ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.65
Committed: Tue Jun 27 15:39:58 2006 UTC (18 years, 10 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_2_0_pre8
Changes since 1.64: +0 -3 lines
Log Message:
removed GridSyncJobId from the fields which are reported every time from the WN script to the DashBoard

File Contents

# Content
1 from Scheduler import Scheduler
2 from crab_logger import Logger
3 from crab_exceptions import *
4 from crab_util import *
5 from EdgConfig import *
6 import common
7
8 import os, sys, time
9
10 class SchedulerEdg(Scheduler):
11 def __init__(self):
12 Scheduler.__init__(self,"EDG")
13 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14 "children_hist","children_num","children_states","condorId","condor_jdl", \
15 "cpuTime","destination", "done_code","exit_code","expectFrom", \
16 "expectUpdate","globusId","jdl","jobId","jobtype", \
17 "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18 "owner","parent_job", "reason","resubmitted","rsl","seed",\
19 "stateEnterTime","stateEnterTimes","subjob_failed", \
20 "user tags" , "status" , "status_code","hierarchy"]
21 return
22
23 def configure(self, cfg_params):
24
25 try:
26 RB = cfg_params["EDG.rb"]
27 edgConfig = EdgConfig(RB)
28 self.edg_config = edgConfig.config()
29 self.edg_config_vo = edgConfig.configVO()
30 except KeyError:
31 self.edg_config = ''
32 self.edg_config_vo = ''
33
34 try:
35 self.proxyServer = cfg_params["EDG.proxy_server"]
36 except KeyError:
37 self.proxyServer = 'myproxy.cern.ch'
38 common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39
40 try: self.LCG_version = cfg_params["EDG.lcg_version"]
41 except KeyError: self.LCG_version = '2'
42
43 try: self.EDG_requirements = cfg_params['EDG.requirements']
44 except KeyError: self.EDG_requirements = ''
45
46 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
47 except KeyError: self.EDG_retry_count = ''
48
49 try:
50 self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
51 #print "self.EDG_ce_black_list = ", self.EDG_ce_black_list
52 except KeyError:
53 self.EDG_ce_black_list = ''
54
55 try:
56 self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
57 #print "self.EDG_ce_white_list = ", self.EDG_ce_white_list
58 except KeyError: self.EDG_ce_white_list = ''
59
60 try: self.VO = cfg_params['EDG.virtual_organization']
61 except KeyError: self.VO = 'cms'
62
63 try: self.return_data = cfg_params['USER.return_data']
64 except KeyError: self.return_data = 1
65
66 try:
67 self.copy_input_data = common.analisys_common_info['copy_input_data']
68 #print "self.copy_input_data = ", self.copy_input_data
69 except KeyError: self.copy_input_data = 0
70
71 try:
72 self.copy_data = cfg_params["USER.copy_data"]
73 if int(self.copy_data) == 1:
74 try:
75 self.SE = cfg_params['USER.storage_element']
76 self.SE_PATH = cfg_params['USER.storage_path']
77 except KeyError:
78 msg = "Error. The [USER] section does not have 'storage_element'"
79 msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
80 common.logger.message(msg)
81 raise CrabException(msg)
82 except KeyError: self.copy_data = 0
83
84 if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
85 msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
86 msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
87 raise CrabException(msg)
88
89 try:
90 self.lfc_host = cfg_params['EDG.lfc_host']
91 except KeyError:
92 msg = "Error. The [EDG] section does not have 'lfc_host' value"
93 msg = msg + " it's necessary to know the LFC host name"
94 common.logger.message(msg)
95 raise CrabException(msg)
96 try:
97 self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
98 except KeyError:
99 msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
100 msg = msg + " it's necessary to know the catalog type"
101 common.logger.message(msg)
102 raise CrabException(msg)
103 try:
104 self.lfc_home = cfg_params['EDG.lfc_home']
105 except KeyError:
106 msg = "Error. The [EDG] section does not have 'lfc_home' value"
107 msg = msg + " it's necessary to know the home catalog dir"
108 common.logger.message(msg)
109 raise CrabException(msg)
110
111 try:
112 self.register_data = cfg_params["USER.register_data"]
113 if int(self.register_data) == 1:
114 try:
115 self.LFN = cfg_params['USER.lfn_dir']
116 except KeyError:
117 msg = "Error. The [USER] section does not have 'lfn_dir' value"
118 msg = msg + " it's necessary for LCF registration"
119 common.logger.message(msg)
120 raise CrabException(msg)
121 except KeyError: self.register_data = 0
122
123 if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
124 msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
125 msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
126 common.logger.message(msg)
127 raise CrabException(msg)
128
129 try: self.EDG_requirements = cfg_params['EDG.requirements']
130 except KeyError: self.EDG_requirements = ''
131
132 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
133 except KeyError: self.EDG_retry_count = ''
134
135 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
136 except KeyError: self.EDG_clock_time= ''
137
138 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
139 except KeyError: self.EDG_cpu_time = ''
140
141 # Add EDG_WL_LOCATION to the python path
142
143 try:
144 path = os.environ['EDG_WL_LOCATION']
145 except:
146 msg = "Error: the EDG_WL_LOCATION variable is not set."
147 raise CrabException(msg)
148
149 libPath=os.path.join(path, "lib")
150 sys.path.append(libPath)
151 libPath=os.path.join(path, "lib", "python")
152 sys.path.append(libPath)
153
154 self.proxyValid=0
155
156 try:
157 self._taskId = cfg_params['taskId']
158 except:
159 self._taskId = ''
160
161 return
162
163
164 def sched_parameter(self):
165 """
166 Returns file with scheduler-specific parameters
167 """
168
169 if (self.edg_config and self.edg_config_vo != ''):
170 self.param='sched_param.clad'
171 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
172 param_file.write('RBconfig = "'+self.edg_config+'";\n')
173 param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
174 param_file.close()
175 return 1
176 else:
177 return 0
178
179 def wsSetupEnvironment(self):
180 """
181 Returns part of a job script which does scheduler-specific work.
182 """
183 txt = ''
184 txt += "# job number (first parameter for job wrapper)\n"
185 txt += "NJob=$1\n"
186
187 txt += '# job identification to DashBoard \n'
188 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
189 txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
190 txt += 'MonitorID=`echo ' + self._taskId + '`\n'
191 txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
192 txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
193 txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
194
195 txt += 'echo "middleware discovery " \n'
196 txt += 'if [ $VO_CMS_SW_DIR ]; then \n'
197 txt += ' middleware=LCG \n'
198 txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
199 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
200 txt += ' echo "middleware =$middleware" \n'
201 txt += 'elif [ $GRID3_APP_DIR ]; then\n'
202 txt += ' middleware=OSG \n'
203 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
204 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
205 txt += ' echo "middleware =$middleware" \n'
206 txt += 'elif [ $OSG_APP ]; then \n'
207 txt += ' middleware=OSG \n'
208 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
209 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
210 txt += ' echo "middleware =$middleware" \n'
211 txt += 'else \n'
212 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
213 txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
214 txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
215 txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
216 txt += ' rm -f $RUNTIME_AREA/$repo \n'
217 txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
218 txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
219 txt += ' exit 1 \n'
220 txt += 'fi \n'
221
222 txt += '# report first time to DashBoard \n'
223 txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
224 txt += 'rm -f $RUNTIME_AREA/$repo \n'
225 txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
226 txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
227
228 txt += '\n\n'
229
230 if int(self.copy_data) == 1:
231 if self.SE:
232 txt += 'export SE='+self.SE+'\n'
233 txt += 'echo "SE = $SE"\n'
234 if self.SE_PATH:
235 if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
236 txt += 'export SE_PATH='+self.SE_PATH+'\n'
237 txt += 'echo "SE_PATH = $SE_PATH"\n'
238
239 txt += 'export VO='+self.VO+'\n'
240 ### FEDE: add some line for LFC catalog setting
241 txt += 'if [ $middleware == LCG ]; then \n'
242 txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
243 txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
244 txt += ' fi\n'
245 txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
246 txt += ' export LFC_HOST='+self.lfc_host+'\n'
247 txt += ' fi\n'
248 txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
249 txt += ' export LFC_HOME='+self.lfc_home+'\n'
250 txt += ' fi\n'
251 txt += 'elif [ $middleware == OSG ]; then\n'
252 txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
253 txt += 'fi\n'
254 #####
255 if int(self.register_data) == 1:
256 txt += 'if [ $middleware == LCG ]; then \n'
257 txt += ' export LFN='+self.LFN+'\n'
258 txt += ' lfc-ls $LFN\n'
259 txt += ' result=$?\n'
260 txt += ' echo $result\n'
261 ### creation of LFN dir in LFC catalog, under /grid/cms dir
262 txt += ' if [ $result != 0 ]; then\n'
263 txt += ' lfc-mkdir $LFN\n'
264 txt += ' result=$?\n'
265 txt += ' echo $result\n'
266 txt += ' fi\n'
267 txt += 'elif [ $middleware == OSG ]; then\n'
268 txt += ' echo " Files registration to be implemented for OSG"\n'
269 txt += 'fi\n'
270 txt += '\n'
271
272 if self.VO:
273 txt += 'export VO='+self.VO+'\n'
274 if self.LFN:
275 txt += 'if [ $middleware == LCG ]; then \n'
276 txt += ' export LFN='+self.LFN+'\n'
277 txt += 'fi\n'
278 txt += '\n'
279
280 txt += 'if [ $middleware == LCG ]; then\n'
281 txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
282 txt += ' echo "CloseCEs = $CloseCEs"\n'
283 txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
284 txt += ' echo "CE = $CE"\n'
285 txt += 'elif [ $middleware == OSG ]; then \n'
286 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
287 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
288 txt += ' else \n'
289 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
290 txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
291 txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
292 txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
293 txt += ' rm -f $RUNTIME_AREA/$repo \n'
294 txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
295 txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
296 txt += ' exit 1 \n'
297 txt += ' fi \n'
298 txt += 'fi \n'
299
300 return txt
301
302 def wsCopyInput(self):
303 """
304 Copy input data from SE to WN
305 """
306 txt = ''
307 try:
308 self.copy_input_data = common.analisys_common_info['copy_input_data']
309 #print "self.copy_input_data = ", self.copy_input_data
310 except KeyError: self.copy_input_data = 0
311 if int(self.copy_input_data) == 1:
312 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
313 txt += 'if [ $middleware == OSG ]; then\n'
314 txt += ' #\n'
315 txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
316 txt += ' #\n'
317 txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
318 txt += 'elif [ $middleware == LCG ]; then \n'
319 txt += ' #\n'
320 txt += ' # Copy Input Data from SE to this WN\n'
321 txt += ' #\n'
322 ### changed by georgia (put a loop copying more than one input files per jobs)
323 txt += ' for input_file in $cur_file_list \n'
324 txt += ' do \n'
325 txt += ' lcg-cp --vo $VO lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
326 txt += ' copy_input_exit_status=$?\n'
327 txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
328 txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
329 txt += ' echo "Problems with copying to WN" \n'
330 txt += ' else \n'
331 txt += ' echo "input copied into WN" \n'
332 txt += ' fi \n'
333 txt += ' done \n'
334 ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
335 txt += ' for file in $cur_pu_list \n'
336 txt += ' do \n'
337 txt += ' lcg-cp --vo $VO lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
338 txt += ' copy_input_exit_status=$?\n'
339 txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
340 txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
341 txt += ' echo "Problems with copying pu to WN" \n'
342 txt += ' else \n'
343 txt += ' echo "input pu files copied into WN" \n'
344 txt += ' fi \n'
345 txt += ' done \n'
346 txt += ' \n'
347 txt += ' ### Check SCRATCH space available on WN : \n'
348 txt += ' df -h \n'
349 txt += 'fi \n'
350
351 return txt
352
353 def wsCopyOutput(self):
354 """
355 Write a CopyResults part of a job script, e.g.
356 to copy produced output into a storage element.
357 """
358 txt = ''
359 if int(self.copy_data) == 1:
360 txt += '#\n'
361 txt += '# Copy output to SE = $SE\n'
362 txt += '#\n'
363 txt += 'if [ $exe_result -eq 0 ]; then\n'
364 txt += ' for out_file in $file_list ; do\n'
365 txt += ' echo "Trying to copy output file to $SE "\n'
366 ## OLI_Daniele globus-* for OSG, lcg-* for LCG
367 txt += ' if [ $middleware == OSG ]; then\n'
368 txt += ' echo "globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
369 txt += ' globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1 \n'
370 txt += ' copy_exit_status=$? \n'
371 txt += ' elif [ $middleware == LCG ]; then \n'
372 txt += ' echo "lcg-cp --vo cms -t 1200 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
373 txt += ' lcg-cp --vo cms -t 1200 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1\n'
374 txt += ' copy_exit_status=$? \n'
375 txt += ' fi \n'
376 txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
377 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
378 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
379 txt += ' echo "Problems with SE = $SE"\n'
380 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
381 txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
382 txt += ' else\n'
383 txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
384 txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
385 txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
386 txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
387 txt += ' fi\n'
388 txt += ' done\n'
389 txt += 'fi\n'
390 return txt
391
392 def wsRegisterOutput(self):
393 """
394 Returns part of a job script which does scheduler-specific work.
395 """
396
397 txt = ''
398 if int(self.register_data) == 1:
399 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
400 txt += 'if [ $middleware == OSG ]; then\n'
401 txt += ' #\n'
402 txt += ' # Register output to LFC deactivated in OSG mode\n'
403 txt += ' #\n'
404 txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
405 txt += 'elif [ $middleware == LCG ]; then \n'
406 txt += '#\n'
407 txt += '# Register output to LFC\n'
408 txt += '#\n'
409 txt += ' if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
410 txt += ' for out_file in $file_list ; do\n'
411 txt += ' echo "Trying to register the output file into LFC"\n'
412 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
413 txt += ' lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
414 txt += ' register_exit_status=$?\n'
415 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
416 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
417 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
418 txt += ' echo "Problems with the registration to LFC" \n'
419 txt += ' echo "Try with srm protocol" \n'
420 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
421 txt += ' lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
422 txt += ' register_exit_status=$?\n'
423 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
424 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
425 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
426 txt += ' echo "Problems with the registration into LFC" \n'
427 txt += ' fi \n'
428 txt += ' else \n'
429 txt += ' echo "output registered to LFC"\n'
430 txt += ' fi \n'
431 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
432 txt += ' done\n'
433 txt += ' elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
434 txt += ' echo "Trying to copy output file to CloseSE"\n'
435 txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
436 txt += ' for out_file in $file_list ; do\n'
437 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
438 txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
439 txt += ' register_exit_status=$?\n'
440 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
441 txt += ' echo "STAGE_OUT = $register_exit_status"\n'
442 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
443 txt += ' echo "Problems with CloseSE" \n'
444 txt += ' else \n'
445 txt += ' echo "The program was successfully executed"\n'
446 txt += ' echo "SE = $CLOSE_SE"\n'
447 txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
448 txt += ' fi \n'
449 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
450 txt += ' done\n'
451 txt += ' else\n'
452 txt += ' echo "Problem with the executable"\n'
453 txt += ' fi \n'
454 txt += 'fi \n'
455 return txt
456
457 def loggingInfo(self, id):
458 """
459 retrieve the logging info from logging and bookkeeping and return it
460 """
461 self.checkProxy()
462 cmd = 'edg-job-get-logging-info -v 2 ' + id
463 #cmd_out = os.popen(cmd)
464 cmd_out = runCommand(cmd)
465 return cmd_out
466
467 def listMatch(self, nj):
468 """
469 Check the compatibility of available resources
470 """
471 self.checkProxy()
472 jdl = common.job_list[nj].jdlFilename()
473 cmd = 'edg-job-list-match ' + self.configOpt_() + jdl
474 cmd_out = runCommand(cmd,0,10)
475 if not cmd_out:
476 raise CrabException("ERROR: "+cmd+" failed!")
477
478 return self.parseListMatch_(cmd_out, jdl)
479
480 def parseListMatch_(self, out, jdl):
481 """
482 Parse the f* output of edg-list-match and produce something sensible
483 """
484 reComment = re.compile( r'^\**$' )
485 reEmptyLine = re.compile( r'^$' )
486 reVO = re.compile( r'Selected Virtual Organisation name.*' )
487 reLine = re.compile( r'.*')
488 reCE = re.compile( r'(.*:.*)')
489 reCEId = re.compile( r'CEId.*')
490 reNO = re.compile( r'No Computing Element matching' )
491 reRB = re.compile( r'Connecting to host' )
492 next = 0
493 CEs=[]
494 Match=0
495
496 #print out
497 lines = reLine.findall(out)
498
499 i=0
500 CEs=[]
501 for line in lines:
502 string.strip(line)
503 #print line
504 if reNO.match( line ):
505 common.logger.debug(5,line)
506 return 0
507 pass
508 if reVO.match( line ):
509 VO =reVO.match( line ).group()
510 common.logger.debug(5,"VO "+VO)
511 pass
512
513 if reRB.match( line ):
514 RB = reRB.match(line).group()
515 common.logger.debug(5,"RB "+RB)
516 pass
517
518 if reCEId.search( line ):
519 for lineCE in lines[i:-1]:
520 if reCE.match( lineCE ):
521 CE = string.strip(reCE.search(lineCE).group(1))
522 CEs.append(CE.split(':')[0])
523 pass
524 pass
525 pass
526 i=i+1
527 pass
528
529 common.logger.debug(5,"All CE :"+str(CEs))
530
531 sites = []
532 [sites.append(it) for it in CEs if not sites.count(it)]
533
534 common.logger.debug(5,"All Sites :"+str(sites))
535 common.logger.message("Matched Sites :"+str(sites))
536 return len(sites)
537
538 def noMatchFound_(self, jdl):
539 reReq = re.compile( r'Requirements' )
540 reString = re.compile( r'"\S*"' )
541 f = file(jdl,'r')
542 for line in f.readlines():
543 line= line.strip()
544 if reReq.match(line):
545 for req in reString.findall(line):
546 if re.search("VO",req):
547 common.logger.message( "SW required: "+req)
548 continue
549 if re.search('"\d+',req):
550 common.logger.message("Other req : "+req)
551 continue
552 common.logger.message( "CE required: "+req)
553 break
554 pass
555 raise CrabException("No compatible resources found!")
556
557 def submit(self, nj):
558 """
559 Submit one EDG job.
560 """
561
562 self.checkProxy()
563 jid = None
564 jdl = common.job_list[nj].jdlFilename()
565
566 cmd = 'edg-job-submit ' + self.configOpt_() + jdl
567 cmd_out = runCommand(cmd)
568 if cmd_out != None:
569 reSid = re.compile( r'https.+' )
570 jid = reSid.search(cmd_out).group()
571 pass
572 return jid
573
574 def resubmit(self, nj_list):
575 """
576 Prepare jobs to be submit
577 """
578 return
579
580 def getExitStatus(self, id):
581 return self.getStatusAttribute_(id, 'exit_code')
582
583 def queryStatus(self, id):
584 return self.getStatusAttribute_(id, 'status')
585
586 def queryDest(self, id):
587 return self.getStatusAttribute_(id, 'destination')
588
589
590 def getStatusAttribute_(self, id, attr):
591 """ Query a status of the job with id """
592
593 self.checkProxy()
594 hstates = {}
595 Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
596 # Bypass edg-job-status interfacing directly to C++ API
597 # Job attribute vector to retrieve status without edg-job-status
598 level = 0
599 # Instance of the Status class provided by LB API
600 jobStat = Status()
601 st = 0
602 jobStat.getStatus(id, level)
603 err, apiMsg = jobStat.get_error()
604 if err:
605 common.logger.debug(5,'Error caught' + apiMsg)
606 return None
607 else:
608 for i in range(len(self.states)):
609 # Fill an hash table with all information retrieved from LB API
610 hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
611 result = jobStat.loadStatus(st)[ self.states.index(attr) ]
612 return result
613
614 def queryDetailedStatus(self, id):
615 """ Query a detailed status of the job with id """
616 cmd = 'edg-job-status '+id
617 cmd_out = runCommand(cmd)
618 return cmd_out
619
620 def getOutput(self, id):
621 """
622 Get output for a finished job with id.
623 Returns the name of directory with results.
624 """
625
626 self.checkProxy()
627 cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
628 cmd_out = runCommand(cmd)
629
630 # Determine the output directory name
631 dir = common.work_space.resDir()
632 dir += os.environ['USER']
633 dir += '_' + os.path.basename(id)
634 return dir
635
636 def cancel(self, id):
637 """ Cancel the EDG job with id """
638 self.checkProxy()
639 cmd = 'edg-job-cancel --noint ' + id
640 cmd_out = runCommand(cmd)
641 return cmd_out
642
643 def createSchScript(self, nj):
644 """
645 Create a JDL-file for EDG.
646 """
647
648 job = common.job_list[nj]
649 jbt = job.type()
650 inp_sandbox = jbt.inputSandbox(nj)
651 out_sandbox = jbt.outputSandbox(nj)
652 inp_storage_subdir = ''
653
654 title = '# This JDL was generated by '+\
655 common.prog_name+' (version '+common.prog_version_str+')\n'
656 jt_string = ''
657
658
659
660 SPL = inp_storage_subdir
661 if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
662
663 jdl_fname = job.jdlFilename()
664 jdl = open(jdl_fname, 'w')
665 jdl.write(title)
666
667 script = job.scriptFilename()
668 jdl.write('Executable = "' + os.path.basename(script) +'";\n')
669 jdl.write(jt_string)
670
671 ### only one .sh JDL has arguments:
672
673 ### Fabio
674 jdl.write('Arguments = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "EDG") +'";\n')
675 inp_box = 'InputSandbox = { '
676 inp_box = inp_box + '"' + script + '",'
677
678 if inp_sandbox != None:
679 for fl in inp_sandbox:
680 inp_box = inp_box + ' "' + fl + '",'
681 pass
682 pass
683
684 #if common.use_jam:
685 # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
686
687 # Marco (VERY TEMPORARY ML STUFF)
688 inp_box = inp_box+' "' + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + '", "' +\
689 os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + '", "'+\
690 os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + '", "'+\
691 os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + '", "'+\
692 os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + '"'
693 # End Marco
694
695 if (not jbt.additional_inbox_files == []):
696 inp_box = inp_box + ', '
697 for addFile in jbt.additional_inbox_files:
698 addFile = os.path.abspath(addFile)
699 inp_box = inp_box+' "'+addFile+'",'
700 pass
701
702 if inp_box[-1] == ',' : inp_box = inp_box[:-1]
703 inp_box = inp_box + ' };\n'
704 jdl.write(inp_box)
705
706 jdl.write('StdOutput = "' + job.stdout() + '";\n')
707 jdl.write('StdError = "' + job.stderr() + '";\n')
708
709
710 if job.stdout() == job.stderr():
711 out_box = 'OutputSandbox = { "' + \
712 job.stdout() + '", ".BrokerInfo",'
713 else:
714 out_box = 'OutputSandbox = { "' + \
715 job.stdout() + '", "' + \
716 job.stderr() + '", ".BrokerInfo",'
717
718 if int(self.return_data) == 1:
719 if out_sandbox != None:
720 for fl in out_sandbox:
721 out_box = out_box + ' "' + fl + '",'
722 pass
723 pass
724 pass
725
726 if out_box[-1] == ',' : out_box = out_box[:-1]
727 out_box = out_box + ' };'
728 jdl.write(out_box+'\n')
729
730
731 req='Requirements = '
732 noreq=req
733 req = req + jbt.getRequirements()
734 #### and USER REQUIREMENT
735 if self.EDG_requirements:
736 if (req != noreq):
737 req = req + ' && '
738 req = req + self.EDG_requirements
739 #### FEDE #####
740 if self.EDG_ce_white_list:
741 ce_white_list = string.split(self.EDG_ce_white_list,',')
742 #print "req = ", req
743 for i in range(len(ce_white_list)):
744 if i == 0:
745 if (req != noreq):
746 req = req + ' && '
747 req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
748 pass
749 else:
750 req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
751 req = req + ')'
752
753 if self.EDG_ce_black_list:
754 ce_black_list = string.split(self.EDG_ce_black_list,',')
755 for ce in ce_black_list:
756 if (req != noreq):
757 req = req + ' && '
758 req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
759 pass
760
761 ###############
762 clockTime=480
763 if self.EDG_clock_time:
764 clockTime= self.EDG_clock_time
765 if (req != noreq):
766 req = req + ' && '
767 req = req + '((other.GlueCEPolicyMaxWallClockTime == 0) || (other.GlueCEPolicyMaxWallClockTime>='+str(clockTime)+'))'
768
769 cpuTime=1000
770 if self.EDG_cpu_time:
771 cpuTime=self.EDG_cpu_time
772 if (req != noreq):
773 req = req + ' && '
774 req = req + '((other.GlueCEPolicyMaxCPUTime == 0) || (other.GlueCEPolicyMaxCPUTime>='+str(cpuTime)+'))'
775
776 if (req != noreq):
777 req = req + ';\n'
778 jdl.write(req)
779
780 jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
781
782 if ( self.EDG_retry_count ):
783 jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
784 pass
785
786 jdl.write('MyProxyServer = "' + self.proxyServer + '";\n')
787
788 jdl.close()
789 return
790
791 def checkProxy(self):
792 """
793 Function to check the Globus proxy.
794 """
795 if (self.proxyValid): return
796 timeleft = -999
797 minTimeLeft=10*3600 # in seconds
798
799 minTimeLeftServer = 100 # in hours
800
801 #cmd = 'voms-proxy-info -exists -valid '+str(minTimeLeft)+':00'
802 #cmd = 'voms-proxy-info -timeleft'
803 mustRenew = 0
804 timeLeftLocal = runCommand('voms-proxy-info -timeleft')
805 timeLeftServer = -999
806 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
807 mustRenew = 1
808 else:
809 timeLeftServer = runCommand('voms-proxy-info -actimeleft | head -1')
810 if not timeLeftServer or not isInt(timeLeftServer):
811 mustRenew = 1
812 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
813 mustRenew = 1
814 pass
815 pass
816
817 if mustRenew:
818 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
819 cmd = 'voms-proxy-init -voms cms -valid 96:00'
820 try:
821 # SL as above: damn it!
822 out = os.system(cmd)
823 if (out>0): raise CrabException("Unable to create a valid proxy!\n")
824 except:
825 msg = "Unable to create a valid proxy!\n"
826 raise CrabException(msg)
827 # cmd = 'grid-proxy-info -timeleft'
828 # cmd_out = runCommand(cmd,0,20)
829 pass
830
831 ## now I do have a voms proxy valid, and I check the myproxy server
832 renewProxy = 0
833 cmd = 'myproxy-info -d -s '+self.proxyServer
834 cmd_out = runCommand(cmd,0,20)
835 if not cmd_out:
836 common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
837 renewProxy = 1
838 else:
839 # if myproxy exist but not long enough, renew
840 reTime = re.compile( r'timeleft: (\d+)' )
841 #print "<"+str(reTime.search( cmd_out ).group(1))+">"
842 if reTime.match( cmd_out ):
843 time = reTime.search( line ).group(1)
844 if time < minTimeLeftServer:
845 renewProxy = 1
846 common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
847 pass
848 pass
849
850 # if not, create one.
851 if renewProxy:
852 cmd = 'myproxy-init -d -n -s '+self.proxyServer
853 out = os.system(cmd)
854 if (out>0):
855 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
856 pass
857
858 # cache proxy validity
859 self.proxyValid=1
860 return
861
862 def configOpt_(self):
863 edg_ui_cfg_opt = ' '
864 if self.edg_config:
865 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
866 if self.edg_config_vo:
867 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
868 return edg_ui_cfg_opt