ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PostMortem.py
(Generate patch)

Comparing COMP/CRAB/python/PostMortem.py (file contents):
Revision 1.2 by spiga, Tue Nov 8 13:25:22 2005 UTC vs.
Revision 1.12 by corvo, Mon Nov 20 18:44:09 2006 UTC

# Line 1 | Line 1
1   from Actor import *
2 + from crab_util import *
3   import common
4 < import string, os
4 > from ApmonIf import ApmonIf
5 > import Statistic
6 > #from random import random
7 > import time
8 > from ProgressBar import ProgressBar
9 > from TerminalController import TerminalController
10  
11 < class PostMortem(Actor):
12 <    def __init__(self, cfg_params, nj_list, use_boss):
11 > class Submitter(Actor):
12 >    def __init__(self, cfg_params, nj_list):
13          self.cfg_params = cfg_params
14          self.nj_list = nj_list
15 <        self.flag_useboss = use_boss
15 >        
16 >        if common.scheduler.boss_scheduler_name == 'condor_g':
17 >            # create hash of cfg file
18 >            self.hash = makeCksum(common.work_space.cfgFileName())
19 >        else:
20 >            self.hash = ''
21 >
22          return
23      
24      def run(self):
25          """
26 <        The main method of the class.
26 >        The main method of the class: submit jobs in range self.nj_list
27          """
28 <        common.logger.debug(5, "PostMortem::run() called")
28 >        common.logger.debug(5, "Submitter::run() called")
29 >
30 >        totalCreatedJobs= 0
31 >        start = time.time()
32 >        for nj in range(common.jobDB.nJobs()):
33 >            if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
34 >            pass
35  
36 <        if len(self.nj_list)==0:
37 <            common.logger.debug(5, "No jobs to check")
36 >        if (totalCreatedJobs==0):
37 >            common.logger.message("No jobs to be submitted: first create them")
38              return
39  
40 <        # run a list-match on first job
41 <        for nj in self.nj_list:
42 <            if self.flag_useboss == 1 :
43 <                id = common.scheduler.boss_SID(nj+1)
44 <            else:
45 <                id = common.jobDB.jobId(nj)
46 <            out = common.scheduler.loggingInfo(id)
47 <            job = common.job_list[nj]
48 <            jdl_fname = string.replace(job.jdlFilename(),'jdl','loggingInfo')
49 <            if os.path.exists(jdl_fname):
50 <                common.logger.message('Logging info for job '+str(nj+1)+' already present in '+jdl_fname+' Remove it for update')
51 <                continue
52 <            jdl = open(jdl_fname, 'w')
53 <            for line in out: jdl.write(line)
54 <            jdl.close()
55 <            common.logger.message('Logging info for job '+str(nj+1)+' written to '+jdl_fname)
56 <            
40 >        # submit pre DashBoard information
41 >        params = {'jobId':'TaskMeta'}
42 >              
43 >        fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
44 >        for i in fl.readlines():
45 >            val = i.split(':')
46 >            params[val[0]] = string.strip(val[1])
47 >            fl.close()
48 >
49 >        common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
50 >                        
51 >        self.cfg_params['apmon'].sendToML(params)
52 >
53 >        #########
54 >        # Loop over jobs
55 >        njs = 0
56 >        try:
57 >            list=[]
58 >            list_of_list = []  
59 >            lastBlock=-1
60 >            count = 0
61 >            for nj in self.nj_list:
62 >                same=0
63 >                # first check that status of the job is suitable for submission
64 >                st = common.jobDB.status(nj)
65 >                if st != 'C'  and st != 'K' and st != 'A' and st != 'RC':
66 >                    long_st = crabJobStatusToString(st)
67 >                    msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
68 >                    common.logger.message(msg)
69 >                    continue
70 >    
71 >                currBlock = common.jobDB.block(nj)
72 >                # SL perform listmatch only if block has changed
73 >                if (currBlock!=lastBlock):
74 >                    if common.scheduler.boss_scheduler_name != "condor_g" :
75 >                        match = common.scheduler.listMatch(nj, currBlock)
76 >                    else :
77 >                        match = "1"
78 >                    lastBlock = currBlock
79 >                else:
80 >                    common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
81 >                    same=1
82 >    
83 >                if match:
84 >                    if not same:
85 >                        common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
86 >                    else:
87 >                        common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
88 >                    list.append(common.jobDB.bossId(nj))
89 >    
90 >                    if nj == self.nj_list[-1]: # check that is not the last job in the list
91 >                        list_of_list.append([currBlock,list])
92 >                    else: # check if next job has same group
93 >                        nextBlock = common.jobDB.block(nj+1)
94 >                        if  currBlock != nextBlock : # if not, close this group and reset
95 >                            list_of_list.append([currBlock,list])
96 >                            list=[]
97 >                else:
98 >                    common.logger.message("No compatible site found, will not submit job "+str(nj+1))
99 >                    continue
100 >                count += 1
101 >            ### Progress Bar indicator, deactivate for debug
102 >            if not common.logger.debugLevel() :
103 >                term = TerminalController()
104 >    
105 >            for ii in range(len(list_of_list)): # Add loop DS
106 >                common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
107 > #                if not common.logger.debugLevel() :
108 > #                    try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
109 > #                    except: pbar = None
110 >    
111 >                jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
112 >                bjidLista = map(int, bjidLista) # cast all bjidLista to int
113 >    
114 > #                if not common.logger.debugLevel():
115 > #                    if pbar :
116 > #                        pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
117 >    
118 >                for jj in bjidLista: # Add loop over SID returned from group submission  DS
119 >                    tmpNj = jj - 1
120 >                    jid=jidLista[bjidLista.index(jj)]
121 >                    common.logger.debug(5,"Submitted job # "+ `(jj)`)
122 >                    common.jobDB.setStatus(tmpNj, 'S')
123 >                    common.jobDB.setJobId(tmpNj, jid)
124 >                    common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId'])
125 >                    njs += 1
126 >              
127 >                    ##### DashBoard report #####################  
128 >                    try:
129 >                        resFlag = 0
130 >                        if st == 'RC': resFlag = 2
131 >                        Statistic.Monitor('submit',resFlag,jid,'-----','dest')
132 >                    except:
133 >                        pass
134 >                    
135 >                    # OLI: JobID treatment, special for Condor-G scheduler
136 >                    jobId = ''
137 >                    if common.scheduler.boss_scheduler_name == 'condor_g':
138 >                        jobId = str(jj) + '_' + self.hash + '_' + jid
139 >                        common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
140 >                    else:
141 >                        jobId = str(jj) + '_' + jid
142 >                        common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
143 >              
144 >                    if ( jid.find(":") != -1 ) :
145 >                        rb = jid.split(':')[1]
146 >                        rb = rb.replace('//', '')
147 >                    else :
148 >                        rb = 'OSG'
149 >              
150 >                    params = {'jobId': jobId, \
151 >                              'sid': jid, \
152 >                              'broker': rb, \
153 >                              'bossId': jj, \
154 >                              'TargetSE': string.join((common.jobDB.destination(tmpNj)),",")}
155 >              
156 >                    fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
157 >                    for i in fl.readlines():
158 >                        val = i.split(':')
159 >                        params[val[0]] = string.strip(val[1])
160 >                    fl.close()
161 >    
162 >                    common.logger.debug(5,'Submission DashBoard report: '+str(params))
163 >                        
164 >                    self.cfg_params['apmon'].sendToML(params)
165 >                pass
166              pass
167  
168 +        except:
169 +            exctype, value = sys.exc_info()[:2]
170 +            print "Type: %s Value: %s"%(exctype, value)
171 +            common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
172 +            common.jobDB.save()
173 +        
174 +        stop = time.time()
175 +        common.logger.debug(1, "Submission Time: "+str(stop - start))
176 +        common.logger.write("Submission time :"+str(stop - start))
177 +        common.jobDB.save()
178 +            
179 +        msg = '\nTotal of %d jobs submitted'%njs
180 +        if njs != len(self.nj_list) :
181 +            msg += ' (from %d requested).'%(len(self.nj_list))
182 +            pass
183 +        else:
184 +            msg += '.'
185 +            pass
186 +        common.logger.message(msg)
187          return
42

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines