ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.103
Committed: Wed Oct 25 15:40:37 2006 UTC (18 years, 6 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_4_1_pre1
Changes since 1.102: +10 -0 lines
Log Message:
add new card <EDG.dont_check_proxy> which swich off the proxy check (as well as proxy delegation) as requested by Brian Bockelman

File Contents

# Content
1 from Scheduler import Scheduler
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 from EdgConfig import *
6 import common
7
8 import os, sys, time
9
10 class SchedulerEdg(Scheduler):
11 def __init__(self):
12 Scheduler.__init__(self,"EDG")
13 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14 "children_hist","children_num","children_states","condorId","condor_jdl", \
15 "cpuTime","destination", "done_code","exit_code","expectFrom", \
16 "expectUpdate","globusId","jdl","jobId","jobtype", \
17 "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18 "owner","parent_job", "reason","resubmitted","rsl","seed",\
19 "stateEnterTime","stateEnterTimes","subjob_failed", \
20 "user tags" , "status" , "status_code","hierarchy"]
21
22 return
23
24 def configure(self, cfg_params):
25
26 try:
27 RB=cfg_params["EDG.rb"]
28 self.rb_param_file=self.rb_configure(RB)
29 except KeyError:
30 self.rb_param_file=''
31 pass
32 try:
33 self.proxyServer = cfg_params["EDG.proxy_server"]
34 except KeyError:
35 self.proxyServer = 'myproxy.cern.ch'
36 common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
37
38 try:
39 self.group = cfg_params["EDG.group"]
40 except KeyError:
41 self.group = None
42
43 try:
44 self.role = cfg_params["EDG.role"]
45 except KeyError:
46 self.role = None
47
48 try: self.LCG_version = cfg_params["EDG.lcg_version"]
49 except KeyError: self.LCG_version = '2'
50
51 try:
52 self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
53 except KeyError:
54 self.EDG_ce_black_list = ''
55
56 try:
57 self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
58 except KeyError: self.EDG_ce_white_list = ''
59
60 try: self.VO = cfg_params['EDG.virtual_organization']
61 except KeyError: self.VO = 'cms'
62
63 try: self.copy_input_data = cfg_params["USER.copy_input_data"]
64 except KeyError: self.copy_input_data = 0
65
66 try: self.return_data = cfg_params['USER.return_data']
67 except KeyError: self.return_data = 0
68
69 try:
70 self.copy_data = cfg_params["USER.copy_data"]
71 if int(self.copy_data) == 1:
72 try:
73 self.SE = cfg_params['USER.storage_element']
74 self.SE_PATH = cfg_params['USER.storage_path']
75 except KeyError:
76 msg = "Error. The [USER] section does not have 'storage_element'"
77 msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
78 common.logger.message(msg)
79 raise CrabException(msg)
80 except KeyError: self.copy_data = 0
81
82 if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
83 msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
84 msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
85 raise CrabException(msg)
86
87 try:
88 self.lfc_host = cfg_params['EDG.lfc_host']
89 except KeyError:
90 msg = "Error. The [EDG] section does not have 'lfc_host' value"
91 msg = msg + " it's necessary to know the LFC host name"
92 common.logger.message(msg)
93 raise CrabException(msg)
94 try:
95 self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
96 except KeyError:
97 msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
98 msg = msg + " it's necessary to know the catalog type"
99 common.logger.message(msg)
100 raise CrabException(msg)
101 try:
102 self.lfc_home = cfg_params['EDG.lfc_home']
103 except KeyError:
104 msg = "Error. The [EDG] section does not have 'lfc_home' value"
105 msg = msg + " it's necessary to know the home catalog dir"
106 common.logger.message(msg)
107 raise CrabException(msg)
108
109 try:
110 self.register_data = cfg_params["USER.register_data"]
111 if int(self.register_data) == 1:
112 try:
113 self.LFN = cfg_params['USER.lfn_dir']
114 except KeyError:
115 msg = "Error. The [USER] section does not have 'lfn_dir' value"
116 msg = msg + " it's necessary for LCF registration"
117 common.logger.message(msg)
118 raise CrabException(msg)
119 except KeyError: self.register_data = 0
120
121 if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
122 msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
123 msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
124 common.logger.message(msg)
125 raise CrabException(msg)
126
127 try: self.EDG_requirements = cfg_params['EDG.requirements']
128 except KeyError: self.EDG_requirements = ''
129
130 try: self.EDG_addJdlParam = string.split(cfg_params['EDG.additional_jdl_parameters'],',')
131 except KeyError: self.EDG_addJdlParam = []
132
133 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
134 except KeyError: self.EDG_retry_count = ''
135
136 try: self.EDG_shallow_retry_count= cfg_params['EDG.shallow_retry_count']
137 except KeyError: self.EDG_shallow_retry_count = ''
138
139 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
140 except KeyError: self.EDG_clock_time= ''
141
142 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
143 except KeyError: self.EDG_cpu_time = ''
144
145 # Add EDG_WL_LOCATION to the python path
146
147 try:
148 path = os.environ['EDG_WL_LOCATION']
149 except:
150 msg = "Error: the EDG_WL_LOCATION variable is not set."
151 raise CrabException(msg)
152
153 libPath=os.path.join(path, "lib")
154 sys.path.append(libPath)
155 libPath=os.path.join(path, "lib", "python")
156 sys.path.append(libPath)
157
158 self.proxyValid=0
159
160 try:
161 self._taskId = cfg_params['taskId']
162 except:
163 self._taskId = ''
164
165 try: self.jobtypeName = cfg_params['CRAB.jobtype']
166 except KeyError: self.jobtypeName = ''
167
168 try: self.schedulerName = cfg_params['CRAB.scheduler']
169 except KeyError: self.scheduler = ''
170
171 try: self.dontCheckProxy=cfg_params["EDG.dont_check_proxy"]
172 except KeyError: self.dontCheckProxy = 0
173
174 return
175
176
177 def rb_configure(self, RB):
178 self.edg_config = ''
179 self.edg_config_vo = ''
180 self.rb_param_file = ''
181
182 edgConfig = EdgConfig(RB)
183 self.edg_config = edgConfig.config()
184 self.edg_config_vo = edgConfig.configVO()
185
186 if (self.edg_config and self.edg_config_vo != ''):
187 self.rb_param_file = 'RBconfig = "'+self.edg_config+'";\nRBconfigVO = "'+self.edg_config_vo+'";'
188 #print "rb_param_file = ", self.rb_param_file
189 return self.rb_param_file
190
191
192 def sched_parameter(self):
193 """
194 Returns file with requirements and scheduler-specific parameters
195 """
196 index = int(common.jobDB.nJobs()) - 1
197 job = common.job_list[index]
198 jbt = job.type()
199
200 lastBlock=-1
201 first = []
202 for n in range(common.jobDB.nJobs()):
203 currBlock=common.jobDB.block(n)
204 if (currBlock!=lastBlock):
205 lastBlock = currBlock
206 first.append(n)
207
208 req = ''
209 req = req + jbt.getRequirements()
210
211 if self.EDG_requirements:
212 if (req == ' '):
213 req = req + self.EDG_requirements
214 else:
215 req = req + ' && ' + self.EDG_requirements
216
217 if self.EDG_ce_white_list:
218 ce_white_list = string.split(self.EDG_ce_white_list,',')
219 for i in range(len(ce_white_list)):
220 if i == 0:
221 if (req == ' '):
222 req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
223 else:
224 req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
225 pass
226 else:
227 req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
228 req = req + ')'
229
230 if self.EDG_ce_black_list:
231 ce_black_list = string.split(self.EDG_ce_black_list,',')
232 for ce in ce_black_list:
233 if (req == ' '):
234 req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
235 else:
236 req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
237 pass
238 if self.EDG_clock_time:
239 if (req == ' '):
240 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
241 else:
242 req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
243
244 if self.EDG_cpu_time:
245 if (req == ' '):
246 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
247 else:
248 req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
249
250 for i in range(len(first)): # Add loop DS
251 self.param='sched_param_'+str(i)+'.clad'
252 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
253
254 itr4=self.findSites_(first[i])
255 for arg in itr4:
256 req = req + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
257 param_file.write('Requirements = '+req +';\n')
258
259 if (self.rb_param_file != ''):
260 param_file.write(self.rb_param_file)
261
262 if len(self.EDG_addJdlParam):
263 for p in self.EDG_addJdlParam:
264 param_file.write(p)
265
266 param_file.close()
267
268
269 def wsSetupEnvironment(self):
270 """
271 Returns part of a job script which does scheduler-specific work.
272 """
273 txt = ''
274 txt += '# strip arguments\n'
275 txt += 'echo "strip arguments"\n'
276 txt += 'args=("$@")\n'
277 txt += 'nargs=$#\n'
278 txt += 'shift $nargs\n'
279 txt += "# job number (first parameter for job wrapper)\n"
280 #txt += "NJob=$1\n"
281 txt += "NJob=${args[0]}\n"
282
283 txt += '# job identification to DashBoard \n'
284 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
285 txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
286 txt += 'MonitorID=`echo ' + self._taskId + '`\n'
287 txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
288 txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
289 txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
290
291 txt += 'echo "middleware discovery " \n'
292 txt += 'if [ $GRID3_APP_DIR ]; then\n'
293 txt += ' middleware=OSG \n'
294 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
295 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
296 txt += ' echo "middleware =$middleware" \n'
297 txt += 'elif [ $OSG_APP ]; then \n'
298 txt += ' middleware=OSG \n'
299 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
300 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
301 txt += ' echo "middleware =$middleware" \n'
302 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
303 txt += ' middleware=LCG \n'
304 txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
305 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
306 txt += ' echo "middleware =$middleware" \n'
307 txt += 'else \n'
308 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
309 txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
310 txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
311 txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
312 txt += ' rm -f $RUNTIME_AREA/$repo \n'
313 txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
314 txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
315 txt += ' exit 1 \n'
316 txt += 'fi \n'
317
318 txt += '# report first time to DashBoard \n'
319 txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
320 txt += 'rm -f $RUNTIME_AREA/$repo \n'
321 txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
322 txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
323
324 txt += '\n\n'
325
326 if int(self.copy_data) == 1:
327 if self.SE:
328 txt += 'export SE='+self.SE+'\n'
329 txt += 'echo "SE = $SE"\n'
330 if self.SE_PATH:
331 if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
332 txt += 'export SE_PATH='+self.SE_PATH+'\n'
333 txt += 'echo "SE_PATH = $SE_PATH"\n'
334
335 txt += 'export VO='+self.VO+'\n'
336 ### add some line for LFC catalog setting
337 txt += 'if [ $middleware == LCG ]; then \n'
338 txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
339 txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
340 txt += ' fi\n'
341 txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
342 txt += ' export LFC_HOST='+self.lfc_host+'\n'
343 txt += ' fi\n'
344 txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
345 txt += ' export LFC_HOME='+self.lfc_home+'\n'
346 txt += ' fi\n'
347 txt += 'elif [ $middleware == OSG ]; then\n'
348 txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
349 txt += 'fi\n'
350 #####
351 if int(self.register_data) == 1:
352 txt += 'if [ $middleware == LCG ]; then \n'
353 txt += ' export LFN='+self.LFN+'\n'
354 txt += ' lfc-ls $LFN\n'
355 txt += ' result=$?\n'
356 txt += ' echo $result\n'
357 ### creation of LFN dir in LFC catalog, under /grid/cms dir
358 txt += ' if [ $result != 0 ]; then\n'
359 txt += ' lfc-mkdir $LFN\n'
360 txt += ' result=$?\n'
361 txt += ' echo $result\n'
362 txt += ' fi\n'
363 txt += 'elif [ $middleware == OSG ]; then\n'
364 txt += ' echo " Files registration to be implemented for OSG"\n'
365 txt += 'fi\n'
366 txt += '\n'
367
368 if self.VO:
369 txt += 'export VO='+self.VO+'\n'
370 if self.LFN:
371 txt += 'if [ $middleware == LCG ]; then \n'
372 txt += ' export LFN='+self.LFN+'\n'
373 txt += 'fi\n'
374 txt += '\n'
375
376 txt += 'if [ $middleware == LCG ]; then\n'
377 txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
378 txt += ' echo "CloseCEs = $CloseCEs"\n'
379 txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
380 txt += ' echo "CE = $CE"\n'
381 txt += 'elif [ $middleware == OSG ]; then \n'
382 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
383 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
384 txt += ' else \n'
385 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
386 txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
387 txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
388 txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
389 txt += ' rm -f $RUNTIME_AREA/$repo \n'
390 txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
391 txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
392 txt += ' exit 1 \n'
393 txt += ' fi \n'
394 txt += 'fi \n'
395
396 return txt
397
398 def wsCopyInput(self):
399 """
400 Copy input data from SE to WN
401 """
402 txt = ''
403 if not self.copy_input_data: return txt
404
405 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
406 txt += 'if [ $middleware == OSG ]; then\n'
407 txt += ' #\n'
408 txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
409 txt += ' #\n'
410 txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
411 txt += 'elif [ $middleware == LCG ]; then \n'
412 txt += ' #\n'
413 txt += ' # Copy Input Data from SE to this WN\n'
414 txt += ' #\n'
415 ### changed by georgia (put a loop copying more than one input files per jobs)
416 txt += ' for input_file in $cur_file_list \n'
417 txt += ' do \n'
418 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
419 txt += ' copy_input_exit_status=$?\n'
420 txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
421 txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
422 txt += ' echo "Problems with copying to WN" \n'
423 txt += ' else \n'
424 txt += ' echo "input copied into WN" \n'
425 txt += ' fi \n'
426 txt += ' done \n'
427 ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
428 txt += ' for file in $cur_pu_list \n'
429 txt += ' do \n'
430 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
431 txt += ' copy_input_pu_exit_status=$?\n'
432 txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
433 txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
434 txt += ' echo "Problems with copying pu to WN" \n'
435 txt += ' else \n'
436 txt += ' echo "input pu files copied into WN" \n'
437 txt += ' fi \n'
438 txt += ' done \n'
439 txt += ' \n'
440 txt += ' ### Check SCRATCH space available on WN : \n'
441 txt += ' df -h \n'
442 txt += 'fi \n'
443
444 return txt
445
446 def wsCopyOutput(self):
447 """
448 Write a CopyResults part of a job script, e.g.
449 to copy produced output into a storage element.
450 """
451 txt = ''
452 if int(self.copy_data) == 1:
453 txt += '#\n'
454 txt += '# Copy output to SE = $SE\n'
455 txt += '#\n'
456 txt += ' if [ $middleware == OSG ]; then\n'
457 txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
458 txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
459 txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
460 txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
461 txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
462 txt += ' fi \n'
463
464 txt += ' for out_file in $file_list ; do\n'
465 txt += ' echo "Trying to copy output file to $SE using srmcp"\n'
466 txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
467 txt += ' mkdir -p $HOME/.srmconfig\n'
468 txt += ' if [ $middleware == LCG ]; then\n'
469 txt += ' echo "srmcp -retry_num 3 -retry_timeout 480000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
470 txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 480000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
471 txt += ' elif [ $middleware == OSG ]; then\n'
472 txt += ' echo "srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
473 txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
474 txt += ' fi \n'
475 txt += ' copy_exit_status=$?\n'
476 txt += ' echo "COPY_EXIT_STATUS for srmcp = $copy_exit_status"\n'
477 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
478
479 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
480 txt += ' echo "Possible problem with SE = $SE"\n'
481 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
482 txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
483 txt += ' echo "srmcp failed, attempting lcg-cp."\n'
484 if common.logger.debugLevel() >= 5:
485 txt += ' echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
486 txt += ' exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
487 else:
488 txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
489 txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
490 txt += ' copy_exit_status=$?\n'
491 txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
492 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
493
494 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
495 txt += ' echo "Problems with SE = $SE"\n'
496 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
497 txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
498 txt += ' echo "srmcp and lcg-cp and failed!"\n'
499 txt += ' else\n'
500 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
501 txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
502 txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
503 txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
504 txt += ' echo "lcg-cp succeeded"\n'
505 txt += ' fi\n'
506 txt += ' else\n'
507 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
508 txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
509 txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
510 txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
511 txt += ' echo "srmcp succeeded"\n'
512 txt += ' fi\n'
513 txt += ' done\n'
514 return txt
515
516 def wsRegisterOutput(self):
517 """
518 Returns part of a job script which does scheduler-specific work.
519 """
520
521 txt = ''
522 if int(self.register_data) == 1:
523 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
524 txt += 'if [ $middleware == OSG ]; then\n'
525 txt += ' #\n'
526 txt += ' # Register output to LFC deactivated in OSG mode\n'
527 txt += ' #\n'
528 txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
529 txt += 'elif [ $middleware == LCG ]; then \n'
530 txt += '#\n'
531 txt += '# Register output to LFC\n'
532 txt += '#\n'
533 txt += ' if [ $copy_exit_status -eq 0 ]; then\n'
534 txt += ' for out_file in $file_list ; do\n'
535 txt += ' echo "Trying to register the output file into LFC"\n'
536 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1"\n'
537 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1 \n'
538 txt += ' register_exit_status=$?\n'
539 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
540 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
541 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
542 txt += ' echo "Problems with the registration to LFC" \n'
543 txt += ' echo "Try with srm protocol" \n'
544 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1"\n'
545 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1 \n'
546 txt += ' register_exit_status=$?\n'
547 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
548 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
549 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
550 txt += ' echo "Problems with the registration into LFC" \n'
551 txt += ' fi \n'
552 txt += ' else \n'
553 txt += ' echo "output registered to LFC"\n'
554 txt += ' fi \n'
555 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
556 txt += ' done\n'
557 txt += ' else \n'
558 txt += ' echo "Trying to copy output file to CloseSE"\n'
559 txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
560 txt += ' for out_file in $file_list ; do\n'
561 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1" \n'
562 txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1 \n'
563 txt += ' register_exit_status=$?\n'
564 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
565 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
566 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
567 txt += ' echo "Problems with CloseSE or Catalog" \n'
568 txt += ' else \n'
569 txt += ' echo "The program was successfully executed"\n'
570 txt += ' echo "SE = $CLOSE_SE"\n'
571 txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
572 txt += ' fi \n'
573 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
574 txt += ' done\n'
575 txt += ' fi \n'
576 txt += ' exit_status=$register_exit_status\n'
577 txt += 'fi \n'
578 return txt
579
580 def loggingInfo(self, id):
581 """
582 retrieve the logging info from logging and bookkeeping and return it
583 """
584 self.checkProxy()
585 cmd = 'edg-job-get-logging-info -v 2 ' + id
586 #cmd_out = os.popen(cmd)
587 cmd_out = runCommand(cmd)
588 return cmd_out
589
590 def getExitStatus(self, id):
591 return self.getStatusAttribute_(id, 'exit_code')
592
593 def queryStatus(self, id):
594 return self.getStatusAttribute_(id, 'status')
595
596 def queryDest(self, id):
597 return self.getStatusAttribute_(id, 'destination')
598
599
600 def getStatusAttribute_(self, id, attr):
601 """ Query a status of the job with id """
602
603 self.checkProxy()
604 hstates = {}
605 Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
606 # Bypass edg-job-status interfacing directly to C++ API
607 # Job attribute vector to retrieve status without edg-job-status
608 level = 0
609 # Instance of the Status class provided by LB API
610 jobStat = Status()
611 st = 0
612 #print id, level, attr, self.states.index(attr)
613 jobStat.getStatus(id, level)
614 #print jobStat.loadStatus(st)
615 err, apiMsg = jobStat.get_error()
616 if err:
617 common.logger.debug(5,'Error caught' + apiMsg)
618 return None
619 else:
620 for i in range(len(self.states)):
621 # Fill an hash table with all information retrieved from LB API
622 hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
623 #print i, jobStat.loadStatus(st)[i]
624 result = jobStat.loadStatus(st)[self.states.index(attr)]
625 #print str(result)
626 return result
627
628 def queryDetailedStatus(self, id):
629 """ Query a detailed status of the job with id """
630 cmd = 'edg-job-status '+id
631 cmd_out = runCommand(cmd)
632 return cmd_out
633
634 ##### FEDE ######
635 def findSites_(self, n):
636 itr4 =[]
637 sites = common.jobDB.destination(n)
638 if len(sites)>0 and sites[0]=="Any":
639 return itr4
640 itr = ''
641 if sites != [""]:#CarlosDaniele
642 for site in sites:
643 #itr = itr + 'target.GlueSEUniqueID==&quot;'+site+'&quot; || '
644 itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
645 itr = itr[0:-4]
646 itr4.append( itr )
647 return itr4
648
649 def createXMLSchScript(self, nj, argsList):
650 # def createXMLSchScript(self, nj):
651
652 """
653 Create a XML-file for BOSS4.
654 """
655 # job = common.job_list[nj]
656 """
657 INDY
658 [begin] FIX-ME:
659 I would pass jobType instead of job
660 """
661 index = nj - 1
662 job = common.job_list[index]
663 jbt = job.type()
664
665 inp_sandbox = jbt.inputSandbox(index)
666 out_sandbox = jbt.outputSandbox(index)
667 """
668 [end] FIX-ME
669 """
670
671
672 title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
673 jt_string = ''
674
675 xml_fname = str(self.jobtypeName)+'.xml'
676 xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
677
678 #TaskName
679 dir = string.split(common.work_space.topDir(), '/')
680 taskName = dir[len(dir)-2]
681
682 to_writeReq = ''
683 to_write = ''
684
685 req=' '
686 req = req + jbt.getRequirements()
687
688 if self.EDG_requirements:
689 if (req == ' '):
690 req = req + self.EDG_requirements
691 else:
692 req = req + ' && ' + self.EDG_requirements
693 if self.EDG_ce_white_list:
694 ce_white_list = string.split(self.EDG_ce_white_list,',')
695 for i in range(len(ce_white_list)):
696 if i == 0:
697 if (req == ' '):
698 req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
699 else:
700 req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
701 pass
702 else:
703 req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
704 req = req + ')'
705
706 if self.EDG_ce_black_list:
707 ce_black_list = string.split(self.EDG_ce_black_list,',')
708 for ce in ce_black_list:
709 if (req == ' '):
710 req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
711 else:
712 req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
713 pass
714 if self.EDG_clock_time:
715 if (req == ' '):
716 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
717 else:
718 req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
719
720 if self.EDG_cpu_time:
721 if (req == ' '):
722 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
723 else:
724 req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
725
726 if ( self.EDG_retry_count ):
727 to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
728 pass
729
730 if ( self.EDG_shallow_retry_count ):
731 to_write = to_write + 'ShallowRetryCount = "'+self.EDG_shallow_retry_count+'"\n'
732 pass
733
734 to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
735 to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
736
737 #TaskName
738 dir = string.split(common.work_space.topDir(), '/')
739 taskName = dir[len(dir)-2]
740
741 xml.write(str(title))
742 xml.write('<task name="' +str(taskName)+'">\n')
743 xml.write(jt_string)
744
745 if (to_write != ''):
746 xml.write('<extraTags\n')
747 xml.write(to_write)
748 xml.write('/>\n')
749 pass
750
751 xml.write('<iterator>\n')
752 xml.write('\t<iteratorRule name="ITR1">\n')
753 xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
754 xml.write('\t</iteratorRule>\n')
755 xml.write('\t<iteratorRule name="ITR2">\n')
756 for arg in argsList:
757 xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
758 pass
759 xml.write('\t</iteratorRule>\n')
760 #print jobList
761 xml.write('\t<iteratorRule name="ITR3">\n')
762 xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
763 xml.write('\t</iteratorRule>\n')
764
765 '''
766 indy: here itr4
767 '''
768
769
770 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
771 xml.write(jt_string)
772
773 #executable
774
775 """
776 INDY
777 script depends on jobType: it should be probably get in a different way
778 """
779 script = job.scriptFilename()
780 xml.write('<program>\n')
781 xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
782 xml.write(jt_string)
783
784 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
785 xml.write('<program_types> crabjob </program_types>\n')
786 inp_box = script + ','
787
788 if inp_sandbox != None:
789 for fl in inp_sandbox:
790 inp_box = inp_box + '' + fl + ','
791 pass
792 pass
793
794 inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
795 os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
796 os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
797 os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
798 os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + ','+\
799 os.path.abspath(os.environ['CRABDIR']+'/python/'+'parseCrabFjr.py')
800
801 if (not jbt.additional_inbox_files == []):
802 inp_box = inp_box + ','
803 for addFile in jbt.additional_inbox_files:
804 addFile = os.path.abspath(addFile)
805 inp_box = inp_box+''+addFile+','
806 pass
807
808 if inp_box[-1] == ',' : inp_box = inp_box[:-1]
809 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
810 xml.write(inp_box)
811
812 base = jbt.name()
813 stdout = base + '__ITR3_.stdout'
814 stderr = base + '__ITR3_.stderr'
815
816 xml.write('<stderr> ' + stderr + '</stderr>\n')
817 xml.write('<stdout> ' + stdout + '</stdout>\n')
818
819
820 out_box = stdout + ',' + \
821 stderr + ',.BrokerInfo,'
822
823 """
824 if int(self.return_data) == 1:
825 if out_sandbox != None:
826 for fl in out_sandbox:
827 out_box = out_box + '' + fl + ','
828 pass
829 pass
830 pass
831 """
832
833 """
834 INDY
835 something similar should be also done for infiles (if it makes sense!)
836 """
837 if int(self.return_data) == 1:
838 for fl in jbt.output_file:
839 out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
840 pass
841 pass
842
843 if out_box[-1] == ',' : out_box = out_box[:-1]
844 out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
845 xml.write(out_box)
846
847 xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
848
849 xml.write('</program>\n')
850 xml.write('</chain>\n')
851
852 xml.write('</iterator>\n')
853 xml.write('</task>\n')
854
855 xml.close()
856
857
858 return
859
860 def checkProxy(self):
861 """
862 Function to check the Globus proxy.
863 """
864 if (self.proxyValid): return
865
866 ### Just return if asked to do so
867 if (self.dontCheckProxy):
868 self.proxyValid=1
869 return
870
871 timeleft = -999
872 minTimeLeft=10*3600 # in seconds
873
874 minTimeLeftServer = 100 # in hours
875
876 mustRenew = 0
877 timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
878 timeLeftServer = -999
879 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
880 mustRenew = 1
881 else:
882 timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
883 if not timeLeftServer or not isInt(timeLeftServer):
884 mustRenew = 1
885 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
886 mustRenew = 1
887 pass
888 pass
889
890 if mustRenew:
891 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
892 cmd = 'voms-proxy-init -voms '+self.VO
893 if self.group:
894 cmd += ':/'+self.VO+'/'+self.group
895 if self.role:
896 cmd += '/role='+self.role
897 cmd += ' -valid 192:00'
898 try:
899 # SL as above: damn it!
900 common.logger.debug(10,cmd)
901 out = os.system(cmd)
902 if (out>0): raise CrabException("Unable to create a valid proxy!\n")
903 except:
904 msg = "Unable to create a valid proxy!\n"
905 raise CrabException(msg)
906 pass
907
908 ## now I do have a voms proxy valid, and I check the myproxy server
909 renewProxy = 0
910 cmd = 'myproxy-info -d -s '+self.proxyServer
911 cmd_out = runCommand(cmd,0,20)
912 if not cmd_out:
913 common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
914 renewProxy = 1
915 else:
916 # if myproxy exist but not long enough, renew
917 reTime = re.compile( r'timeleft: (\d+)' )
918 #print "<"+str(reTime.search( cmd_out ).group(1))+">"
919 if reTime.match( cmd_out ):
920 time = reTime.search( line ).group(1)
921 if time < minTimeLeftServer:
922 renewProxy = 1
923 common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
924 pass
925 pass
926
927 # if not, create one.
928 if renewProxy:
929 cmd = 'myproxy-init -d -n -s '+self.proxyServer
930 out = os.system(cmd)
931 if (out>0):
932 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
933 pass
934
935 # cache proxy validity
936 self.proxyValid=1
937 return
938
939 def configOpt_(self):
940 edg_ui_cfg_opt = ' '
941 if self.edg_config:
942 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
943 if self.edg_config_vo:
944 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
945 return edg_ui_cfg_opt