ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.1.2.2
Committed: Thu Jul 20 18:32:30 2006 UTC (18 years, 9 months ago) by spiga
Content type: text/x-python
Branch: CRAB_BOSS4
Changes since 1.1.2.1: +89 -3 lines
Log Message:
add list match for boss4 and changed some hardcoded parameter

File Contents

# User Rev Content
1 spiga 1.1.2.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     from EdgConfig import *
6     import common
7    
8     import os, sys, time
9    
10     class SchedulerGlite(Scheduler):
11     def __init__(self):
12     Scheduler.__init__(self,"GLITE")
13     self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14     "children_hist","children_num","children_states","condorId","condor_jdl", \
15     "cpuTime","destination", "done_code","exit_code","expectFrom", \
16     "expectUpdate","globusId","jdl","jobId","jobtype", \
17     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18     "owner","parent_job", "reason","resubmitted","rsl","seed",\
19     "stateEnterTime","stateEnterTimes","subjob_failed", \
20     "user tags" , "status" , "status_code","hierarchy"]
21     return
22    
23     def configure(self, cfg_params):
24    
25     try:
26     RB = cfg_params["EDG.rb"]
27     edgConfig = EdgConfig(RB)
28     self.edg_config = edgConfig.config()
29     self.edg_config_vo = edgConfig.configVO()
30     except KeyError:
31     self.edg_config = ''
32     self.edg_config_vo = ''
33    
34     try:
35     self.proxyServer = cfg_params["EDG.proxy_server"]
36     except KeyError:
37     self.proxyServer = 'myproxy.cern.ch'
38     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39    
40     try: self.LCG_version = cfg_params["EDG.lcg_version"]
41     except KeyError: self.LCG_version = '2'
42    
43     try: self.EDG_requirements = cfg_params['EDG.requirements']
44     except KeyError: self.EDG_requirements = ''
45    
46     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
47     except KeyError: self.EDG_retry_count = ''
48    
49     try:
50     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
51     except KeyError:
52     self.EDG_ce_black_list = ''
53    
54     try:
55     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
56     except KeyError: self.EDG_ce_white_list = ''
57    
58     try: self.VO = cfg_params['EDG.virtual_organization']
59     except KeyError: self.VO = 'cms'
60    
61     try: self.return_data = cfg_params['USER.return_data']
62     except KeyError: self.return_data = 1
63    
64     try:
65     self.copy_input_data = common.analisys_common_info['copy_input_data']
66     except KeyError: self.copy_input_data = 0
67    
68     try:
69     self.copy_data = cfg_params["USER.copy_data"]
70     if int(self.copy_data) == 1:
71     try:
72     self.SE = cfg_params['USER.storage_element']
73     self.SE_PATH = cfg_params['USER.storage_path']
74     except KeyError:
75     msg = "Error. The [USER] section does not have 'storage_element'"
76     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
77     common.logger.message(msg)
78     raise CrabException(msg)
79     except KeyError: self.copy_data = 0
80    
81     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
82     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
83     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
84     raise CrabException(msg)
85    
86     try:
87     self.lfc_host = cfg_params['EDG.lfc_host']
88     except KeyError:
89     msg = "Error. The [EDG] section does not have 'lfc_host' value"
90     msg = msg + " it's necessary to know the LFC host name"
91     common.logger.message(msg)
92     raise CrabException(msg)
93     try:
94     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
95     except KeyError:
96     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
97     msg = msg + " it's necessary to know the catalog type"
98     common.logger.message(msg)
99     raise CrabException(msg)
100     try:
101     self.lfc_home = cfg_params['EDG.lfc_home']
102     except KeyError:
103     msg = "Error. The [EDG] section does not have 'lfc_home' value"
104     msg = msg + " it's necessary to know the home catalog dir"
105     common.logger.message(msg)
106     raise CrabException(msg)
107    
108     try:
109     self.register_data = cfg_params["USER.register_data"]
110     if int(self.register_data) == 1:
111     try:
112     self.LFN = cfg_params['USER.lfn_dir']
113     except KeyError:
114     msg = "Error. The [USER] section does not have 'lfn_dir' value"
115     msg = msg + " it's necessary for LCF registration"
116     common.logger.message(msg)
117     raise CrabException(msg)
118     except KeyError: self.register_data = 0
119    
120     if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
121     msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
122     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
123     common.logger.message(msg)
124     raise CrabException(msg)
125    
126     try: self.EDG_requirements = cfg_params['EDG.requirements']
127     except KeyError: self.EDG_requirements = ''
128    
129     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
130     except KeyError: self.EDG_retry_count = ''
131    
132     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
133     except KeyError: self.EDG_clock_time= ''
134    
135     try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
136     except KeyError: self.EDG_cpu_time = ''
137    
138 spiga 1.1.2.2
139     try: self.scheduler = cfg_params['CRAB.scheduler']
140     except KeyError: self.scheduler = ''
141    
142     try: self.jobtypeName = cfg_params['CRAB.jobtype']
143     except KeyError: self.jobtypeName = ''
144    
145 spiga 1.1.2.1 # Add EDG_WL_LOCATION to the python path
146     try:
147     path = os.environ['EDG_WL_LOCATION']
148     except:
149     msg = "Error: the EDG_WL_LOCATION variable is not set."
150     raise CrabException(msg)
151    
152     libPath=os.path.join(path, "lib")
153     sys.path.append(libPath)
154     libPath=os.path.join(path, "lib", "python")
155     sys.path.append(libPath)
156    
157     self.proxyValid=0
158     return
159    
160    
161     def sched_parameter(self):
162     """
163     Returns file with scheduler-specific parameters
164     """
165     if (self.edg_config and self.edg_config_vo != ''):
166     self.param='sched_param.clad'
167     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
168     param_file.write('RBconfig = "'+self.edg_config+'";\n')
169     param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
170     param_file.close()
171     return 1
172     else:
173     return 0
174    
175     def wsSetupEnvironment(self):
176     """
177     Returns part of a job script which does scheduler-specific work.
178     """
179     txt = ''
180     txt += 'echo "middleware discovery " \n'
181     txt += 'if [ $VO_CMS_SW_DIR ]; then\n'
182     txt += ' middleware=LCG \n'
183     txt += ' echo "middleware =$middleware" \n'
184     txt += 'elif [ $GRID3_APP_DIR ]; then\n'
185     txt += ' middleware=OSG \n'
186     txt += ' echo "middleware =$middleware" \n'
187     txt += 'elif [ $OSG_APP ]; then \n'
188     txt += ' middleware=OSG \n'
189     txt += ' echo "middleware =$middleware" \n'
190     txt += 'else \n'
191     txt += ' echo "SET_CMS_ENV 1 ==> middleware not identified" \n'
192     txt += ' echo "JOB_EXIT_STATUS = 1"\n'
193     txt += ' exit 1\n'
194     txt += 'fi\n'
195    
196     txt += '\n\n'
197    
198     ### OLI: removed to move DashBoard reporting header to front of wrapper script
199     # txt += 'if [ $middleware == LCG ]; then \n'
200     # txt += ' echo "SyncGridJobId=`echo $EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
201     # txt += 'fi\n'
202    
203     if int(self.copy_data) == 1:
204     if self.SE:
205     txt += 'export SE='+self.SE+'\n'
206     txt += 'echo "SE = $SE"\n'
207     if self.SE_PATH:
208     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
209     txt += 'export SE_PATH='+self.SE_PATH+'\n'
210     txt += 'echo "SE_PATH = $SE_PATH"\n'
211    
212     txt += 'export VO='+self.VO+'\n'
213     ### FEDE: add some line for LFC catalog setting
214     txt += 'if [ $middleware == LCG ]; then \n'
215     txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
216     txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
217     txt += ' fi\n'
218     txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
219     txt += ' export LFC_HOST='+self.lfc_host+'\n'
220     txt += ' fi\n'
221     txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
222     txt += ' export LFC_HOME='+self.lfc_home+'\n'
223     txt += ' fi\n'
224     txt += 'elif [ $middleware == OSG ]; then\n'
225     txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
226     txt += 'fi\n'
227     #####
228     if int(self.register_data) == 1:
229     txt += 'if [ $middleware == LCG ]; then \n'
230     txt += ' export LFN='+self.LFN+'\n'
231     txt += ' lfc-ls $LFN\n'
232     txt += ' result=$?\n'
233     txt += ' echo $result\n'
234     ### creation of LFN dir in LFC catalog, under /grid/cms dir
235     txt += ' if [ $result != 0 ]; then\n'
236     txt += ' lfc-mkdir $LFN\n'
237     txt += ' result=$?\n'
238     txt += ' echo $result\n'
239     txt += ' fi\n'
240     txt += 'elif [ $middleware == OSG ]; then\n'
241     txt += ' echo " Files registration to be implemented for OSG"\n'
242     txt += 'fi\n'
243     txt += '\n'
244    
245     if self.VO:
246     txt += 'export VO='+self.VO+'\n'
247     if self.LFN:
248     txt += 'if [ $middleware == LCG ]; then \n'
249     txt += ' export LFN='+self.LFN+'\n'
250     txt += 'fi\n'
251     txt += '\n'
252    
253     txt += 'if [ $middleware == LCG ]; then\n'
254     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
255     txt += ' echo "CloseCEs = $CloseCEs"\n'
256     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
257     txt += ' echo "CE = $CE"\n'
258     txt += 'elif [ $middleware == OSG ]; then \n'
259     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
260     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
261     txt += ' else \n'
262     txt += ' echo "SET_ENV 1 ==> ERROR in setting CE name - OSG mode -" \n'
263     txt += ' exit 1 \n'
264     txt += ' fi \n'
265     txt += 'fi \n'
266    
267     return txt
268    
269     def wsCopyInput(self):
270     """
271     Copy input data from SE to WN
272     """
273     txt = ''
274     try:
275     self.copy_input_data = common.analisys_common_info['copy_input_data']
276     #print "self.copy_input_data = ", self.copy_input_data
277     except KeyError: self.copy_input_data = 0
278     if int(self.copy_input_data) == 1:
279     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
280     txt += 'if [ $middleware == OSG ]; then\n'
281     txt += ' #\n'
282     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
283     txt += ' #\n'
284     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
285     txt += 'elif [ $middleware == LCG ]; then \n'
286     txt += ' #\n'
287     txt += ' # Copy Input Data from SE to this WN\n'
288     txt += ' #\n'
289     ### changed by georgia (put a loop copying more than one input files per jobs)
290     txt += ' for input_file in $cur_file_list \n'
291     txt += ' do \n'
292     txt += ' lcg-cp --vo $VO lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
293     txt += ' copy_input_exit_status=$?\n'
294     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
295     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
296     txt += ' echo "Problems with copying to WN" \n'
297     txt += ' else \n'
298     txt += ' echo "input copied into WN" \n'
299     txt += ' fi \n'
300     txt += ' done \n'
301     ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
302     txt += ' for file in $cur_pu_list \n'
303     txt += ' do \n'
304     txt += ' lcg-cp --vo $VO lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
305     txt += ' copy_input_exit_status=$?\n'
306     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
307     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
308     txt += ' echo "Problems with copying pu to WN" \n'
309     txt += ' else \n'
310     txt += ' echo "input pu files copied into WN" \n'
311     txt += ' fi \n'
312     txt += ' done \n'
313     txt += ' \n'
314     txt += ' ### Check SCRATCH space available on WN : \n'
315     txt += ' df -h \n'
316     txt += 'fi \n'
317    
318     return txt
319    
320     def wsCopyOutput(self):
321     """
322     Write a CopyResults part of a job script, e.g.
323     to copy produced output into a storage element.
324     """
325     txt = ''
326     if int(self.copy_data) == 1:
327     txt += '#\n'
328     txt += '# Copy output to SE = $SE\n'
329     txt += '#\n'
330     txt += 'if [ $exe_result -eq 0 ]; then\n'
331     txt += ' for out_file in $file_list ; do\n'
332     txt += ' echo "Trying to copy output file to $SE "\n'
333     ## OLI_Daniele globus-* for OSG, lcg-* for LCG
334     txt += ' if [ $middleware == OSG ]; then\n'
335     txt += ' echo "globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
336     txt += ' copy_exit_status=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
337     #txt += ' exitstring=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
338     txt += ' elif [ $middleware == LCG ]; then \n'
339     txt += ' echo "lcg-cp --vo cms -t 1200 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
340     txt += ' copy_exit_status=`lcg-cp --vo cms -t 1200 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
341     #txt += ' exitstring=`lcg-cp --vo cms -t 30 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
342     txt += ' fi \n'
343     #txt += ' copy_exit_status=$?\n'
344     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
345     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
346     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
347     txt += ' echo "Problems with SE = $SE"\n'
348     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
349     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
350     txt += ' else\n'
351     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
352     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
353     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
354     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
355     txt += ' fi\n'
356     txt += ' done\n'
357     txt += 'fi\n'
358     return txt
359    
360     def wsRegisterOutput(self):
361     """
362     Returns part of a job script which does scheduler-specific work.
363     """
364    
365     txt = ''
366     if int(self.register_data) == 1:
367     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
368     txt += 'if [ $middleware == OSG ]; then\n'
369     txt += ' #\n'
370     txt += ' # Register output to LFC deactivated in OSG mode\n'
371     txt += ' #\n'
372     txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
373     txt += 'elif [ $middleware == LCG ]; then \n'
374     txt += '#\n'
375     txt += '# Register output to LFC\n'
376     txt += '#\n'
377     txt += ' if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
378     txt += ' for out_file in $file_list ; do\n'
379     txt += ' echo "Trying to register the output file into LFC"\n'
380     txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
381     txt += ' lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
382     txt += ' register_exit_status=$?\n'
383     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
384     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
385     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
386     txt += ' echo "Problems with the registration to LFC" \n'
387     txt += ' echo "Try with srm protocol" \n'
388     txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
389     txt += ' lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
390     txt += ' register_exit_status=$?\n'
391     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
392     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
393     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
394     txt += ' echo "Problems with the registration into LFC" \n'
395     txt += ' fi \n'
396     txt += ' else \n'
397     txt += ' echo "output registered to LFC"\n'
398     txt += ' fi \n'
399     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
400     txt += ' done\n'
401     txt += ' elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
402     txt += ' echo "Trying to copy output file to CloseSE"\n'
403     txt += ' CLOSE_SE=`glite-brokerinfo getCloseSEs | head -1`\n'
404     txt += ' for out_file in $file_list ; do\n'
405     txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
406     txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
407     txt += ' register_exit_status=$?\n'
408     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
409     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
410     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
411     txt += ' echo "Problems with CloseSE" \n'
412     txt += ' else \n'
413     txt += ' echo "The program was successfully executed"\n'
414     txt += ' echo "SE = $CLOSE_SE"\n'
415     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
416     txt += ' fi \n'
417     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
418     txt += ' done\n'
419     txt += ' else\n'
420     txt += ' echo "Problem with the executable"\n'
421     txt += ' fi \n'
422     txt += 'fi \n'
423     return txt
424    
425     def loggingInfo(self, id):
426     """
427     retrieve the logging info from logging and bookkeeping and return it
428     """
429     self.checkProxy()
430     cmd = 'glite-job-logging-info -v 2 ' + id
431     #cmd_out = os.popen(cmd)
432     cmd_out = runCommand(cmd)
433     return cmd_out
434    
435     def listMatch(self, nj):
436     """
437     Check the compatibility of available resources
438     """
439     self.checkProxy()
440 spiga 1.1.2.2 # jdl = common.job_list[nj].jdlFilename()
441     jdl = common.work_space.shareDir()+"fake.jdl"
442 spiga 1.1.2.1 cmd = 'glite-job-list-match ' + self.configOpt_() + jdl
443     cmd_out = runCommand(cmd,0,10)
444     if not cmd_out:
445     raise CrabException("ERROR: "+cmd+" failed!")
446    
447     return self.parseListMatch_(cmd_out, jdl)
448    
449     def parseListMatch_(self, out, jdl):
450     """
451     Parse the f* output of edg-list-match and produce something sensible
452     """
453     reComment = re.compile( r'^\**$' )
454     reEmptyLine = re.compile( r'^$' )
455     reVO = re.compile( r'Selected Virtual Organisation name.*' )
456     reLine = re.compile( r'.*')
457     reCE = re.compile( r'(.*:.*)')
458     reCEId = re.compile( r'CEId.*')
459     reNO = re.compile( r'No Computing Element matching' )
460     reRB = re.compile( r'Connecting to host' )
461     next = 0
462     CEs=[]
463     Match=0
464    
465     #print out
466     lines = reLine.findall(out)
467    
468     i=0
469     CEs=[]
470     for line in lines:
471     string.strip(line)
472     #print line
473     if reNO.match( line ):
474     common.logger.debug(5,line)
475     return 0
476     pass
477     if reVO.match( line ):
478     VO =reVO.match( line ).group()
479     common.logger.debug(5,"VO "+VO)
480     pass
481    
482     if reRB.match( line ):
483     RB = reRB.match(line).group()
484     common.logger.debug(5,"RB "+RB)
485     pass
486    
487     if reCEId.search( line ):
488     for lineCE in lines[i:-1]:
489     if reCE.match( lineCE ):
490     CE = string.strip(reCE.search(lineCE).group(1))
491     CEs.append(CE.split(':')[0])
492     pass
493     pass
494     pass
495     i=i+1
496     pass
497    
498     common.logger.debug(5,"All CE :"+str(CEs))
499    
500     sites = []
501     [sites.append(it) for it in CEs if not sites.count(it)]
502    
503     common.logger.debug(5,"All Sites :"+str(sites))
504     return len(sites)
505    
506     def noMatchFound_(self, jdl):
507     reReq = re.compile( r'Requirements' )
508     reString = re.compile( r'"\S*"' )
509     f = file(jdl,'r')
510     for line in f.readlines():
511     line= line.strip()
512     if reReq.match(line):
513     for req in reString.findall(line):
514     if re.search("VO",req):
515     common.logger.message( "SW required: "+req)
516     continue
517     if re.search('"\d+',req):
518     common.logger.message("Other req : "+req)
519     continue
520     common.logger.message( "CE required: "+req)
521     break
522     pass
523     raise CrabException("No compatible resources found!")
524    
525     def submit(self, nj):
526     """
527     Submit one EDG job.
528     """
529    
530     self.checkProxy()
531     jid = None
532     jdl = common.job_list[nj].jdlFilename()
533    
534     cmd = 'glite-job-submit ' + self.configOpt_() + jdl
535     cmd_out = runCommand(cmd)
536     if cmd_out != None:
537     reSid = re.compile( r'https.+' )
538     jid = reSid.search(cmd_out).group()
539     pass
540     return jid
541    
542     def resubmit(self, nj_list):
543     """
544     Prepare jobs to be submit
545     """
546     return
547    
548     def getExitStatus(self, id):
549     return self.getStatusAttribute_(id, 'exit_code')
550    
551     def queryStatus(self, id):
552     return self.getStatusAttribute_(id, 'status')
553    
554     def queryDest(self, id):
555     return self.getStatusAttribute_(id, 'destination')
556    
557    
558     def getStatusAttribute_(self, id, attr):
559     """ Query a status of the job with id """
560    
561     self.checkProxy()
562     hstates = {}
563     Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
564     # Bypass edg-job-status interfacing directly to C++ API
565     # Job attribute vector to retrieve status without edg-job-status
566     level = 0
567     # Instance of the Status class provided by LB API
568     jobStat = Status()
569     st = 0
570     jobStat.getStatus(id, level)
571     err, apiMsg = jobStat.get_error()
572     if err:
573     common.logger.debug(5,'Error caught' + apiMsg)
574     return None
575     else:
576     for i in range(len(self.states)):
577     # Fill an hash table with all information retrieved from LB API
578     hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
579     result = jobStat.loadStatus(st)[ self.states.index(attr) ]
580     return result
581    
582     def queryDetailedStatus(self, id):
583     """ Query a detailed status of the job with id """
584     cmd = 'glite-job-status '+id
585     cmd_out = runCommand(cmd)
586     return cmd_out
587    
588     def getOutput(self, id):
589     """
590     Get output for a finished job with id.
591     Returns the name of directory with results.
592     """
593    
594     self.checkProxy()
595     cmd = 'glite-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
596     cmd_out = runCommand(cmd)
597    
598     # Determine the output directory name
599     dir = common.work_space.resDir()
600     dir += os.environ['USER']
601     dir += '_' + os.path.basename(id)
602     return dir
603    
604     def cancel(self, id):
605     """ Cancel the EDG job with id """
606     self.checkProxy()
607     cmd = 'glite-job-cancel --noint ' + id
608     cmd_out = runCommand(cmd)
609     return cmd_out
610    
611    
612 spiga 1.1.2.2 def createFakeJdl(self,nj): # TMP Just waiting listmatch functionalitly
613     # implementation into BOSS4 Daniele
614     """
615     Create a fake jdl considering
616     only requirements
617     """
618     job = common.job_list[nj]
619     jbt = job.type()
620     inp_storage_subdir = ''
621    
622    
623     SPL = inp_storage_subdir
624     if ( SPL and SPL[-1] != '/' ) : SPL = SPL + '/'
625    
626     jdl = open(common.work_space.shareDir()+"fake.jdl","w")
627    
628     script = job.scriptFilename()
629     jdl.write('Executable = "' + os.path.basename(script) +'";\n')
630    
631     req='Requirements = '
632     noreq=req
633     req = req + jbt.getRequirements()
634     #### and USER REQUIREMENT
635     if self.EDG_requirements:
636     if (req != noreq):
637     req = req + ' && '
638     req = req + self.EDG_requirements
639     #### FEDE #####
640     if self.EDG_ce_white_list:
641     ce_white_list = string.split(self.EDG_ce_white_list,',')
642     #print "req = ", req
643     for i in range(len(ce_white_list)):
644     if i == 0:
645     if (req != noreq):
646     req = req + ' && '
647     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
648     pass
649     else:
650     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
651     req = req + ')'
652    
653     if self.EDG_ce_black_list:
654     ce_black_list = string.split(self.EDG_ce_black_list,',')
655     for ce in ce_black_list:
656     if (req != noreq):
657     req = req + ' && '
658     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
659     pass
660    
661     ###############
662     clockTime=480
663     if self.EDG_clock_time:
664     clockTime= self.EDG_clock_time
665     if (req != noreq):
666     req = req + ' && '
667     req = req + '((other.GlueCEPolicyMaxWallClockTime == 0) || (other.GlueCEPolicyMaxWallClockTime>='+str(clockTime)+'))'
668    
669     cpuTime=1000
670     if self.EDG_cpu_time:
671     cpuTime=self.EDG_cpu_time
672     if (req != noreq):
673     req = req + ' && '
674     req = req + '((other.GlueCEPolicyMaxCPUTime == 0) || (other.GlueCEPolicyMaxCPUTime>='+str(cpuTime)+'))'
675    
676     if (req != noreq):
677     req = req + ';\n'
678     jdl.write(req)
679    
680     jdl.write('VirtualOrganisation = "' + self.VO + '";\n')
681    
682     if ( self.EDG_retry_count ):
683     jdl.write('RetryCount = '+self.EDG_retry_count+';\n')
684     pass
685    
686     jdl.write('MyProxyServer = "' + self.proxyServer + '";\n')
687    
688     jdl.close()
689     return
690 spiga 1.1.2.1 def createXMLSchScript(self, nj):
691     """
692     Create a XML-file for BOSS4.
693     """
694     job = common.job_list[nj]
695     jbt = job.type()
696     inp_sandbox = jbt.inputSandbox(nj)
697     out_sandbox = jbt.outputSandbox(nj)
698    
699     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
700     jt_string = ''
701    
702 spiga 1.1.2.2 xml_fname = str(self.jobtypeName)+'.xml'
703 spiga 1.1.2.1 xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
704    
705     #TaskName
706     dir = string.split(common.work_space.topDir(), '/')
707     taskName = dir[len(dir)-2]
708    
709     to_writeReq = ''
710     to_write = ''
711     req=' '
712     req = req + jbt.getRequirements()
713     if self.EDG_requirements:
714     if (req == ' '):
715     req = req + self.EDG_requirements
716     else:
717     req = req + ' && ' + self.EDG_requirements
718     if self.EDG_ce_white_list:
719     ce_white_list = string.split(self.EDG_ce_white_list,',')
720     for i in range(len(ce_white_list)):
721     if i == 0:
722     if (req == ' '):
723     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
724     else:
725     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
726     pass
727     else:
728     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
729     req = req + ')'
730    
731     if self.EDG_ce_black_list:
732     ce_black_list = string.split(self.EDG_ce_black_list,',')
733     for ce in ce_black_list:
734     if (req == ' '):
735     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
736     else:
737     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
738     pass
739     if self.EDG_clock_time:
740     if (req == ' '):
741     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
742     else:
743     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
744    
745     if self.EDG_cpu_time:
746     if (req == ' '):
747     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
748     else:
749     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
750     if (req != ' '):
751     req = req + '\n'
752     to_writeReq = req
753    
754    
755     if ( self.EDG_retry_count ):
756     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
757     pass
758    
759     to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
760     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
761    
762    
763     #TaskName
764     dir = string.split(common.work_space.topDir(), '/')
765     taskName = dir[len(dir)-2]
766    
767     if nj == 0:
768     xml.write(str(title))
769     xml.write('<task name="' +str(taskName)+'">\n')
770     xml.write(jt_string)
771     #Here it must pass the extra Tags.. (into Task & out of chain)
772     if (to_writeReq != ''):
773     xml.write('<extraTags>\n')
774     xml.write('<Requirements>\n')
775     xml.write('<![CDATA[\n')
776     xml.write(to_writeReq)
777     xml.write(']]>\n')
778     xml.write('</Requirements>\n')
779     xml.write('</extraTags>\n')
780     pass
781    
782     if (to_write != ''):
783     xml.write('<extraTags\n')
784     xml.write(to_write)
785     xml.write('/>\n')
786     pass
787     pass
788    
789 spiga 1.1.2.2 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
790 spiga 1.1.2.1 xml.write(jt_string)
791    
792     #executable
793     script = job.scriptFilename()
794     xml.write('<program exec="' + os.path.basename(script) +'"\n')
795     xml.write(jt_string)
796    
797    
798     ### only one .sh JDL has arguments:
799     ### Fabio
800     xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "GLITE") +'"\n')
801     xml.write('program_types="crabjob"\n')
802     inp_box = 'infiles="'
803     inp_box = inp_box + '' + script + ','
804    
805     if inp_sandbox != None:
806     for fl in inp_sandbox:
807     inp_box = inp_box + '' + fl + ','
808     pass
809     pass
810    
811     # Marco (VERY TEMPORARY ML STUFF)
812     inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
813     os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
814     os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
815     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
816     os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
817     # End Marco
818    
819     if (not jbt.additional_inbox_files == []):
820     inp_box = inp_box + ', '
821     for addFile in jbt.additional_inbox_files:
822     addFile = os.path.abspath(addFile)
823     inp_box = inp_box+''+addFile+','
824     pass
825    
826     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
827     inp_box = inp_box + ' "\n'
828     xml.write(inp_box)
829    
830     xml.write('stderr="' + job.stdout() + '"\n')
831     xml.write('stdout="' + job.stderr() + '"\n')
832    
833    
834     if job.stdout() == job.stderr():
835     out_box = 'outfiles="' + \
836     job.stdout() + ',.BrokerInfo,'
837     else:
838     out_box = 'outfiles="' + \
839     job.stdout() + ',' + \
840     job.stderr() + ',.BrokerInfo,'
841    
842     if int(self.return_data) == 1:
843     if out_sandbox != None:
844     for fl in out_sandbox:
845     out_box = out_box + '' + fl + ','
846     pass
847     pass
848     pass
849    
850     if out_box[-1] == ',' : out_box = out_box[:-1]
851     out_box = out_box + '"'
852     xml.write(out_box+'\n')
853    
854     xml.write('group="'+taskName+'"\n')
855     xml.write('BossAttr="[crabjob.INTERNAL_ID=' + str(nj+1) +']"\n')
856    
857    
858    
859     xml.write('/>\n')
860     xml.write('</chain>\n')
861    
862     if int(nj) == int(common.jobDB.nJobs()-1): xml.write('</task>\n')
863    
864    
865     xml.close()
866    
867     return
868    
869    
870    
871     def checkProxy(self):
872     """
873     Function to check the Globus proxy.
874     """
875     if (self.proxyValid): return
876     timeleft = -999
877     minTimeLeft=10*3600 # in seconds
878    
879     minTimeLeftServer = 100 # in hours
880    
881     #cmd = 'voms-proxy-info -exists -valid '+str(minTimeLeft)+':00'
882     #cmd = 'voms-proxy-info -timeleft'
883     mustRenew = 0
884     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
885     timeLeftServer = -999
886     if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
887     mustRenew = 1
888     else:
889     timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
890     if not timeLeftServer or not isInt(timeLeftServer):
891     mustRenew = 1
892     elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
893     mustRenew = 1
894     pass
895     pass
896    
897     if mustRenew:
898     common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
899     cmd = 'voms-proxy-init -voms cms -valid 96:00'
900     try:
901     # SL as above: damn it!
902     out = os.system(cmd)
903     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
904     except:
905     msg = "Unable to create a valid proxy!\n"
906     raise CrabException(msg)
907     # cmd = 'grid-proxy-info -timeleft'
908     # cmd_out = runCommand(cmd,0,20)
909     pass
910    
911     ## now I do have a voms proxy valid, and I check the myproxy server
912     renewProxy = 0
913     cmd = 'myproxy-info -d -s '+self.proxyServer
914     cmd_out = runCommand(cmd,0,20)
915     if not cmd_out:
916     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
917     renewProxy = 1
918     else:
919     # if myproxy exist but not long enough, renew
920     reTime = re.compile( r'timeleft: (\d+)' )
921     #print "<"+str(reTime.search( cmd_out ).group(1))+">"
922     if reTime.match( cmd_out ):
923     time = reTime.search( line ).group(1)
924     if time < minTimeLeftServer:
925     renewProxy = 1
926     common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
927     pass
928     pass
929    
930     # if not, create one.
931     if renewProxy:
932     cmd = 'myproxy-init -d -n -s '+self.proxyServer
933     out = os.system(cmd)
934     if (out>0):
935     raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
936     pass
937    
938     # cache proxy validity
939     self.proxyValid=1
940     return
941    
942     def configOpt_(self):
943     edg_ui_cfg_opt = ' '
944     if self.edg_config:
945     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
946     if self.edg_config_vo:
947     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
948     return edg_ui_cfg_opt