ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PostMortem.py
(Generate patch)

Comparing COMP/CRAB/python/PostMortem.py (file contents):
Revision 1.12 by corvo, Mon Nov 20 18:44:09 2006 UTC vs.
Revision 1.16 by slacapra, Fri Jan 4 17:30:56 2008 UTC

# Line 1 | Line 1
1   from Actor import *
2   from crab_util import *
3 + import EdgLoggingInfo
4 + import CondorGLoggingInfo
5   import common
6 < from ApmonIf import ApmonIf
5 < import Statistic
6 < #from random import random
7 < import time
8 < from ProgressBar import ProgressBar
9 < from TerminalController import TerminalController
6 > import string, os
7  
8 < class Submitter(Actor):
8 > class PostMortem(Actor):
9      def __init__(self, cfg_params, nj_list):
10          self.cfg_params = cfg_params
11          self.nj_list = nj_list
12 <        
12 >
13          if common.scheduler.boss_scheduler_name == 'condor_g':
14              # create hash of cfg file
15              self.hash = makeCksum(common.work_space.cfgFileName())
16          else:
17              self.hash = ''
18 <
18 >        
19          return
20      
21      def run(self):
22          """
23 <        The main method of the class: submit jobs in range self.nj_list
23 >        The main method of the class.
24          """
25 <        common.logger.debug(5, "Submitter::run() called")
29 <
30 <        totalCreatedJobs= 0
31 <        start = time.time()
32 <        for nj in range(common.jobDB.nJobs()):
33 <            if (common.jobDB.status(nj)=='C') or (common.jobDB.status(nj)=='RC'): totalCreatedJobs +=1
34 <            pass
25 >        common.logger.debug(5, "PostMortem::run() called")
26  
27 <        if (totalCreatedJobs==0):
28 <            common.logger.message("No jobs to be submitted: first create them")
29 <            return
30 <
31 <        # submit pre DashBoard information
32 <        params = {'jobId':'TaskMeta'}
33 <              
34 <        fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
35 <        for i in fl.readlines():
36 <            val = i.split(':')
37 <            params[val[0]] = string.strip(val[1])
38 <            fl.close()
39 <
40 <        common.logger.debug(5,'Submission DashBoard Pre-Submission report: '+str(params))
41 <                        
42 <        self.cfg_params['apmon'].sendToML(params)
43 <
44 <        #########
45 <        # Loop over jobs
46 <        njs = 0
47 <        try:
48 <            list=[]
49 <            list_of_list = []  
59 <            lastBlock=-1
60 <            count = 0
61 <            for nj in self.nj_list:
62 <                same=0
63 <                # first check that status of the job is suitable for submission
64 <                st = common.jobDB.status(nj)
65 <                if st != 'C'  and st != 'K' and st != 'A' and st != 'RC':
66 <                    long_st = crabJobStatusToString(st)
67 <                    msg = "Job # %d not submitted: status %s"%(nj+1, long_st)
68 <                    common.logger.message(msg)
69 <                    continue
70 <    
71 <                currBlock = common.jobDB.block(nj)
72 <                # SL perform listmatch only if block has changed
73 <                if (currBlock!=lastBlock):
74 <                    if common.scheduler.boss_scheduler_name != "condor_g" :
75 <                        match = common.scheduler.listMatch(nj, currBlock)
76 <                    else :
77 <                        match = "1"
78 <                    lastBlock = currBlock
79 <                else:
80 <                    common.logger.debug(1,"Sites for job "+str(nj+1)+" the same as previous job")
81 <                    same=1
82 <    
83 <                if match:
84 <                    if not same:
85 <                        common.logger.message("Found "+str(match)+" compatible site(s) for job "+str(nj+1))
86 <                    else:
87 <                        common.logger.debug(1,"Found "+str(match)+" compatible site(s) for job "+str(nj+1))
88 <                    list.append(common.jobDB.bossId(nj))
89 <    
90 <                    if nj == self.nj_list[-1]: # check that is not the last job in the list
91 <                        list_of_list.append([currBlock,list])
92 <                    else: # check if next job has same group
93 <                        nextBlock = common.jobDB.block(nj+1)
94 <                        if  currBlock != nextBlock : # if not, close this group and reset
95 <                            list_of_list.append([currBlock,list])
96 <                            list=[]
97 <                else:
98 <                    common.logger.message("No compatible site found, will not submit job "+str(nj+1))
99 <                    continue
100 <                count += 1
101 <            ### Progress Bar indicator, deactivate for debug
102 <            if not common.logger.debugLevel() :
103 <                term = TerminalController()
104 <    
105 <            for ii in range(len(list_of_list)): # Add loop DS
106 <                common.logger.debug(1,'Submitting jobs '+str(list_of_list[ii][1]))
107 < #                if not common.logger.debugLevel() :
108 < #                    try: pbar = ProgressBar(term, 'Submitting '+str(len(list_of_list[ii][1]))+' jobs')
109 < #                    except: pbar = None
110 <    
111 <                jidLista, bjidLista = common.scheduler.submit(list_of_list[ii])
112 <                bjidLista = map(int, bjidLista) # cast all bjidLista to int
113 <    
114 < #                if not common.logger.debugLevel():
115 < #                    if pbar :
116 < #                        pbar.update(float(ii+1)/float(len(list_of_list)),'please wait')
117 <    
118 <                for jj in bjidLista: # Add loop over SID returned from group submission  DS
119 <                    tmpNj = jj - 1
120 <                    jid=jidLista[bjidLista.index(jj)]
121 <                    common.logger.debug(5,"Submitted job # "+ `(jj)`)
122 <                    common.jobDB.setStatus(tmpNj, 'S')
123 <                    common.jobDB.setJobId(tmpNj, jid)
124 <                    common.jobDB.setTaskId(tmpNj, self.cfg_params['taskId'])
125 <                    njs += 1
126 <              
127 <                    ##### DashBoard report #####################  
128 <                    try:
129 <                        resFlag = 0
130 <                        if st == 'RC': resFlag = 2
131 <                        Statistic.Monitor('submit',resFlag,jid,'-----','dest')
132 <                    except:
133 <                        pass
134 <                    
135 <                    # OLI: JobID treatment, special for Condor-G scheduler
136 <                    jobId = ''
137 <                    if common.scheduler.boss_scheduler_name == 'condor_g':
138 <                        jobId = str(jj) + '_' + self.hash + '_' + jid
139 <                        common.logger.debug(5,'JobID for ML monitoring is created for CONDOR_G scheduler:'+jobId)
140 <                    else:
141 <                        jobId = str(jj) + '_' + jid
142 <                        common.logger.debug(5,'JobID for ML monitoring is created for EDG scheduler'+jobId)
143 <              
144 <                    if ( jid.find(":") != -1 ) :
145 <                        rb = jid.split(':')[1]
146 <                        rb = rb.replace('//', '')
147 <                    else :
148 <                        rb = 'OSG'
149 <              
150 <                    params = {'jobId': jobId, \
151 <                              'sid': jid, \
152 <                              'broker': rb, \
153 <                              'bossId': jj, \
154 <                              'TargetSE': string.join((common.jobDB.destination(tmpNj)),",")}
155 <              
156 <                    fl = open(common.work_space.shareDir() + '/' + self.cfg_params['apmon'].fName, 'r')
157 <                    for i in fl.readlines():
158 <                        val = i.split(':')
159 <                        params[val[0]] = string.strip(val[1])
160 <                    fl.close()
161 <    
162 <                    common.logger.debug(5,'Submission DashBoard report: '+str(params))
163 <                        
164 <                    self.cfg_params['apmon'].sendToML(params)
165 <                pass
166 <            pass
27 >        for c, v in self.nj_list.iteritems():
28 >            id = int(c)
29 >            out = common.scheduler.loggingInfo(v)
30 >            # job = common.job_list[id - 1]
31 >            jobnum_str = '%06d' % (id)
32 >            fname = common.work_space.jobDir() + '/' + self.cfg_params['CRAB.jobtype'].upper() + '_' + jobnum_str + '.loggingInfo'
33 >            if os.path.exists(fname):
34 >                common.logger.message('Logging info for job ' + str(id) + ' already present in '+fname+'\nRemove it for update')
35 >                continue
36 >            jdl = open(fname, 'w')
37 >            for line in out: jdl.write(line)
38 >            jdl.close()
39 >
40 >            reason = ''
41 >            ## SL this if-elif is the negation of OO! Mus disappear ASAP
42 >            if common.scheduler.boss_scheduler_name == "edg" or common.scheduler.boss_scheduler_name == "glite" or common.scheduler.boss_scheduler_name == "glitecoll":
43 >                loggingInfo = EdgLoggingInfo.EdgLoggingInfo()
44 >                reason = loggingInfo.decodeReason(out)
45 >            elif common.scheduler.boss_scheduler_name == "condor_g" :
46 >                loggingInfo = CondorGLoggingInfo.CondorGLoggingInfo()
47 >                reason = loggingInfo.decodeReason(out)
48 >            else :
49 >                reason = out
50  
51 <        except:
169 <            exctype, value = sys.exc_info()[:2]
170 <            print "Type: %s Value: %s"%(exctype, value)
171 <            common.logger.message("Submitter::run Exception raised: %s %s"%(exctype, value))
172 <            common.jobDB.save()
173 <        
174 <        stop = time.time()
175 <        common.logger.debug(1, "Submission Time: "+str(stop - start))
176 <        common.logger.write("Submission time :"+str(stop - start))
177 <        common.jobDB.save()
51 >            common.logger.message('Logging info for job '+ str(id) +': '+str(reason)+'\n      written to '+str(fname) )
52              
53 <        msg = '\nTotal of %d jobs submitted'%njs
54 <        if njs != len(self.nj_list) :
55 <            msg += ' (from %d requested).'%(len(self.nj_list))
56 <            pass
57 <        else:
58 <            msg += '.'
53 >            # ML reporting
54 >            jobId = ''
55 >            if common.scheduler.boss_scheduler_name == 'condor_g':
56 >                jobId = str(id) + '_' + self.hash + '_' + v
57 >            else:
58 >                jobId = str(id) + '_' + v
59 >
60 >            params = {'taskId': self.cfg_params['taskId'], 'jobId':  jobId, \
61 >                      'sid': v,
62 >                      'PostMortemCategory': loggingInfo.getCategory(), \
63 >                      'PostMortemReason': loggingInfo.getReason()}
64 >            common.apmon.sendToML(params)
65              pass
66 <        common.logger.message(msg)
66 >
67          return
68 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines