ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PostMortem.py
(Generate patch)

Comparing COMP/CRAB/python/PostMortem.py (file contents):
Revision 1.12 by corvo, Mon Nov 20 18:44:09 2006 UTC vs.
Revision 1.18 by spiga, Tue Apr 15 09:37:47 2008 UTC

# Line 1 | Line 1
1   from Actor import *
2   from crab_util import *
3 + #import EdgLoggingInfo
4 + #import CondorGLoggingInfo
5   import common
6 < from ApmonIf import ApmonIf
5 < import Statistic
6 < #from random import random
7 < import time
8 < from ProgressBar import ProgressBar
9 < from TerminalController import TerminalController
6 > import string, os
7  
8 < class Submitter(Actor):
8 > class PostMortem(Actor):
9      def __init__(self, cfg_params, nj_list):
10          self.cfg_params = cfg_params
11          self.nj_list = nj_list
12 <        
13 <        if common.scheduler.boss_scheduler_name == 'condor_g':
17 <            # create hash of cfg file
18 <            self.hash = makeCksum(common.work_space.cfgFileName())
19 <        else:
20 <            self.hash = ''
12 >
13 >        self.fname_base = common.work_space.jobDir() + self.cfg_params['CRAB.jobtype'].upper() + '_'
14  
15          return
16      
17      def run(self):
18          """
19 <        The main method of the class: submit jobs in range self.nj_list
19 >        The main method of the class.
20          """
21 <        common.logger.debug(5, "Submitter::run() called")
21 >        common.logger.debug(5, "PostMortem::run() called")
22  
23 <        totalCreatedJobs= 0
31 <        start = time.time()
32 <        for nj in range(common.jobDB.nJobs()):
33 <            if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
34 <            pass
35 <
36 <        if (totalCreatedJobs==0):
37 <            common.logger.message("No jobs to be submitted: first create them")
38 <            return
39 <
40 <        # submit pre DashBoard information
41 <        params = {'jobId':'TaskMeta'}
42 <              
43 <        fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
44 <        for i in fl.readlines():
45 <            val = i.split(':')
46 <            params[val[0]] = string.strip(val[1])
47 <            fl.close()
23 >        self.collectLogging()
24  
25 <        common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
26 <                        
27 <        self.cfg_params['apmon'].sendToML(params)
28 <
29 <        #########
30 <        # Loop over jobs
31 <        njs = 0
32 <        try:
33 <            list=[]
34 <            list_of_list = []  
35 <            lastBlock=-1
36 <            count = 0
37 <            for nj in self.nj_list:
62 <                same=0
63 <                # first check that status of the job is suitable for submission
64 <                st = common.jobDB.status(nj)
65 <                if st != 'C'  and st != 'K' and st != 'A' and st != 'RC':
66 <                    long_st = crabJobStatusToString(st)
67 <                    msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
68 <                    common.logger.message(msg)
69 <                    continue
70 <    
71 <                currBlock = common.jobDB.block(nj)
72 <                # SL perform listmatch only if block has changed
73 <                if (currBlock!=lastBlock):
74 <                    if common.scheduler.boss_scheduler_name != "condor_g" :
75 <                        match = common.scheduler.listMatch(nj, currBlock)
76 <                    else :
77 <                        match = "1"
78 <                    lastBlock = currBlock
79 <                else:
80 <                    common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
81 <                    same=1
82 <    
83 <                if match:
84 <                    if not same:
85 <                        common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
86 <                    else:
87 <                        common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
88 <                    list.append(common.jobDB.bossId(nj))
89 <    
90 <                    if nj == self.nj_list[-1]: # check that is not the last job in the list
91 <                        list_of_list.append([currBlock,list])
92 <                    else: # check if next job has same group
93 <                        nextBlock = common.jobDB.block(nj+1)
94 <                        if  currBlock != nextBlock : # if not, close this group and reset
95 <                            list_of_list.append([currBlock,list])
96 <                            list=[]
97 <                else:
98 <                    common.logger.message("No compatible site found, will not submit job "+str(nj+1))
99 <                    continue
100 <                count += 1
101 <            ### Progress Bar indicator, deactivate for debug
102 <            if not common.logger.debugLevel() :
103 <                term = TerminalController()
104 <    
105 <            for ii in range(len(list_of_list)): # Add loop DS
106 <                common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
107 < #                if not common.logger.debugLevel() :
108 < #                    try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
109 < #                    except: pbar = None
110 <    
111 <                jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
112 <                bjidLista = map(int, bjidLista) # cast all bjidLista to int
113 <    
114 < #                if not common.logger.debugLevel():
115 < #                    if pbar :
116 < #                        pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
117 <    
118 <                for jj in bjidLista: # Add loop over SID returned from group submission  DS
119 <                    tmpNj = jj - 1
120 <                    jid=jidLista[bjidLista.index(jj)]
121 <                    common.logger.debug(5,"Submitted job # "+ `(jj)`)
122 <                    common.jobDB.setStatus(tmpNj, 'S')
123 <                    common.jobDB.setJobId(tmpNj, jid)
124 <                    common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId'])
125 <                    njs += 1
126 <              
127 <                    ##### DashBoard report #####################  
128 <                    try:
129 <                        resFlag = 0
130 <                        if st == 'RC': resFlag = 2
131 <                        Statistic.Monitor('submit',resFlag,jid,'-----','dest')
132 <                    except:
133 <                        pass
134 <                    
135 <                    # OLI: JobID treatment, special for Condor-G scheduler
136 <                    jobId = ''
137 <                    if common.scheduler.boss_scheduler_name == 'condor_g':
138 <                        jobId = str(jj) + '_' + self.hash + '_' + jid
139 <                        common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
140 <                    else:
141 <                        jobId = str(jj) + '_' + jid
142 <                        common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
143 <              
144 <                    if ( jid.find(":") != -1 ) :
145 <                        rb = jid.split(':')[1]
146 <                        rb = rb.replace('//', '')
147 <                    else :
148 <                        rb = 'OSG'
149 <              
150 <                    params = {'jobId': jobId, \
151 <                              'sid': jid, \
152 <                              'broker': rb, \
153 <                              'bossId': jj, \
154 <                              'TargetSE': string.join((common.jobDB.destination(tmpNj)),",")}
155 <              
156 <                    fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
157 <                    for i in fl.readlines():
158 <                        val = i.split(':')
159 <                        params[val[0]] = string.strip(val[1])
160 <                    fl.close()
161 <    
162 <                    common.logger.debug(5,'Submission DashBoard report: '+str(params))
163 <                        
164 <                    self.cfg_params['apmon'].sendToML(params)
165 <                pass
166 <            pass
167 <
168 <        except:
169 <            exctype, value = sys.exc_info()[:2]
170 <            print "Type: %s Value: %s"%(exctype, value)
171 <            common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
172 <            common.jobDB.save()
173 <        
174 <        stop = time.time()
175 <        common.logger.debug(1, "Submission Time: "+str(stop - start))
176 <        common.logger.write("Submission time :"+str(stop - start))
177 <        common.jobDB.save()
178 <            
179 <        msg = '\nTotal of %d jobs submitted'%njs
180 <        if njs != len(self.nj_list) :
181 <            msg += ' (from %d requested).'%(len(self.nj_list))
182 <            pass
183 <        else:
184 <            msg += '.'
185 <            pass
186 <        common.logger.message(msg)
25 >
26 >    def collectLogging(self):
27 >        for id in self.nj_list:
28 >            fname = self.fname_base + str(id) + '.LoggingInfo'
29 >            if os.path.exists(fname):
30 >                common.logger.message('Logging info for job ' + str(id) + ' already present in '+fname+'\nRemove it for update')
31 >                continue
32 >            common.scheduler.loggingInfo(id,self.fname_base+str(id))
33 >            fl = open(fname, 'r')
34 >            out = "".join(fl.readlines())  
35 >            fl.close()
36 >            reason = self.decodeLogging(out)
37 >            common.logger.message('Logging info for job '+ str(id) +': '+str(reason)+'\n      written to '+str(fname) )
38          return
39 +        
40 +    def decodeLogging(self, out):
41 +        """
42 +        """
43 +        return  common.scheduler.decodeLogInfo(out)
44 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines