ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerCondor_g.py
Revision: 1.83
Committed: Thu Mar 27 21:14:45 2008 UTC (17 years, 1 month ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.82: +20 -18 lines
Log Message:
Improvements in sched_parameter

File Contents

# User Rev Content
1 ewv 1.79 from SchedulerGrid import SchedulerGrid
2 gutsche 1.1 from JobList import JobList
3     from crab_logger import Logger
4     from crab_exceptions import *
5     from crab_util import *
6 gutsche 1.42 from osg_bdii import *
7 gutsche 1.1 import time
8     import common
9 gutsche 1.25 import popen2
10 slacapra 1.39 import os
11 gutsche 1.49 from BlackWhiteListParser import BlackWhiteListParser
12 gutsche 1.1
13 ewv 1.83 import pdb # Use while debugging
14    
15    
16 ewv 1.79 class SchedulerCondor_g(SchedulerGrid):
17 gutsche 1.1 def __init__(self):
18 ewv 1.79 SchedulerGrid.__init__(self,"CONDOR_G")
19 gutsche 1.1
20     # check for locally running condor scheduler
21     cmd = 'ps xau | grep -i condor_schedd | grep -v grep'
22     cmd_out = runCommand(cmd)
23     if cmd_out == None:
24 gutsche 1.20 msg = '[Condor-G Scheduler]: condor_schedd is not running on this machine.\n'
25     msg += '[Condor-G Scheduler]: Please use another machine with installed condor and running condor_schedd or change the Scheduler in your crab.cfg.'
26 gutsche 1.42 common.logger.debug(2,msg)
27 gutsche 1.20 raise CrabException(msg)
28 gutsche 1.1
29     self.checkExecutableInPath('condor_q')
30     self.checkExecutableInPath('condor_submit')
31     self.checkExecutableInPath('condor_version')
32    
33     # get version number
34     cmd = 'condor_version'
35     cmd_out = runCommand(cmd)
36     if cmd != None :
37     tmp = cmd_out.find('CondorVersion') + 15
38     version = cmd_out[tmp:tmp+6].split('.')
39     version_master = int(version[0])
40     version_major = int(version[1])
41     version_minor = int(version[2])
42     else :
43 gutsche 1.20 msg = '[Condor-G Scheduler]: condor_version was not able to determine the installed condor version.\n'
44     msg += '[Condor-G Scheduler]: Please use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
45 gutsche 1.42 common.logger.debug(2,msg)
46 gutsche 1.20 raise CrabException(msg)
47 gutsche 1.1
48     self.checkExecutableInPath('condor_config_val')
49    
50     self.checkCondorVariablePointsToFile('GRIDMANAGER')
51    
52 ewv 1.76 self.checkCondorVariablePointsToFile('GT2_GAHP',alternate_name='GAHP')
53 gutsche 1.1
54     self.checkCondorVariablePointsToFile('GRID_MONITOR')
55    
56     self.checkCondorVariableIsTrue('ENABLE_GRID_MONITOR')
57    
58 gutsche 1.25 max_submit = self.queryCondorVariable('GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE',100).strip()
59     max_pending = self.queryCondorVariable('GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE','Unlimited').strip()
60 gutsche 1.1
61 gutsche 1.20 msg = '[Condor-G Scheduler]\n'
62     msg += 'Maximal number of jobs submitted to the grid : GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE = '+max_submit+'\n'
63     msg += 'Maximal number of parallel submits to the grid : GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE = '+max_pending+'\n'
64     msg += 'Ask the administrator of your local condor installation to increase these variables to enable more jobs to be executed on the grid in parallel.\n'
65     common.logger.debug(2,msg)
66 mkirn 1.15
67 gutsche 1.29 # create hash
68     self.hash = makeCksum(common.work_space.cfgFileName())
69    
70 gutsche 1.1 return
71    
72 gutsche 1.38 def getCEfromSE(self, seSite):
73 gutsche 1.42 # returns the ce including jobmanager
74 ewv 1.58 ces = jm_from_se_bdii(seSite)
75 gutsche 1.42
76     # mapping ce_hostname to full ce name including jobmanager
77     ce_hostnames = {}
78     for ce in ces :
79     ce_hostnames[ce.split(':')[0].strip()] = ce
80 gutsche 1.38
81     oneSite=''
82 gutsche 1.42 if ( len(ce_hostnames.keys()) == 1 ) :
83     oneSite=ce_hostnames[ce_hostnames.keys()[0]]
84     elif ( len(ce_hostnames.keys()) > 1 ) :
85     if 'EDG.ce_white_list' in self.cfg_params.keys() and len(self.cfg_params['EDG.ce_white_list'].split(',')) == 1 and self.cfg_params['EDG.ce_white_list'].strip() in ce_hostnames.keys() :
86     oneSite = ce_hostnames[self.cfg_params['EDG.ce_white_list']]
87 gutsche 1.38 else :
88     msg = '[Condor-G Scheduler]: More than one Compute Element (CE) is available for job submission.\n'
89     msg += '[Condor-G Scheduler]: Please select one of the following CE:\n'
90     msg += '[Condor-G Scheduler]:'
91 gutsche 1.42 for host in ce_hostnames.keys() :
92 gutsche 1.38 msg += ' ' + host
93     msg += '\n'
94     msg += '[Condor-G Scheduler]: and enter this CE in the CE_white_list variable of the [EDG] section in your crab.cfg.\n'
95 gutsche 1.42 common.logger.debug(2,msg)
96 gutsche 1.38 raise CrabException(msg)
97     else :
98 gutsche 1.42 raise CrabException('[Condor-G Scheduler]: CE hostname(s) for SE '+seSite+' could not be determined from BDII.')
99 gutsche 1.40
100 gutsche 1.38 return oneSite
101 gutsche 1.40
102 gutsche 1.1 def checkExecutableInPath(self, name):
103     # check if executable is in PATH
104     cmd = 'which '+name
105     cmd_out = runCommand(cmd)
106     if cmd_out == None:
107 gutsche 1.20 msg = '[Condor-G Scheduler]: '+name+' is not in the $PATH on this machine.\n'
108     msg += '[Condor-G Scheduler]: Please use another machine with installed condor or change the Scheduler in your crab.cfg.'
109 gutsche 1.42 common.logger.debug(2,msg)
110 gutsche 1.20 raise CrabException(msg)
111 gutsche 1.1
112 ewv 1.76 def checkCondorVariablePointsToFile(self, name, alternate_name=None):
113 gutsche 1.1 ## check for condor variable
114     cmd = 'condor_config_val '+name
115     cmd_out = runCommand(cmd)
116 ewv 1.76 if alternate_name and not cmd_out:
117     cmd = 'condor_config_val '+alternate_name
118     cmd_out = runCommand(cmd)
119     if cmd_out:
120     cmd_out = string.strip(cmd_out)
121     if not cmd_out or not os.path.isfile(cmd_out) :
122 gutsche 1.20 msg = '[Condor-G Scheduler]: the variable '+name+' is not properly set for the condor installation on this machine.\n'
123 ewv 1.76 msg += '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable '+name+' properly, ' + \
124 gutsche 1.1 'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
125 gutsche 1.42 common.logger.debug(2,msg)
126 gutsche 1.20 raise CrabException(msg)
127 gutsche 1.1
128     def checkCondorVariableIsTrue(self, name):
129     ## check for condor variable
130     cmd = 'condor_config_val '+name
131     cmd_out = runCommand(cmd)
132     if cmd_out == 'TRUE' :
133 gutsche 1.38 msg = '[Condor-G Scheduler]: the variable '+name+' is not set to true for the condor installation on this machine.\n'
134 gutsche 1.20 msg += '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable '+name+' to true,',
135 gutsche 1.1 'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
136 gutsche 1.42 common.logger.debug(2,msg)
137 gutsche 1.20 raise CrabException(msg)
138 gutsche 1.1
139 gutsche 1.25 def queryCondorVariable(self, name, default):
140 gutsche 1.1 ## check for condor variable
141     cmd = 'condor_config_val '+name
142 gutsche 1.25 out = popen2.Popen3(cmd,1)
143     exit_code = out.wait()
144     cmd_out = out.fromchild.readline().strip()
145     if exit_code != 0 :
146     cmd_out = str(default)
147    
148     return cmd_out
149 gutsche 1.1
150     def configure(self, cfg_params):
151 ewv 1.79 SchedulerGrid.configure(self,cfg_params)
152 gutsche 1.1
153 ewv 1.79 # self.cfg_params = cfg_params
154 gutsche 1.38
155 gutsche 1.49 # init BlackWhiteListParser
156 ewv 1.79 # self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
157 gutsche 1.49
158 gutsche 1.18 try:
159 ewv 1.79 self.GLOBUS_RSL = cfg_params['CONDORG.globus_rsl']
160 gutsche 1.30 except KeyError:
161 ewv 1.79 self.GLOBUS_RSL = ''
162 gutsche 1.40
163     # Provide an override for the batchsystem that condor_g specifies as a grid resource.
164 gutsche 1.42 # this is to handle the case where the site supports several batchsystem but bdii
165 gutsche 1.40 # only allows a site to public one.
166 ewv 1.63 try:
167 ewv 1.79 self.batchsystem = cfg_params['CONDORG.batchsystem']
168     msg = '[Condor-G Scheduler]: batchsystem overide specified in your crab.cfg'
169     common.logger.debug(2,msg)
170     except KeyError:
171     self.batchsystem = ''
172 gutsche 1.1
173     # check if one and only one entry is in $CE_WHITELIST
174    
175 ewv 1.79 # redo this with SchedulerGrid SE list
176    
177 gutsche 1.1 try:
178 gutsche 1.32 tmpGood = string.split(cfg_params['EDG.se_white_list'],',')
179 gutsche 1.1 except KeyError:
180 gutsche 1.20 msg = '[Condor-G Scheduler]: destination site is not selected properly.\n'
181 gutsche 1.32 msg += '[Condor-G Scheduler]: Please select your destination site and only your destination site in the SE_white_list variable of the [EDG] section in your crab.cfg.'
182 gutsche 1.42 common.logger.debug(2,msg)
183 gutsche 1.20 raise CrabException(msg)
184 gutsche 1.40
185 gutsche 1.1 if len(tmpGood) != 1 :
186 gutsche 1.20 msg = '[Condor-G Scheduler]: destination site is not selected properly.\n'
187 gutsche 1.32 msg += '[Condor-G Scheduler]: Please select your destination site and only your destination site in the SE_white_list variable of the [EDG] section in your crab.cfg.'
188 gutsche 1.42 common.logger.debug(2,msg)
189 gutsche 1.20 raise CrabException(msg)
190 gutsche 1.1
191     try:
192     self.UseGT4 = cfg_params['USER.use_gt_4'];
193     except KeyError:
194     self.UseGT4 = 0;
195    
196     # added here because checklistmatch is not used
197     self.checkProxy()
198 gutsche 1.6
199 gutsche 1.50 self.datasetPath = ''
200     try:
201     tmp = cfg_params['CMSSW.datasetpath']
202     if string.lower(tmp)=='none':
203     self.datasetPath = None
204     self.selectNoInput = 1
205     else:
206     self.datasetPath = tmp
207     self.selectNoInput = 0
208     except KeyError:
209 ewv 1.58 msg = "Error: datasetpath not defined "
210 gutsche 1.50 raise CrabException(msg)
211 ewv 1.71
212 gutsche 1.1 return
213 gutsche 1.40
214 ewv 1.82 def sched_parameter(self,i,task):
215 gutsche 1.1 """
216     Returns file with scheduler-specific parameters
217     """
218 gutsche 1.29 lastDest=''
219     first = []
220     last = []
221 ewv 1.82
222     print "sched_parameter called with i=",i
223     print " task =",task
224    
225     for n in range(common._db.nJobs()):
226 ewv 1.83 currDest=self.blackWhiteListParser.cleanForBlackWhiteList(eval(task.jobs[i-1]['dlsDestination']))
227     print "currDest =",currDest
228     if (currDest!=lastDest):
229     lastDest = currDest
230     first.append(n)
231     if n != 0:last.append(n-1)
232 ewv 1.82 if len(first)>len(last) :last.append(common._db.nJobs())
233 gutsche 1.40
234 gutsche 1.29 for i in range(len(first)): # Add loop DS
235 ewv 1.83 self.param='sched_param_'+str(i)+'.clad'
236     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
237 gutsche 1.29
238 ewv 1.83 param_file.write('globusrsl = ')
239 gutsche 1.29
240 ewv 1.83 # extraTag maxWallTime
241     if (self.EDG_clock_time):
242     param_file.write('(maxWalltime='+self.EDG_clock_time+')')
243 gutsche 1.40
244 ewv 1.83 # extraTag additional GLOBUS_RSL
245     if ( self.GLOBUS_RSL != '' ) :
246     param_file.write(self.GLOBUS_RSL)
247 gutsche 1.29
248 ewv 1.83 param_file.write(';')
249 gutsche 1.29
250 ewv 1.83 param_file.close()
251 gutsche 1.29
252 gutsche 1.40
253 gutsche 1.1 def wsSetupEnvironment(self):
254     """
255     Returns part of a job script which does scheduler-specific work.
256     """
257     txt = ''
258 mkirn 1.11 txt += '# strip arguments\n'
259     txt += 'echo "strip arguments"\n'
260     txt += 'args=("$@")\n'
261     txt += 'nargs=$#\n'
262     txt += 'shift $nargs\n'
263 gutsche 1.6 txt += "# job number (first parameter for job wrapper)\n"
264 ewv 1.75 txt += "NJob=${args[0]}; export NJob\n"
265 gutsche 1.6
266 fanzago 1.77 txt += '# job identification to DashBoard \n'
267     txt += 'MonitorJobID=${NJob}_'+self.hash+'_$GLOBUS_GRAM_JOB_CONTACT \n'
268     txt += 'SyncGridJobId=$GLOBUS_GRAM_JOB_CONTACT \n'
269     txt += 'MonitorID='+self._taskId+' \n'
270     txt += 'echo "MonitorJobID=$MonitorJobID" > $RUNTIME_AREA/$repo \n'
271     txt += 'echo "SyncGridJobId=$SyncGridJobId" >> $RUNTIME_AREA/$repo \n'
272     txt += 'echo "MonitorID=$MonitorID" >> $RUNTIME_AREA/$repo\n'
273 gutsche 1.6
274 fanzago 1.77 txt += 'echo ">>> GridFlavour discovery: " \n'
275 ewv 1.63 txt += 'if [ $OSG_APP ]; then \n'
276 gutsche 1.6 txt += ' middleware=OSG \n'
277 fanzago 1.77 txt += ' echo "SyncCE=`echo $GLOBUS_GRAM_JOB_CONTACT | cut -d/ -f3 | cut -d: -f1`" >> $RUNTIME_AREA/$repo \n'
278     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
279 fanzago 1.59 txt += ' echo ">>> middleware =$middleware" \n'
280 gutsche 1.18 txt += 'elif [ $VO_CMS_SW_DIR ]; then\n'
281     txt += ' middleware=LCG \n'
282 fanzago 1.77 txt += ' echo "SyncCE=`glite-brokerinfo getCE`" >> $RUNTIME_AREA/$repo \n'
283     txt += ' echo "GridFlavour=$middleware" | tee -a $RUNTIME_AREA/$repo \n'
284 gutsche 1.1 txt += 'else \n'
285 fanzago 1.77 txt += ' echo "ERROR ==> GridFlavour not identified" \n'
286     txt += ' job_exit_code=10030 \n'
287     txt += ' func_exit \n'
288 gutsche 1.1 txt += 'fi\n'
289    
290 gutsche 1.6 txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
291 gutsche 1.1 txt += '\n\n'
292    
293     if int(self.copy_data) == 1:
294     if self.SE:
295     txt += 'export SE='+self.SE+'\n'
296     txt += 'echo "SE = $SE"\n'
297     if self.SE_PATH:
298     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
299     txt += 'export SE_PATH='+self.SE_PATH+'\n'
300     txt += 'echo "SE_PATH = $SE_PATH"\n'
301 gutsche 1.40
302 gutsche 1.1 txt += 'export VO='+self.VO+'\n'
303 mkirn 1.15 txt += 'CE=${args[3]}\n'
304 gutsche 1.1 txt += 'echo "CE = $CE"\n'
305     return txt
306    
307     def wsCopyInput(self):
308     """
309 ewv 1.58 Copy input data from SE to WN
310 gutsche 1.1 """
311     txt = '\n'
312     return txt
313    
314     def loggingInfo(self, id):
315     """
316     retrieve the logging info from logging and bookkeeping and return it
317     """
318 gutsche 1.18 schedd = id.split('//')[0]
319     condor_id = id.split('//')[1]
320     cmd = 'condor_q -l -name ' + schedd + ' ' + condor_id
321     cmd_out = runCommand(cmd)
322 gutsche 1.20 common.logger.debug(5,"Condor-G loggingInfo cmd: "+cmd)
323     common.logger.debug(5,"Condor-G loggingInfo cmd_out: "+cmd_out)
324 gutsche 1.1 return cmd_out
325    
326     def listMatch(self, nj):
327     """
328     Check the compatibility of available resources
329     """
330 slacapra 1.39 #self.checkProxy()
331 gutsche 1.1 return 1
332    
333 slacapra 1.65 def userName(self):
334     """ return the user name """
335     self.checkProxy()
336     return runCommand("voms-proxy-info -identity")
337    
338 ewv 1.73 def getAttribute(self, id, attr):
339     return self.getStatusAttribute_(id, attr)
340 gutsche 1.1
341 slacapra 1.72 def getAttribute(self, id, attr):
342     return self.getStatusAttribute_(id, attr)
343    
344 gutsche 1.1 def getExitStatus(self, id):
345     return self.getStatusAttribute_(id, 'exit_code')
346    
347     def queryStatus(self, id):
348     return self.getStatusAttribute_(id, 'status')
349    
350 ewv 1.58 def queryDest(self, id):
351 gutsche 1.1 return self.getStatusAttribute_(id, 'destination')
352    
353    
354     def getStatusAttribute_(self, id, attr):
355     """ Query a status of the job with id """
356    
357     result = ''
358 gutsche 1.40
359 gutsche 1.1 if ( attr == 'exit_code' ) :
360 gutsche 1.18 jobnum_str = '%06d' % (int(id))
361     # opts = common.work_space.loadSavedOptions()
362 slacapra 1.41 base = string.upper(common.taskDB.dict("jobtype"))
363 gutsche 1.18 log_file = common.work_space.resDir() + base + '_' + jobnum_str + '.stdout'
364     logfile = open(log_file)
365     log_line = logfile.readline()
366     while log_line :
367     log_line = log_line.strip()
368     if log_line.startswith('JOB_EXIT_STATUS') :
369     log_line_split = log_line.split()
370     result = log_line_split[2]
371     pass
372     log_line = logfile.readline()
373     result = ''
374 gutsche 1.1 elif ( attr == 'status' ) :
375 gutsche 1.18 schedd = id.split('//')[0]
376     condor_id = id.split('//')[1]
377     cmd = 'condor_q -name ' + schedd + ' ' + condor_id
378 gutsche 1.1 cmd_out = runCommand(cmd)
379     if cmd_out != None:
380 gutsche 1.18 status_flag = 0
381 gutsche 1.1 for line in cmd_out.splitlines() :
382 gutsche 1.18 if line.strip().startswith(condor_id.strip()) :
383 gutsche 1.1 status = line.strip().split()[5]
384     if ( status == 'I' ):
385     result = 'Scheduled'
386     break
387     elif ( status == 'U' ) :
388     result = 'Ready'
389     break
390     elif ( status == 'H' ) :
391     result = 'Hold'
392     break
393     elif ( status == 'R' ) :
394     result = 'Running'
395     break
396     elif ( status == 'X' ) :
397     result = 'Cancelled'
398     break
399     elif ( status == 'C' ) :
400     result = 'Done'
401     break
402     else :
403     result = 'Done'
404     break
405     else :
406     result = 'Done'
407     else :
408     result = 'Done'
409     elif ( attr == 'destination' ) :
410 gutsche 1.54 seSite = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(int(id)-1))
411 gutsche 1.43 # if no site was selected during job splitting (datasetPath=None)
412     # set to self.cfg_params['EDG.se_white_list']
413 gutsche 1.54 if self.datasetPath == 'None':
414 gutsche 1.43 seSite = self.cfg_params['EDG.se_white_list']
415 gutsche 1.42 oneSite = self.getCEfromSE(seSite).split(':')[0].strip()
416 gutsche 1.18 result = oneSite
417 gutsche 1.1 elif ( attr == 'reason' ) :
418     result = 'status query'
419     elif ( attr == 'stateEnterTime' ) :
420     result = time.asctime(time.gmtime())
421     return result
422    
423     def queryDetailedStatus(self, id):
424     """ Query a detailed status of the job with id """
425     user = os.environ['USER']
426     cmd = 'condor_q -submitter ' + user
427     cmd_out = runCommand(cmd)
428     return cmd_out
429    
430 ewv 1.81 def ce_list(self):
431     """
432     Returns string with requirement CE related, dummy for now
433     """
434     req = ''
435    
436     return req,self.EDG_ce_white_list,self.EDG_ce_black_list
437    
438 gutsche 1.18 def createXMLSchScript(self, nj, argsList):
439     """
440     Create a XML-file for BOSS4.
441     """
442    
443     # job steering
444     index = nj - 1
445     job = common.job_list[index]
446     jbt = job.type()
447    
448     # input and output sandboxes
449     inp_sandbox = jbt.inputSandbox(index)
450    
451     # title
452     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
453     jt_string = ''
454 gutsche 1.40
455 gutsche 1.18 xml_fname = str(self.jobtypeName)+'.xml'
456     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
457    
458 ewv 1.58 # TaskName
459 gutsche 1.18 dir = string.split(common.work_space.topDir(), '/')
460     taskName = dir[len(dir)-2]
461 gutsche 1.40
462 gutsche 1.18 xml.write(str(title))
463 spiga 1.61
464     #First check the X509_USER_PROXY. In not there use the default
465 mcinquil 1.62 try:
466     x509=os.environ['X509_USER_PROXY']
467     except Exception, ex:
468     import traceback
469     common.logger.debug( 6, str(ex) )
470     common.logger.debug( 6, traceback.format_exc() )
471 spiga 1.61 x509_cmd = 'ls /tmp/x509up_u`id -u`'
472     x509=runCommand(x509_cmd).strip()
473 fanzago 1.57 xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + ' task_info="' + str(x509) + '">\n')
474 gutsche 1.18 xml.write(jt_string)
475    
476     xml.write('<iterator>\n')
477     xml.write('\t<iteratorRule name="ITR1">\n')
478     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
479     xml.write('\t</iteratorRule>\n')
480     xml.write('\t<iteratorRule name="ITR2">\n')
481     for arg in argsList:
482     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
483     pass
484     xml.write('\t</iteratorRule>\n')
485     xml.write('\t<iteratorRule name="ITR3">\n')
486     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
487     xml.write('\t</iteratorRule>\n')
488    
489 ewv 1.58 xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.schedulerName)+'">\n')
490 spiga 1.44 # xmliwrite('<chain scheduler="'+str(self.schedulerName)+'">\n')
491 gutsche 1.18 xml.write(jt_string)
492    
493    
494     #executable
495    
496     script = job.scriptFilename()
497     xml.write('<program>\n')
498     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
499     xml.write(jt_string)
500 gutsche 1.40
501 gutsche 1.18 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
502     xml.write('<program_types> crabjob </program_types>\n')
503    
504     # input sanbox
505     inp_box = script + ','
506    
507     if inp_sandbox != None:
508     for fl in inp_sandbox:
509     inp_box = inp_box + '' + fl + ','
510     pass
511     pass
512    
513     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
514     inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
515     xml.write(inp_box)
516 gutsche 1.40
517 gutsche 1.18 # stdout and stderr
518     base = jbt.name()
519     stdout = base + '__ITR3_.stdout'
520     stderr = base + '__ITR3_.stderr'
521    
522     xml.write('<stderr> ' + stderr + '</stderr>\n')
523     xml.write('<stdout> ' + stdout + '</stdout>\n')
524 gutsche 1.40
525 gutsche 1.18 # output sanbox
526 ewv 1.58 out_box = stdout + ',' + stderr + ','
527 gutsche 1.18
528 slacapra 1.36 # Stuff to be returned _always_ via sandbox
529     for fl in jbt.output_file_sandbox:
530     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
531     pass
532     pass
533    
534 gutsche 1.18 if int(self.return_data) == 1:
535     for fl in jbt.output_file:
536     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
537     pass
538     pass
539    
540     if out_box[-1] == ',' : out_box = out_box[:-1]
541     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
542     xml.write(out_box)
543 gutsche 1.40
544 gutsche 1.18 xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
545    
546     xml.write('</program>\n')
547    
548     # start writing of extraTags
549     to_write = ''
550    
551     # extraTag universe
552     to_write += 'universe = "&quot;globus&quot;"\n'
553    
554     # extraTag globusscheduler
555    
556 gutsche 1.42 # use bdii to query ce including jobmanager from site
557 gutsche 1.53 # use first job with non-empty
558     seSite = ''
559     for i in range(nj) :
560 gutsche 1.54 seSite = self.blackWhiteListParser.cleanForBlackWhiteList(common.jobDB.destination(i-1))
561 gutsche 1.53 if seSite != '' :
562     break;
563 gutsche 1.43 # if no site was selected during job splitting (datasetPath=None)
564     # set to self.cfg_params['EDG.se_white_list']
565     if seSite == '' :
566 gutsche 1.50 if self.datasetPath == None :
567     seSite = self.cfg_params['EDG.se_white_list']
568     else :
569 gutsche 1.53 msg = '[Condor-G Scheduler]: Jobs cannot be submitted to site ' + self.cfg_params['EDG.se_white_list'] + ' because the dataset ' + self.datasetPath + ' is not available at this site.\n'
570 ewv 1.58 common.logger.debug(2,msg)
571     raise CrabException(msg)
572    
573 gutsche 1.38 oneSite = self.getCEfromSE(seSite)
574 ewv 1.80
575 gutsche 1.48 # query if site is in production
576     status = cestate_from_ce_bdii(oneSite.split(':')[0].strip())
577     if status != 'Production' :
578     msg = '[Condor-G Scheduler]: Jobs cannot be submitted to site ' + oneSite.split(':')[0].strip() + ' because the site has status ' + status + ' and is currently not operational.\n'
579     msg += '[Condor-G Scheduler]: Please choose another site for your jobs.'
580     common.logger.debug(2,msg)
581     raise CrabException(msg)
582 gutsche 1.40
583 gutsche 1.42 if self.batchsystem != '' :
584     oneSite = oneSite.split('/')[0].strip() + '/' + self.batchsystem
585 ewv 1.58
586 gutsche 1.42 to_write += 'globusscheduler = "&quot;' + str(oneSite) + '&quot;"\n'
587 gutsche 1.18
588     # extraTag condor transfer file flag
589     to_write += 'should_transfer_files = "&quot;YES&quot;"\n'
590    
591     # extraTag when to write output
592     to_write += 'when_to_transfer_output = "&quot;ON_EXIT&quot;"\n'
593    
594     # extraTag switch off streaming of stdout
595     to_write += 'stream_output = "&quot;false&quot;"\n'
596    
597     # extraTag switch off streaming of stderr
598     to_write += 'stream_error = "&quot;false&quot;"\n'
599    
600     # extraTag condor logfile
601     condor_log = jbt.name() + '__ITR3_.log'
602     to_write += 'Log = "&quot;' + condor_log + '&quot;"\n'
603    
604     # extraTag condor notification
605     to_write += 'notification="&quot;never&quot;"\n'
606    
607     # extraTag condor queue statement
608     to_write += 'QUEUE = "&quot;1&quot;"\n'
609    
610     if (to_write != ''):
611     xml.write('<extraTags\n')
612     xml.write(to_write)
613     xml.write('/>\n')
614     pass
615    
616     xml.write('</chain>\n')
617    
618     xml.write('</iterator>\n')
619     xml.write('</task>\n')
620    
621     xml.close()
622 gutsche 1.29
623 gutsche 1.18 return
624    
625 gutsche 1.1 def checkProxy(self):
626     """
627     Function to check the Globus proxy.
628     """
629     if (self.proxyValid): return
630 slacapra 1.39 #timeleft = -999
631 gutsche 1.18 minTimeLeft=10*3600 # in seconds
632    
633     mustRenew = 0
634     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
635     timeLeftServer = -999
636     if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
637     mustRenew = 1
638     else:
639     timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
640     if not timeLeftServer or not isInt(timeLeftServer):
641     mustRenew = 1
642     elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
643     mustRenew = 1
644     pass
645     pass
646    
647     if mustRenew:
648 gutsche 1.30 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
649     cmd = 'voms-proxy-init -voms '+self.VO
650     if self.group:
651     cmd += ':/'+self.VO+'/'+self.group
652 gutsche 1.18 if self.role:
653 gutsche 1.30 cmd += '/role='+self.role
654     cmd += ' -valid 192:00'
655 gutsche 1.1 try:
656     # SL as above: damn it!
657     out = os.system(cmd)
658     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
659     except:
660     msg = "Unable to create a valid proxy!\n"
661     raise CrabException(msg)
662     pass
663 gutsche 1.18
664 gutsche 1.1 self.proxyValid=1
665     return