ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.108
Committed: Wed Dec 6 10:27:30 2006 UTC (18 years, 4 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.107: +3 -2 lines
Log Message:
fix serious bug in adding requirements

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.50 from EdgConfig import *
6 nsmirnov 1.1 import common
7    
8 corvo 1.107 import os, sys, time, gzip
9 nsmirnov 1.1
10     class SchedulerEdg(Scheduler):
11     def __init__(self):
12     Scheduler.__init__(self,"EDG")
13 slacapra 1.8 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14     "children_hist","children_num","children_states","condorId","condor_jdl", \
15     "cpuTime","destination", "done_code","exit_code","expectFrom", \
16     "expectUpdate","globusId","jdl","jobId","jobtype", \
17     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18     "owner","parent_job", "reason","resubmitted","rsl","seed",\
19     "stateEnterTime","stateEnterTimes","subjob_failed", \
20     "user tags" , "status" , "status_code","hierarchy"]
21 slacapra 1.103
22 nsmirnov 1.1 return
23    
24     def configure(self, cfg_params):
25 spiga 1.49
26 slacapra 1.46 try:
27 fanzago 1.99 RB=cfg_params["EDG.rb"]
28     self.rb_param_file=self.rb_configure(RB)
29 slacapra 1.46 except KeyError:
30 fanzago 1.99 self.rb_param_file=''
31     pass
32 slacapra 1.51 try:
33     self.proxyServer = cfg_params["EDG.proxy_server"]
34     except KeyError:
35     self.proxyServer = 'myproxy.cern.ch'
36     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
37 spiga 1.49
38 slacapra 1.79 try:
39 spiga 1.91 self.group = cfg_params["EDG.group"]
40     except KeyError:
41     self.group = None
42    
43     try:
44 slacapra 1.79 self.role = cfg_params["EDG.role"]
45     except KeyError:
46     self.role = None
47    
48 nsmirnov 1.1 try: self.LCG_version = cfg_params["EDG.lcg_version"]
49     except KeyError: self.LCG_version = '2'
50    
51 fanzago 1.48 try:
52     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
53     except KeyError:
54     self.EDG_ce_black_list = ''
55    
56     try:
57     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
58     except KeyError: self.EDG_ce_white_list = ''
59    
60 fanzago 1.14 try: self.VO = cfg_params['EDG.virtual_organization']
61     except KeyError: self.VO = 'cms'
62    
63 slacapra 1.102 try: self.copy_input_data = cfg_params["USER.copy_input_data"]
64     except KeyError: self.copy_input_data = 0
65    
66 spiga 1.49 try: self.return_data = cfg_params['USER.return_data']
67 spiga 1.91 except KeyError: self.return_data = 0
68    
69 fanzago 1.14 try:
70     self.copy_data = cfg_params["USER.copy_data"]
71 fanzago 1.30 if int(self.copy_data) == 1:
72     try:
73     self.SE = cfg_params['USER.storage_element']
74     self.SE_PATH = cfg_params['USER.storage_path']
75     except KeyError:
76     msg = "Error. The [USER] section does not have 'storage_element'"
77     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
78     common.logger.message(msg)
79     raise CrabException(msg)
80     except KeyError: self.copy_data = 0
81    
82     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
83     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
84     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
85     raise CrabException(msg)
86 fanzago 1.38
87     try:
88     self.lfc_host = cfg_params['EDG.lfc_host']
89     except KeyError:
90     msg = "Error. The [EDG] section does not have 'lfc_host' value"
91     msg = msg + " it's necessary to know the LFC host name"
92     common.logger.message(msg)
93     raise CrabException(msg)
94     try:
95     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
96     except KeyError:
97     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
98     msg = msg + " it's necessary to know the catalog type"
99     common.logger.message(msg)
100     raise CrabException(msg)
101     try:
102     self.lfc_home = cfg_params['EDG.lfc_home']
103     except KeyError:
104     msg = "Error. The [EDG] section does not have 'lfc_home' value"
105     msg = msg + " it's necessary to know the home catalog dir"
106     common.logger.message(msg)
107     raise CrabException(msg)
108 fanzago 1.30
109 fanzago 1.14 try:
110     self.register_data = cfg_params["USER.register_data"]
111 fanzago 1.30 if int(self.register_data) == 1:
112     try:
113     self.LFN = cfg_params['USER.lfn_dir']
114     except KeyError:
115     msg = "Error. The [USER] section does not have 'lfn_dir' value"
116 fanzago 1.36 msg = msg + " it's necessary for LCF registration"
117 fanzago 1.30 common.logger.message(msg)
118     raise CrabException(msg)
119     except KeyError: self.register_data = 0
120    
121     if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
122     msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
123     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
124     common.logger.message(msg)
125     raise CrabException(msg)
126 fanzago 1.14
127     try: self.EDG_requirements = cfg_params['EDG.requirements']
128     except KeyError: self.EDG_requirements = ''
129 slacapra 1.97
130 slacapra 1.101 try: self.EDG_addJdlParam = string.split(cfg_params['EDG.additional_jdl_parameters'],',')
131     except KeyError: self.EDG_addJdlParam = []
132    
133 fanzago 1.14 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
134     except KeyError: self.EDG_retry_count = ''
135 slacapra 1.97
136     try: self.EDG_shallow_retry_count= cfg_params['EDG.shallow_retry_count']
137     except KeyError: self.EDG_shallow_retry_count = ''
138    
139 fanzago 1.14 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
140     except KeyError: self.EDG_clock_time= ''
141 slacapra 1.97
142 fanzago 1.14 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
143     except KeyError: self.EDG_cpu_time = ''
144    
145 fanzago 1.16 # Add EDG_WL_LOCATION to the python path
146    
147     try:
148     path = os.environ['EDG_WL_LOCATION']
149     except:
150     msg = "Error: the EDG_WL_LOCATION variable is not set."
151     raise CrabException(msg)
152    
153     libPath=os.path.join(path, "lib")
154     sys.path.append(libPath)
155     libPath=os.path.join(path, "lib", "python")
156     sys.path.append(libPath)
157 nsmirnov 1.1
158 slacapra 1.18 self.proxyValid=0
159 gutsche 1.60
160 slacapra 1.61 try:
161     self._taskId = cfg_params['taskId']
162     except:
163     self._taskId = ''
164 gutsche 1.60
165 slacapra 1.81 try: self.jobtypeName = cfg_params['CRAB.jobtype']
166     except KeyError: self.jobtypeName = ''
167    
168     try: self.schedulerName = cfg_params['CRAB.scheduler']
169     except KeyError: self.scheduler = ''
170    
171 slacapra 1.103 try: self.dontCheckProxy=cfg_params["EDG.dont_check_proxy"]
172     except KeyError: self.dontCheckProxy = 0
173    
174 nsmirnov 1.1 return
175    
176 fanzago 1.10
177 fanzago 1.99 def rb_configure(self, RB):
178     self.edg_config = ''
179     self.edg_config_vo = ''
180     self.rb_param_file = ''
181    
182     edgConfig = EdgConfig(RB)
183     self.edg_config = edgConfig.config()
184     self.edg_config_vo = edgConfig.configVO()
185    
186     if (self.edg_config and self.edg_config_vo != ''):
187     self.rb_param_file = 'RBconfig = "'+self.edg_config+'";\nRBconfigVO = "'+self.edg_config_vo+'";'
188 fanzago 1.100 #print "rb_param_file = ", self.rb_param_file
189 fanzago 1.99 return self.rb_param_file
190    
191    
192 fanzago 1.10 def sched_parameter(self):
193     """
194 spiga 1.88 Returns file with requirements and scheduler-specific parameters
195 fanzago 1.10 """
196 spiga 1.88 index = int(common.jobDB.nJobs()) - 1
197     job = common.job_list[index]
198     jbt = job.type()
199    
200 slacapra 1.95 lastBlock=-1
201 spiga 1.88 first = []
202     for n in range(common.jobDB.nJobs()):
203 slacapra 1.95 currBlock=common.jobDB.block(n)
204     if (currBlock!=lastBlock):
205     lastBlock = currBlock
206 spiga 1.88 first.append(n)
207    
208     req = ''
209     req = req + jbt.getRequirements()
210    
211     if self.EDG_requirements:
212     if (req == ' '):
213     req = req + self.EDG_requirements
214     else:
215     req = req + ' && ' + self.EDG_requirements
216 slacapra 1.95
217 spiga 1.88 if self.EDG_ce_white_list:
218     ce_white_list = string.split(self.EDG_ce_white_list,',')
219     for i in range(len(ce_white_list)):
220     if i == 0:
221     if (req == ' '):
222 spiga 1.106 req = req + '((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
223 spiga 1.88 else:
224 spiga 1.106 req = req + ' && ((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
225 spiga 1.88 pass
226     else:
227 spiga 1.106 req = req + ' || (RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
228 spiga 1.88 req = req + ')'
229    
230     if self.EDG_ce_black_list:
231     ce_black_list = string.split(self.EDG_ce_black_list,',')
232     for ce in ce_black_list:
233     if (req == ' '):
234 spiga 1.106 req = req + '(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
235 spiga 1.88 else:
236 spiga 1.106 req = req + ' && (!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
237 spiga 1.88 pass
238     if self.EDG_clock_time:
239     if (req == ' '):
240     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
241     else:
242     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
243    
244     if self.EDG_cpu_time:
245     if (req == ' '):
246     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
247     else:
248     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
249    
250     for i in range(len(first)): # Add loop DS
251 slacapra 1.108 groupReq = req
252 spiga 1.88 self.param='sched_param_'+str(i)+'.clad'
253 fanzago 1.10 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
254 spiga 1.88
255     itr4=self.findSites_(first[i])
256 slacapra 1.95 for arg in itr4:
257 slacapra 1.108 groupReq = groupReq + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
258     param_file.write('Requirements = '+groupReq +';\n')
259 spiga 1.88
260 fanzago 1.99 if (self.rb_param_file != ''):
261     param_file.write(self.rb_param_file)
262 spiga 1.88
263 slacapra 1.101 if len(self.EDG_addJdlParam):
264     for p in self.EDG_addJdlParam:
265     param_file.write(p)
266    
267 fanzago 1.10 param_file.close()
268 spiga 1.88
269 fanzago 1.13
270 nsmirnov 1.2 def wsSetupEnvironment(self):
271     """
272     Returns part of a job script which does scheduler-specific work.
273     """
274 spiga 1.49 txt = ''
275 mkirn 1.75 txt += '# strip arguments\n'
276     txt += 'echo "strip arguments"\n'
277     txt += 'args=("$@")\n'
278     txt += 'nargs=$#\n'
279     txt += 'shift $nargs\n'
280 gutsche 1.60 txt += "# job number (first parameter for job wrapper)\n"
281 mkirn 1.75 #txt += "NJob=$1\n"
282     txt += "NJob=${args[0]}\n"
283 gutsche 1.60
284     txt += '# job identification to DashBoard \n'
285 gutsche 1.64 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
286     txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
287     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
288     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
289     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
290     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
291 gutsche 1.60
292 spiga 1.49 txt += 'echo "middleware discovery " \n'
293 gutsche 1.84 txt += 'if [ $GRID3_APP_DIR ]; then\n'
294 spiga 1.49 txt += ' middleware=OSG \n'
295 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
296     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
297 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
298     txt += 'elif [ $OSG_APP ]; then \n'
299     txt += ' middleware=OSG \n'
300 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
301     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
302 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
303 gutsche 1.84 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
304     txt += ' middleware=LCG \n'
305     txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
306     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
307     txt += ' echo "middleware =$middleware" \n'
308 spiga 1.49 txt += 'else \n'
309 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
310     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
311     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
312     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
313 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
314     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
315     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
316 gutsche 1.60 txt += ' exit 1 \n'
317     txt += 'fi \n'
318    
319     txt += '# report first time to DashBoard \n'
320     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
321 gutsche 1.64 txt += 'rm -f $RUNTIME_AREA/$repo \n'
322     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
323     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
324    
325 spiga 1.49 txt += '\n\n'
326    
327 fanzago 1.30 if int(self.copy_data) == 1:
328 fanzago 1.14 if self.SE:
329     txt += 'export SE='+self.SE+'\n'
330 fanzago 1.15 txt += 'echo "SE = $SE"\n'
331 fanzago 1.14 if self.SE_PATH:
332     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
333     txt += 'export SE_PATH='+self.SE_PATH+'\n'
334 fanzago 1.15 txt += 'echo "SE_PATH = $SE_PATH"\n'
335 spiga 1.49
336 fanzago 1.38 txt += 'export VO='+self.VO+'\n'
337 spiga 1.88 ### add some line for LFC catalog setting
338 spiga 1.49 txt += 'if [ $middleware == LCG ]; then \n'
339     txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
340     txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
341     txt += ' fi\n'
342     txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
343     txt += ' export LFC_HOST='+self.lfc_host+'\n'
344     txt += ' fi\n'
345     txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
346     txt += ' export LFC_HOME='+self.lfc_home+'\n'
347     txt += ' fi\n'
348     txt += 'elif [ $middleware == OSG ]; then\n'
349     txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
350 fanzago 1.38 txt += 'fi\n'
351     #####
352 fanzago 1.30 if int(self.register_data) == 1:
353 spiga 1.49 txt += 'if [ $middleware == LCG ]; then \n'
354     txt += ' export LFN='+self.LFN+'\n'
355     txt += ' lfc-ls $LFN\n'
356     txt += ' result=$?\n'
357     txt += ' echo $result\n'
358 fanzago 1.36 ### creation of LFN dir in LFC catalog, under /grid/cms dir
359 spiga 1.49 txt += ' if [ $result != 0 ]; then\n'
360     txt += ' lfc-mkdir $LFN\n'
361     txt += ' result=$?\n'
362     txt += ' echo $result\n'
363     txt += ' fi\n'
364     txt += 'elif [ $middleware == OSG ]; then\n'
365     txt += ' echo " Files registration to be implemented for OSG"\n'
366 fanzago 1.36 txt += 'fi\n'
367     txt += '\n'
368 spiga 1.49
369     if self.VO:
370     txt += 'export VO='+self.VO+'\n'
371     if self.LFN:
372     txt += 'if [ $middleware == LCG ]; then \n'
373     txt += ' export LFN='+self.LFN+'\n'
374     txt += 'fi\n'
375     txt += '\n'
376    
377     txt += 'if [ $middleware == LCG ]; then\n'
378     txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
379     txt += ' echo "CloseCEs = $CloseCEs"\n'
380     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
381     txt += ' echo "CE = $CE"\n'
382     txt += 'elif [ $middleware == OSG ]; then \n'
383     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
384 slacapra 1.81 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
385 spiga 1.49 txt += ' else \n'
386 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
387     txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
388     txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
389     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
390 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
391     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
392     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
393 spiga 1.49 txt += ' exit 1 \n'
394     txt += ' fi \n'
395     txt += 'fi \n'
396    
397 nsmirnov 1.2 return txt
398 fanzago 1.15
399 fanzago 1.39 def wsCopyInput(self):
400     """
401     Copy input data from SE to WN
402     """
403     txt = ''
404 slacapra 1.102 if not self.copy_input_data: return txt
405 slacapra 1.90
406 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
407 slacapra 1.92 txt += 'if [ $middleware == OSG ]; then\n'
408     txt += ' #\n'
409     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
410     txt += ' #\n'
411     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
412     txt += 'elif [ $middleware == LCG ]; then \n'
413     txt += ' #\n'
414     txt += ' # Copy Input Data from SE to this WN\n'
415     txt += ' #\n'
416     ### changed by georgia (put a loop copying more than one input files per jobs)
417     txt += ' for input_file in $cur_file_list \n'
418     txt += ' do \n'
419     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
420     txt += ' copy_input_exit_status=$?\n'
421     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
422     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
423     txt += ' echo "Problems with copying to WN" \n'
424     txt += ' else \n'
425     txt += ' echo "input copied into WN" \n'
426     txt += ' fi \n'
427     txt += ' done \n'
428     ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
429     txt += ' for file in $cur_pu_list \n'
430     txt += ' do \n'
431     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
432     txt += ' copy_input_pu_exit_status=$?\n'
433     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
434     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
435     txt += ' echo "Problems with copying pu to WN" \n'
436     txt += ' else \n'
437     txt += ' echo "input pu files copied into WN" \n'
438     txt += ' fi \n'
439     txt += ' done \n'
440     txt += ' \n'
441     txt += ' ### Check SCRATCH space available on WN : \n'
442     txt += ' df -h \n'
443     txt += 'fi \n'
444 spiga 1.49
445 fanzago 1.39 return txt
446    
447 fanzago 1.14 def wsCopyOutput(self):
448     """
449     Write a CopyResults part of a job script, e.g.
450     to copy produced output into a storage element.
451     """
452     txt = ''
453 fanzago 1.30 if int(self.copy_data) == 1:
454 fanzago 1.14 txt += '#\n'
455 fanzago 1.15 txt += '# Copy output to SE = $SE\n'
456 fanzago 1.14 txt += '#\n'
457 gutsche 1.73 txt += ' if [ $middleware == OSG ]; then\n'
458     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
459     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
460     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
461     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
462     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
463     txt += ' fi \n'
464 spiga 1.91
465 corvo 1.34 txt += ' for out_file in $file_list ; do\n'
466 spiga 1.91 txt += ' echo "Trying to copy output file to $SE using srmcp"\n'
467     txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
468     txt += ' mkdir -p $HOME/.srmconfig\n'
469     txt += ' if [ $middleware == LCG ]; then\n'
470     txt += ' echo "srmcp -retry_num 3 -retry_timeout 480000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
471     txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 480000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
472     txt += ' elif [ $middleware == OSG ]; then\n'
473     txt += ' echo "srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
474     txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
475     txt += ' fi \n'
476 gutsche 1.73 txt += ' copy_exit_status=$?\n'
477 spiga 1.91 txt += ' echo "COPY_EXIT_STATUS for srmcp = $copy_exit_status"\n'
478 corvo 1.34 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
479 spiga 1.91
480 corvo 1.34 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
481 gutsche 1.73 txt += ' echo "Possible problem with SE = $SE"\n'
482 corvo 1.34 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
483     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
484 mkirn 1.93 txt += ' echo "srmcp failed, attempting lcg-cp."\n'
485 spiga 1.91 if common.logger.debugLevel() >= 5:
486     txt += ' echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
487     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
488     else:
489     txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
490     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
491 spiga 1.88 txt += ' copy_exit_status=$?\n'
492 spiga 1.91 txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
493 slacapra 1.87 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
494 spiga 1.91
495 gutsche 1.73 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
496     txt += ' echo "Problems with SE = $SE"\n'
497     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
498     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
499 mkirn 1.93 txt += ' echo "srmcp and lcg-cp and failed!"\n'
500 gutsche 1.73 txt += ' else\n'
501     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
502     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
503     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
504     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
505 spiga 1.91 txt += ' echo "lcg-cp succeeded"\n'
506 gutsche 1.73 txt += ' fi\n'
507 corvo 1.34 txt += ' else\n'
508     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
509     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
510     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
511     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
512 spiga 1.91 txt += ' echo "srmcp succeeded"\n'
513 corvo 1.34 txt += ' fi\n'
514     txt += ' done\n'
515 fanzago 1.14 return txt
516    
517     def wsRegisterOutput(self):
518     """
519     Returns part of a job script which does scheduler-specific work.
520     """
521    
522     txt = ''
523 fanzago 1.30 if int(self.register_data) == 1:
524 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
525     txt += 'if [ $middleware == OSG ]; then\n'
526     txt += ' #\n'
527     txt += ' # Register output to LFC deactivated in OSG mode\n'
528     txt += ' #\n'
529     txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
530     txt += 'elif [ $middleware == LCG ]; then \n'
531 fanzago 1.14 txt += '#\n'
532 fanzago 1.36 txt += '# Register output to LFC\n'
533 fanzago 1.14 txt += '#\n'
534 fanzago 1.66 txt += ' if [ $copy_exit_status -eq 0 ]; then\n'
535 spiga 1.49 txt += ' for out_file in $file_list ; do\n'
536     txt += ' echo "Trying to register the output file into LFC"\n'
537 fanzago 1.71 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1"\n'
538 fanzago 1.70 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1 \n'
539 fanzago 1.14 txt += ' register_exit_status=$?\n'
540 fanzago 1.15 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
541     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
542 fanzago 1.14 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
543 spiga 1.49 txt += ' echo "Problems with the registration to LFC" \n'
544     txt += ' echo "Try with srm protocol" \n'
545 fanzago 1.71 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1"\n'
546 fanzago 1.70 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1 \n'
547 spiga 1.49 txt += ' register_exit_status=$?\n'
548     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
549     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
550     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
551     txt += ' echo "Problems with the registration into LFC" \n'
552     txt += ' fi \n'
553     txt += ' else \n'
554     txt += ' echo "output registered to LFC"\n'
555 fanzago 1.14 txt += ' fi \n'
556 spiga 1.49 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
557     txt += ' done\n'
558 fanzago 1.66 txt += ' else \n'
559 spiga 1.49 txt += ' echo "Trying to copy output file to CloseSE"\n'
560     txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
561     txt += ' for out_file in $file_list ; do\n'
562 fanzago 1.71 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1" \n'
563 fanzago 1.70 txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1 \n'
564 spiga 1.49 txt += ' register_exit_status=$?\n'
565     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
566     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
567     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
568 fanzago 1.66 txt += ' echo "Problems with CloseSE or Catalog" \n'
569 spiga 1.49 txt += ' else \n'
570     txt += ' echo "The program was successfully executed"\n'
571     txt += ' echo "SE = $CLOSE_SE"\n'
572     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
573     txt += ' fi \n'
574     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
575     txt += ' done\n'
576     txt += ' fi \n'
577 fanzago 1.68 txt += ' exit_status=$register_exit_status\n'
578 fanzago 1.14 txt += 'fi \n'
579     return txt
580 nsmirnov 1.1
581 spiga 1.23 def loggingInfo(self, id):
582 slacapra 1.7 """
583     retrieve the logging info from logging and bookkeeping and return it
584     """
585 slacapra 1.18 self.checkProxy()
586 fanzago 1.24 cmd = 'edg-job-get-logging-info -v 2 ' + id
587 slacapra 1.32 cmd_out = runCommand(cmd)
588 slacapra 1.7 return cmd_out
589    
590 nsmirnov 1.1 def queryDetailedStatus(self, id):
591     """ Query a detailed status of the job with id """
592     cmd = 'edg-job-status '+id
593     cmd_out = runCommand(cmd)
594     return cmd_out
595    
596 slacapra 1.80 ##### FEDE ######
597 spiga 1.88 def findSites_(self, n):
598     itr4 =[]
599     sites = common.jobDB.destination(n)
600     if len(sites)>0 and sites[0]=="Any":
601     return itr4
602     itr = ''
603     if sites != [""]:#CarlosDaniele
604     for site in sites:
605     #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
606     itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
607     itr = itr[0:-4]
608     itr4.append( itr )
609 slacapra 1.80 return itr4
610    
611     def createXMLSchScript(self, nj, argsList):
612 spiga 1.88
613 slacapra 1.80 """
614     Create a XML-file for BOSS4.
615     """
616     # job = common.job_list[nj]
617     """
618     INDY
619 spiga 1.88 [begin] FIX-ME:
620     I would pass jobType instead of job
621 slacapra 1.80 """
622     index = nj - 1
623     job = common.job_list[index]
624     jbt = job.type()
625    
626     inp_sandbox = jbt.inputSandbox(index)
627     out_sandbox = jbt.outputSandbox(index)
628 nsmirnov 1.5 """
629 spiga 1.88 [end] FIX-ME
630 nsmirnov 1.5 """
631 slacapra 1.6
632 slacapra 1.81
633     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
634     jt_string = ''
635    
636     xml_fname = str(self.jobtypeName)+'.xml'
637     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
638    
639     #TaskName
640     dir = string.split(common.work_space.topDir(), '/')
641     taskName = dir[len(dir)-2]
642    
643     to_writeReq = ''
644     to_write = ''
645    
646     req=' '
647     req = req + jbt.getRequirements()
648    
649     if self.EDG_requirements:
650     if (req == ' '):
651     req = req + self.EDG_requirements
652     else:
653     req = req + ' && ' + self.EDG_requirements
654     if self.EDG_ce_white_list:
655     ce_white_list = string.split(self.EDG_ce_white_list,',')
656     for i in range(len(ce_white_list)):
657     if i == 0:
658     if (req == ' '):
659     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
660     else:
661     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
662     pass
663     else:
664     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
665     req = req + ')'
666    
667     if self.EDG_ce_black_list:
668     ce_black_list = string.split(self.EDG_ce_black_list,',')
669     for ce in ce_black_list:
670     if (req == ' '):
671     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
672     else:
673     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
674     pass
675     if self.EDG_clock_time:
676     if (req == ' '):
677     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
678     else:
679     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
680    
681     if self.EDG_cpu_time:
682     if (req == ' '):
683     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
684     else:
685     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
686    
687     if ( self.EDG_retry_count ):
688     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
689     pass
690 nsmirnov 1.5
691 slacapra 1.97 if ( self.EDG_shallow_retry_count ):
692 slacapra 1.98 to_write = to_write + 'ShallowRetryCount = "'+self.EDG_shallow_retry_count+'"\n'
693 slacapra 1.97 pass
694    
695 slacapra 1.81 to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
696     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
697 nsmirnov 1.1
698 slacapra 1.102 #TaskName
699 slacapra 1.81 dir = string.split(common.work_space.topDir(), '/')
700     taskName = dir[len(dir)-2]
701    
702     xml.write(str(title))
703 corvo 1.107 xml.write('<task name="' +str(taskName)+'" sub_path="' + common.work_space.bossCache() + '">\n')
704 slacapra 1.81 xml.write(jt_string)
705 spiga 1.88
706     if (to_write != ''):
707     xml.write('<extraTags\n')
708     xml.write(to_write)
709     xml.write('/>\n')
710     pass
711 slacapra 1.81
712     xml.write('<iterator>\n')
713     xml.write('\t<iteratorRule name="ITR1">\n')
714     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
715     xml.write('\t</iteratorRule>\n')
716     xml.write('\t<iteratorRule name="ITR2">\n')
717     for arg in argsList:
718     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
719     pass
720     xml.write('\t</iteratorRule>\n')
721     #print jobList
722     xml.write('\t<iteratorRule name="ITR3">\n')
723     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
724     xml.write('\t</iteratorRule>\n')
725    
726     '''
727 spiga 1.88 indy: here itr4
728 slacapra 1.81 '''
729    
730 fanzago 1.14
731 slacapra 1.81 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
732     xml.write(jt_string)
733 fanzago 1.14
734 slacapra 1.81 #executable
735 nsmirnov 1.1
736 slacapra 1.81 """
737     INDY
738 spiga 1.88 script depends on jobType: it should be probably get in a different way
739 slacapra 1.81 """
740 nsmirnov 1.1 script = job.scriptFilename()
741 slacapra 1.81 xml.write('<program>\n')
742     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
743     xml.write(jt_string)
744 corvo 1.107
745 slacapra 1.81 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
746     xml.write('<program_types> crabjob </program_types>\n')
747 corvo 1.107 inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
748 nsmirnov 1.1
749     if inp_sandbox != None:
750     for fl in inp_sandbox:
751 slacapra 1.81 inp_box = inp_box + '' + fl + ','
752 nsmirnov 1.1 pass
753     pass
754    
755 fanzago 1.40 if (not jbt.additional_inbox_files == []):
756 spiga 1.82 inp_box = inp_box + ','
757 fanzago 1.40 for addFile in jbt.additional_inbox_files:
758     addFile = os.path.abspath(addFile)
759 slacapra 1.81 inp_box = inp_box+''+addFile+','
760 fanzago 1.40 pass
761 nsmirnov 1.1
762     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
763 slacapra 1.81 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
764     xml.write(inp_box)
765    
766     base = jbt.name()
767     stdout = base + '__ITR3_.stdout'
768     stderr = base + '__ITR3_.stderr'
769 fanzago 1.14
770 slacapra 1.81 xml.write('<stderr> ' + stderr + '</stderr>\n')
771     xml.write('<stdout> ' + stdout + '</stdout>\n')
772 fanzago 1.14
773    
774 slacapra 1.81 out_box = stdout + ',' + \
775     stderr + ',.BrokerInfo,'
776    
777     """
778 fanzago 1.30 if int(self.return_data) == 1:
779 fanzago 1.14 if out_sandbox != None:
780     for fl in out_sandbox:
781 slacapra 1.81 out_box = out_box + '' + fl + ','
782 fanzago 1.14 pass
783 nsmirnov 1.1 pass
784     pass
785 slacapra 1.81 """
786 nsmirnov 1.1
787 slacapra 1.81 """
788     INDY
789 spiga 1.88 something similar should be also done for infiles (if it makes sense!)
790     """
791 slacapra 1.104 # Stuff to be returned _always_ via sandbox
792     for fl in jbt.output_file_sandbox:
793     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
794     pass
795     pass
796    
797     # via sandbox iif required return_data
798 slacapra 1.81 if int(self.return_data) == 1:
799     for fl in jbt.output_file:
800     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
801 fanzago 1.48 pass
802 slacapra 1.81 pass
803 slacapra 1.62
804 slacapra 1.81 if out_box[-1] == ',' : out_box = out_box[:-1]
805     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
806     xml.write(out_box)
807    
808     xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
809 fanzago 1.48
810 slacapra 1.81 xml.write('</program>\n')
811     xml.write('</chain>\n')
812 nsmirnov 1.1
813 slacapra 1.81 xml.write('</iterator>\n')
814     xml.write('</task>\n')
815 slacapra 1.51
816 slacapra 1.81 xml.close()
817 spiga 1.88
818    
819 nsmirnov 1.1 return
820 slacapra 1.18
821     def checkProxy(self):
822     """
823     Function to check the Globus proxy.
824     """
825     if (self.proxyValid): return
826 slacapra 1.103
827     ### Just return if asked to do so
828     if (self.dontCheckProxy):
829     self.proxyValid=1
830     return
831    
832 slacapra 1.18 timeleft = -999
833 slacapra 1.50 minTimeLeft=10*3600 # in seconds
834 slacapra 1.51
835     minTimeLeftServer = 100 # in hours
836    
837 slacapra 1.50 mustRenew = 0
838 fanzago 1.69 timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
839 slacapra 1.50 timeLeftServer = -999
840 fanzago 1.53 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
841 slacapra 1.50 mustRenew = 1
842     else:
843 fanzago 1.69 timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
844 slacapra 1.50 if not timeLeftServer or not isInt(timeLeftServer):
845     mustRenew = 1
846 slacapra 1.54 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
847 slacapra 1.50 mustRenew = 1
848     pass
849     pass
850    
851 slacapra 1.51 if mustRenew:
852 gutsche 1.94 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
853 spiga 1.91 cmd = 'voms-proxy-init -voms '+self.VO
854     if self.group:
855     cmd += ':/'+self.VO+'/'+self.group
856 slacapra 1.79 if self.role:
857 spiga 1.91 cmd += '/role='+self.role
858 gutsche 1.94 cmd += ' -valid 192:00'
859 slacapra 1.18 try:
860 slacapra 1.32 # SL as above: damn it!
861 spiga 1.91 common.logger.debug(10,cmd)
862 slacapra 1.19 out = os.system(cmd)
863     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
864 slacapra 1.18 except:
865     msg = "Unable to create a valid proxy!\n"
866     raise CrabException(msg)
867     pass
868 slacapra 1.51
869     ## now I do have a voms proxy valid, and I check the myproxy server
870     renewProxy = 0
871     cmd = 'myproxy-info -d -s '+self.proxyServer
872     cmd_out = runCommand(cmd,0,20)
873     if not cmd_out:
874     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
875     renewProxy = 1
876     else:
877     # if myproxy exist but not long enough, renew
878     reTime = re.compile( r'timeleft: (\d+)' )
879     #print "<"+str(reTime.search( cmd_out ).group(1))+">"
880     if reTime.match( cmd_out ):
881     time = reTime.search( line ).group(1)
882     if time < minTimeLeftServer:
883     renewProxy = 1
884     common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
885     pass
886     pass
887    
888     # if not, create one.
889     if renewProxy:
890     cmd = 'myproxy-init -d -n -s '+self.proxyServer
891     out = os.system(cmd)
892     if (out>0):
893 fanzago 1.53 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
894 slacapra 1.51 pass
895    
896     # cache proxy validity
897 slacapra 1.18 self.proxyValid=1
898     return
899 spiga 1.49
900 slacapra 1.18 def configOpt_(self):
901     edg_ui_cfg_opt = ' '
902     if self.edg_config:
903 slacapra 1.51 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
904 slacapra 1.18 if self.edg_config_vo:
905 slacapra 1.51 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
906 slacapra 1.18 return edg_ui_cfg_opt