ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerCondor_g.py
Revision: 1.13
Committed: Mon Jul 31 14:04:13 2006 UTC (18 years, 9 months ago) by mkirn
Content type: text/x-python
Branch: MAIN
Changes since 1.12: +7 -2 lines
Log Message:
lcg-cp verbose for debug level >= 5

File Contents

# User Rev Content
1 gutsche 1.1 from Scheduler import Scheduler
2     from JobList import JobList
3     from crab_logger import Logger
4     from crab_exceptions import *
5     from crab_util import *
6     from GridCatService import GridCatHostService
7     import time
8     import common
9    
10     import os, sys, time
11    
12     class SchedulerCondor_g(Scheduler):
13     def __init__(self):
14     Scheduler.__init__(self,"CONDOR_G")
15     self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
16     "children_hist","children_num","children_states","condorId","condor_jdl", \
17     "cpuTime","destination", "done_code","exit_code","expectFrom", \
18     "expectUpdate","globusId","jdl","jobId","jobtype", \
19     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
20     "owner","parent_job", "reason","resubmitted","rsl","seed",\
21     "stateEnterTime","stateEnterTimes","subjob_failed", \
22     "user tags" , "status" , "status_code","hierarchy"]
23    
24     # check for locally running condor scheduler
25     cmd = 'ps xau | grep -i condor_schedd | grep -v grep'
26     cmd_out = runCommand(cmd)
27     if cmd_out == None:
28     print '[Condor-G Scheduler]: condor_schedd is not running on this machine.'
29     print '[Condor-G Scheduler]: Please use another machine with installed condor and running condor_schedd or change the Scheduler in your crab.cfg.'
30     sys.exit(1)
31    
32     self.checkExecutableInPath('condor_q')
33     self.checkExecutableInPath('condor_submit')
34     self.checkExecutableInPath('condor_version')
35    
36     # get version number
37     cmd = 'condor_version'
38     cmd_out = runCommand(cmd)
39     if cmd != None :
40     tmp = cmd_out.find('CondorVersion') + 15
41     version = cmd_out[tmp:tmp+6].split('.')
42     version_master = int(version[0])
43     version_major = int(version[1])
44     version_minor = int(version[2])
45     else :
46     print '[Condor-G Scheduler]: condor_version was not able to determine the installed condor version.'
47     print '[Condor-G Scheduler]: Please use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
48     sys.exit(1)
49    
50     self.checkExecutableInPath('condor_config_val')
51    
52     self.checkCondorVariablePointsToFile('GRIDMANAGER')
53    
54     if version_master >= 6 and version_major >= 7 and version_minor >= 11 :
55     self.checkCondorVariablePointsToFile('GT2_GAHP')
56     else :
57     self.checkCondorVariablePointsToFile('GAHP')
58    
59     self.checkCondorVariablePointsToFile('GRID_MONITOR')
60    
61     self.checkCondorVariableIsTrue('ENABLE_GRID_MONITOR')
62    
63     max_submit = self.queryCondorVariable('GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE')
64     max_pending = self.queryCondorVariable('GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE')
65    
66     print '[Condor-G Scheduler]'
67     print 'Maximal number of jobs submitted to the grid : GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE = ',max_submit
68     print 'Maximal number of parallel submits to the grid : GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE = ',max_pending
69     print 'Ask the administrator of your local condor installation to increase these variables to enable more jobs to be executed on the grid in parallel.\n'
70    
71     return
72    
73     def checkExecutableInPath(self, name):
74     # check if executable is in PATH
75     cmd = 'which '+name
76     cmd_out = runCommand(cmd)
77     if cmd_out == None:
78     print '[Condor-G Scheduler]: ',name,' is not in the $PATH on this machine.'
79     print '[Condor-G Scheduler]: Please use another machine with installed condor or change the Scheduler in your crab.cfg.'
80     sys.exit(1)
81    
82     def checkCondorVariablePointsToFile(self, name):
83     ## check for condor variable
84     cmd = 'condor_config_val '+name
85     cmd_out = runCommand(cmd)
86     if os.path.isfile(cmd_out) > 0 :
87     print '[Condor-G Scheduler]: the variable ',name,' is not properly set for the condor installation on this machine.'
88     print '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable ',name,' properly,',
89     'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
90     sys.exit(1)
91    
92     def checkCondorVariableIsTrue(self, name):
93     ## check for condor variable
94     cmd = 'condor_config_val '+name
95     cmd_out = runCommand(cmd)
96     if cmd_out == 'TRUE' :
97     print '[Condor-G Scheduler]: the variable ',name,' is not set to true for the condor installation on this machine.'
98     print '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable ',name,' to true,',
99     'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
100     sys.exit(1)
101    
102     def queryCondorVariable(self, name):
103     ## check for condor variable
104     cmd = 'condor_config_val '+name
105     return runCommand(cmd)
106    
107     def configure(self, cfg_params):
108    
109     self.copy_input_data = 0
110    
111     try: self.return_data = cfg_params['USER.return_data']
112     except KeyError: self.return_data = 1
113    
114     try:
115     self.copy_data = cfg_params["USER.copy_data"]
116     if int(self.copy_data) == 1:
117     try:
118     self.SE = cfg_params['USER.storage_element']
119     self.SE_PATH = cfg_params['USER.storage_path']
120     except KeyError:
121     msg = "Error. The [USER] section does not have 'storage_element'"
122     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
123     common.logger.message(msg)
124     raise CrabException(msg)
125     except KeyError: self.copy_data = 0
126    
127     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
128     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
129     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
130     raise CrabException(msg)
131    
132     try:
133     self.lfc_host = cfg_params['EDG.lfc_host']
134     except KeyError:
135     msg = "Error. The [EDG] section does not have 'lfc_host' value"
136     msg = msg + " it's necessary to know the LFC host name"
137     common.logger.message(msg)
138     raise CrabException(msg)
139     try:
140     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
141     except KeyError:
142     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
143     msg = msg + " it's necessary to know the catalog type"
144     common.logger.message(msg)
145     raise CrabException(msg)
146     try:
147     self.lfc_home = cfg_params['EDG.lfc_home']
148     except KeyError:
149     msg = "Error. The [EDG] section does not have 'lfc_home' value"
150     msg = msg + " it's necessary to know the home catalog dir"
151     common.logger.message(msg)
152     raise CrabException(msg)
153    
154     try: self.VO = cfg_params['EDG.virtual_organization']
155     except KeyError: self.VO = 'cms'
156    
157     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
158     except KeyError: self.EDG_clock_time= ''
159    
160     self.register_data = 0
161    
162     # check if one and only one entry is in $CE_WHITELIST
163    
164     try:
165     tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
166     except KeyError:
167     print '[Condor-G Scheduler]: destination site is not selected properly.'
168     print '[Condor-G Scheduler]: Please select your destination site and only your destination site in the CE_white_list variable of the [EDG] section in your crab.cfg.'
169     sys.exit(1)
170    
171     if len(tmpGood) != 1 :
172     print '[Condor-G Scheduler]: destination site is not selected properly.'
173     print '[Condor-G Scheduler]: Please select your destination site and only your destination site in the CE_white_list variable of the [EDG] section in your crab.cfg.'
174     sys.exit(1)
175    
176     # activate Boss per default
177     try:
178     self.UseBoss = cfg_params['CRAB.use_boss'];
179     except KeyError:
180     self.UseBoss = '1';
181    
182     try:
183     self.UseGT4 = cfg_params['USER.use_gt_4'];
184     except KeyError:
185     self.UseGT4 = 0;
186    
187     self.proxyValid=0
188     # added here because checklistmatch is not used
189     self.checkProxy()
190 gutsche 1.6
191     self._taskId = cfg_params['taskId']
192    
193 gutsche 1.1 return
194    
195    
196     def sched_parameter(self):
197     """
198     Returns file with scheduler-specific parameters
199     """
200     return 0
201    
202     def wsSetupEnvironment(self):
203     """
204     Returns part of a job script which does scheduler-specific work.
205     """
206     txt = ''
207 gutsche 1.6
208     txt = ''
209 mkirn 1.11 txt += '# strip arguments\n'
210     txt += 'echo "strip arguments"\n'
211     txt += 'args=("$@")\n'
212     txt += 'nargs=$#\n'
213     txt += 'shift $nargs\n'
214 gutsche 1.6 txt += "# job number (first parameter for job wrapper)\n"
215 mkirn 1.11 #txt += "NJob=$1\n"
216     txt += "NJob=${args[0]}\n"
217 gutsche 1.6
218     # create hash of cfg file
219     hash = makeCksum(common.work_space.cfgFileName())
220 gutsche 1.7 txt += 'MonitorJobID=`echo ${NJob}_'+hash+'_$GLOBUS_GRAM_JOB_CONTACT`\n'
221     txt += 'SyncGridJobId=`echo $GLOBUS_GRAM_JOB_CONTACT`\n'
222     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
223     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
224     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
225     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
226 gutsche 1.6
227 fanzago 1.2 txt += 'echo "middleware discovery " \n'
228 gutsche 1.1 txt += 'if [ $VO_CMS_SW_DIR ]; then\n'
229 gutsche 1.6 txt += ' middleware=LCG \n'
230     txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
231     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
232     txt += ' echo "middleware =$middleware" \n'
233 gutsche 1.1 txt += 'elif [ $GRID3_APP_DIR ]; then\n'
234 gutsche 1.6 txt += ' middleware=OSG \n'
235     txt += ' echo "SyncCE=`echo $hostname`" | tee -a $RUNTIME_AREA/$repo \n'
236     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
237     txt += ' echo "middleware =$middleware" \n'
238 gutsche 1.1 txt += 'elif [ $OSG_APP ]; then \n'
239 gutsche 1.6 txt += ' middleware=OSG \n'
240     txt += ' echo "SyncCE=`echo $hostname`" | tee -a $RUNTIME_AREA/$repo \n'
241     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
242     txt += ' echo "middleware =$middleware" \n'
243 gutsche 1.1 txt += 'else \n'
244 gutsche 1.6 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
245     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
246     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
247     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
248 gutsche 1.7 txt += ' rm -f $RUNTIME_AREA/$repo \n'
249     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
250     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
251 gutsche 1.6 txt += ' exit 1 \n'
252 gutsche 1.1 txt += 'fi\n'
253    
254 gutsche 1.6 txt += '# report first time to DashBoard \n'
255     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
256 gutsche 1.7 txt += 'rm -f $RUNTIME_AREA/$repo \n'
257     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
258     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
259 gutsche 1.6
260 gutsche 1.1 txt += '\n\n'
261    
262     if int(self.copy_data) == 1:
263     if self.SE:
264     txt += 'export SE='+self.SE+'\n'
265     txt += 'echo "SE = $SE"\n'
266     if self.SE_PATH:
267     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
268     txt += 'export SE_PATH='+self.SE_PATH+'\n'
269     txt += 'echo "SE_PATH = $SE_PATH"\n'
270    
271     txt += 'export VO='+self.VO+'\n'
272 mkirn 1.12 # txt += 'CE=$4\n'
273     txt += 'CE=${args[3]}
274 gutsche 1.1 txt += 'echo "CE = $CE"\n'
275     return txt
276    
277     def wsCopyInput(self):
278     """
279     Copy input data from SE to WN
280     """
281     txt = '\n'
282     return txt
283    
284     def wsCopyOutput(self):
285     """
286     Write a CopyResults part of a job script, e.g.
287     to copy produced output into a storage element.
288     """
289     txt = ''
290     if int(self.copy_data) == 1:
291     txt += '#\n'
292     txt += '# Copy output to SE = $SE\n'
293     txt += '#\n'
294 gutsche 1.9 #txt += 'if [ $exe_result -eq 0 ]; then\n'
295     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
296     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
297     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
298     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
299     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
300 gutsche 1.1 txt += ' for out_file in $file_list ; do\n'
301 gutsche 1.9 txt += ' echo "Trying to copy output file to $SE using lcg-cp"\n'
302 mkirn 1.13 if common.logger.debugLevel() >= 5:
303     txt += ' echo "lcg-cp --vo $VO --verbose -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
304     txt += ' exitstring=`lcg-cp --vo $VO --verbose -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
305     else:
306     txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
307     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
308 gutsche 1.1 txt += ' copy_exit_status=$?\n'
309 gutsche 1.9 txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
310 gutsche 1.1 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
311     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
312 gutsche 1.9 txt += ' echo "Possible problem with SE = $SE"\n'
313 gutsche 1.1 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
314     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
315 mkirn 1.13 txt += ' echo "lcg-cp failed. For verbose lcg-cp output, use command line option -debug 5."\n'
316 gutsche 1.9 txt += ' echo "lcg-cp failed, attempting srmcp"\n'
317     txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
318     txt += ' mkdir -p $HOME/.srmconfig\n'
319 mkirn 1.10 txt += ' echo "srmcp -retry_num 5 -retry_timeout 480000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
320     txt += ' exitstring=`srmcp -retry_num 5 -retry_timeout 480000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
321 gutsche 1.9 txt += ' copy_exit_status=$?\n'
322     txt += ' echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
323     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
324     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
325     txt += ' echo "Problems with SE = $SE"\n'
326     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
327     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
328     txt += ' echo "lcg-cp and srmcp failed"\n'
329     txt += ' echo "If storage_path in your config file contains a ? you may need a \? instead."\n'
330     txt += ' else\n'
331     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
332     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
333     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
334     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
335     txt += ' echo "srmcp succeeded"\n'
336     txt += ' fi\n'
337 gutsche 1.1 txt += ' else\n'
338     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
339     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
340     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
341     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
342 gutsche 1.9 txt += ' echo "lcg-cp succeeded"\n'
343 gutsche 1.1 txt += ' fi\n'
344     txt += ' done\n'
345 gutsche 1.9 #txt += 'fi\n'
346 gutsche 1.1
347     return txt
348    
349     def wsRegisterOutput(self):
350     """
351     Returns part of a job script which does scheduler-specific work.
352     """
353    
354     txt = ''
355     return txt
356    
357     def loggingInfo(self, id):
358     """
359     retrieve the logging info from logging and bookkeeping and return it
360     """
361     self.checkProxy()
362     cmd = 'condor_q -l -analyze ' + id
363     cmd_out = os.popen(cmd)
364     return cmd_out
365    
366     def listMatch(self, nj):
367     """
368     Check the compatibility of available resources
369     """
370     self.checkProxy()
371     return 1
372    
373     def submit(self, nj):
374     """
375     Submit one OSG job.
376     """
377     self.checkProxy()
378    
379     jid = None
380     jdl = common.job_list[nj].jdlFilename()
381    
382     cmd = 'condor_submit ' + jdl
383     cmd_out = runCommand(cmd)
384     if cmd_out != None:
385     tmp = cmd_out.find('submitted to cluster') + 21
386     jid = cmd_out[tmp:].replace('.','')
387     jid = jid.replace('\n','')
388     pass
389     return jid
390    
391     def resubmit(self, nj_list):
392     """
393     Prepare jobs to be submit
394     """
395     return
396    
397     def getExitStatus(self, id):
398     return self.getStatusAttribute_(id, 'exit_code')
399    
400     def queryStatus(self, id):
401     return self.getStatusAttribute_(id, 'status')
402    
403     def queryDest(self, id):
404     return self.getStatusAttribute_(id, 'destination')
405    
406    
407     def getStatusAttribute_(self, id, attr):
408     """ Query a status of the job with id """
409    
410     self.checkProxy()
411     result = ''
412    
413     if ( attr == 'exit_code' ) :
414     for i in range(common.jobDB.nJobs()) :
415     if ( id == common.jobDB.jobId(i) ) :
416     jobnum_str = '%06d' % (int(i)+1)
417     opts = common.work_space.loadSavedOptions()
418     base = string.upper(opts['-jobtype'])
419     log_file = common.work_space.resDir() + base + '_' + jobnum_str + '.stdout'
420     logfile = open(log_file)
421     log_line = logfile.readline()
422     while log_line :
423     log_line = log_line.strip()
424     if log_line.startswith('JOB_EXIT_STATUS') :
425     log_line_split = log_line.split()
426     result = log_line_split[2]
427     pass
428     log_line = logfile.readline()
429     elif ( attr == 'status' ) :
430     user = os.environ['USER']
431     cmd = 'condor_q -submitter ' + user
432     cmd_out = runCommand(cmd)
433     if cmd_out != None:
434     for line in cmd_out.splitlines() :
435     if line.strip().startswith(id.strip()) :
436     status = line.strip().split()[5]
437     if ( status == 'I' ):
438     result = 'Scheduled'
439     break
440     elif ( status == 'U' ) :
441     result = 'Ready'
442     break
443     elif ( status == 'H' ) :
444     result = 'Hold'
445     break
446     elif ( status == 'R' ) :
447     result = 'Running'
448     break
449     elif ( status == 'X' ) :
450     result = 'Cancelled'
451     break
452     elif ( status == 'C' ) :
453     result = 'Done'
454     break
455     else :
456     result = 'Done'
457     break
458     else :
459     result = 'Done'
460     else :
461     result = 'Done'
462     elif ( attr == 'destination' ) :
463     for i in range(common.jobDB.nJobs()) :
464     if ( id == common.jobDB.jobId(i) ) :
465     jobnum_str = '%06d' % (int(i)+1)
466     opts = common.work_space.loadSavedOptions()
467     base = string.upper(opts['-jobtype'])
468     log_file = common.work_space.resDir() + base + '_' + jobnum_str + '.stdout'
469     logfile = open(log_file)
470     log_line = logfile.readline()
471     while log_line :
472     log_line = log_line.strip()
473     if log_line.startswith('GridJobId') :
474     log_line_split = log_line.split()
475     result = os.path.split(log_line_split[2])[0]
476     pass
477     log_line = logfile.readline()
478     elif ( attr == 'reason' ) :
479     result = 'status query'
480     elif ( attr == 'stateEnterTime' ) :
481     result = time.asctime(time.gmtime())
482     return result
483    
484     def queryDetailedStatus(self, id):
485     """ Query a detailed status of the job with id """
486     user = os.environ['USER']
487     cmd = 'condor_q -submitter ' + user
488     cmd_out = runCommand(cmd)
489     return cmd_out
490    
491     def getOutput(self, id):
492     """
493     Get output for a finished job with id.
494     Returns the name of directory with results.
495     not needed for condor-g
496     """
497     self.checkProxy()
498     return ''
499    
500     def cancel(self, id):
501     """ Cancel the condor job with id """
502     self.checkProxy()
503     # query for schedd
504     user = os.environ['USER']
505     cmd = 'condor_q -submitter ' + user
506     cmd_out = runCommand(cmd)
507     schedd=''
508     if cmd_out != None:
509     for line in cmd_out.splitlines() :
510     if line.strip().startswith('--') :
511     schedd = line.strip().split()[6]
512     if line.strip().startswith(id.strip()) :
513     status = line.strip().split()[5]
514     break
515     cmd = 'condor_rm -name ' + schedd + ' ' + id
516     cmd_out = runCommand(cmd)
517     return cmd_out
518    
519     def createSchScript(self, nj):
520     """
521     Create a JDL-file for condor
522     """
523    
524     job = common.job_list[nj]
525     jbt = job.type()
526     inp_sandbox = jbt.inputSandbox(nj)
527     out_sandbox = jbt.outputSandbox(nj)
528    
529     # write EDG style JDL if UseBoss == 1
530    
531     if self.UseBoss == '1' :
532    
533     title = '# This JDL was generated by '+\
534     common.prog_name+' (version '+common.prog_version_str+')\n'
535    
536     jdl_fname = job.jdlFilename()
537     jdl = open(jdl_fname, 'w')
538     jdl.write(title)
539    
540     jdl.write('universe = "globus";\n')
541    
542     # use gridcat to query site
543     gridcat_service_url = "http://osg-cat.grid.iu.edu/services.php"
544     hostSvc = ''
545     try:
546     hostSvc = GridCatHostService(gridcat_service_url,common.analisys_common_info['sites'][0])
547     except StandardError, ex:
548     gridcat_service_url = "http://osg-itb.ivdgl.org/gridcat/services.php"
549     try:
550     hostSvc = GridCatHostService(gridcat_service_url,common.analisys_common_info['sites'][0])
551     except StandardError, ex:
552     print '[Condor-G Scheduler]: selected site: ',common.analisys_common_info['sites'][0],' is not an OSG site!\n'
553     print '[Condor-G Scheduler]: Direct Condor-G submission to LCG sites is not possible!\n'
554     sys.exit(1)
555    
556     batchsystem = hostSvc.batchSystem()
557     if batchsystem <> '' : batchsystem='-'+batchsystem
558     jdl_globusscheduler = 'globusscheduler = "' + common.analisys_common_info['sites'][0] + '/jobmanager' + batchsystem + '";\n'
559     jdl.write(jdl_globusscheduler)
560     if ( self.EDG_clock_time != '' ) :
561     jdl.write('globusrsl = "(maxWalltime='+self.EDG_clock_time+')";\n')
562    
563     jdl.write('ENABLE_GRID_MONITOR = "TRUE";\n')
564    
565     script = job.scriptFilename()
566     jdl.write('Executable = "' + os.path.basename(script) + '";\n')
567    
568     jdl.write('should_transfer_files = "YES";\n')
569     jdl.write('when_to_transfer_output = "ON_EXIT";\n')
570    
571     inp_box = 'InputSandbox = { '
572     inp_box = inp_box + '"' + script + '" ,'
573    
574     if inp_sandbox != None:
575     for fl in inp_sandbox:
576     inp_box = inp_box + '"' + fl + '" ,'
577     pass
578     pass
579    
580     for addFile in jbt.additional_inbox_files:
581     addFile = os.path.abspath(addFile)
582     inp_box = inp_box + '"' + addFile + '" ,'
583     pass
584    
585     inp_box = inp_box+' "' + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + '", "' +\
586 gutsche 1.3 os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + '", "'+\
587 gutsche 1.1 os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + '", "'+\
588     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + '", "'+\
589     os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + '"'
590    
591     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
592     inp_box = inp_box + ' };\n'
593     jdl.write(inp_box)
594    
595     out_box = 'OutputSandbox = { '
596     out_box = out_box + '"' + job.stdout() + '" ,'
597     out_box = out_box + '"' + job.stderr() + '" ,'
598    
599     if int(self.return_data) == 1:
600     if out_sandbox != None:
601     for fl in out_sandbox:
602     out_box = out_box + '"' + fl + '" ,'
603     pass
604     pass
605     pass
606    
607     if out_box[-1] == ',' : out_box = out_box[:-1]
608     out_box = out_box + ' };\n'
609     jdl.write(out_box)
610    
611 gutsche 1.4 #firstEvent = common.jobDB.firstEvent(nj)
612     #maxEvents = common.jobDB.maxEvents(nj)
613     jdl.write('Arguments = "' + str(nj+1)+' '+jbt.getJobTypeArguments(nj, "CONDOR") + '";\n')
614 gutsche 1.1
615     jdl.write('StdOutput = "' + job.stdout() + '";\n')
616     jdl.write('stream_output = "false";\n')
617     jdl.write('StdError = "' + job.stderr() + '";\n')
618     jdl.write('stream_error = "false";\n')
619     logentry = 'Log = "' + job.stderr() + '";\n'
620     logentry = logentry.replace('stderr','log')
621     jdl.write(logentry)
622     jdl.write('notification="never";\n')
623     jdl.write('QUEUE = "1";\n')
624     jdl.close()
625    
626     else :
627    
628     title = '# This JDL was generated by '+\
629     common.prog_name+' (version '+common.prog_version_str+')\n'
630    
631     jdl_fname = job.jdlFilename()
632     jdl = open(jdl_fname, 'w')
633     jdl.write(title)
634    
635    
636     # use gridcat to query site
637     gridcat_service_url = "http://osg-cat.grid.iu.edu/services.php"
638     hostSvc = ''
639     try:
640     hostSvc = GridCatHostService(gridcat_service_url,common.analisys_common_info['sites'][0])
641     except StandardError, ex:
642     gridcat_service_url = "http://osg-itb.ivdgl.org/gridcat/services.php"
643     try:
644     hostSvc = GridCatHostService(gridcat_service_url,common.analisys_common_info['sites'][0])
645     except StandardError, ex:
646     print '[Condor-G Scheduler]: selected site: ',common.analisys_common_info['sites'][0],' is not an OSG site!\n'
647     print '[Condor-G Scheduler]: Direct Condor-G submission to LCG sites is not possible!\n'
648     sys.exit(1)
649    
650     batchsystem = hostSvc.batchSystem()
651    
652     if self.UseGT4 == '1' :
653    
654     jdl.write('universe = grid\n')
655     jdl.write('grid_type = gt4\n')
656     jdl_globusscheduler = 'globusscheduler = ' + common.analisys_common_info['sites'][0] + '\n'
657     jdl.write(jdl_globusscheduler)
658     jdl_jobmanager = 'jobmanager_type = ' + batchsystem + '\n'
659     jdl.write(jdl_jobmanager)
660     if ( self.EDG_clock_time != '' ) :
661     jdl.write('globusrsl = (maxWalltime='+self.EDG_clock_time+')\n')
662    
663     else :
664    
665     if batchsystem <> '' : batchsystem='-'+batchsystem
666     jdl.write('universe = globus\n')
667     jdl_globusscheduler = 'globusscheduler = ' + common.analisys_common_info['sites'][0] + '/jobmanager' + batchsystem + '\n'
668     jdl.write(jdl_globusscheduler)
669     if ( self.EDG_clock_time != '' ) :
670     jdl.write('globusrsl = (maxWalltime='+self.EDG_clock_time+')\n')
671    
672     jdl.write('ENABLE_GRID_MONITOR = TRUE\n')
673    
674     script = job.scriptFilename()
675     jdl.write('Executable = ' + common.work_space.jobDir() + '/' + os.path.basename(script) + '\n')
676    
677     jdl.write('should_transfer_files = YES\n')
678     jdl.write('when_to_transfer_output = ON_EXIT\n')
679    
680     inp_box = 'transfer_input_files ='
681     inp_box = inp_box + script + ','
682    
683     if inp_sandbox != None:
684     for fl in inp_sandbox:
685     inp_box = inp_box + fl + ','
686     pass
687     pass
688    
689     for addFile in jbt.additional_inbox_files:
690     addFile = os.path.abspath(addFile)
691     inp_box = inp_box + addFile + ','
692     pass
693    
694     inp_box = inp_box+ os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
695     os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
696     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
697     os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
698    
699     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
700     inp_box = inp_box + '\n'
701     jdl.write(inp_box)
702    
703     out_box = 'transfer_output_files = '
704    
705     if int(self.return_data) == 1:
706     if out_sandbox != None:
707     for fl in out_sandbox:
708     out_box = out_box + common.work_space.resDir() + '/' + fl + ','
709     pass
710     pass
711     pass
712    
713     if out_box[-1] == ',' : out_box = out_box[:-1]
714     out_box = out_box + '\n'
715     jdl.write(out_box)
716    
717     firstEvent = common.jobDB.firstEvent(nj)
718     maxEvents = common.jobDB.maxEvents(nj)
719     jdl.write('Arguments = ' + str(nj+1)+' '+str(firstEvent)+' '+str(maxEvents)+' '+common.analisys_common_info['sites'][0]+'\n')
720     # arguments for Boss
721     jdl.write('Output = ' + common.work_space.resDir() + '/' + job.stdout() + '\n')
722     jdl.write('stream_output = false\n')
723     jdl.write('Error = ' + common.work_space.resDir() + '/' + job.stderr() + '\n')
724     jdl.write('stream_error = false\n')
725     logentry = 'Log = ' + common.work_space.resDir() + '/' + job.stderr() + '\n'
726     logentry = logentry.replace('stderr','log')
727     jdl.write(logentry)
728     jdl.write('notification=never\n')
729     jdl.write('QUEUE 1\n')
730     jdl.close()
731     return
732    
733     def checkProxy(self):
734     """
735     Function to check the Globus proxy.
736     """
737     if (self.proxyValid): return
738     timeleft = -999
739     minTimeLeft=10 # in hours
740     cmd = 'voms-proxy-info -exists -valid '+str(minTimeLeft)+':00'
741     # SL Here I have to use os.system since the stupid command exit with >0 if no valid proxy is found
742     cmd_out = os.system(cmd)
743     if (cmd_out>0):
744 gutsche 1.5 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
745     cmd = 'voms-proxy-init -voms cms -valid 96:00'
746 gutsche 1.1 try:
747     # SL as above: damn it!
748     out = os.system(cmd)
749     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
750     except:
751     msg = "Unable to create a valid proxy!\n"
752     raise CrabException(msg)
753     # cmd = 'grid-proxy-info -timeleft'
754     # cmd_out = runCommand(cmd,0,20)
755     pass
756     self.proxyValid=1
757     return