ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/jobSitter.py
(Generate patch)

Comparing UserCode/MitProd/Processing/bin/jobSitter.py (file contents):
Revision 1.1 by paus, Sat Jun 5 01:49:22 2010 UTC vs.
Revision 1.2 by paus, Sat Jun 5 02:36:28 2010 UTC

# Line 0 | Line 1
1 + #!/usr/bin/env python
2 + #---------------------------------------------------------------------------------------------------
3 + # Script to go through my crab jobs, get status and output and take some completion action if
4 + # needed and desired.
5 + #
6 + # Author: C.Paus                                                                      (July 1, 2008)
7 + #---------------------------------------------------------------------------------------------------
8 + import os,sys,getopt,re,string
9 + import task
10 +
11 + def removeCrabTask(crabTask):
12 +        cmd = 'crab -kill all -continue ' + crabTask.tag + ' >& /dev/null; rm -rf ' + crabTask.tag
13 +        print ' KILL and REMOVE task: ' + cmd
14 +        status = os.system(cmd)
15 +        return status
16 +
17 + def updateExitStati(tag,status):
18 +        # Make the file
19 +        file = tag + '/res/CMSSW_%d.stdout'%(status.index)
20 +        #print ' Analyzing file: ' + file
21 +        cmd = 'cat ' + file
22 +        if os.path.exists(file):
23 +            for line in os.popen(cmd).readlines():  # run command
24 +                line = line[:-1]
25 +                # get ride of empty or commented lines
26 +                if line == '' or line[0] == '#':
27 +                    continue
28 +                # no more unnecessary spaces
29 +                line = " ".join(str(line).split()).strip()
30 +
31 +                f = line.split(" ")
32 +
33 +                if f[0] == 'EXECUTABLE_EXIT_STATUS':
34 +                    status.exitCode   = int(f[2])
35 +                if f[0] == 'StageOutExitStatus':
36 +                    status.exitStatus = int(f[2])
37 +    #else:
38 +    #    print ' output file for this job not yet retrieved. '
39 +
40 + def appendBlacklistSites(tag,failedlist,blacklist,exe):
41 +    # Make the file
42 +    file = tag + '/share/crab.cfg'
43 +    # Find out whether there are any ce black listed
44 +    ceBlacklist = ""
45 +    # Get the original black list
46 +    cmd = 'cat ' + file
47 +
48 +    # finding the one line
49 +    join       = 0
50 +    fullLine   = ""
51 +    bSlash     = "\\";
52 +
53 +    # New Configuration file
54 +    fileOutput = open(tag + '_crab.cfg','w')
55 +
56 +    for line in os.popen(cmd).readlines():  # run command
57 +        line = line[:-1]
58 +        ##print 'Line: ' + line
59 +
60 +        # get ride of empty or commented lines
61 +        if line == '' or line[0] == '#':
62 +            continue
63 +        # no more unnecessary spaces
64 +        line = " ".join(str(line).split()).strip()
65 +        # join lines
66 +        if join == 1:
67 +            fullLine += line
68 +        else:
69 +            fullLine  = line
70 +        # determine if finished or more is coming
71 +        if fullLine[-1] == bSlash:
72 +            join = 1
73 +            fullLine = fullLine[:-1]
74 +        # line really ended, so now look at the whole thing
75 +        else:
76 +            join = 0
77 +            # test whether there is a directory
78 +            names      = fullLine.split('=')       # splitting every blank
79 +            ##print "FullLine: " + fullLine
80 +            if names[0] == 'ce_black_list':
81 +                ceBlacklist = fullLine
82 +                fullLine += ',' + failedlist
83 +                if blacklist != "":
84 +                    fullLine = "ce_black_list=" + blacklist
85 +                print "    new blacklist: " + fullLine
86 +
87 +            fileOutput.write(fullLine + '\n')
88 +
89 +    fileOutput.close()
90 +
91 +    if ceBlacklist == '':
92 +        print ' WARNING - site blacklisting did not work'
93 +
94 +    cmd = 'mv ' + tag + '_crab.cfg ' + file
95 +    print "\nACTION -- MOVE: " + cmd
96 +    status = 0
97 +    if exe == 1:
98 +        status = os.system(cmd)
99 +    else:
100 +        ##status = os.system('cat ' + tag + '_crab.cfg')
101 +        status = os.system('rm ' + tag + '_crab.cfg')
102 +    return status
103 +
104 + def removeJobRemainders(storageEle,storagePath,mitDataset,index,exe):
105 +    fileMit = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
106 +              '_000_%d'%(index) + '.root'
107 +    fileEdm = 'srm://' + storageEle + ':8443' + storagePath + '/' + mitDataset + \
108 +              '-edm_%d'%(index) + '.root'
109 +    cmd = 'srmrm ' + fileMit + ' ' + fileEdm + ' >& /dev/null'
110 +    #print 'ACTION -- REMOVE: \n   srmrm ' + fileMit + '\n   srmrm ' + fileEdm + '\n'
111 +    status = 0
112 +    if exe == 1:
113 +        # for now not execute this # status = os.system(cmd)
114 +        print 'For now not removing file: ' + cmd
115 +    return status
116 +
117 + #===================================================================================================
118 + # Main starts here
119 + #===================================================================================================
120 + # Define string to explain usage of the script
121 + usage = \
122 +      "\nUsage: jobSitter.py [ --pattern= --blacklist=" + \
123 +      "                        --status  --help --backward --clean --extend --one --exe ]\n"
124 +
125 + # Define the valid options which can be specified and check out the command line
126 + valid = ['pattern=','blacklist=','catalog=','help','backward','kill','clean','exe','extend','one']
127 + try:
128 +    opts, args = getopt.getopt(sys.argv[1:], "", valid)
129 + except getopt.GetoptError, ex:
130 +    print usage
131 +    print str(ex)
132 +    sys.exit(1)
133 +
134 + # --------------------------------------------------------------------------------------------------
135 + # Get all parameters for this little task
136 + # --------------------------------------------------------------------------------------------------
137 + # Set defaults
138 + pattern   = ''
139 + blacklist = ''
140 + catalog   = 0
141 + clean     = 0
142 + kill      = 0
143 + exe       = 0
144 + extend    = 0
145 + one       = 0
146 + backward  = ''
147 +
148 + # Read new values from the command line
149 + for opt, arg in opts:
150 +    if opt == "--help":
151 +        print usage
152 +        sys.exit(0)
153 +    if opt == "--pattern":
154 +        pattern   = arg
155 +    if opt == "--blacklist":
156 +        blacklist = arg
157 +    if opt == "--catalog":
158 +        catalog   = int(arg)
159 +    if opt == "--clean":
160 +        clean     = 1
161 +    if opt == "--one":
162 +        one       = 1
163 +    if opt == "--exe":
164 +        exe       = 1
165 +    if opt == "--extend":
166 +        extend    = 1
167 +    if opt == "--backward":
168 +        backward  = ' -r '
169 +    if opt == "--kill":
170 +        kill      = 1
171 +
172 + # --------------------------------------------------------------------------------------------------
173 + # Here is where the real action starts -------------------------------------------------------------
174 + # --------------------------------------------------------------------------------------------------
175 +
176 + # Find the list of crab tasks to babysit
177 + crabTasks   = []
178 + datasetList = []
179 + cmd = 'find ./ -maxdepth 1 -name crab_0_\* |grep -v cfg | sort' + backward
180 + print '\n=============================================================================='
181 + print ' Summary of crab task list: \n'
182 + for line in os.popen(cmd).readlines():  # run command
183 +    line       = line[:-1]              # strip '\n'
184 +    ## print ' LINE: ' + line
185 +    f          = line.split('/')        # splitting every blank
186 +    tag        = f.pop()
187 +
188 +    crabTask   = task.Task(tag)
189 +
190 +    #print 'Pattern: ' + pattern + '  tag: ' + crabTask.mitDataset
191 +    if re.search(pattern,crabTask.mitDataset):
192 +        crabTasks.append(crabTask)
193 +        crabTask.show()
194 +
195 +    if one == 1:
196 +        break
197 +
198 + # Process the crab tasks determined to be relevant in the last query
199 + print '\n=============================================================================='
200 + print ' Process crab task list (please wait, crab commands are first fully parsed)\n'
201 + i = 0
202 + for crabTask in crabTasks:
203 +
204 +    dataset     = crabTask.cmsDataset
205 +    storageEle  = crabTask.storageEle
206 +    storagePath = crabTask.storagePath
207 +
208 +    if kill == 1:
209 +            crabTask.killAndRemove(1)
210 +            continue
211 +
212 +    crabTask.loadAllLfns(crabTask.mitCfg + '/' + crabTask.mitVersion + '/' + \
213 +                         crabTask.mitDataset + '.lfns')
214 +    #if crabTask.status == 'cataloged':
215 +    #    ##removeCrabTask(crabTask)
216 +    #    crabTask.killAndRemove(1)
217 +    #    continue
218 +
219 +    # make sure catalog is up to date
220 +    f   = storagePath.split(" ")
221 +    cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' \
222 +          + crabTask.mitVersion + ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
223 +
224 +    if catalog == 3:
225 +            cmd = 'catalog.sh -cegt -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
226 +                   ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
227 +    if catalog == 4:
228 +            cmd = 'catalog.sh -eg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
229 +                   ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --retry'
230 +    if catalog == 5:
231 +            cmd = 'catalog.sh -ceg -m ' + crabTask.mitCfg + ' ' + crabTask.mitVersion + \
232 +                   ' ' + crabTask.mitDataset + '/' + crabTask.tag + ' --remove'
233 +    print ' CATALOG: ' + cmd
234 +    if catalog != 0:
235 +            os.system(cmd)
236 +
237 +    # break out of the loop as only cataloging is required
238 +    if catalog > 1:
239 +            continue
240 +
241 +    # do we need to extend the task
242 +    if extend == 1:
243 +        print '\n------------------------------------------------------------------------------'
244 +        print '  --> EXTEND ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
245 +              + '\n       -> ' + dataset   + '  (' + crabTask.mitDataset + ')'\
246 +              + '\n       -> ' + storageEle \
247 +              + '\n       -> ' + storagePath
248 +        print '------------------------------------------------------------------------------\n'
249 +        cmd = 'crab -extend -c ' + crabTask.tag
250 +        os.system(cmd)
251 +
252 +    print '\n------------------------------------------------------------------------------'
253 +    print '  --> STATUS ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
254 +          + '\n       -> ' + dataset   + '  (' + crabTask.mitDataset + ')'\
255 +          + '\n       -> ' + storageEle \
256 +          + '\n       -> ' + storagePath
257 +    print '------------------------------------------------------------------------------\n'
258 +
259 +    # interact with crab to get the job status
260 +    crabTask.getJobStati()
261 +    if len(crabTask.jobStati) < 1:
262 +        print ' ERROR - dropped empty crab task from the work list.'
263 +        print '         crab task id: ' + crabTask.tag
264 +        continue
265 +    else:
266 +        print ' '
267 +        print ' Task status: ' + crabTask.status
268 +        if crabTask.status == 'completed' or crabTask.status == 'finished':
269 +            crabTask.remove(clean)
270 +        print ' '
271 +
272 +    # review failing sites
273 +    siteList = ""
274 +    if len(crabTask.failingSites) > 0:
275 +        nSites = 0
276 +        siteList = ",".join(crabTask.failingSites)
277 +        print " Failing sites (consider blacklisting them)"
278 +        for site,nAbort in crabTask.failingSites.iteritems():
279 +            nSites += 1
280 +            print '    ' + site + '(%d'%nAbort  + ')'
281 +    appendBlacklistSites(crabTask.tag,siteList,blacklist,exe)
282 +
283 +    # review all job stati and update exit stati if needed
284 +    for status in crabTask.jobStati:
285 +        if status.tag == 'Retrieved' and (status.exitCode < 0 and status.exitStatus < 0):
286 +            updateExitStati(crabTask.tag,status)
287 +            #status.showCompact()
288 +
289 +    # review all job stati and propose action
290 +    subList   = ''
291 +    resubList = ''
292 +    for status in crabTask.jobStati:
293 +        ##print ' %4.0d '%(status.index) + ' --> ' + status.tag
294 +        if ((status.tag == 'Created' and status.outputFile == 0)):
295 +            status.showCompact()
296 +            if subList == '':
297 +                subList += '%d'%(status.index)
298 +            else:
299 +                subList += ',%d'%(status.index)
300 +        if ((status.tag == 'Aborted'  or status.exitCode > 0 or status.exitStatus > 0) or
301 +            (status.tag == 'Retrieved' and status.outputFile == 0)):
302 +            status.showCompact()
303 +            if resubList == '':
304 +                resubList += '%d'%(status.index)
305 +            else:
306 +                resubList += ',%d'%(status.index)
307 +            # for failed job first remove remainders before resubmitting
308 +            if status.outputFile == 1:
309 +                removeJobRemainders(storageEle,storagePath,crabTask.mitDataset,status.index,exe)
310 +
311 +    if subList != '':
312 +        if not re.search('-',subList) and not re.search(',',subList):
313 +            subList = subList + ',999999999'
314 +        cmd = 'crab -c ' + crabTask.tag + ' -submit ' + subList
315 +        print '\nACTION -- SUBMIT.PY: ' + cmd
316 +        if exe == 1:
317 +            status = os.system(cmd)
318 +    if resubList != '':
319 +        cmd = 'crab -c ' + crabTask.tag + ' -resubmit ' + resubList
320 +        print '\nACTION -- RE-SUBMIT.PY: ' + cmd
321 +        if exe == 1:
322 +            status = os.system(cmd)
323 +
324 +
325 +    print '\n------------------------------------------------------------------------------'
326 +    print '  --> GETOUTPUT ' + crabTask.tag + ' -- wait crab commands first fully parsed -- ' \
327 +          + '\n       -> ' + dataset \
328 +          + '\n       -> ' + storageEle \
329 +          + '\n       -> ' + storagePath
330 +    print '------------------------------------------------------------------------------\n'
331 +
332 +    cmd = 'crab -getoutput -continue ' + crabTask.tag
333 +    status = os.system(cmd)
334 +
335 +    cmd = 'cleanupLog.py --crabId ' + crabTask.tag
336 +    status = os.system(cmd)
337 +
338 +    i += 1

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines