ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerCondor_g.py
Revision: 1.42
Committed: Fri Mar 30 02:33:47 2007 UTC (18 years, 1 month ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_5_0, CRAB_1_5_0_pre9
Changes since 1.41: +47 -119 lines
Log Message:
Retire OSG information system GridCat and move to BDII queries for
SE->CE mapping, jobmanager and site status ('Production' or not).

File Contents

# User Rev Content
1 gutsche 1.1 from Scheduler import Scheduler
2     from JobList import JobList
3     from crab_logger import Logger
4     from crab_exceptions import *
5     from crab_util import *
6 gutsche 1.42 from osg_bdii import *
7 gutsche 1.1 import time
8     import common
9 gutsche 1.25 import popen2
10 slacapra 1.39 import os
11 gutsche 1.1
12     class SchedulerCondor_g(Scheduler):
13     def __init__(self):
14     Scheduler.__init__(self,"CONDOR_G")
15     self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
16     "children_hist","children_num","children_states","condorId","condor_jdl", \
17     "cpuTime","destination", "done_code","exit_code","expectFrom", \
18     "expectUpdate","globusId","jdl","jobId","jobtype", \
19     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
20     "owner","parent_job", "reason","resubmitted","rsl","seed",\
21     "stateEnterTime","stateEnterTimes","subjob_failed", \
22     "user tags" , "status" , "status_code","hierarchy"]
23    
24     # check for locally running condor scheduler
25     cmd = 'ps xau | grep -i condor_schedd | grep -v grep'
26     cmd_out = runCommand(cmd)
27     if cmd_out == None:
28 gutsche 1.20 msg = '[Condor-G Scheduler]: condor_schedd is not running on this machine.\n'
29     msg += '[Condor-G Scheduler]: Please use another machine with installed condor and running condor_schedd or change the Scheduler in your crab.cfg.'
30 gutsche 1.42 common.logger.debug(2,msg)
31 gutsche 1.20 raise CrabException(msg)
32 gutsche 1.1
33     self.checkExecutableInPath('condor_q')
34     self.checkExecutableInPath('condor_submit')
35     self.checkExecutableInPath('condor_version')
36    
37     # get version number
38     cmd = 'condor_version'
39     cmd_out = runCommand(cmd)
40     if cmd != None :
41     tmp = cmd_out.find('CondorVersion') + 15
42     version = cmd_out[tmp:tmp+6].split('.')
43     version_master = int(version[0])
44     version_major = int(version[1])
45     version_minor = int(version[2])
46     else :
47 gutsche 1.20 msg = '[Condor-G Scheduler]: condor_version was not able to determine the installed condor version.\n'
48     msg += '[Condor-G Scheduler]: Please use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
49 gutsche 1.42 common.logger.debug(2,msg)
50 gutsche 1.20 raise CrabException(msg)
51 gutsche 1.1
52     self.checkExecutableInPath('condor_config_val')
53    
54     self.checkCondorVariablePointsToFile('GRIDMANAGER')
55    
56     if version_master >= 6 and version_major >= 7 and version_minor >= 11 :
57     self.checkCondorVariablePointsToFile('GT2_GAHP')
58 gutsche 1.16 elif version_master >=6 and version_major < 8 :
59 gutsche 1.1 self.checkCondorVariablePointsToFile('GAHP')
60    
61     self.checkCondorVariablePointsToFile('GRID_MONITOR')
62    
63     self.checkCondorVariableIsTrue('ENABLE_GRID_MONITOR')
64    
65 gutsche 1.25 max_submit = self.queryCondorVariable('GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE',100).strip()
66     max_pending = self.queryCondorVariable('GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE','Unlimited').strip()
67 gutsche 1.1
68 gutsche 1.20 msg = '[Condor-G Scheduler]\n'
69     msg += 'Maximal number of jobs submitted to the grid : GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE = '+max_submit+'\n'
70     msg += 'Maximal number of parallel submits to the grid : GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE = '+max_pending+'\n'
71     msg += 'Ask the administrator of your local condor installation to increase these variables to enable more jobs to be executed on the grid in parallel.\n'
72     common.logger.debug(2,msg)
73 mkirn 1.15
74 gutsche 1.29 # create hash
75     self.hash = makeCksum(common.work_space.cfgFileName())
76    
77 gutsche 1.1 return
78    
79 gutsche 1.38 def getCEfromSE(self, seSite):
80 gutsche 1.42 # returns the ce including jobmanager
81     ces = jm_from_se_bdii(seSite)
82 gutsche 1.38
83 gutsche 1.42 # hardcode fnal as BDII maps cmssrm.fnal.gov to cmslcgce.fnal.gov
84     if seSite.find ('fnal.gov') >= 0 :
85     return 'cmsosgce.fnal.gov:2119/jobmanager-condor'
86    
87     # mapping ce_hostname to full ce name including jobmanager
88     ce_hostnames = {}
89     for ce in ces :
90     ce_hostnames[ce.split(':')[0].strip()] = ce
91 gutsche 1.38
92     oneSite=''
93 gutsche 1.42 if ( len(ce_hostnames.keys()) == 1 ) :
94     oneSite=ce_hostnames[ce_hostnames.keys()[0]]
95     elif ( len(ce_hostnames.keys()) > 1 ) :
96     if 'EDG.ce_white_list' in self.cfg_params.keys() and len(self.cfg_params['EDG.ce_white_list'].split(',')) == 1 and self.cfg_params['EDG.ce_white_list'].strip() in ce_hostnames.keys() :
97     oneSite = ce_hostnames[self.cfg_params['EDG.ce_white_list']]
98 gutsche 1.38 else :
99     msg = '[Condor-G Scheduler]: More than one Compute Element (CE) is available for job submission.\n'
100     msg += '[Condor-G Scheduler]: Please select one of the following CE:\n'
101     msg += '[Condor-G Scheduler]:'
102 gutsche 1.42 for host in ce_hostnames.keys() :
103 gutsche 1.38 msg += ' ' + host
104     msg += '\n'
105     msg += '[Condor-G Scheduler]: and enter this CE in the CE_white_list variable of the [EDG] section in your crab.cfg.\n'
106 gutsche 1.42 common.logger.debug(2,msg)
107 gutsche 1.38 raise CrabException(msg)
108     else :
109 gutsche 1.42 raise CrabException('[Condor-G Scheduler]: CE hostname(s) for SE '+seSite+' could not be determined from BDII.')
110 gutsche 1.40
111 gutsche 1.38 return oneSite
112 gutsche 1.40
113 gutsche 1.1 def checkExecutableInPath(self, name):
114     # check if executable is in PATH
115     cmd = 'which '+name
116     cmd_out = runCommand(cmd)
117     if cmd_out == None:
118 gutsche 1.20 msg = '[Condor-G Scheduler]: '+name+' is not in the $PATH on this machine.\n'
119     msg += '[Condor-G Scheduler]: Please use another machine with installed condor or change the Scheduler in your crab.cfg.'
120 gutsche 1.42 common.logger.debug(2,msg)
121 gutsche 1.20 raise CrabException(msg)
122 gutsche 1.1
123     def checkCondorVariablePointsToFile(self, name):
124     ## check for condor variable
125     cmd = 'condor_config_val '+name
126     cmd_out = runCommand(cmd)
127     if os.path.isfile(cmd_out) > 0 :
128 gutsche 1.20 msg = '[Condor-G Scheduler]: the variable '+name+' is not properly set for the condor installation on this machine.\n'
129     msg += '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable '+name+' properly,',
130 gutsche 1.1 'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
131 gutsche 1.42 common.logger.debug(2,msg)
132 gutsche 1.20 raise CrabException(msg)
133 gutsche 1.1
134     def checkCondorVariableIsTrue(self, name):
135     ## check for condor variable
136     cmd = 'condor_config_val '+name
137     cmd_out = runCommand(cmd)
138     if cmd_out == 'TRUE' :
139 gutsche 1.38 msg = '[Condor-G Scheduler]: the variable '+name+' is not set to true for the condor installation on this machine.\n'
140 gutsche 1.20 msg += '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable '+name+' to true,',
141 gutsche 1.1 'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
142 gutsche 1.42 common.logger.debug(2,msg)
143 gutsche 1.20 raise CrabException(msg)
144 gutsche 1.1
145 gutsche 1.25 def queryCondorVariable(self, name, default):
146 gutsche 1.1 ## check for condor variable
147     cmd = 'condor_config_val '+name
148 gutsche 1.25 out = popen2.Popen3(cmd,1)
149     exit_code = out.wait()
150     cmd_out = out.fromchild.readline().strip()
151     if exit_code != 0 :
152     cmd_out = str(default)
153    
154     return cmd_out
155 gutsche 1.1
156     def configure(self, cfg_params):
157    
158 gutsche 1.38 self.cfg_params = cfg_params
159    
160 gutsche 1.18 try:
161 gutsche 1.30 self.group = cfg_params["EDG.group"]
162     except KeyError:
163     self.group = None
164 gutsche 1.40
165 gutsche 1.30 try:
166 gutsche 1.18 self.role = cfg_params["EDG.role"]
167     except KeyError:
168     self.role = None
169    
170 gutsche 1.1 self.copy_input_data = 0
171    
172     try: self.return_data = cfg_params['USER.return_data']
173     except KeyError: self.return_data = 1
174    
175     try:
176     self.copy_data = cfg_params["USER.copy_data"]
177     if int(self.copy_data) == 1:
178     try:
179     self.SE = cfg_params['USER.storage_element']
180     self.SE_PATH = cfg_params['USER.storage_path']
181     except KeyError:
182     msg = "Error. The [USER] section does not have 'storage_element'"
183     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
184 gutsche 1.42 common.logger.debug(2,msg)
185 gutsche 1.1 raise CrabException(msg)
186     except KeyError: self.copy_data = 0
187    
188     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
189     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
190     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
191     raise CrabException(msg)
192    
193     try:
194     self.lfc_host = cfg_params['EDG.lfc_host']
195     except KeyError:
196     msg = "Error. The [EDG] section does not have 'lfc_host' value"
197     msg = msg + " it's necessary to know the LFC host name"
198 gutsche 1.42 common.logger.debug(2,msg)
199 gutsche 1.1 raise CrabException(msg)
200     try:
201     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
202     except KeyError:
203     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
204     msg = msg + " it's necessary to know the catalog type"
205 gutsche 1.42 common.logger.debug(2,msg)
206 gutsche 1.1 raise CrabException(msg)
207     try:
208     self.lfc_home = cfg_params['EDG.lfc_home']
209     except KeyError:
210     msg = "Error. The [EDG] section does not have 'lfc_home' value"
211     msg = msg + " it's necessary to know the home catalog dir"
212 gutsche 1.42 common.logger.debug(2,msg)
213 gutsche 1.1 raise CrabException(msg)
214 gutsche 1.40
215 gutsche 1.1 try: self.VO = cfg_params['EDG.virtual_organization']
216     except KeyError: self.VO = 'cms'
217    
218     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
219     except KeyError: self.EDG_clock_time= ''
220 gutsche 1.29
221     try: self.GLOBUS_RSL = cfg_params['CONDORG.globus_rsl']
222     except KeyError: self.GLOBUS_RSL = ''
223 gutsche 1.40
224     # Provide an override for the batchsystem that condor_g specifies as a grid resource.
225 gutsche 1.42 # this is to handle the case where the site supports several batchsystem but bdii
226 gutsche 1.40 # only allows a site to public one.
227     try:
228     self.batchsystem = cfg_params['CONDORG.batchsystem']
229     msg = '[Condor-G Scheduler]: batchsystem overide specified in your crab.cfg'
230 gutsche 1.42 common.logger.debug(2,msg)
231 gutsche 1.40 except KeyError: self.batchsystem = ''
232 gutsche 1.1 self.register_data = 0
233    
234     # check if one and only one entry is in $CE_WHITELIST
235    
236     try:
237 gutsche 1.32 tmpGood = string.split(cfg_params['EDG.se_white_list'],',')
238 gutsche 1.1 except KeyError:
239 gutsche 1.20 msg = '[Condor-G Scheduler]: destination site is not selected properly.\n'
240 gutsche 1.32 msg += '[Condor-G Scheduler]: Please select your destination site and only your destination site in the SE_white_list variable of the [EDG] section in your crab.cfg.'
241 gutsche 1.42 common.logger.debug(2,msg)
242 gutsche 1.20 raise CrabException(msg)
243 gutsche 1.40
244 gutsche 1.1 if len(tmpGood) != 1 :
245 gutsche 1.20 msg = '[Condor-G Scheduler]: destination site is not selected properly.\n'
246 gutsche 1.32 msg += '[Condor-G Scheduler]: Please select your destination site and only your destination site in the SE_white_list variable of the [EDG] section in your crab.cfg.'
247 gutsche 1.42 common.logger.debug(2,msg)
248 gutsche 1.20 raise CrabException(msg)
249 gutsche 1.1
250     try:
251     self.UseGT4 = cfg_params['USER.use_gt_4'];
252     except KeyError:
253     self.UseGT4 = 0;
254    
255     self.proxyValid=0
256     # added here because checklistmatch is not used
257     self.checkProxy()
258 gutsche 1.6
259     self._taskId = cfg_params['taskId']
260 gutsche 1.40
261 gutsche 1.18 try: self.jobtypeName = cfg_params['CRAB.jobtype']
262     except KeyError: self.jobtypeName = ''
263 gutsche 1.40
264 gutsche 1.18 try: self.schedulerName = cfg_params['CRAB.scheduler']
265     except KeyError: self.scheduler = ''
266    
267 gutsche 1.1 return
268 gutsche 1.40
269 gutsche 1.1
270     def sched_parameter(self):
271     """
272     Returns file with scheduler-specific parameters
273     """
274 gutsche 1.29 lastDest=''
275     first = []
276     last = []
277     for n in range(common.jobDB.nJobs()):
278     currDest=common.jobDB.destination(n)
279     if (currDest!=lastDest):
280     lastDest = currDest
281     first.append(n)
282     if n != 0:last.append(n-1)
283     if len(first)>len(last) :last.append(common.jobDB.nJobs())
284 gutsche 1.40
285 gutsche 1.29 for i in range(len(first)): # Add loop DS
286     self.param='sched_param_'+str(i)+'.clad'
287     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
288    
289     param_file.write('globusrsl = ')
290    
291     # extraTag maxWallTime
292     if ( self.EDG_clock_time != '' ) :
293     param_file.write('(maxWalltime='+self.EDG_clock_time+')')
294 gutsche 1.40
295 gutsche 1.29 # extraTag additional GLOBUS_RSL
296     if ( self.GLOBUS_RSL != '' ) :
297     param_file.write(self.GLOBUS_RSL)
298    
299     param_file.write(';')
300    
301     param_file.close()
302    
303 gutsche 1.40
304 gutsche 1.1 def wsSetupEnvironment(self):
305     """
306     Returns part of a job script which does scheduler-specific work.
307     """
308     txt = ''
309 gutsche 1.6
310     txt = ''
311 mkirn 1.11 txt += '# strip arguments\n'
312     txt += 'echo "strip arguments"\n'
313     txt += 'args=("$@")\n'
314     txt += 'nargs=$#\n'
315     txt += 'shift $nargs\n'
316 gutsche 1.6 txt += "# job number (first parameter for job wrapper)\n"
317 mkirn 1.11 txt += "NJob=${args[0]}\n"
318 gutsche 1.6
319 gutsche 1.29 txt += 'MonitorJobID=`echo ${NJob}_'+self.hash+'_$GLOBUS_GRAM_JOB_CONTACT`\n'
320 gutsche 1.7 txt += 'SyncGridJobId=`echo $GLOBUS_GRAM_JOB_CONTACT`\n'
321     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
322     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
323     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
324     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
325 gutsche 1.6
326 fanzago 1.2 txt += 'echo "middleware discovery " \n'
327 gutsche 1.18 txt += 'if [ $GRID3_APP_DIR ]; then\n'
328 gutsche 1.6 txt += ' middleware=OSG \n'
329 gutsche 1.24 txt += ' echo "SyncCE=`echo $GLOBUS_GRAM_JOB_CONTACT | cut -d/ -f3 | cut -d: -f1`" | tee -a $RUNTIME_AREA/$repo \n'
330 gutsche 1.6 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
331     txt += ' echo "middleware =$middleware" \n'
332 gutsche 1.1 txt += 'elif [ $OSG_APP ]; then \n'
333 gutsche 1.6 txt += ' middleware=OSG \n'
334 gutsche 1.24 txt += ' echo "SyncCE=`echo $GLOBUS_GRAM_JOB_CONTACT | cut -d/ -f3 | cut -d: -f1`" | tee -a $RUNTIME_AREA/$repo \n'
335 gutsche 1.6 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
336     txt += ' echo "middleware =$middleware" \n'
337 gutsche 1.18 txt += 'elif [ $VO_CMS_SW_DIR ]; then\n'
338     txt += ' middleware=LCG \n'
339     txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
340     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
341     txt += ' echo "middleware =$middleware" \n'
342 gutsche 1.1 txt += 'else \n'
343 gutsche 1.6 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
344     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
345     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
346     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
347 gutsche 1.7 txt += ' rm -f $RUNTIME_AREA/$repo \n'
348     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
349     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
350 gutsche 1.6 txt += ' exit 1 \n'
351 gutsche 1.1 txt += 'fi\n'
352    
353 gutsche 1.6 txt += '# report first time to DashBoard \n'
354     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
355 gutsche 1.7 txt += 'rm -f $RUNTIME_AREA/$repo \n'
356     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
357     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
358 gutsche 1.6
359 gutsche 1.1 txt += '\n\n'
360    
361     if int(self.copy_data) == 1:
362     if self.SE:
363     txt += 'export SE='+self.SE+'\n'
364     txt += 'echo "SE = $SE"\n'
365     if self.SE_PATH:
366     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
367     txt += 'export SE_PATH='+self.SE_PATH+'\n'
368     txt += 'echo "SE_PATH = $SE_PATH"\n'
369 gutsche 1.40
370 gutsche 1.1 txt += 'export VO='+self.VO+'\n'
371 mkirn 1.15 txt += 'CE=${args[3]}\n'
372 gutsche 1.1 txt += 'echo "CE = $CE"\n'
373     return txt
374    
375     def wsCopyInput(self):
376     """
377     Copy input data from SE to WN
378     """
379     txt = '\n'
380     return txt
381    
382     def wsCopyOutput(self):
383     """
384     Write a CopyResults part of a job script, e.g.
385     to copy produced output into a storage element.
386     """
387     txt = ''
388     if int(self.copy_data) == 1:
389     txt += '#\n'
390     txt += '# Copy output to SE = $SE\n'
391     txt += '#\n'
392 gutsche 1.19 txt += '\n'
393     txt += 'which lcg-cp\n'
394     txt += 'lcgcp_location_exit_code=$?\n'
395     txt += '\n'
396 gutsche 1.20 txt += 'if [ $lcgcp_location_exit_code -eq 1 ]; then\n'
397 gutsche 1.19 txt += ''
398 gutsche 1.9 txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
399     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
400     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
401     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
402     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
403 gutsche 1.19 txt += 'else\n'
404     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
405     txt += ' export X509_CERT_DIR=/etc/grid-security/certificates\n'
406     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
407     txt += 'fi\n'
408 gutsche 1.1 txt += ' for out_file in $file_list ; do\n'
409 slacapra 1.26 txt += ' echo "Trying to copy output file to $SE using srmcp"\n'
410     txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
411     txt += ' mkdir -p $HOME/.srmconfig\n'
412     txt += ' echo "srmcp -retry_num 3 -retry_timeout 480000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
413     txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 480000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
414 gutsche 1.1 txt += ' copy_exit_status=$?\n'
415 slacapra 1.26 txt += ' echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
416 gutsche 1.1 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
417     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
418 mkirn 1.28 txt += ' echo "Possible problems with SE = $SE"\n'
419 slacapra 1.26 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
420     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
421     txt += ' echo "srmcp failed, attempting lcg-cp"\n'
422     txt += ' echo "Trying to copy output file to $SE using lcg-cp"\n'
423 mkirn 1.28 if common.logger.debugLevel() >= 5:
424 slacapra 1.26 txt += ' echo "lcg-cp --vo $VO --verbose -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
425     txt += ' exitstring=`lcg-cp --vo $VO --verbose -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
426     else:
427     txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
428     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
429     txt += ' copy_exit_status=$?\n'
430     txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
431     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
432     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
433     txt += ' echo "Problems with SE = $SE"\n'
434     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
435     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
436 mkirn 1.28 txt += ' echo "lcg-cp failed. For verbose lcg-cp output, use command line option -debug 5."\n'
437     txt += ' echo "lcg-cp and srmcp failed!"\n'
438 slacapra 1.26 txt += ' else\n'
439     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
440     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
441     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
442     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
443     txt += ' echo "lcg-cp succeeded"\n'
444     txt += ' fi\n'
445 gutsche 1.1 txt += ' else\n'
446 slacapra 1.26 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
447     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
448     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
449     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
450     txt += ' echo "srmcp succeeded"\n'
451     txt += ' fi\n'
452 gutsche 1.1 txt += ' done\n'
453    
454     return txt
455    
456     def wsRegisterOutput(self):
457     """
458     Returns part of a job script which does scheduler-specific work.
459     """
460    
461     txt = ''
462     return txt
463    
464     def loggingInfo(self, id):
465     """
466     retrieve the logging info from logging and bookkeeping and return it
467     """
468 gutsche 1.18 schedd = id.split('//')[0]
469     condor_id = id.split('//')[1]
470     cmd = 'condor_q -l -name ' + schedd + ' ' + condor_id
471     cmd_out = runCommand(cmd)
472 gutsche 1.20 common.logger.debug(5,"Condor-G loggingInfo cmd: "+cmd)
473     common.logger.debug(5,"Condor-G loggingInfo cmd_out: "+cmd_out)
474 gutsche 1.1 return cmd_out
475    
476     def listMatch(self, nj):
477     """
478     Check the compatibility of available resources
479     """
480 slacapra 1.39 #self.checkProxy()
481 gutsche 1.1 return 1
482    
483     def submit(self, nj):
484     """
485     Submit one OSG job.
486     """
487     self.checkProxy()
488    
489     jid = None
490     jdl = common.job_list[nj].jdlFilename()
491    
492     cmd = 'condor_submit ' + jdl
493     cmd_out = runCommand(cmd)
494     if cmd_out != None:
495     tmp = cmd_out.find('submitted to cluster') + 21
496     jid = cmd_out[tmp:].replace('.','')
497     jid = jid.replace('\n','')
498     pass
499     return jid
500    
501     def resubmit(self, nj_list):
502     """
503     Prepare jobs to be submit
504     """
505     return
506    
507     def getExitStatus(self, id):
508     return self.getStatusAttribute_(id, 'exit_code')
509    
510     def queryStatus(self, id):
511     return self.getStatusAttribute_(id, 'status')
512    
513     def queryDest(self, id):
514     return self.getStatusAttribute_(id, 'destination')
515    
516    
517     def getStatusAttribute_(self, id, attr):
518     """ Query a status of the job with id """
519    
520     result = ''
521 gutsche 1.40
522 gutsche 1.1 if ( attr == 'exit_code' ) :
523 gutsche 1.18 jobnum_str = '%06d' % (int(id))
524     # opts = common.work_space.loadSavedOptions()
525 slacapra 1.41 base = string.upper(common.taskDB.dict("jobtype"))
526 gutsche 1.18 log_file = common.work_space.resDir() + base + '_' + jobnum_str + '.stdout'
527     logfile = open(log_file)
528     log_line = logfile.readline()
529     while log_line :
530     log_line = log_line.strip()
531     if log_line.startswith('JOB_EXIT_STATUS') :
532     log_line_split = log_line.split()
533     result = log_line_split[2]
534     pass
535     log_line = logfile.readline()
536     result = ''
537 gutsche 1.1 elif ( attr == 'status' ) :
538 gutsche 1.18 schedd = id.split('//')[0]
539     condor_id = id.split('//')[1]
540     cmd = 'condor_q -name ' + schedd + ' ' + condor_id
541 gutsche 1.1 cmd_out = runCommand(cmd)
542     if cmd_out != None:
543 gutsche 1.18 status_flag = 0
544 gutsche 1.1 for line in cmd_out.splitlines() :
545 gutsche 1.18 if line.strip().startswith(condor_id.strip()) :
546 gutsche 1.1 status = line.strip().split()[5]
547     if ( status == 'I' ):
548     result = 'Scheduled'
549     break
550     elif ( status == 'U' ) :
551     result = 'Ready'
552     break
553     elif ( status == 'H' ) :
554     result = 'Hold'
555     break
556     elif ( status == 'R' ) :
557     result = 'Running'
558     break
559     elif ( status == 'X' ) :
560     result = 'Cancelled'
561     break
562     elif ( status == 'C' ) :
563     result = 'Done'
564     break
565     else :
566     result = 'Done'
567     break
568     else :
569     result = 'Done'
570     else :
571     result = 'Done'
572     elif ( attr == 'destination' ) :
573 gutsche 1.18 seSite = common.jobDB.destination(int(id)-1)[0]
574 gutsche 1.42 oneSite = self.getCEfromSE(seSite).split(':')[0].strip()
575 gutsche 1.18 result = oneSite
576 gutsche 1.1 elif ( attr == 'reason' ) :
577     result = 'status query'
578     elif ( attr == 'stateEnterTime' ) :
579     result = time.asctime(time.gmtime())
580     return result
581    
582     def queryDetailedStatus(self, id):
583     """ Query a detailed status of the job with id """
584     user = os.environ['USER']
585     cmd = 'condor_q -submitter ' + user
586     cmd_out = runCommand(cmd)
587     return cmd_out
588    
589     def getOutput(self, id):
590     """
591     Get output for a finished job with id.
592     Returns the name of directory with results.
593     not needed for condor-g
594     """
595 slacapra 1.39 #self.checkProxy()
596 gutsche 1.1 return ''
597    
598     def cancel(self, id):
599     """ Cancel the condor job with id """
600     self.checkProxy()
601     # query for schedd
602     user = os.environ['USER']
603     cmd = 'condor_q -submitter ' + user
604     cmd_out = runCommand(cmd)
605     schedd=''
606     if cmd_out != None:
607     for line in cmd_out.splitlines() :
608     if line.strip().startswith('--') :
609     schedd = line.strip().split()[6]
610     if line.strip().startswith(id.strip()) :
611 slacapra 1.39 # status = line.strip().split()[5]
612 gutsche 1.1 break
613     cmd = 'condor_rm -name ' + schedd + ' ' + id
614     cmd_out = runCommand(cmd)
615     return cmd_out
616    
617 gutsche 1.18 def createXMLSchScript(self, nj, argsList):
618     """
619     Create a XML-file for BOSS4.
620     """
621    
622     # job steering
623     index = nj - 1
624     job = common.job_list[index]
625     jbt = job.type()
626    
627     # input and output sandboxes
628     inp_sandbox = jbt.inputSandbox(index)
629 slacapra 1.39 #out_sandbox = jbt.outputSandbox(index)
630 gutsche 1.18
631     # title
632     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
633     jt_string = ''
634 gutsche 1.40
635 gutsche 1.18 xml_fname = str(self.jobtypeName)+'.xml'
636     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
637    
638     # TaskName
639     dir = string.split(common.work_space.topDir(), '/')
640     taskName = dir[len(dir)-2]
641 gutsche 1.40
642 gutsche 1.18 xml.write(str(title))
643 gutsche 1.37 xml.write('<task name="' +str(taskName)+'" sub_path="' + common.work_space.bossCache() + '">\n')
644 gutsche 1.18 xml.write(jt_string)
645    
646     xml.write('<iterator>\n')
647     xml.write('\t<iteratorRule name="ITR1">\n')
648     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
649     xml.write('\t</iteratorRule>\n')
650     xml.write('\t<iteratorRule name="ITR2">\n')
651     for arg in argsList:
652     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
653     pass
654     xml.write('\t</iteratorRule>\n')
655     xml.write('\t<iteratorRule name="ITR3">\n')
656     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
657     xml.write('\t</iteratorRule>\n')
658    
659     xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
660     xml.write(jt_string)
661    
662    
663     #executable
664    
665     script = job.scriptFilename()
666     xml.write('<program>\n')
667     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
668     xml.write(jt_string)
669 gutsche 1.40
670 gutsche 1.18 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
671     xml.write('<program_types> crabjob </program_types>\n')
672    
673     # input sanbox
674     inp_box = script + ','
675    
676     if inp_sandbox != None:
677     for fl in inp_sandbox:
678     inp_box = inp_box + '' + fl + ','
679     pass
680     pass
681    
682     inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
683     os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
684     os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
685     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
686 gutsche 1.31 os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + ','+\
687     os.path.abspath(os.environ['CRABDIR']+'/python/'+'parseCrabFjr.py')
688 gutsche 1.18
689     if (not jbt.additional_inbox_files == []):
690     inp_box = inp_box + ', '
691     for addFile in jbt.additional_inbox_files:
692     addFile = os.path.abspath(addFile)
693     inp_box = inp_box+''+addFile+','
694     pass
695    
696     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
697     inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
698     xml.write(inp_box)
699 gutsche 1.40
700 gutsche 1.18 # stdout and stderr
701     base = jbt.name()
702     stdout = base + '__ITR3_.stdout'
703     stderr = base + '__ITR3_.stderr'
704    
705     xml.write('<stderr> ' + stderr + '</stderr>\n')
706     xml.write('<stdout> ' + stdout + '</stdout>\n')
707 gutsche 1.40
708 gutsche 1.18 # output sanbox
709     out_box = stdout + ',' + stderr + ','
710    
711 slacapra 1.36 # Stuff to be returned _always_ via sandbox
712     for fl in jbt.output_file_sandbox:
713     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
714     pass
715     pass
716    
717 gutsche 1.18 if int(self.return_data) == 1:
718     for fl in jbt.output_file:
719     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
720     pass
721     pass
722    
723     if out_box[-1] == ',' : out_box = out_box[:-1]
724     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
725     xml.write(out_box)
726 gutsche 1.40
727 gutsche 1.18 xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
728    
729     xml.write('</program>\n')
730    
731     # start writing of extraTags
732     to_write = ''
733    
734     # extraTag universe
735     to_write += 'universe = "&quot;globus&quot;"\n'
736    
737     # extraTag globusscheduler
738    
739 gutsche 1.42 # use bdii to query ce including jobmanager from site
740 gutsche 1.18 seSite = common.jobDB.destination(nj-1)[0]
741 gutsche 1.38 oneSite = self.getCEfromSE(seSite)
742 gutsche 1.42 # do not check the site status check for FNAL (OSG not in BDII)
743     if oneSite.find('fnal.gov') < 0 :
744     # query if site is in production
745     status = cestate_from_ce_bdii(oneSite.split(':')[0].strip())
746     if status != 'Production' :
747     msg = '[Condor-G Scheduler]: Jobs cannot be submitted to site ' + oneSite.split(':')[0].strip() + ' because the site has status ' + status + ' and is currently not operational.\n'
748     msg += '[Condor-G Scheduler]: Please choose another site for your jobs.'
749     common.logger.debug(2,msg)
750     raise CrabException(msg)
751 gutsche 1.40
752 gutsche 1.42 if self.batchsystem != '' :
753     oneSite = oneSite.split('/')[0].strip() + '/' + self.batchsystem
754    
755     to_write += 'globusscheduler = "&quot;' + str(oneSite) + '&quot;"\n'
756 gutsche 1.18
757     # extraTag condor transfer file flag
758     to_write += 'should_transfer_files = "&quot;YES&quot;"\n'
759    
760     # extraTag when to write output
761     to_write += 'when_to_transfer_output = "&quot;ON_EXIT&quot;"\n'
762    
763     # extraTag switch off streaming of stdout
764     to_write += 'stream_output = "&quot;false&quot;"\n'
765    
766     # extraTag switch off streaming of stderr
767     to_write += 'stream_error = "&quot;false&quot;"\n'
768    
769     # extraTag condor logfile
770     condor_log = jbt.name() + '__ITR3_.log'
771     to_write += 'Log = "&quot;' + condor_log + '&quot;"\n'
772    
773     # extraTag condor notification
774     to_write += 'notification="&quot;never&quot;"\n'
775    
776     # extraTag condor queue statement
777     to_write += 'QUEUE = "&quot;1&quot;"\n'
778    
779     if (to_write != ''):
780     xml.write('<extraTags\n')
781     xml.write(to_write)
782     xml.write('/>\n')
783     pass
784    
785     xml.write('</chain>\n')
786    
787     xml.write('</iterator>\n')
788     xml.write('</task>\n')
789    
790     xml.close()
791 gutsche 1.29
792 gutsche 1.18 return
793    
794 gutsche 1.1 def checkProxy(self):
795     """
796     Function to check the Globus proxy.
797     """
798     if (self.proxyValid): return
799 slacapra 1.39 #timeleft = -999
800 gutsche 1.18 minTimeLeft=10*3600 # in seconds
801    
802     mustRenew = 0
803     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
804     timeLeftServer = -999
805     if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
806     mustRenew = 1
807     else:
808     timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
809     if not timeLeftServer or not isInt(timeLeftServer):
810     mustRenew = 1
811     elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
812     mustRenew = 1
813     pass
814     pass
815    
816     if mustRenew:
817 gutsche 1.30 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
818     cmd = 'voms-proxy-init -voms '+self.VO
819     if self.group:
820     cmd += ':/'+self.VO+'/'+self.group
821 gutsche 1.18 if self.role:
822 gutsche 1.30 cmd += '/role='+self.role
823     cmd += ' -valid 192:00'
824 gutsche 1.1 try:
825     # SL as above: damn it!
826     out = os.system(cmd)
827     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
828     except:
829     msg = "Unable to create a valid proxy!\n"
830     raise CrabException(msg)
831     pass
832 gutsche 1.18
833 gutsche 1.1 self.proxyValid=1
834     return
835 gutsche 1.40