ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGrid.py
Revision: 1.6
Committed: Fri Apr 7 09:40:22 2006 UTC (19 years ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.5: +37 -4 lines
Log Message:
added changes about black and white ce list

File Contents

# User Rev Content
1 gutsche 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     import common
6    
7     import os, sys, time
8    
9     class SchedulerGrid(Scheduler):
10     def __init__(self):
11     Scheduler.__init__(self,"GRID")
12     self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
13     "children_hist","children_num","children_states","condorId","condor_jdl", \
14     "cpuTime","destination", "done_code","exit_code","expectFrom", \
15     "expectUpdate","globusId","jdl","jobId","jobtype", \
16     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
17     "owner","parent_job", "reason","resubmitted","rsl","seed",\
18     "stateEnterTime","stateEnterTimes","subjob_failed", \
19     "user tags" , "status" , "status_code","hierarchy"]
20     return
21    
22     def configure(self, cfg_params):
23    
24 spiga 1.3 try:
25     RB = cfg_params["EDG.rb"]
26     edgConfig = EdgConfig(RB)
27     self.edg_config = edgConfig.config()
28     self.edg_config_vo = edgConfig.configVO()
29     except KeyError:
30     self.edg_config = ''
31     self.edg_config_vo = ''
32 gutsche 1.1
33    
34     try: self.LCG_version = cfg_params["EDG.lcg_version"]
35     except KeyError: self.LCG_version = '2'
36    
37     try: self.EDG_requirements = cfg_params['EDG.requirements']
38     except KeyError: self.EDG_requirements = ''
39    
40     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
41     except KeyError: self.EDG_retry_count = ''
42    
43 fanzago 1.6 try:
44     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
45     #print "self.EDG_ce_black_list = ", self.EDG_ce_black_list
46     except KeyError:
47     self.EDG_ce_black_list = ''
48    
49     try:
50     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
51     #print "self.EDG_ce_white_list = ", self.EDG_ce_white_list
52     except KeyError: self.EDG_ce_white_list = ''
53    
54 gutsche 1.1 try: self.VO = cfg_params['EDG.virtual_organization']
55     except KeyError: self.VO = 'cms'
56    
57     try: self.return_data = cfg_params['USER.return_data']
58     except KeyError: self.return_data = 1
59    
60     try:
61     self.copy_input_data = common.analisys_common_info['copy_input_data']
62     #print "self.copy_input_data = ", self.copy_input_data
63     except KeyError: self.copy_input_data = 0
64    
65     try:
66     self.copy_data = cfg_params["USER.copy_data"]
67     if int(self.copy_data) == 1:
68     try:
69     self.SE = cfg_params['USER.storage_element']
70     self.SE_PATH = cfg_params['USER.storage_path']
71     except KeyError:
72     msg = "Error. The [USER] section does not have 'storage_element'"
73     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
74     common.logger.message(msg)
75     raise CrabException(msg)
76     except KeyError: self.copy_data = 0
77    
78     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
79     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
80     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
81     raise CrabException(msg)
82 fanzago 1.6
83 gutsche 1.1 try:
84     self.lfc_host = cfg_params['EDG.lfc_host']
85     except KeyError:
86     msg = "Error. The [EDG] section does not have 'lfc_host' value"
87     msg = msg + " it's necessary to know the LFC host name"
88     common.logger.message(msg)
89     raise CrabException(msg)
90     try:
91     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
92     except KeyError:
93     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
94     msg = msg + " it's necessary to know the catalog type"
95     common.logger.message(msg)
96     raise CrabException(msg)
97     try:
98     self.lfc_home = cfg_params['EDG.lfc_home']
99     except KeyError:
100     msg = "Error. The [EDG] section does not have 'lfc_home' value"
101     msg = msg + " it's necessary to know the home catalog dir"
102     common.logger.message(msg)
103     raise CrabException(msg)
104    
105     try:
106     self.register_data = cfg_params["USER.register_data"]
107     if int(self.register_data) == 1:
108     try:
109     self.LFN = cfg_params['USER.lfn_dir']
110     except KeyError:
111     msg = "Error. The [USER] section does not have 'lfn_dir' value"
112     msg = msg + " it's necessary for LCF registration"
113     common.logger.message(msg)
114     raise CrabException(msg)
115     except KeyError: self.register_data = 0
116    
117     if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
118     msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
119     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
120     common.logger.message(msg)
121     raise CrabException(msg)
122    
123     try: self.EDG_requirements = cfg_params['EDG.requirements']
124     except KeyError: self.EDG_requirements = ''
125    
126     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
127     except KeyError: self.EDG_retry_count = ''
128    
129     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
130     except KeyError: self.EDG_clock_time= ''
131    
132     try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
133     except KeyError: self.EDG_cpu_time = ''
134    
135     # Add EDG_WL_LOCATION to the python path
136    
137     try:
138     path = os.environ['EDG_WL_LOCATION']
139     except:
140     msg = "Error: the EDG_WL_LOCATION variable is not set."
141     raise CrabException(msg)
142    
143     libPath=os.path.join(path, "lib")
144     sys.path.append(libPath)
145     libPath=os.path.join(path, "lib", "python")
146     sys.path.append(libPath)
147    
148     self.proxyValid=0
149     return
150    
151    
152     def sched_parameter(self):
153     """
154     Returns file with scheduler-specific parameters
155     """
156    
157     if (self.edg_config and self.edg_config_vo != ''):
158     self.param='sched_param.clad'
159     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
160     param_file.write('RBconfig = "'+self.edg_config+'";\n')
161     param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
162     param_file.close()
163     return 1
164     else:
165     return 0
166    
167     def wsSetupEnvironment(self):
168     """
169     Returns part of a job script which does scheduler-specific work.
170     """
171     txt = ''
172 fanzago 1.2 txt += 'echo "middleware discovery " \n'
173 gutsche 1.1 txt += 'if [ $VO_CMS_SW_DIR ]; then\n'
174 spiga 1.3 txt += ' middleware=LCG \n'
175     txt += ' echo "middleware =$middleware" \n'
176 gutsche 1.1 txt += 'elif [ $GRID3_APP_DIR ]; then\n'
177 spiga 1.3 txt += ' middleware=OSG \n'
178     txt += ' echo "middleware =$middleware" \n'
179 gutsche 1.1 txt += 'elif [ $OSG_APP ]; then \n'
180 spiga 1.3 txt += ' middleware=OSG \n'
181     txt += ' echo "middleware =$middleware" \n'
182 gutsche 1.1 txt += 'else \n'
183 spiga 1.3 txt += ' echo "SET_CMS_ENV 1 ==> middleware not identified" \n'
184     txt += ' echo "JOB_EXIT_STATUS = 1"\n'
185     txt += ' exit 1\n'
186 gutsche 1.1 txt += 'fi\n'
187    
188     txt += '\n\n'
189    
190 spiga 1.3 txt += 'if [ $middleware == LCG ]; then \n'
191     txt += ' echo "SyncGridJobId=`echo $EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
192     txt += 'fi\n'
193 gutsche 1.1
194     if int(self.copy_data) == 1:
195     if self.SE:
196     txt += 'export SE='+self.SE+'\n'
197     txt += 'echo "SE = $SE"\n'
198     if self.SE_PATH:
199     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
200     txt += 'export SE_PATH='+self.SE_PATH+'\n'
201     txt += 'echo "SE_PATH = $SE_PATH"\n'
202    
203     txt += 'export VO='+self.VO+'\n'
204     ### FEDE: add some line for LFC catalog setting
205 spiga 1.3 txt += 'if [ $middleware == LCG ]; then \n'
206     txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
207     txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
208     txt += ' fi\n'
209     txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
210     txt += ' export LFC_HOST='+self.lfc_host+'\n'
211     txt += ' fi\n'
212     txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
213     txt += ' export LFC_HOME='+self.lfc_home+'\n'
214     txt += ' fi\n'
215     txt += 'elif [ $middleware == OSG ]; then\n'
216     txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
217 gutsche 1.1 txt += 'fi\n'
218     #####
219     if int(self.register_data) == 1:
220 spiga 1.3 txt += 'if [ $middleware == LCG ]; then \n'
221     txt += ' export LFN='+self.LFN+'\n'
222     txt += ' lfc-ls $LFN\n'
223     txt += ' result=$?\n'
224     txt += ' echo $result\n'
225 gutsche 1.1 ### creation of LFN dir in LFC catalog, under /grid/cms dir
226 spiga 1.3 txt += ' if [ $result != 0 ]; then\n'
227     txt += ' lfc-mkdir $LFN\n'
228     txt += ' result=$?\n'
229     txt += ' echo $result\n'
230     txt += ' fi\n'
231     txt += 'elif [ $middleware == OSG ]; then\n'
232     txt += ' echo " Files registration to be implemented for OSG"\n'
233 gutsche 1.1 txt += 'fi\n'
234     txt += '\n'
235    
236     if self.VO:
237     txt += 'export VO='+self.VO+'\n'
238     if self.LFN:
239 spiga 1.3 txt += 'if [ $middleware == LCG ]; then \n'
240     txt += ' export LFN='+self.LFN+'\n'
241     txt += 'fi\n'
242 gutsche 1.1 txt += '\n'
243    
244     txt += 'if [ $middleware == LCG ]; then\n'
245 spiga 1.3 txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
246     txt += ' echo "CloseCEs = $CloseCEs"\n'
247     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
248     txt += ' echo "CE = $CE"\n'
249 gutsche 1.1 txt += 'elif [ $middleware == OSG ]; then \n'
250 spiga 1.3 txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
251     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
252     txt += ' else \n'
253     txt += ' echo "SET_ENV 1 ==> ERROR in setting CE name - OSG mode -" \n'
254     txt += ' exit 1 \n'
255     txt += ' fi \n'
256 gutsche 1.1 txt += 'fi \n'
257    
258     return txt
259    
260     def wsCopyInput(self):
261     """
262     Copy input data from SE to WN
263     """
264     txt = ''
265     try:
266     self.copy_input_data = common.analisys_common_info['copy_input_data']
267     #print "self.copy_input_data = ", self.copy_input_data
268     except KeyError: self.copy_input_data = 0
269     if int(self.copy_input_data) == 1:
270     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
271     txt += 'if [ $middleware == OSG ]; then\n'
272     txt += ' #\n'
273     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
274     txt += ' #\n'
275 fanzago 1.4 txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
276 gutsche 1.1 txt += 'elif [ $middleware == LCG ]; then \n'
277     txt += ' #\n'
278     txt += ' # Copy Input Data from SE to this WN\n'
279     txt += ' #\n'
280     ### changed by georgia (put a loop copying more than one input files per jobs)
281     txt += ' for input_file in $cur_file_list \n'
282     txt += ' do \n'
283     txt += ' lcg-cp --vo $VO lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
284     txt += ' copy_input_exit_status=$?\n'
285     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
286     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
287     txt += ' echo "Problems with copying to WN" \n'
288     txt += ' else \n'
289     txt += ' echo "input copied into WN" \n'
290     txt += ' fi \n'
291     txt += ' done \n'
292     ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
293     txt += ' for file in $cur_pu_list \n'
294     txt += ' do \n'
295     txt += ' lcg-cp --vo $VO lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
296     txt += ' copy_input_exit_status=$?\n'
297     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
298     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
299     txt += ' echo "Problems with copying pu to WN" \n'
300     txt += ' else \n'
301     txt += ' echo "input pu files copied into WN" \n'
302     txt += ' fi \n'
303     txt += ' done \n'
304     txt += ' \n'
305     txt += ' ### Check SCRATCH space available on WN : \n'
306     txt += ' df -h \n'
307     txt += 'fi \n'
308    
309     return txt
310    
311     def wsCopyOutput(self):
312     """
313     Write a CopyResults part of a job script, e.g.
314     to copy produced output into a storage element.
315     """
316     txt = ''
317     if int(self.copy_data) == 1:
318     txt += '#\n'
319     txt += '# Copy output to SE = $SE\n'
320     txt += '#\n'
321     txt += 'if [ $exe_result -eq 0 ]; then\n'
322     txt += ' for out_file in $file_list ; do\n'
323     txt += ' echo "Trying to copy output file to $SE "\n'
324     ## OLI_Daniele globus-* for OSG, lcg-* for LCG
325     txt += ' if [ $middleware == OSG ]; then\n'
326     txt += ' echo "globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
327 fanzago 1.2 txt += ' copy_exit_status=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
328     #txt += ' exitstring=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
329 gutsche 1.1 txt += ' elif [ $middleware == LCG ]; then \n'
330 spiga 1.3 txt += ' echo "lcg-cp --vo cms -t 1200 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
331     txt += ' copy_exit_status=`lcg-cp --vo cms -t 1200 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
332 fanzago 1.2 #txt += ' exitstring=`lcg-cp --vo cms -t 30 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
333 gutsche 1.1 txt += ' fi \n'
334 fanzago 1.2 #txt += ' copy_exit_status=$?\n'
335 gutsche 1.1 txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
336     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
337     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
338     txt += ' echo "Problems with SE = $SE"\n'
339     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
340     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
341     txt += ' else\n'
342     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
343     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
344     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
345     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
346     txt += ' fi\n'
347     txt += ' done\n'
348     txt += 'fi\n'
349     return txt
350    
351     def wsRegisterOutput(self):
352     """
353     Returns part of a job script which does scheduler-specific work.
354     """
355    
356     txt = ''
357     if int(self.register_data) == 1:
358     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
359     txt += 'if [ $middleware == OSG ]; then\n'
360     txt += ' #\n'
361     txt += ' # Register output to LFC deactivated in OSG mode\n'
362     txt += ' #\n'
363 fanzago 1.4 txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
364 gutsche 1.1 txt += 'elif [ $middleware == LCG ]; then \n'
365     txt += '#\n'
366     txt += '# Register output to LFC\n'
367     txt += '#\n'
368     txt += ' if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
369     txt += ' for out_file in $file_list ; do\n'
370     txt += ' echo "Trying to register the output file into LFC"\n'
371     txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
372     txt += ' lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
373     txt += ' register_exit_status=$?\n'
374     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
375     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
376     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
377     txt += ' echo "Problems with the registration to LFC" \n'
378     txt += ' echo "Try with srm protocol" \n'
379     txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
380     txt += ' lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
381     txt += ' register_exit_status=$?\n'
382     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
383     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
384     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
385     txt += ' echo "Problems with the registration into LFC" \n'
386     txt += ' fi \n'
387     txt += ' else \n'
388     txt += ' echo "output registered to LFC"\n'
389     txt += ' fi \n'
390     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
391     txt += ' done\n'
392     txt += ' elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
393     txt += ' echo "Trying to copy output file to CloseSE"\n'
394     txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
395     txt += ' for out_file in $file_list ; do\n'
396     txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
397     txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
398     txt += ' register_exit_status=$?\n'
399     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
400     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
401     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
402     txt += ' echo "Problems with CloseSE" \n'
403     txt += ' else \n'
404     txt += ' echo "The program was successfully executed"\n'
405     txt += ' echo "SE = $CLOSE_SE"\n'
406     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
407     txt += ' fi \n'
408     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
409     txt += ' done\n'
410     txt += ' else\n'
411     txt += ' echo "Problem with the executable"\n'
412     txt += ' fi \n'
413     txt += 'fi \n'
414     return txt
415    
416     def loggingInfo(self, id):
417     """
418     retrieve the logging info from logging and bookkeeping and return it
419     """
420     self.checkProxy()
421     cmd = 'edg-job-get-logging-info -v 2 ' + id
422     #cmd_out = os.popen(cmd)
423     cmd_out = runCommand(cmd)
424     return cmd_out
425    
426     def listMatch(self, nj):
427     """
428     Check the compatibility of available resources
429     """
430     self.checkProxy()
431     jdl = common.job_list[nj].jdlFilename()
432     cmd = 'edg-job-list-match ' + self.configOpt_() + jdl
433     cmd_out = runCommand(cmd,0,10)
434     if not cmd_out:
435     raise CrabException("ERROR: "+cmd+" failed!")
436    
437     return self.parseListMatch_(cmd_out, jdl)
438    
439     def parseListMatch_(self, out, jdl):
440     """
441     Parse the f* output of edg-list-match and produce something sensible
442     """
443     reComment = re.compile( r'^\**$' )
444     reEmptyLine = re.compile( r'^$' )
445     reVO = re.compile( r'Selected Virtual Organisation name.*' )
446     reLine = re.compile( r'.*')
447     reCE = re.compile( r'(.*:.*)')
448     reCEId = re.compile( r'CEId.*')
449     reNO = re.compile( r'No Computing Element matching' )
450     reRB = re.compile( r'Connecting to host' )
451     next = 0
452     CEs=[]
453     Match=0
454    
455     #print out
456     lines = reLine.findall(out)
457    
458     i=0
459     CEs=[]
460     for line in lines:
461     string.strip(line)
462     #print line
463     if reNO.match( line ):
464     common.logger.debug(5,line)
465     return 0
466     pass
467     if reVO.match( line ):
468     VO =reVO.match( line ).group()
469     common.logger.debug(5,"VO "+VO)
470     pass
471    
472     if reRB.match( line ):
473     RB = reRB.match(line).group()
474     common.logger.debug(5,"RB "+RB)
475     pass
476    
477     if reCEId.search( line ):
478     for lineCE in lines[i:-1]:
479     if reCE.match( lineCE ):
480     CE = string.strip(reCE.search(lineCE).group(1))
481     CEs.append(CE.split(':')[0])
482     pass
483     pass
484     pass
485     i=i+1
486     pass
487    
488     common.logger.debug(5,"All CE :"+str(CEs))
489    
490     sites = []
491     [sites.append(it) for it in CEs if not sites.count(it)]
492    
493     common.logger.debug(5,"All Sites :"+str(sites))
494     return len(sites)
495    
496     def noMatchFound_(self, jdl):
497     reReq = re.compile( r'Requirements' )
498     reString = re.compile( r'"\S*"' )
499     f = file(jdl,'r')
500     for line in f.readlines():
501     line= line.strip()
502     if reReq.match(line):
503     for req in reString.findall(line):
504     if re.search("VO",req):
505     common.logger.message( "SW required: "+req)
506     continue
507     if re.search('"\d+',req):
508     common.logger.message("Other req : "+req)
509     continue
510     common.logger.message( "CE required: "+req)
511     break
512     pass
513     raise CrabException("No compatible resources found!")
514    
515     def submit(self, nj):
516     """
517     Submit one EDG job.
518     """
519    
520     self.checkProxy()
521     jid = None
522     jdl = common.job_list[nj].jdlFilename()
523    
524     cmd = 'edg-job-submit ' + self.configOpt_() + jdl
525     cmd_out = runCommand(cmd)
526     if cmd_out != None:
527     reSid = re.compile( r'https.+' )
528     jid = reSid.search(cmd_out).group()
529     pass
530     return jid
531    
532     def resubmit(self, nj_list):
533     """
534     Prepare jobs to be submit
535     """
536     return
537    
538     def getExitStatus(self, id):
539     return self.getStatusAttribute_(id, 'exit_code')
540    
541     def queryStatus(self, id):
542     return self.getStatusAttribute_(id, 'status')
543    
544     def queryDest(self, id):
545     return self.getStatusAttribute_(id, 'destination')
546    
547    
548     def getStatusAttribute_(self, id, attr):
549     """ Query a status of the job with id """
550    
551     self.checkProxy()
552     hstates = {}
553     Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
554     # Bypass edg-job-status interfacing directly to C++ API
555     # Job attribute vector to retrieve status without edg-job-status
556     level = 0
557     # Instance of the Status class provided by LB API
558     jobStat = Status()
559     st = 0
560     jobStat.getStatus(id, level)
561     err, apiMsg = jobStat.get_error()
562     if err:
563     common.logger.debug(5,'Error caught' + apiMsg)
564     return None
565     else:
566     for i in range(len(self.states)):
567     # Fill an hash table with all information retrieved from LB API
568     hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
569     result = jobStat.loadStatus(st)[ self.states.index(attr) ]
570     return result
571    
572     def queryDetailedStatus(self, id):
573     """ Query a detailed status of the job with id """
574     cmd = 'edg-job-status '+id
575     cmd_out = runCommand(cmd)
576     return cmd_out
577    
578     def getOutput(self, id):
579     """
580     Get output for a finished job with id.
581     Returns the name of directory with results.
582     """
583    
584     self.checkProxy()
585     cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
586     cmd_out = runCommand(cmd)
587    
588     # Determine the output directory name
589     dir = common.work_space.resDir()
590     dir += os.getlogin()
591     dir += '_' + os.path.basename(id)
592     return dir
593    
594     def cancel(self, id):
595     """ Cancel the EDG job with id """
596     self.checkProxy()
597     cmd = 'edg-job-cancel --noint ' + id
598     cmd_out = runCommand(cmd)
599     return cmd_out
600    
601     def createSchScript(self, nj):
602     """
603     Create a JDL-file for EDG.
604     """
605    
606     job = common.job_list[nj]
607     jbt = job.type()
608     inp_sandbox = jbt.inputSandbox(nj)
609     out_sandbox = jbt.outputSandbox(nj)
610     inp_storage_subdir = ''
611    
612     title = '# This JDL was generated by '+\
613     common.prog_name+' (version '+common.prog_version_str+')\n'
614     jt_string = ''
615    
616    
617    
618     SPL = inp_storage_subdir
619     if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
620    
621     jdl_fname = job.jdlFilename()
622     jdl = open(jdl_fname, 'w')
623     jdl.write(title)
624    
625     script = job.scriptFilename()
626     jdl.write('Executable = "' + os.path.basename(script) +'";\n')
627     jdl.write(jt_string)
628    
629     ### only one .sh JDL has arguments:
630     firstEvent = common.jobDB.firstEvent(nj)
631     maxEvents = common.jobDB.maxEvents(nj)
632     jdl.write('Arguments = "' + str(nj+1)+' '+str(firstEvent)+' '+str(maxEvents)+'";\n')
633    
634     inp_box = 'InputSandbox = { '
635     inp_box = inp_box + '"' + script + '",'
636    
637     if inp_sandbox != None:
638     for fl in inp_sandbox:
639     inp_box = inp_box + ' "' + fl + '",'
640     pass
641     pass
642    
643     #if common.use_jam:
644     # inp_box = inp_box+' "'+common.bin_dir+'/'+common.run_jam+'",'
645    
646     # Marco (VERY TEMPORARY ML STUFF)
647     inp_box = inp_box+' "' + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + '", "' +\
648 gutsche 1.5 os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + '", "'+\
649 gutsche 1.1 os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + '", "'+\
650     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + '", "'+\
651     os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py') + '"'
652     # End Marco
653    
654     if (not jbt.additional_inbox_files == []):
655     inp_box = inp_box + ', '
656     for addFile in jbt.additional_inbox_files:
657     addFile = os.path.abspath(addFile)
658     inp_box = inp_box+' "'+addFile+'",'
659     pass
660    
661     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
662     inp_box = inp_box + ' };\n'
663     jdl.write(inp_box)
664    
665     jdl.write('StdOutput = "' + job.stdout() + '";\n')
666     jdl.write('StdError = "' + job.stderr() + '";\n')
667    
668    
669     if job.stdout() == job.stderr():
670     out_box = 'OutputSandbox = { "' + \
671     job.stdout() + '", ".BrokerInfo",'
672     else:
673     out_box = 'OutputSandbox = { "' + \
674     job.stdout() + '", "' + \
675     job.stderr() + '", ".BrokerInfo",'
676    
677     if int(self.return_data) == 1:
678     if out_sandbox != None:
679     for fl in out_sandbox:
680     out_box = out_box + ' "' + fl + '",'
681     pass
682     pass
683     pass
684    
685     if out_box[-1] == ',' : out_box = out_box[:-1]
686     out_box = out_box + ' };'
687     jdl.write(out_box+'\n')
688    
689    
690     req='Requirements = '
691     req = req + jbt.getRequirements()
692     # ### if at least a CE exists ...
693     # if common.analisys_common_info['sites']:
694     # if common.analisys_common_info['sw_version']:
695     # req='Requirements = '
696     # req=req + 'Member("VO-cms-' + \
697     # common.analisys_common_info['sw_version'] + \
698     # '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
699     # if len(common.analisys_common_info['sites'])>0:
700     # req = req + ' && ('
701     # for i in range(len(common.analisys_common_info['sites'])):
702     # req = req + 'other.GlueCEInfoHostName == "' \
703     # + common.analisys_common_info['sites'][i] + '"'
704     # if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
705     # req = req + ' || '
706     # req = req + ')'
707     #### and USER REQUIREMENT
708     if self.EDG_requirements:
709     if (req == 'Requirement = '):
710     req = req + self.EDG_requirements
711     else:
712     req = req + ' && ' + self.EDG_requirements
713 fanzago 1.6 #### FEDE #####
714     if self.EDG_ce_white_list:
715     ce_white_list = string.split(self.EDG_ce_white_list,',')
716     #print "req = ", req
717     for i in range(len(ce_white_list)):
718     if i == 0:
719     if (req == 'Requirement = '):
720     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
721     else:
722     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
723     pass
724     else:
725     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
726     req = req + ')'
727    
728     if self.EDG_ce_black_list:
729     ce_black_list = string.split(self.EDG_ce_black_list,',')
730     for ce in ce_black_list:
731     if (req == 'Requirement = '):
732     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
733     else:
734     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
735     pass
736     ###############
737 gutsche 1.1 if self.EDG_clock_time:
738     if (req == 'Requirement = '):
739     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
740     else:
741     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
742 fanzago 1.6
743 gutsche 1.1 if self.EDG_cpu_time:
744     if (req == 'Requirement = '):
745     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
746     else:
747     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
748     if (req != 'Requirement = '):
749     req = req + ';\n'
750     jdl.write(req)
751    
752     jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
753    
754     if ( self.EDG_retry_count ):
755     jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
756     pass
757    
758     jdl.close()
759     return
760    
761     def checkProxy(self):
762     """
763     Function to check the Globus proxy.
764     """
765     if (self.proxyValid): return
766     timeleft = -999
767     minTimeLeft=10 # in hours
768     cmd = 'voms-proxy-info -exists -valid '+str(minTimeLeft)+':00'
769     # SL Here I have to use os.system since the stupid command exit with >0 if no valid proxy is found
770     cmd_out = os.system(cmd)
771     if (cmd_out>0):
772     common.logger.message( "No valid proxy found or timeleft too short!\n Creating a user proxy with default length of 24h\n")
773 spiga 1.3 cmd = 'voms-proxy-init -voms cms -valid 100:00'
774 gutsche 1.1 try:
775     # SL as above: damn it!
776     out = os.system(cmd)
777     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
778     except:
779     msg = "Unable to create a valid proxy!\n"
780     raise CrabException(msg)
781     # cmd = 'grid-proxy-info -timeleft'
782     # cmd_out = runCommand(cmd,0,20)
783     pass
784     self.proxyValid=1
785     return
786    
787     def configOpt_(self):
788     edg_ui_cfg_opt = ' '
789     if self.edg_config:
790     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
791     if self.edg_config_vo:
792     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
793     return edg_ui_cfg_opt