ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerCondor_g.py
Revision: 1.47
Committed: Tue Jul 17 14:08:24 2007 UTC (17 years, 9 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.46: +2 -2 lines
Log Message:
 remove mkdir ~/.srmconfig

File Contents

# User Rev Content
1 gutsche 1.1 from Scheduler import Scheduler
2     from JobList import JobList
3     from crab_logger import Logger
4     from crab_exceptions import *
5     from crab_util import *
6 gutsche 1.42 from osg_bdii import *
7 gutsche 1.1 import time
8     import common
9 gutsche 1.25 import popen2
10 slacapra 1.39 import os
11 gutsche 1.1
12     class SchedulerCondor_g(Scheduler):
13     def __init__(self):
14     Scheduler.__init__(self,"CONDOR_G")
15     self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
16     "children_hist","children_num","children_states","condorId","condor_jdl", \
17     "cpuTime","destination", "done_code","exit_code","expectFrom", \
18     "expectUpdate","globusId","jdl","jobId","jobtype", \
19     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
20     "owner","parent_job", "reason","resubmitted","rsl","seed",\
21     "stateEnterTime","stateEnterTimes","subjob_failed", \
22     "user tags" , "status" , "status_code","hierarchy"]
23    
24     # check for locally running condor scheduler
25     cmd = 'ps xau | grep -i condor_schedd | grep -v grep'
26     cmd_out = runCommand(cmd)
27     if cmd_out == None:
28 gutsche 1.20 msg = '[Condor-G Scheduler]: condor_schedd is not running on this machine.\n'
29     msg += '[Condor-G Scheduler]: Please use another machine with installed condor and running condor_schedd or change the Scheduler in your crab.cfg.'
30 gutsche 1.42 common.logger.debug(2,msg)
31 gutsche 1.20 raise CrabException(msg)
32 gutsche 1.1
33     self.checkExecutableInPath('condor_q')
34     self.checkExecutableInPath('condor_submit')
35     self.checkExecutableInPath('condor_version')
36    
37     # get version number
38     cmd = 'condor_version'
39     cmd_out = runCommand(cmd)
40     if cmd != None :
41     tmp = cmd_out.find('CondorVersion') + 15
42     version = cmd_out[tmp:tmp+6].split('.')
43     version_master = int(version[0])
44     version_major = int(version[1])
45     version_minor = int(version[2])
46     else :
47 gutsche 1.20 msg = '[Condor-G Scheduler]: condor_version was not able to determine the installed condor version.\n'
48     msg += '[Condor-G Scheduler]: Please use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
49 gutsche 1.42 common.logger.debug(2,msg)
50 gutsche 1.20 raise CrabException(msg)
51 gutsche 1.1
52     self.checkExecutableInPath('condor_config_val')
53    
54     self.checkCondorVariablePointsToFile('GRIDMANAGER')
55    
56     if version_master >= 6 and version_major >= 7 and version_minor >= 11 :
57     self.checkCondorVariablePointsToFile('GT2_GAHP')
58 gutsche 1.16 elif version_master >=6 and version_major < 8 :
59 gutsche 1.1 self.checkCondorVariablePointsToFile('GAHP')
60    
61     self.checkCondorVariablePointsToFile('GRID_MONITOR')
62    
63     self.checkCondorVariableIsTrue('ENABLE_GRID_MONITOR')
64    
65 gutsche 1.25 max_submit = self.queryCondorVariable('GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE',100).strip()
66     max_pending = self.queryCondorVariable('GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE','Unlimited').strip()
67 gutsche 1.1
68 gutsche 1.20 msg = '[Condor-G Scheduler]\n'
69     msg += 'Maximal number of jobs submitted to the grid : GRIDMANAGER_MAX_SUBMITTED_JOBS_PER_RESOURCE = '+max_submit+'\n'
70     msg += 'Maximal number of parallel submits to the grid : GRIDMANAGER_MAX_PENDING_SUBMITS_PER_RESOURCE = '+max_pending+'\n'
71     msg += 'Ask the administrator of your local condor installation to increase these variables to enable more jobs to be executed on the grid in parallel.\n'
72     common.logger.debug(2,msg)
73 mkirn 1.15
74 gutsche 1.29 # create hash
75     self.hash = makeCksum(common.work_space.cfgFileName())
76    
77 gutsche 1.1 return
78    
79 gutsche 1.38 def getCEfromSE(self, seSite):
80 gutsche 1.42 # returns the ce including jobmanager
81     ces = jm_from_se_bdii(seSite)
82 gutsche 1.43
83 gutsche 1.38
84 gutsche 1.42 # hardcode fnal as BDII maps cmssrm.fnal.gov to cmslcgce.fnal.gov
85     if seSite.find ('fnal.gov') >= 0 :
86     return 'cmsosgce.fnal.gov:2119/jobmanager-condor'
87    
88     # mapping ce_hostname to full ce name including jobmanager
89     ce_hostnames = {}
90     for ce in ces :
91     ce_hostnames[ce.split(':')[0].strip()] = ce
92 gutsche 1.38
93     oneSite=''
94 gutsche 1.42 if ( len(ce_hostnames.keys()) == 1 ) :
95     oneSite=ce_hostnames[ce_hostnames.keys()[0]]
96     elif ( len(ce_hostnames.keys()) > 1 ) :
97     if 'EDG.ce_white_list' in self.cfg_params.keys() and len(self.cfg_params['EDG.ce_white_list'].split(',')) == 1 and self.cfg_params['EDG.ce_white_list'].strip() in ce_hostnames.keys() :
98     oneSite = ce_hostnames[self.cfg_params['EDG.ce_white_list']]
99 gutsche 1.38 else :
100     msg = '[Condor-G Scheduler]: More than one Compute Element (CE) is available for job submission.\n'
101     msg += '[Condor-G Scheduler]: Please select one of the following CE:\n'
102     msg += '[Condor-G Scheduler]:'
103 gutsche 1.42 for host in ce_hostnames.keys() :
104 gutsche 1.38 msg += ' ' + host
105     msg += '\n'
106     msg += '[Condor-G Scheduler]: and enter this CE in the CE_white_list variable of the [EDG] section in your crab.cfg.\n'
107 gutsche 1.42 common.logger.debug(2,msg)
108 gutsche 1.38 raise CrabException(msg)
109     else :
110 gutsche 1.42 raise CrabException('[Condor-G Scheduler]: CE hostname(s) for SE '+seSite+' could not be determined from BDII.')
111 gutsche 1.40
112 gutsche 1.38 return oneSite
113 gutsche 1.40
114 gutsche 1.1 def checkExecutableInPath(self, name):
115     # check if executable is in PATH
116     cmd = 'which '+name
117     cmd_out = runCommand(cmd)
118     if cmd_out == None:
119 gutsche 1.20 msg = '[Condor-G Scheduler]: '+name+' is not in the $PATH on this machine.\n'
120     msg += '[Condor-G Scheduler]: Please use another machine with installed condor or change the Scheduler in your crab.cfg.'
121 gutsche 1.42 common.logger.debug(2,msg)
122 gutsche 1.20 raise CrabException(msg)
123 gutsche 1.1
124     def checkCondorVariablePointsToFile(self, name):
125     ## check for condor variable
126     cmd = 'condor_config_val '+name
127     cmd_out = runCommand(cmd)
128     if os.path.isfile(cmd_out) > 0 :
129 gutsche 1.20 msg = '[Condor-G Scheduler]: the variable '+name+' is not properly set for the condor installation on this machine.\n'
130     msg += '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable '+name+' properly,',
131 gutsche 1.1 'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
132 gutsche 1.42 common.logger.debug(2,msg)
133 gutsche 1.20 raise CrabException(msg)
134 gutsche 1.1
135     def checkCondorVariableIsTrue(self, name):
136     ## check for condor variable
137     cmd = 'condor_config_val '+name
138     cmd_out = runCommand(cmd)
139     if cmd_out == 'TRUE' :
140 gutsche 1.38 msg = '[Condor-G Scheduler]: the variable '+name+' is not set to true for the condor installation on this machine.\n'
141 gutsche 1.20 msg += '[Condor-G Scheduler]: Please ask the administrator of the local condor installation to set the variable '+name+' to true,',
142 gutsche 1.1 'use another machine with properly installed condor or change the Scheduler in your crab.cfg.'
143 gutsche 1.42 common.logger.debug(2,msg)
144 gutsche 1.20 raise CrabException(msg)
145 gutsche 1.1
146 gutsche 1.25 def queryCondorVariable(self, name, default):
147 gutsche 1.1 ## check for condor variable
148     cmd = 'condor_config_val '+name
149 gutsche 1.25 out = popen2.Popen3(cmd,1)
150     exit_code = out.wait()
151     cmd_out = out.fromchild.readline().strip()
152     if exit_code != 0 :
153     cmd_out = str(default)
154    
155     return cmd_out
156 gutsche 1.1
157     def configure(self, cfg_params):
158    
159 gutsche 1.38 self.cfg_params = cfg_params
160    
161 gutsche 1.18 try:
162 gutsche 1.30 self.group = cfg_params["EDG.group"]
163     except KeyError:
164     self.group = None
165 gutsche 1.40
166 gutsche 1.30 try:
167 gutsche 1.18 self.role = cfg_params["EDG.role"]
168     except KeyError:
169     self.role = None
170    
171 gutsche 1.1 self.copy_input_data = 0
172    
173     try: self.return_data = cfg_params['USER.return_data']
174     except KeyError: self.return_data = 1
175    
176     try:
177     self.copy_data = cfg_params["USER.copy_data"]
178     if int(self.copy_data) == 1:
179     try:
180     self.SE = cfg_params['USER.storage_element']
181     self.SE_PATH = cfg_params['USER.storage_path']
182     except KeyError:
183     msg = "Error. The [USER] section does not have 'storage_element'"
184     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
185 gutsche 1.42 common.logger.debug(2,msg)
186 gutsche 1.1 raise CrabException(msg)
187     except KeyError: self.copy_data = 0
188    
189     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
190     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
191     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
192     raise CrabException(msg)
193    
194     try:
195     self.lfc_host = cfg_params['EDG.lfc_host']
196     except KeyError:
197     msg = "Error. The [EDG] section does not have 'lfc_host' value"
198     msg = msg + " it's necessary to know the LFC host name"
199 gutsche 1.42 common.logger.debug(2,msg)
200 gutsche 1.1 raise CrabException(msg)
201     try:
202     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
203     except KeyError:
204     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
205     msg = msg + " it's necessary to know the catalog type"
206 gutsche 1.42 common.logger.debug(2,msg)
207 gutsche 1.1 raise CrabException(msg)
208     try:
209     self.lfc_home = cfg_params['EDG.lfc_home']
210     except KeyError:
211     msg = "Error. The [EDG] section does not have 'lfc_home' value"
212     msg = msg + " it's necessary to know the home catalog dir"
213 gutsche 1.42 common.logger.debug(2,msg)
214 gutsche 1.1 raise CrabException(msg)
215 gutsche 1.40
216 gutsche 1.1 try: self.VO = cfg_params['EDG.virtual_organization']
217     except KeyError: self.VO = 'cms'
218    
219     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
220     except KeyError: self.EDG_clock_time= ''
221 gutsche 1.29
222     try: self.GLOBUS_RSL = cfg_params['CONDORG.globus_rsl']
223     except KeyError: self.GLOBUS_RSL = ''
224 gutsche 1.40
225     # Provide an override for the batchsystem that condor_g specifies as a grid resource.
226 gutsche 1.42 # this is to handle the case where the site supports several batchsystem but bdii
227 gutsche 1.40 # only allows a site to public one.
228     try:
229     self.batchsystem = cfg_params['CONDORG.batchsystem']
230     msg = '[Condor-G Scheduler]: batchsystem overide specified in your crab.cfg'
231 gutsche 1.42 common.logger.debug(2,msg)
232 gutsche 1.40 except KeyError: self.batchsystem = ''
233 gutsche 1.1 self.register_data = 0
234    
235     # check if one and only one entry is in $CE_WHITELIST
236    
237     try:
238 gutsche 1.32 tmpGood = string.split(cfg_params['EDG.se_white_list'],',')
239 gutsche 1.1 except KeyError:
240 gutsche 1.20 msg = '[Condor-G Scheduler]: destination site is not selected properly.\n'
241 gutsche 1.32 msg += '[Condor-G Scheduler]: Please select your destination site and only your destination site in the SE_white_list variable of the [EDG] section in your crab.cfg.'
242 gutsche 1.42 common.logger.debug(2,msg)
243 gutsche 1.20 raise CrabException(msg)
244 gutsche 1.40
245 gutsche 1.1 if len(tmpGood) != 1 :
246 gutsche 1.20 msg = '[Condor-G Scheduler]: destination site is not selected properly.\n'
247 gutsche 1.32 msg += '[Condor-G Scheduler]: Please select your destination site and only your destination site in the SE_white_list variable of the [EDG] section in your crab.cfg.'
248 gutsche 1.42 common.logger.debug(2,msg)
249 gutsche 1.20 raise CrabException(msg)
250 gutsche 1.1
251     try:
252     self.UseGT4 = cfg_params['USER.use_gt_4'];
253     except KeyError:
254     self.UseGT4 = 0;
255    
256     self.proxyValid=0
257     # added here because checklistmatch is not used
258     self.checkProxy()
259 gutsche 1.6
260     self._taskId = cfg_params['taskId']
261 gutsche 1.40
262 gutsche 1.18 try: self.jobtypeName = cfg_params['CRAB.jobtype']
263     except KeyError: self.jobtypeName = ''
264 gutsche 1.40
265 gutsche 1.18 try: self.schedulerName = cfg_params['CRAB.scheduler']
266     except KeyError: self.scheduler = ''
267    
268 gutsche 1.1 return
269 gutsche 1.40
270 gutsche 1.1
271     def sched_parameter(self):
272     """
273     Returns file with scheduler-specific parameters
274     """
275 gutsche 1.29 lastDest=''
276     first = []
277     last = []
278     for n in range(common.jobDB.nJobs()):
279     currDest=common.jobDB.destination(n)
280     if (currDest!=lastDest):
281     lastDest = currDest
282     first.append(n)
283     if n != 0:last.append(n-1)
284     if len(first)>len(last) :last.append(common.jobDB.nJobs())
285 gutsche 1.40
286 gutsche 1.29 for i in range(len(first)): # Add loop DS
287     self.param='sched_param_'+str(i)+'.clad'
288     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
289    
290     param_file.write('globusrsl = ')
291    
292     # extraTag maxWallTime
293     if ( self.EDG_clock_time != '' ) :
294     param_file.write('(maxWalltime='+self.EDG_clock_time+')')
295 gutsche 1.40
296 gutsche 1.29 # extraTag additional GLOBUS_RSL
297     if ( self.GLOBUS_RSL != '' ) :
298     param_file.write(self.GLOBUS_RSL)
299    
300     param_file.write(';')
301    
302     param_file.close()
303    
304 gutsche 1.40
305 gutsche 1.1 def wsSetupEnvironment(self):
306     """
307     Returns part of a job script which does scheduler-specific work.
308     """
309     txt = ''
310 gutsche 1.6
311     txt = ''
312 mkirn 1.11 txt += '# strip arguments\n'
313     txt += 'echo "strip arguments"\n'
314     txt += 'args=("$@")\n'
315     txt += 'nargs=$#\n'
316     txt += 'shift $nargs\n'
317 gutsche 1.6 txt += "# job number (first parameter for job wrapper)\n"
318 mkirn 1.11 txt += "NJob=${args[0]}\n"
319 gutsche 1.6
320 gutsche 1.29 txt += 'MonitorJobID=`echo ${NJob}_'+self.hash+'_$GLOBUS_GRAM_JOB_CONTACT`\n'
321 gutsche 1.7 txt += 'SyncGridJobId=`echo $GLOBUS_GRAM_JOB_CONTACT`\n'
322     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
323     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
324     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
325     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
326 gutsche 1.6
327 fanzago 1.2 txt += 'echo "middleware discovery " \n'
328 gutsche 1.18 txt += 'if [ $GRID3_APP_DIR ]; then\n'
329 gutsche 1.6 txt += ' middleware=OSG \n'
330 gutsche 1.24 txt += ' echo "SyncCE=`echo $GLOBUS_GRAM_JOB_CONTACT | cut -d/ -f3 | cut -d: -f1`" | tee -a $RUNTIME_AREA/$repo \n'
331 gutsche 1.6 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
332     txt += ' echo "middleware =$middleware" \n'
333 gutsche 1.1 txt += 'elif [ $OSG_APP ]; then \n'
334 gutsche 1.6 txt += ' middleware=OSG \n'
335 gutsche 1.24 txt += ' echo "SyncCE=`echo $GLOBUS_GRAM_JOB_CONTACT | cut -d/ -f3 | cut -d: -f1`" | tee -a $RUNTIME_AREA/$repo \n'
336 gutsche 1.6 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
337     txt += ' echo "middleware =$middleware" \n'
338 gutsche 1.18 txt += 'elif [ $VO_CMS_SW_DIR ]; then\n'
339     txt += ' middleware=LCG \n'
340 spiga 1.46 # txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
341     txt += ' echo "SyncCE=`glite-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
342 gutsche 1.18 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
343     txt += ' echo "middleware =$middleware" \n'
344 gutsche 1.1 txt += 'else \n'
345 gutsche 1.6 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
346     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
347     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
348     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
349 gutsche 1.7 txt += ' rm -f $RUNTIME_AREA/$repo \n'
350     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
351     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
352 gutsche 1.6 txt += ' exit 1 \n'
353 gutsche 1.1 txt += 'fi\n'
354    
355 gutsche 1.6 txt += '# report first time to DashBoard \n'
356     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
357 gutsche 1.7 txt += 'rm -f $RUNTIME_AREA/$repo \n'
358     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
359     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
360 gutsche 1.6
361 gutsche 1.1 txt += '\n\n'
362    
363     if int(self.copy_data) == 1:
364     if self.SE:
365     txt += 'export SE='+self.SE+'\n'
366     txt += 'echo "SE = $SE"\n'
367     if self.SE_PATH:
368     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
369     txt += 'export SE_PATH='+self.SE_PATH+'\n'
370     txt += 'echo "SE_PATH = $SE_PATH"\n'
371 gutsche 1.40
372 gutsche 1.1 txt += 'export VO='+self.VO+'\n'
373 mkirn 1.15 txt += 'CE=${args[3]}\n'
374 gutsche 1.1 txt += 'echo "CE = $CE"\n'
375     return txt
376    
377     def wsCopyInput(self):
378     """
379     Copy input data from SE to WN
380     """
381     txt = '\n'
382     return txt
383    
384     def wsCopyOutput(self):
385     """
386     Write a CopyResults part of a job script, e.g.
387     to copy produced output into a storage element.
388     """
389     txt = ''
390     if int(self.copy_data) == 1:
391     txt += '#\n'
392     txt += '# Copy output to SE = $SE\n'
393     txt += '#\n'
394 gutsche 1.19 txt += '\n'
395     txt += 'which lcg-cp\n'
396     txt += 'lcgcp_location_exit_code=$?\n'
397     txt += '\n'
398 gutsche 1.20 txt += 'if [ $lcgcp_location_exit_code -eq 1 ]; then\n'
399 gutsche 1.19 txt += ''
400 gutsche 1.9 txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
401     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
402     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
403     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
404     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
405 gutsche 1.19 txt += 'else\n'
406     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
407     txt += ' export X509_CERT_DIR=/etc/grid-security/certificates\n'
408     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
409     txt += 'fi\n'
410 gutsche 1.1 txt += ' for out_file in $file_list ; do\n'
411 slacapra 1.26 txt += ' echo "Trying to copy output file to $SE using srmcp"\n'
412 slacapra 1.47 # txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
413     # txt += ' mkdir -p $HOME/.srmconfig\n'
414 slacapra 1.26 txt += ' echo "srmcp -retry_num 3 -retry_timeout 480000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
415     txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 480000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
416 gutsche 1.1 txt += ' copy_exit_status=$?\n'
417 slacapra 1.26 txt += ' echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
418 gutsche 1.1 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
419     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
420 mkirn 1.28 txt += ' echo "Possible problems with SE = $SE"\n'
421 slacapra 1.26 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
422     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
423     txt += ' echo "srmcp failed, attempting lcg-cp"\n'
424     txt += ' echo "Trying to copy output file to $SE using lcg-cp"\n'
425 mkirn 1.28 if common.logger.debugLevel() >= 5:
426 slacapra 1.26 txt += ' echo "lcg-cp --vo $VO --verbose -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
427     txt += ' exitstring=`lcg-cp --vo $VO --verbose -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
428     else:
429     txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
430     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
431     txt += ' copy_exit_status=$?\n'
432     txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
433     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
434     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
435     txt += ' echo "Problems with SE = $SE"\n'
436     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
437     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
438 mkirn 1.28 txt += ' echo "lcg-cp and srmcp failed!"\n'
439 gutsche 1.43 txt += ' SE=""\n'
440     txt += ' echo "SE = $SE"\n'
441     txt += ' SE_PATH=""\n'
442     txt += ' echo "SE_PATH = $SE_PATH"\n'
443 slacapra 1.26 txt += ' else\n'
444     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
445     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
446     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
447     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
448     txt += ' echo "lcg-cp succeeded"\n'
449     txt += ' fi\n'
450 gutsche 1.1 txt += ' else\n'
451 slacapra 1.26 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
452     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
453     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
454     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
455     txt += ' echo "srmcp succeeded"\n'
456     txt += ' fi\n'
457 gutsche 1.1 txt += ' done\n'
458 gutsche 1.43 txt += ' exit_status=$copy_exit_status\n'
459 gutsche 1.1
460     return txt
461    
462     def wsRegisterOutput(self):
463     """
464     Returns part of a job script which does scheduler-specific work.
465     """
466    
467     txt = ''
468     return txt
469    
470     def loggingInfo(self, id):
471     """
472     retrieve the logging info from logging and bookkeeping and return it
473     """
474 gutsche 1.18 schedd = id.split('//')[0]
475     condor_id = id.split('//')[1]
476     cmd = 'condor_q -l -name ' + schedd + ' ' + condor_id
477     cmd_out = runCommand(cmd)
478 gutsche 1.20 common.logger.debug(5,"Condor-G loggingInfo cmd: "+cmd)
479     common.logger.debug(5,"Condor-G loggingInfo cmd_out: "+cmd_out)
480 gutsche 1.1 return cmd_out
481    
482     def listMatch(self, nj):
483     """
484     Check the compatibility of available resources
485     """
486 slacapra 1.39 #self.checkProxy()
487 gutsche 1.1 return 1
488    
489     def submit(self, nj):
490     """
491     Submit one OSG job.
492     """
493     self.checkProxy()
494    
495     jid = None
496     jdl = common.job_list[nj].jdlFilename()
497    
498     cmd = 'condor_submit ' + jdl
499     cmd_out = runCommand(cmd)
500     if cmd_out != None:
501     tmp = cmd_out.find('submitted to cluster') + 21
502     jid = cmd_out[tmp:].replace('.','')
503     jid = jid.replace('\n','')
504     pass
505     return jid
506    
507     def resubmit(self, nj_list):
508     """
509     Prepare jobs to be submit
510     """
511     return
512    
513     def getExitStatus(self, id):
514     return self.getStatusAttribute_(id, 'exit_code')
515    
516     def queryStatus(self, id):
517     return self.getStatusAttribute_(id, 'status')
518    
519     def queryDest(self, id):
520     return self.getStatusAttribute_(id, 'destination')
521    
522    
523     def getStatusAttribute_(self, id, attr):
524     """ Query a status of the job with id """
525    
526     result = ''
527 gutsche 1.40
528 gutsche 1.1 if ( attr == 'exit_code' ) :
529 gutsche 1.18 jobnum_str = '%06d' % (int(id))
530     # opts = common.work_space.loadSavedOptions()
531 slacapra 1.41 base = string.upper(common.taskDB.dict("jobtype"))
532 gutsche 1.18 log_file = common.work_space.resDir() + base + '_' + jobnum_str + '.stdout'
533     logfile = open(log_file)
534     log_line = logfile.readline()
535     while log_line :
536     log_line = log_line.strip()
537     if log_line.startswith('JOB_EXIT_STATUS') :
538     log_line_split = log_line.split()
539     result = log_line_split[2]
540     pass
541     log_line = logfile.readline()
542     result = ''
543 gutsche 1.1 elif ( attr == 'status' ) :
544 gutsche 1.18 schedd = id.split('//')[0]
545     condor_id = id.split('//')[1]
546     cmd = 'condor_q -name ' + schedd + ' ' + condor_id
547 gutsche 1.1 cmd_out = runCommand(cmd)
548     if cmd_out != None:
549 gutsche 1.18 status_flag = 0
550 gutsche 1.1 for line in cmd_out.splitlines() :
551 gutsche 1.18 if line.strip().startswith(condor_id.strip()) :
552 gutsche 1.1 status = line.strip().split()[5]
553     if ( status == 'I' ):
554     result = 'Scheduled'
555     break
556     elif ( status == 'U' ) :
557     result = 'Ready'
558     break
559     elif ( status == 'H' ) :
560     result = 'Hold'
561     break
562     elif ( status == 'R' ) :
563     result = 'Running'
564     break
565     elif ( status == 'X' ) :
566     result = 'Cancelled'
567     break
568     elif ( status == 'C' ) :
569     result = 'Done'
570     break
571     else :
572     result = 'Done'
573     break
574     else :
575     result = 'Done'
576     else :
577     result = 'Done'
578     elif ( attr == 'destination' ) :
579 gutsche 1.18 seSite = common.jobDB.destination(int(id)-1)[0]
580 gutsche 1.43 # if no site was selected during job splitting (datasetPath=None)
581     # set to self.cfg_params['EDG.se_white_list']
582     if seSite == '' :
583     seSite = self.cfg_params['EDG.se_white_list']
584 gutsche 1.42 oneSite = self.getCEfromSE(seSite).split(':')[0].strip()
585 gutsche 1.18 result = oneSite
586 gutsche 1.1 elif ( attr == 'reason' ) :
587     result = 'status query'
588     elif ( attr == 'stateEnterTime' ) :
589     result = time.asctime(time.gmtime())
590     return result
591    
592     def queryDetailedStatus(self, id):
593     """ Query a detailed status of the job with id """
594     user = os.environ['USER']
595     cmd = 'condor_q -submitter ' + user
596     cmd_out = runCommand(cmd)
597     return cmd_out
598    
599     def getOutput(self, id):
600     """
601     Get output for a finished job with id.
602     Returns the name of directory with results.
603     not needed for condor-g
604     """
605 slacapra 1.39 #self.checkProxy()
606 gutsche 1.1 return ''
607    
608     def cancel(self, id):
609     """ Cancel the condor job with id """
610     self.checkProxy()
611     # query for schedd
612     user = os.environ['USER']
613     cmd = 'condor_q -submitter ' + user
614     cmd_out = runCommand(cmd)
615     schedd=''
616     if cmd_out != None:
617     for line in cmd_out.splitlines() :
618     if line.strip().startswith('--') :
619     schedd = line.strip().split()[6]
620     if line.strip().startswith(id.strip()) :
621 slacapra 1.39 # status = line.strip().split()[5]
622 gutsche 1.1 break
623     cmd = 'condor_rm -name ' + schedd + ' ' + id
624     cmd_out = runCommand(cmd)
625     return cmd_out
626    
627 gutsche 1.18 def createXMLSchScript(self, nj, argsList):
628     """
629     Create a XML-file for BOSS4.
630     """
631    
632     # job steering
633     index = nj - 1
634     job = common.job_list[index]
635     jbt = job.type()
636    
637     # input and output sandboxes
638     inp_sandbox = jbt.inputSandbox(index)
639 slacapra 1.39 #out_sandbox = jbt.outputSandbox(index)
640 gutsche 1.18
641     # title
642     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
643     jt_string = ''
644 gutsche 1.40
645 gutsche 1.18 xml_fname = str(self.jobtypeName)+'.xml'
646     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
647    
648     # TaskName
649     dir = string.split(common.work_space.topDir(), '/')
650     taskName = dir[len(dir)-2]
651 gutsche 1.40
652 gutsche 1.18 xml.write(str(title))
653 spiga 1.44 xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + ' task_info="' + os.environ["X509_USER_PROXY"] + '">\n')
654     # xml.write('<task name="' +str(taskName)+'" sub_path="' + common.work_space.bossCache() + '">\n')
655 gutsche 1.18 xml.write(jt_string)
656    
657     xml.write('<iterator>\n')
658     xml.write('\t<iteratorRule name="ITR1">\n')
659     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
660     xml.write('\t</iteratorRule>\n')
661     xml.write('\t<iteratorRule name="ITR2">\n')
662     for arg in argsList:
663     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
664     pass
665     xml.write('\t</iteratorRule>\n')
666     xml.write('\t<iteratorRule name="ITR3">\n')
667     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
668     xml.write('\t</iteratorRule>\n')
669    
670 spiga 1.44 xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.schedulerName)+'">\n')
671     # xmliwrite('<chain scheduler="'+str(self.schedulerName)+'">\n')
672 gutsche 1.18 xml.write(jt_string)
673    
674    
675     #executable
676    
677     script = job.scriptFilename()
678     xml.write('<program>\n')
679     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
680     xml.write(jt_string)
681 gutsche 1.40
682 gutsche 1.18 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
683     xml.write('<program_types> crabjob </program_types>\n')
684    
685     # input sanbox
686     inp_box = script + ','
687    
688     if inp_sandbox != None:
689     for fl in inp_sandbox:
690     inp_box = inp_box + '' + fl + ','
691     pass
692     pass
693    
694     inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
695     os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
696     os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
697     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
698 gutsche 1.31 os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + ','+\
699     os.path.abspath(os.environ['CRABDIR']+'/python/'+'parseCrabFjr.py')
700 gutsche 1.18
701     if (not jbt.additional_inbox_files == []):
702     inp_box = inp_box + ', '
703     for addFile in jbt.additional_inbox_files:
704     addFile = os.path.abspath(addFile)
705     inp_box = inp_box+''+addFile+','
706     pass
707    
708     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
709     inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
710     xml.write(inp_box)
711 gutsche 1.40
712 gutsche 1.18 # stdout and stderr
713     base = jbt.name()
714     stdout = base + '__ITR3_.stdout'
715     stderr = base + '__ITR3_.stderr'
716    
717     xml.write('<stderr> ' + stderr + '</stderr>\n')
718     xml.write('<stdout> ' + stdout + '</stdout>\n')
719 gutsche 1.40
720 gutsche 1.18 # output sanbox
721     out_box = stdout + ',' + stderr + ','
722    
723 slacapra 1.36 # Stuff to be returned _always_ via sandbox
724     for fl in jbt.output_file_sandbox:
725     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
726     pass
727     pass
728    
729 gutsche 1.18 if int(self.return_data) == 1:
730     for fl in jbt.output_file:
731     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
732     pass
733     pass
734    
735     if out_box[-1] == ',' : out_box = out_box[:-1]
736     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
737     xml.write(out_box)
738 gutsche 1.40
739 gutsche 1.18 xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
740    
741     xml.write('</program>\n')
742    
743     # start writing of extraTags
744     to_write = ''
745    
746     # extraTag universe
747     to_write += 'universe = "&quot;globus&quot;"\n'
748    
749     # extraTag globusscheduler
750    
751 gutsche 1.42 # use bdii to query ce including jobmanager from site
752 gutsche 1.18 seSite = common.jobDB.destination(nj-1)[0]
753 gutsche 1.43 # if no site was selected during job splitting (datasetPath=None)
754     # set to self.cfg_params['EDG.se_white_list']
755     if seSite == '' :
756     seSite = self.cfg_params['EDG.se_white_list']
757 gutsche 1.38 oneSite = self.getCEfromSE(seSite)
758 gutsche 1.42 # do not check the site status check for FNAL (OSG not in BDII)
759     if oneSite.find('fnal.gov') < 0 :
760     # query if site is in production
761     status = cestate_from_ce_bdii(oneSite.split(':')[0].strip())
762     if status != 'Production' :
763     msg = '[Condor-G Scheduler]: Jobs cannot be submitted to site ' + oneSite.split(':')[0].strip() + ' because the site has status ' + status + ' and is currently not operational.\n'
764     msg += '[Condor-G Scheduler]: Please choose another site for your jobs.'
765     common.logger.debug(2,msg)
766     raise CrabException(msg)
767 gutsche 1.40
768 gutsche 1.42 if self.batchsystem != '' :
769     oneSite = oneSite.split('/')[0].strip() + '/' + self.batchsystem
770    
771     to_write += 'globusscheduler = "&quot;' + str(oneSite) + '&quot;"\n'
772 gutsche 1.18
773     # extraTag condor transfer file flag
774     to_write += 'should_transfer_files = "&quot;YES&quot;"\n'
775    
776     # extraTag when to write output
777     to_write += 'when_to_transfer_output = "&quot;ON_EXIT&quot;"\n'
778    
779     # extraTag switch off streaming of stdout
780     to_write += 'stream_output = "&quot;false&quot;"\n'
781    
782     # extraTag switch off streaming of stderr
783     to_write += 'stream_error = "&quot;false&quot;"\n'
784    
785     # extraTag condor logfile
786     condor_log = jbt.name() + '__ITR3_.log'
787     to_write += 'Log = "&quot;' + condor_log + '&quot;"\n'
788    
789     # extraTag condor notification
790     to_write += 'notification="&quot;never&quot;"\n'
791    
792     # extraTag condor queue statement
793     to_write += 'QUEUE = "&quot;1&quot;"\n'
794    
795     if (to_write != ''):
796     xml.write('<extraTags\n')
797     xml.write(to_write)
798     xml.write('/>\n')
799     pass
800    
801     xml.write('</chain>\n')
802    
803     xml.write('</iterator>\n')
804     xml.write('</task>\n')
805    
806     xml.close()
807 gutsche 1.29
808 gutsche 1.18 return
809    
810 gutsche 1.1 def checkProxy(self):
811     """
812     Function to check the Globus proxy.
813     """
814     if (self.proxyValid): return
815 slacapra 1.39 #timeleft = -999
816 gutsche 1.18 minTimeLeft=10*3600 # in seconds
817    
818     mustRenew = 0
819     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
820     timeLeftServer = -999
821     if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
822     mustRenew = 1
823     else:
824     timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
825     if not timeLeftServer or not isInt(timeLeftServer):
826     mustRenew = 1
827     elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
828     mustRenew = 1
829     pass
830     pass
831    
832     if mustRenew:
833 gutsche 1.30 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
834     cmd = 'voms-proxy-init -voms '+self.VO
835     if self.group:
836     cmd += ':/'+self.VO+'/'+self.group
837 gutsche 1.18 if self.role:
838 gutsche 1.30 cmd += '/role='+self.role
839     cmd += ' -valid 192:00'
840 gutsche 1.1 try:
841     # SL as above: damn it!
842     out = os.system(cmd)
843     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
844     except:
845     msg = "Unable to create a valid proxy!\n"
846     raise CrabException(msg)
847     pass
848 gutsche 1.18
849 gutsche 1.1 self.proxyValid=1
850     return
851 gutsche 1.40
852 gutsche 1.45 def submitTout(self, list):
853     return 120
854