ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.93
Committed: Fri Oct 6 22:24:17 2006 UTC (18 years, 6 months ago) by mkirn
Content type: text/x-python
Branch: MAIN
Changes since 1.92: +2 -7 lines
Log Message:
Changed messages, fixed minor bug for SE output

File Contents

# Content
1 from Scheduler import Scheduler
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 from EdgConfig import *
6 import common
7
8 import os, sys, time
9
10 class SchedulerEdg(Scheduler):
11 def __init__(self):
12 Scheduler.__init__(self,"EDG")
13 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14 "children_hist","children_num","children_states","condorId","condor_jdl", \
15 "cpuTime","destination", "done_code","exit_code","expectFrom", \
16 "expectUpdate","globusId","jdl","jobId","jobtype", \
17 "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18 "owner","parent_job", "reason","resubmitted","rsl","seed",\
19 "stateEnterTime","stateEnterTimes","subjob_failed", \
20 "user tags" , "status" , "status_code","hierarchy"]
21 return
22
23 def configure(self, cfg_params):
24
25 try:
26 RB = cfg_params["EDG.rb"]
27 edgConfig = EdgConfig(RB)
28 self.edg_config = edgConfig.config()
29 self.edg_config_vo = edgConfig.configVO()
30 except KeyError:
31 self.edg_config = ''
32 self.edg_config_vo = ''
33
34 try:
35 self.proxyServer = cfg_params["EDG.proxy_server"]
36 except KeyError:
37 self.proxyServer = 'myproxy.cern.ch'
38 common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39
40 try:
41 self.group = cfg_params["EDG.group"]
42 except KeyError:
43 self.group = None
44
45 try:
46 self.role = cfg_params["EDG.role"]
47 except KeyError:
48 self.role = None
49
50 try: self.LCG_version = cfg_params["EDG.lcg_version"]
51 except KeyError: self.LCG_version = '2'
52
53 try: self.EDG_requirements = cfg_params['EDG.requirements']
54 except KeyError: self.EDG_requirements = ''
55
56 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
57 except KeyError: self.EDG_retry_count = ''
58
59 try:
60 self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
61 #print "self.EDG_ce_black_list = ", self.EDG_ce_black_list
62 except KeyError:
63 self.EDG_ce_black_list = ''
64
65 try:
66 self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
67 #print "self.EDG_ce_white_list = ", self.EDG_ce_white_list
68 except KeyError: self.EDG_ce_white_list = ''
69
70 try: self.VO = cfg_params['EDG.virtual_organization']
71 except KeyError: self.VO = 'cms'
72
73 try: self.return_data = cfg_params['USER.return_data']
74 except KeyError: self.return_data = 0
75
76 try:
77 self.copy_data = cfg_params["USER.copy_data"]
78 if int(self.copy_data) == 1:
79 try:
80 self.SE = cfg_params['USER.storage_element']
81 self.SE_PATH = cfg_params['USER.storage_path']
82 except KeyError:
83 msg = "Error. The [USER] section does not have 'storage_element'"
84 msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
85 common.logger.message(msg)
86 raise CrabException(msg)
87 except KeyError: self.copy_data = 0
88
89 if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
90 msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
91 msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
92 raise CrabException(msg)
93
94 try:
95 self.lfc_host = cfg_params['EDG.lfc_host']
96 except KeyError:
97 msg = "Error. The [EDG] section does not have 'lfc_host' value"
98 msg = msg + " it's necessary to know the LFC host name"
99 common.logger.message(msg)
100 raise CrabException(msg)
101 try:
102 self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
103 except KeyError:
104 msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
105 msg = msg + " it's necessary to know the catalog type"
106 common.logger.message(msg)
107 raise CrabException(msg)
108 try:
109 self.lfc_home = cfg_params['EDG.lfc_home']
110 except KeyError:
111 msg = "Error. The [EDG] section does not have 'lfc_home' value"
112 msg = msg + " it's necessary to know the home catalog dir"
113 common.logger.message(msg)
114 raise CrabException(msg)
115
116 try:
117 self.register_data = cfg_params["USER.register_data"]
118 if int(self.register_data) == 1:
119 try:
120 self.LFN = cfg_params['USER.lfn_dir']
121 except KeyError:
122 msg = "Error. The [USER] section does not have 'lfn_dir' value"
123 msg = msg + " it's necessary for LCF registration"
124 common.logger.message(msg)
125 raise CrabException(msg)
126 except KeyError: self.register_data = 0
127
128 if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
129 msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
130 msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
131 common.logger.message(msg)
132 raise CrabException(msg)
133
134 try: self.EDG_requirements = cfg_params['EDG.requirements']
135 except KeyError: self.EDG_requirements = ''
136
137 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
138 except KeyError: self.EDG_retry_count = ''
139
140 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
141 except KeyError: self.EDG_clock_time= ''
142
143 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
144 except KeyError: self.EDG_cpu_time = ''
145
146 # Add EDG_WL_LOCATION to the python path
147
148 try:
149 path = os.environ['EDG_WL_LOCATION']
150 except:
151 msg = "Error: the EDG_WL_LOCATION variable is not set."
152 raise CrabException(msg)
153
154 libPath=os.path.join(path, "lib")
155 sys.path.append(libPath)
156 libPath=os.path.join(path, "lib", "python")
157 sys.path.append(libPath)
158
159 self.proxyValid=0
160
161 try:
162 self._taskId = cfg_params['taskId']
163 except:
164 self._taskId = ''
165
166 try: self.jobtypeName = cfg_params['CRAB.jobtype']
167 except KeyError: self.jobtypeName = ''
168
169 try: self.schedulerName = cfg_params['CRAB.scheduler']
170 except KeyError: self.scheduler = ''
171
172 return
173
174
175 def sched_parameter(self):
176 """
177 Returns file with requirements and scheduler-specific parameters
178 """
179 index = int(common.jobDB.nJobs()) - 1
180 job = common.job_list[index]
181 jbt = job.type()
182
183 lastDest=''
184 first = []
185 last = []
186 for n in range(common.jobDB.nJobs()):
187 currDest=common.jobDB.destination(n)
188 if (currDest!=lastDest):
189 lastDest = currDest
190 first.append(n)
191 if n != 0:last.append(n-1)
192 if len(first)>len(last) :last.append(common.jobDB.nJobs())
193
194 req = ''
195 req = req + jbt.getRequirements()
196
197 if self.EDG_requirements:
198 if (req == ' '):
199 req = req + self.EDG_requirements
200 else:
201 req = req + ' && ' + self.EDG_requirements
202 if self.EDG_ce_white_list:
203 ce_white_list = string.split(self.EDG_ce_white_list,',')
204 for i in range(len(ce_white_list)):
205 if i == 0:
206 if (req == ' '):
207 req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
208 else:
209 req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
210 pass
211 else:
212 req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
213 req = req + ')'
214
215 if self.EDG_ce_black_list:
216 ce_black_list = string.split(self.EDG_ce_black_list,',')
217 for ce in ce_black_list:
218 if (req == ' '):
219 req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
220 else:
221 req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
222 pass
223 if self.EDG_clock_time:
224 if (req == ' '):
225 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
226 else:
227 req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
228
229 if self.EDG_cpu_time:
230 if (req == ' '):
231 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
232 else:
233 req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
234
235 for i in range(len(first)): # Add loop DS
236 self.param='sched_param_'+str(i)+'.clad'
237 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
238
239 itr4=self.findSites_(first[i])
240 if (itr4 != []):
241 req1=''
242 for arg in itr4:
243 req1 = req + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
244 param_file.write('Requirements = '+req1 +';\n')
245
246 if (self.edg_config and self.edg_config_vo != ''):
247 param_file.write('RBconfig = "'+self.edg_config+'";\n')
248 param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
249
250 param_file.close()
251
252
253 def wsSetupEnvironment(self):
254 """
255 Returns part of a job script which does scheduler-specific work.
256 """
257 txt = ''
258 txt += '# strip arguments\n'
259 txt += 'echo "strip arguments"\n'
260 txt += 'args=("$@")\n'
261 txt += 'nargs=$#\n'
262 txt += 'shift $nargs\n'
263 txt += "# job number (first parameter for job wrapper)\n"
264 #txt += "NJob=$1\n"
265 txt += "NJob=${args[0]}\n"
266
267 txt += '# job identification to DashBoard \n'
268 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
269 txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
270 txt += 'MonitorID=`echo ' + self._taskId + '`\n'
271 txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
272 txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
273 txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
274
275 txt += 'echo "middleware discovery " \n'
276 txt += 'if [ $GRID3_APP_DIR ]; then\n'
277 txt += ' middleware=OSG \n'
278 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
279 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
280 txt += ' echo "middleware =$middleware" \n'
281 txt += 'elif [ $OSG_APP ]; then \n'
282 txt += ' middleware=OSG \n'
283 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
284 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
285 txt += ' echo "middleware =$middleware" \n'
286 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
287 txt += ' middleware=LCG \n'
288 txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
289 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
290 txt += ' echo "middleware =$middleware" \n'
291 txt += 'else \n'
292 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
293 txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
294 txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
295 txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
296 txt += ' rm -f $RUNTIME_AREA/$repo \n'
297 txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
298 txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
299 txt += ' exit 1 \n'
300 txt += 'fi \n'
301
302 txt += '# report first time to DashBoard \n'
303 txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
304 txt += 'rm -f $RUNTIME_AREA/$repo \n'
305 txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
306 txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
307
308 txt += '\n\n'
309
310 if int(self.copy_data) == 1:
311 if self.SE:
312 txt += 'export SE='+self.SE+'\n'
313 txt += 'echo "SE = $SE"\n'
314 if self.SE_PATH:
315 if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
316 txt += 'export SE_PATH='+self.SE_PATH+'\n'
317 txt += 'echo "SE_PATH = $SE_PATH"\n'
318
319 txt += 'export VO='+self.VO+'\n'
320 ### add some line for LFC catalog setting
321 txt += 'if [ $middleware == LCG ]; then \n'
322 txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
323 txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
324 txt += ' fi\n'
325 txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
326 txt += ' export LFC_HOST='+self.lfc_host+'\n'
327 txt += ' fi\n'
328 txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
329 txt += ' export LFC_HOME='+self.lfc_home+'\n'
330 txt += ' fi\n'
331 txt += 'elif [ $middleware == OSG ]; then\n'
332 txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
333 txt += 'fi\n'
334 #####
335 if int(self.register_data) == 1:
336 txt += 'if [ $middleware == LCG ]; then \n'
337 txt += ' export LFN='+self.LFN+'\n'
338 txt += ' lfc-ls $LFN\n'
339 txt += ' result=$?\n'
340 txt += ' echo $result\n'
341 ### creation of LFN dir in LFC catalog, under /grid/cms dir
342 txt += ' if [ $result != 0 ]; then\n'
343 txt += ' lfc-mkdir $LFN\n'
344 txt += ' result=$?\n'
345 txt += ' echo $result\n'
346 txt += ' fi\n'
347 txt += 'elif [ $middleware == OSG ]; then\n'
348 txt += ' echo " Files registration to be implemented for OSG"\n'
349 txt += 'fi\n'
350 txt += '\n'
351
352 if self.VO:
353 txt += 'export VO='+self.VO+'\n'
354 if self.LFN:
355 txt += 'if [ $middleware == LCG ]; then \n'
356 txt += ' export LFN='+self.LFN+'\n'
357 txt += 'fi\n'
358 txt += '\n'
359
360 txt += 'if [ $middleware == LCG ]; then\n'
361 txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
362 txt += ' echo "CloseCEs = $CloseCEs"\n'
363 txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
364 txt += ' echo "CE = $CE"\n'
365 txt += 'elif [ $middleware == OSG ]; then \n'
366 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
367 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
368 txt += ' else \n'
369 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
370 txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
371 txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
372 txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
373 txt += ' rm -f $RUNTIME_AREA/$repo \n'
374 txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
375 txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
376 txt += ' exit 1 \n'
377 txt += ' fi \n'
378 txt += 'fi \n'
379
380 return txt
381
382 def wsCopyInput(self):
383 """
384 Copy input data from SE to WN
385 """
386 txt = ''
387
388 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
389 txt += 'if [ $middleware == OSG ]; then\n'
390 txt += ' #\n'
391 txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
392 txt += ' #\n'
393 txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
394 txt += 'elif [ $middleware == LCG ]; then \n'
395 txt += ' #\n'
396 txt += ' # Copy Input Data from SE to this WN\n'
397 txt += ' #\n'
398 ### changed by georgia (put a loop copying more than one input files per jobs)
399 txt += ' for input_file in $cur_file_list \n'
400 txt += ' do \n'
401 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
402 txt += ' copy_input_exit_status=$?\n'
403 txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
404 txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
405 txt += ' echo "Problems with copying to WN" \n'
406 txt += ' else \n'
407 txt += ' echo "input copied into WN" \n'
408 txt += ' fi \n'
409 txt += ' done \n'
410 ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
411 txt += ' for file in $cur_pu_list \n'
412 txt += ' do \n'
413 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
414 txt += ' copy_input_pu_exit_status=$?\n'
415 txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
416 txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
417 txt += ' echo "Problems with copying pu to WN" \n'
418 txt += ' else \n'
419 txt += ' echo "input pu files copied into WN" \n'
420 txt += ' fi \n'
421 txt += ' done \n'
422 txt += ' \n'
423 txt += ' ### Check SCRATCH space available on WN : \n'
424 txt += ' df -h \n'
425 txt += 'fi \n'
426
427 return txt
428
429 def wsCopyOutput(self):
430 """
431 Write a CopyResults part of a job script, e.g.
432 to copy produced output into a storage element.
433 """
434 txt = ''
435 if int(self.copy_data) == 1:
436 txt += '#\n'
437 txt += '# Copy output to SE = $SE\n'
438 txt += '#\n'
439 txt += ' if [ $middleware == OSG ]; then\n'
440 txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
441 txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
442 txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
443 txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
444 txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
445 txt += ' fi \n'
446
447 txt += ' for out_file in $file_list ; do\n'
448 txt += ' echo "Trying to copy output file to $SE using srmcp"\n'
449 txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
450 txt += ' mkdir -p $HOME/.srmconfig\n'
451 txt += ' if [ $middleware == LCG ]; then\n'
452 txt += ' echo "srmcp -retry_num 3 -retry_timeout 480000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
453 txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 480000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
454 txt += ' elif [ $middleware == OSG ]; then\n'
455 txt += ' echo "srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
456 txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
457 txt += ' fi \n'
458 txt += ' copy_exit_status=$?\n'
459 txt += ' echo "COPY_EXIT_STATUS for srmcp = $copy_exit_status"\n'
460 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
461
462 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
463 txt += ' echo "Possible problem with SE = $SE"\n'
464 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
465 txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
466 txt += ' echo "srmcp failed, attempting lcg-cp."\n'
467 if common.logger.debugLevel() >= 5:
468 txt += ' echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
469 txt += ' exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
470 else:
471 txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
472 txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
473 txt += ' copy_exit_status=$?\n'
474 txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
475 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
476
477 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
478 txt += ' echo "Problems with SE = $SE"\n'
479 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
480 txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
481 txt += ' echo "srmcp and lcg-cp and failed!"\n'
482 txt += ' else\n'
483 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
484 txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
485 txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
486 txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
487 txt += ' echo "lcg-cp succeeded"\n'
488 txt += ' fi\n'
489 txt += ' else\n'
490 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
491 txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
492 txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
493 txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
494 txt += ' echo "srmcp succeeded"\n'
495 txt += ' fi\n'
496 txt += ' done\n'
497 return txt
498
499 def wsRegisterOutput(self):
500 """
501 Returns part of a job script which does scheduler-specific work.
502 """
503
504 txt = ''
505 if int(self.register_data) == 1:
506 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
507 txt += 'if [ $middleware == OSG ]; then\n'
508 txt += ' #\n'
509 txt += ' # Register output to LFC deactivated in OSG mode\n'
510 txt += ' #\n'
511 txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
512 txt += 'elif [ $middleware == LCG ]; then \n'
513 txt += '#\n'
514 txt += '# Register output to LFC\n'
515 txt += '#\n'
516 txt += ' if [ $copy_exit_status -eq 0 ]; then\n'
517 txt += ' for out_file in $file_list ; do\n'
518 txt += ' echo "Trying to register the output file into LFC"\n'
519 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1"\n'
520 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1 \n'
521 txt += ' register_exit_status=$?\n'
522 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
523 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
524 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
525 txt += ' echo "Problems with the registration to LFC" \n'
526 txt += ' echo "Try with srm protocol" \n'
527 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1"\n'
528 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1 \n'
529 txt += ' register_exit_status=$?\n'
530 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
531 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
532 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
533 txt += ' echo "Problems with the registration into LFC" \n'
534 txt += ' fi \n'
535 txt += ' else \n'
536 txt += ' echo "output registered to LFC"\n'
537 txt += ' fi \n'
538 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
539 txt += ' done\n'
540 txt += ' else \n'
541 txt += ' echo "Trying to copy output file to CloseSE"\n'
542 txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
543 txt += ' for out_file in $file_list ; do\n'
544 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1" \n'
545 txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1 \n'
546 txt += ' register_exit_status=$?\n'
547 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
548 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
549 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
550 txt += ' echo "Problems with CloseSE or Catalog" \n'
551 txt += ' else \n'
552 txt += ' echo "The program was successfully executed"\n'
553 txt += ' echo "SE = $CLOSE_SE"\n'
554 txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
555 txt += ' fi \n'
556 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
557 txt += ' done\n'
558 txt += ' fi \n'
559 txt += ' exit_status=$register_exit_status\n'
560 txt += 'fi \n'
561 return txt
562
563 def loggingInfo(self, id):
564 """
565 retrieve the logging info from logging and bookkeeping and return it
566 """
567 self.checkProxy()
568 cmd = 'edg-job-get-logging-info -v 2 ' + id
569 #cmd_out = os.popen(cmd)
570 cmd_out = runCommand(cmd)
571 return cmd_out
572
573 def getExitStatus(self, id):
574 return self.getStatusAttribute_(id, 'exit_code')
575
576 def queryStatus(self, id):
577 return self.getStatusAttribute_(id, 'status')
578
579 def queryDest(self, id):
580 return self.getStatusAttribute_(id, 'destination')
581
582
583 def getStatusAttribute_(self, id, attr):
584 """ Query a status of the job with id """
585
586 self.checkProxy()
587 hstates = {}
588 Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
589 # Bypass edg-job-status interfacing directly to C++ API
590 # Job attribute vector to retrieve status without edg-job-status
591 level = 0
592 # Instance of the Status class provided by LB API
593 jobStat = Status()
594 st = 0
595 jobStat.getStatus(id, level)
596 err, apiMsg = jobStat.get_error()
597 if err:
598 common.logger.debug(5,'Error caught' + apiMsg)
599 return None
600 else:
601 for i in range(len(self.states)):
602 # Fill an hash table with all information retrieved from LB API
603 hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
604 result = jobStat.loadStatus(st)[self.states.index(attr)]
605 return result
606
607 def queryDetailedStatus(self, id):
608 """ Query a detailed status of the job with id """
609 cmd = 'edg-job-status '+id
610 cmd_out = runCommand(cmd)
611 return cmd_out
612
613 ##### FEDE ######
614 def findSites_(self, n):
615 itr4 =[]
616 sites = common.jobDB.destination(n)
617 if len(sites)>0 and sites[0]=="Any":
618 return itr4
619 itr = ''
620 if sites != [""]:#CarlosDaniele
621 for site in sites:
622 #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
623 itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
624 itr = itr[0:-4]
625 itr4.append( itr )
626 return itr4
627
628 def createXMLSchScript(self, nj, argsList):
629 # def createXMLSchScript(self, nj):
630
631 """
632 Create a XML-file for BOSS4.
633 """
634 # job = common.job_list[nj]
635 """
636 INDY
637 [begin] FIX-ME:
638 I would pass jobType instead of job
639 """
640 index = nj - 1
641 job = common.job_list[index]
642 jbt = job.type()
643
644 inp_sandbox = jbt.inputSandbox(index)
645 out_sandbox = jbt.outputSandbox(index)
646 """
647 [end] FIX-ME
648 """
649
650
651 title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
652 jt_string = ''
653
654 xml_fname = str(self.jobtypeName)+'.xml'
655 xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
656
657 #TaskName
658 dir = string.split(common.work_space.topDir(), '/')
659 taskName = dir[len(dir)-2]
660
661 to_writeReq = ''
662 to_write = ''
663
664 req=' '
665 req = req + jbt.getRequirements()
666
667
668 #sites = common.jobDB.destination(nj)
669 #if len(sites)>0 and sites[0]!="Any":
670 # req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
671 #req = req
672
673 if self.EDG_requirements:
674 if (req == ' '):
675 req = req + self.EDG_requirements
676 else:
677 req = req + ' && ' + self.EDG_requirements
678 if self.EDG_ce_white_list:
679 ce_white_list = string.split(self.EDG_ce_white_list,',')
680 for i in range(len(ce_white_list)):
681 if i == 0:
682 if (req == ' '):
683 req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
684 else:
685 req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
686 pass
687 else:
688 req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
689 req = req + ')'
690
691 if self.EDG_ce_black_list:
692 ce_black_list = string.split(self.EDG_ce_black_list,',')
693 for ce in ce_black_list:
694 if (req == ' '):
695 req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
696 else:
697 req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
698 pass
699 if self.EDG_clock_time:
700 if (req == ' '):
701 req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
702 else:
703 req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
704
705 if self.EDG_cpu_time:
706 if (req == ' '):
707 req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
708 else:
709 req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
710
711 if ( self.EDG_retry_count ):
712 to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
713 pass
714
715 to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
716 to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
717
718 #TaskName
719 dir = string.split(common.work_space.topDir(), '/')
720 taskName = dir[len(dir)-2]
721
722 xml.write(str(title))
723 xml.write('<task name="' +str(taskName)+'">\n')
724 xml.write(jt_string)
725
726 if (to_write != ''):
727 xml.write('<extraTags\n')
728 xml.write(to_write)
729 xml.write('/>\n')
730 pass
731
732 xml.write('<iterator>\n')
733 xml.write('\t<iteratorRule name="ITR1">\n')
734 xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
735 xml.write('\t</iteratorRule>\n')
736 xml.write('\t<iteratorRule name="ITR2">\n')
737 for arg in argsList:
738 xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
739 pass
740 xml.write('\t</iteratorRule>\n')
741 #print jobList
742 xml.write('\t<iteratorRule name="ITR3">\n')
743 xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
744 xml.write('\t</iteratorRule>\n')
745
746 '''
747 indy: here itr4
748 '''
749
750
751 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
752 xml.write(jt_string)
753
754 #executable
755
756 """
757 INDY
758 script depends on jobType: it should be probably get in a different way
759 """
760 script = job.scriptFilename()
761 xml.write('<program>\n')
762 xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
763 xml.write(jt_string)
764
765
766 ### only one .sh JDL has arguments:
767 ### Fabio
768 # xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "EDG") +'"\n')
769 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
770 xml.write('<program_types> crabjob </program_types>\n')
771 inp_box = script + ','
772
773 if inp_sandbox != None:
774 for fl in inp_sandbox:
775 inp_box = inp_box + '' + fl + ','
776 pass
777 pass
778
779 inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
780 os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
781 os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
782 os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
783 os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
784
785 if (not jbt.additional_inbox_files == []):
786 inp_box = inp_box + ','
787 for addFile in jbt.additional_inbox_files:
788 addFile = os.path.abspath(addFile)
789 inp_box = inp_box+''+addFile+','
790 pass
791
792 if inp_box[-1] == ',' : inp_box = inp_box[:-1]
793 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
794 xml.write(inp_box)
795
796 base = jbt.name()
797 stdout = base + '__ITR3_.stdout'
798 stderr = base + '__ITR3_.stderr'
799
800 xml.write('<stderr> ' + stderr + '</stderr>\n')
801 xml.write('<stdout> ' + stdout + '</stdout>\n')
802
803
804 out_box = stdout + ',' + \
805 stderr + ',.BrokerInfo,'
806
807 """
808 if int(self.return_data) == 1:
809 if out_sandbox != None:
810 for fl in out_sandbox:
811 out_box = out_box + '' + fl + ','
812 pass
813 pass
814 pass
815 """
816
817 """
818 INDY
819 something similar should be also done for infiles (if it makes sense!)
820 """
821 if int(self.return_data) == 1:
822 for fl in jbt.output_file:
823 out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
824 pass
825 pass
826
827 if out_box[-1] == ',' : out_box = out_box[:-1]
828 out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
829 xml.write(out_box)
830
831 xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
832
833 xml.write('</program>\n')
834 xml.write('</chain>\n')
835
836 xml.write('</iterator>\n')
837 xml.write('</task>\n')
838
839 xml.close()
840
841
842 return
843
844 def checkProxy(self):
845 """
846 Function to check the Globus proxy.
847 """
848 if (self.proxyValid): return
849 timeleft = -999
850 minTimeLeft=10*3600 # in seconds
851
852 minTimeLeftServer = 100 # in hours
853
854 mustRenew = 0
855 timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
856 timeLeftServer = -999
857 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
858 mustRenew = 1
859 else:
860 timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
861 if not timeLeftServer or not isInt(timeLeftServer):
862 mustRenew = 1
863 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
864 mustRenew = 1
865 pass
866 pass
867
868 if mustRenew:
869 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
870 cmd = 'voms-proxy-init -voms '+self.VO
871 if self.group:
872 cmd += ':/'+self.VO+'/'+self.group
873 if self.role:
874 cmd += '/role='+self.role
875 cmd += ' -valid 96:00'
876 try:
877 # SL as above: damn it!
878 common.logger.debug(10,cmd)
879 out = os.system(cmd)
880 if (out>0): raise CrabException("Unable to create a valid proxy!\n")
881 except:
882 msg = "Unable to create a valid proxy!\n"
883 raise CrabException(msg)
884 pass
885
886 ## now I do have a voms proxy valid, and I check the myproxy server
887 renewProxy = 0
888 cmd = 'myproxy-info -d -s '+self.proxyServer
889 cmd_out = runCommand(cmd,0,20)
890 if not cmd_out:
891 common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
892 renewProxy = 1
893 else:
894 # if myproxy exist but not long enough, renew
895 reTime = re.compile( r'timeleft: (\d+)' )
896 #print "<"+str(reTime.search( cmd_out ).group(1))+">"
897 if reTime.match( cmd_out ):
898 time = reTime.search( line ).group(1)
899 if time < minTimeLeftServer:
900 renewProxy = 1
901 common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
902 pass
903 pass
904
905 # if not, create one.
906 if renewProxy:
907 cmd = 'myproxy-init -d -n -s '+self.proxyServer
908 out = os.system(cmd)
909 if (out>0):
910 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
911 pass
912
913 # cache proxy validity
914 self.proxyValid=1
915 return
916
917 def configOpt_(self):
918 edg_ui_cfg_opt = ' '
919 if self.edg_config:
920 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
921 if self.edg_config_vo:
922 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
923 return edg_ui_cfg_opt