ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.109
Committed: Wed Dec 13 11:44:29 2006 UTC (18 years, 4 months ago) by corvo
Content type: text/x-python
Branch: MAIN
Changes since 1.108: +2 -3 lines
Log Message:
Changed Boss cache dir

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.50 from EdgConfig import *
6 nsmirnov 1.1 import common
7    
8 corvo 1.107 import os, sys, time, gzip
9 nsmirnov 1.1
10     class SchedulerEdg(Scheduler):
11     def __init__(self):
12     Scheduler.__init__(self,"EDG")
13 slacapra 1.8 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14     "children_hist","children_num","children_states","condorId","condor_jdl", \
15     "cpuTime","destination", "done_code","exit_code","expectFrom", \
16     "expectUpdate","globusId","jdl","jobId","jobtype", \
17     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18     "owner","parent_job", "reason","resubmitted","rsl","seed",\
19     "stateEnterTime","stateEnterTimes","subjob_failed", \
20     "user tags" , "status" , "status_code","hierarchy"]
21 slacapra 1.103
22 nsmirnov 1.1 return
23    
24     def configure(self, cfg_params):
25 spiga 1.49
26 slacapra 1.46 try:
27 fanzago 1.99 RB=cfg_params["EDG.rb"]
28     self.rb_param_file=self.rb_configure(RB)
29 slacapra 1.46 except KeyError:
30 fanzago 1.99 self.rb_param_file=''
31     pass
32 slacapra 1.51 try:
33     self.proxyServer = cfg_params["EDG.proxy_server"]
34     except KeyError:
35     self.proxyServer = 'myproxy.cern.ch'
36     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
37 spiga 1.49
38 slacapra 1.79 try:
39 spiga 1.91 self.group = cfg_params["EDG.group"]
40     except KeyError:
41     self.group = None
42    
43     try:
44 slacapra 1.79 self.role = cfg_params["EDG.role"]
45     except KeyError:
46     self.role = None
47    
48 nsmirnov 1.1 try: self.LCG_version = cfg_params["EDG.lcg_version"]
49     except KeyError: self.LCG_version = '2'
50    
51 fanzago 1.48 try:
52     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
53     except KeyError:
54     self.EDG_ce_black_list = ''
55    
56     try:
57     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
58     except KeyError: self.EDG_ce_white_list = ''
59    
60 fanzago 1.14 try: self.VO = cfg_params['EDG.virtual_organization']
61     except KeyError: self.VO = 'cms'
62    
63 slacapra 1.102 try: self.copy_input_data = cfg_params["USER.copy_input_data"]
64     except KeyError: self.copy_input_data = 0
65    
66 spiga 1.49 try: self.return_data = cfg_params['USER.return_data']
67 spiga 1.91 except KeyError: self.return_data = 0
68    
69 fanzago 1.14 try:
70     self.copy_data = cfg_params["USER.copy_data"]
71 fanzago 1.30 if int(self.copy_data) == 1:
72     try:
73     self.SE = cfg_params['USER.storage_element']
74     self.SE_PATH = cfg_params['USER.storage_path']
75     except KeyError:
76     msg = "Error. The [USER] section does not have 'storage_element'"
77     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
78     common.logger.message(msg)
79     raise CrabException(msg)
80     except KeyError: self.copy_data = 0
81    
82     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
83     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
84     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
85     raise CrabException(msg)
86 fanzago 1.38
87     try:
88     self.lfc_host = cfg_params['EDG.lfc_host']
89     except KeyError:
90     msg = "Error. The [EDG] section does not have 'lfc_host' value"
91     msg = msg + " it's necessary to know the LFC host name"
92     common.logger.message(msg)
93     raise CrabException(msg)
94     try:
95     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
96     except KeyError:
97     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
98     msg = msg + " it's necessary to know the catalog type"
99     common.logger.message(msg)
100     raise CrabException(msg)
101     try:
102     self.lfc_home = cfg_params['EDG.lfc_home']
103     except KeyError:
104     msg = "Error. The [EDG] section does not have 'lfc_home' value"
105     msg = msg + " it's necessary to know the home catalog dir"
106     common.logger.message(msg)
107     raise CrabException(msg)
108 fanzago 1.30
109 fanzago 1.14 try:
110     self.register_data = cfg_params["USER.register_data"]
111 fanzago 1.30 if int(self.register_data) == 1:
112     try:
113     self.LFN = cfg_params['USER.lfn_dir']
114     except KeyError:
115     msg = "Error. The [USER] section does not have 'lfn_dir' value"
116 fanzago 1.36 msg = msg + " it's necessary for LCF registration"
117 fanzago 1.30 common.logger.message(msg)
118     raise CrabException(msg)
119     except KeyError: self.register_data = 0
120    
121     if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
122     msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
123     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
124     common.logger.message(msg)
125     raise CrabException(msg)
126 fanzago 1.14
127     try: self.EDG_requirements = cfg_params['EDG.requirements']
128     except KeyError: self.EDG_requirements = ''
129 slacapra 1.97
130 slacapra 1.101 try: self.EDG_addJdlParam = string.split(cfg_params['EDG.additional_jdl_parameters'],',')
131     except KeyError: self.EDG_addJdlParam = []
132    
133 fanzago 1.14 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
134     except KeyError: self.EDG_retry_count = ''
135 slacapra 1.97
136     try: self.EDG_shallow_retry_count= cfg_params['EDG.shallow_retry_count']
137     except KeyError: self.EDG_shallow_retry_count = ''
138    
139 fanzago 1.14 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
140     except KeyError: self.EDG_clock_time= ''
141 slacapra 1.97
142 fanzago 1.14 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
143     except KeyError: self.EDG_cpu_time = ''
144    
145 fanzago 1.16 # Add EDG_WL_LOCATION to the python path
146    
147     try:
148     path = os.environ['EDG_WL_LOCATION']
149     except:
150     msg = "Error: the EDG_WL_LOCATION variable is not set."
151     raise CrabException(msg)
152    
153     libPath=os.path.join(path, "lib")
154     sys.path.append(libPath)
155     libPath=os.path.join(path, "lib", "python")
156     sys.path.append(libPath)
157 nsmirnov 1.1
158 slacapra 1.18 self.proxyValid=0
159 gutsche 1.60
160 slacapra 1.61 try:
161     self._taskId = cfg_params['taskId']
162     except:
163     self._taskId = ''
164 gutsche 1.60
165 slacapra 1.81 try: self.jobtypeName = cfg_params['CRAB.jobtype']
166     except KeyError: self.jobtypeName = ''
167    
168     try: self.schedulerName = cfg_params['CRAB.scheduler']
169     except KeyError: self.scheduler = ''
170    
171 slacapra 1.103 try: self.dontCheckProxy=cfg_params["EDG.dont_check_proxy"]
172     except KeyError: self.dontCheckProxy = 0
173    
174 nsmirnov 1.1 return
175    
176 fanzago 1.10
177 fanzago 1.99 def rb_configure(self, RB):
178     self.edg_config = ''
179     self.edg_config_vo = ''
180     self.rb_param_file = ''
181    
182     edgConfig = EdgConfig(RB)
183     self.edg_config = edgConfig.config()
184     self.edg_config_vo = edgConfig.configVO()
185    
186     if (self.edg_config and self.edg_config_vo != ''):
187     self.rb_param_file = 'RBconfig = "'+self.edg_config+'";\nRBconfigVO = "'+self.edg_config_vo+'";'
188 fanzago 1.100 #print "rb_param_file = ", self.rb_param_file
189 fanzago 1.99 return self.rb_param_file
190    
191    
192 fanzago 1.10 def sched_parameter(self):
193     """
194 spiga 1.88 Returns file with requirements and scheduler-specific parameters
195 fanzago 1.10 """
196 spiga 1.88 index = int(common.jobDB.nJobs()) - 1
197     job = common.job_list[index]
198     jbt = job.type()
199    
200 slacapra 1.95 lastBlock=-1
201 spiga 1.88 first = []
202     for n in range(common.jobDB.nJobs()):
203 slacapra 1.95 currBlock=common.jobDB.block(n)
204     if (currBlock!=lastBlock):
205     lastBlock = currBlock
206 spiga 1.88 first.append(n)
207    
208     req = ''
209     req = req + jbt.getRequirements()
210    
211     if self.EDG_requirements:
212     if (req == ' '):
213     req = req + self.EDG_requirements
214     else:
215     req = req + ' && ' + self.EDG_requirements
216 slacapra 1.95
217 spiga 1.88 if self.EDG_ce_white_list:
218     ce_white_list = string.split(self.EDG_ce_white_list,',')
219     for i in range(len(ce_white_list)):
220     if i == 0:
221     if (req == ' '):
222 spiga 1.106 req = req + '((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
223 spiga 1.88 else:
224 spiga 1.106 req = req + ' && ((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
225 spiga 1.88 pass
226     else:
227 spiga 1.106 req = req + ' || (RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
228 spiga 1.88 req = req + ')'
229    
230     if self.EDG_ce_black_list:
231     ce_black_list = string.split(self.EDG_ce_black_list,',')
232     for ce in ce_black_list:
233     if (req == ' '):
234 spiga 1.106 req = req + '(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
235 spiga 1.88 else:
236 spiga 1.106 req = req + ' && (!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
237 spiga 1.88 pass
238     if self.EDG_clock_time:
239     if (req == ' '):
240     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
241     else:
242     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
243    
244     if self.EDG_cpu_time:
245     if (req == ' '):
246     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
247     else:
248     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
249    
250     for i in range(len(first)): # Add loop DS
251     self.param='sched_param_'+str(i)+'.clad'
252 fanzago 1.10 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
253 spiga 1.88
254     itr4=self.findSites_(first[i])
255 slacapra 1.95 for arg in itr4:
256 corvo 1.109 req = req + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
257     param_file.write('Requirements = '+req +';\n')
258 spiga 1.88
259 fanzago 1.99 if (self.rb_param_file != ''):
260     param_file.write(self.rb_param_file)
261 spiga 1.88
262 slacapra 1.101 if len(self.EDG_addJdlParam):
263     for p in self.EDG_addJdlParam:
264     param_file.write(p)
265    
266 fanzago 1.10 param_file.close()
267 spiga 1.88
268 fanzago 1.13
269 nsmirnov 1.2 def wsSetupEnvironment(self):
270     """
271     Returns part of a job script which does scheduler-specific work.
272     """
273 spiga 1.49 txt = ''
274 mkirn 1.75 txt += '# strip arguments\n'
275     txt += 'echo "strip arguments"\n'
276     txt += 'args=("$@")\n'
277     txt += 'nargs=$#\n'
278     txt += 'shift $nargs\n'
279 gutsche 1.60 txt += "# job number (first parameter for job wrapper)\n"
280 mkirn 1.75 #txt += "NJob=$1\n"
281     txt += "NJob=${args[0]}\n"
282 gutsche 1.60
283     txt += '# job identification to DashBoard \n'
284 gutsche 1.64 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
285     txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
286     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
287     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
288     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
289     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
290 gutsche 1.60
291 spiga 1.49 txt += 'echo "middleware discovery " \n'
292 gutsche 1.84 txt += 'if [ $GRID3_APP_DIR ]; then\n'
293 spiga 1.49 txt += ' middleware=OSG \n'
294 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
295     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
296 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
297     txt += 'elif [ $OSG_APP ]; then \n'
298     txt += ' middleware=OSG \n'
299 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
300     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
301 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
302 gutsche 1.84 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
303     txt += ' middleware=LCG \n'
304     txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
305     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
306     txt += ' echo "middleware =$middleware" \n'
307 spiga 1.49 txt += 'else \n'
308 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
309     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
310     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
311     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
312 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
313     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
314     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
315 gutsche 1.60 txt += ' exit 1 \n'
316     txt += 'fi \n'
317    
318     txt += '# report first time to DashBoard \n'
319     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
320 gutsche 1.64 txt += 'rm -f $RUNTIME_AREA/$repo \n'
321     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
322     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
323    
324 spiga 1.49 txt += '\n\n'
325    
326 fanzago 1.30 if int(self.copy_data) == 1:
327 fanzago 1.14 if self.SE:
328     txt += 'export SE='+self.SE+'\n'
329 fanzago 1.15 txt += 'echo "SE = $SE"\n'
330 fanzago 1.14 if self.SE_PATH:
331     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
332     txt += 'export SE_PATH='+self.SE_PATH+'\n'
333 fanzago 1.15 txt += 'echo "SE_PATH = $SE_PATH"\n'
334 spiga 1.49
335 fanzago 1.38 txt += 'export VO='+self.VO+'\n'
336 spiga 1.88 ### add some line for LFC catalog setting
337 spiga 1.49 txt += 'if [ $middleware == LCG ]; then \n'
338     txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
339     txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
340     txt += ' fi\n'
341     txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
342     txt += ' export LFC_HOST='+self.lfc_host+'\n'
343     txt += ' fi\n'
344     txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
345     txt += ' export LFC_HOME='+self.lfc_home+'\n'
346     txt += ' fi\n'
347     txt += 'elif [ $middleware == OSG ]; then\n'
348     txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
349 fanzago 1.38 txt += 'fi\n'
350     #####
351 fanzago 1.30 if int(self.register_data) == 1:
352 spiga 1.49 txt += 'if [ $middleware == LCG ]; then \n'
353     txt += ' export LFN='+self.LFN+'\n'
354     txt += ' lfc-ls $LFN\n'
355     txt += ' result=$?\n'
356     txt += ' echo $result\n'
357 fanzago 1.36 ### creation of LFN dir in LFC catalog, under /grid/cms dir
358 spiga 1.49 txt += ' if [ $result != 0 ]; then\n'
359     txt += ' lfc-mkdir $LFN\n'
360     txt += ' result=$?\n'
361     txt += ' echo $result\n'
362     txt += ' fi\n'
363     txt += 'elif [ $middleware == OSG ]; then\n'
364     txt += ' echo " Files registration to be implemented for OSG"\n'
365 fanzago 1.36 txt += 'fi\n'
366     txt += '\n'
367 spiga 1.49
368     if self.VO:
369     txt += 'export VO='+self.VO+'\n'
370     if self.LFN:
371     txt += 'if [ $middleware == LCG ]; then \n'
372     txt += ' export LFN='+self.LFN+'\n'
373     txt += 'fi\n'
374     txt += '\n'
375    
376     txt += 'if [ $middleware == LCG ]; then\n'
377     txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
378     txt += ' echo "CloseCEs = $CloseCEs"\n'
379     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
380     txt += ' echo "CE = $CE"\n'
381     txt += 'elif [ $middleware == OSG ]; then \n'
382     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
383 slacapra 1.81 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
384 spiga 1.49 txt += ' else \n'
385 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
386     txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
387     txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
388     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
389 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
390     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
391     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
392 spiga 1.49 txt += ' exit 1 \n'
393     txt += ' fi \n'
394     txt += 'fi \n'
395    
396 nsmirnov 1.2 return txt
397 fanzago 1.15
398 fanzago 1.39 def wsCopyInput(self):
399     """
400     Copy input data from SE to WN
401     """
402     txt = ''
403 slacapra 1.102 if not self.copy_input_data: return txt
404 slacapra 1.90
405 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
406 slacapra 1.92 txt += 'if [ $middleware == OSG ]; then\n'
407     txt += ' #\n'
408     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
409     txt += ' #\n'
410     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
411     txt += 'elif [ $middleware == LCG ]; then \n'
412     txt += ' #\n'
413     txt += ' # Copy Input Data from SE to this WN\n'
414     txt += ' #\n'
415     ### changed by georgia (put a loop copying more than one input files per jobs)
416     txt += ' for input_file in $cur_file_list \n'
417     txt += ' do \n'
418     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
419     txt += ' copy_input_exit_status=$?\n'
420     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
421     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
422     txt += ' echo "Problems with copying to WN" \n'
423     txt += ' else \n'
424     txt += ' echo "input copied into WN" \n'
425     txt += ' fi \n'
426     txt += ' done \n'
427     ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
428     txt += ' for file in $cur_pu_list \n'
429     txt += ' do \n'
430     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
431     txt += ' copy_input_pu_exit_status=$?\n'
432     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
433     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
434     txt += ' echo "Problems with copying pu to WN" \n'
435     txt += ' else \n'
436     txt += ' echo "input pu files copied into WN" \n'
437     txt += ' fi \n'
438     txt += ' done \n'
439     txt += ' \n'
440     txt += ' ### Check SCRATCH space available on WN : \n'
441     txt += ' df -h \n'
442     txt += 'fi \n'
443 spiga 1.49
444 fanzago 1.39 return txt
445    
446 fanzago 1.14 def wsCopyOutput(self):
447     """
448     Write a CopyResults part of a job script, e.g.
449     to copy produced output into a storage element.
450     """
451     txt = ''
452 fanzago 1.30 if int(self.copy_data) == 1:
453 fanzago 1.14 txt += '#\n'
454 fanzago 1.15 txt += '# Copy output to SE = $SE\n'
455 fanzago 1.14 txt += '#\n'
456 gutsche 1.73 txt += ' if [ $middleware == OSG ]; then\n'
457     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
458     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
459     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
460     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
461     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
462     txt += ' fi \n'
463 spiga 1.91
464 corvo 1.34 txt += ' for out_file in $file_list ; do\n'
465 spiga 1.91 txt += ' echo "Trying to copy output file to $SE using srmcp"\n'
466     txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
467     txt += ' mkdir -p $HOME/.srmconfig\n'
468     txt += ' if [ $middleware == LCG ]; then\n'
469     txt += ' echo "srmcp -retry_num 3 -retry_timeout 480000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
470     txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 480000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
471     txt += ' elif [ $middleware == OSG ]; then\n'
472     txt += ' echo "srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
473     txt += ' exitstring=`srmcp -retry_num 3 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
474     txt += ' fi \n'
475 gutsche 1.73 txt += ' copy_exit_status=$?\n'
476 spiga 1.91 txt += ' echo "COPY_EXIT_STATUS for srmcp = $copy_exit_status"\n'
477 corvo 1.34 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
478 spiga 1.91
479 corvo 1.34 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
480 gutsche 1.73 txt += ' echo "Possible problem with SE = $SE"\n'
481 corvo 1.34 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
482     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
483 mkirn 1.93 txt += ' echo "srmcp failed, attempting lcg-cp."\n'
484 spiga 1.91 if common.logger.debugLevel() >= 5:
485     txt += ' echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
486     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
487     else:
488     txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
489     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
490 spiga 1.88 txt += ' copy_exit_status=$?\n'
491 spiga 1.91 txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
492 slacapra 1.87 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
493 spiga 1.91
494 gutsche 1.73 txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
495     txt += ' echo "Problems with SE = $SE"\n'
496     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
497     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
498 mkirn 1.93 txt += ' echo "srmcp and lcg-cp and failed!"\n'
499 gutsche 1.73 txt += ' else\n'
500     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
501     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
502     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
503     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
504 spiga 1.91 txt += ' echo "lcg-cp succeeded"\n'
505 gutsche 1.73 txt += ' fi\n'
506 corvo 1.34 txt += ' else\n'
507     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
508     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
509     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
510     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
511 spiga 1.91 txt += ' echo "srmcp succeeded"\n'
512 corvo 1.34 txt += ' fi\n'
513     txt += ' done\n'
514 fanzago 1.14 return txt
515    
516     def wsRegisterOutput(self):
517     """
518     Returns part of a job script which does scheduler-specific work.
519     """
520    
521     txt = ''
522 fanzago 1.30 if int(self.register_data) == 1:
523 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
524     txt += 'if [ $middleware == OSG ]; then\n'
525     txt += ' #\n'
526     txt += ' # Register output to LFC deactivated in OSG mode\n'
527     txt += ' #\n'
528     txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
529     txt += 'elif [ $middleware == LCG ]; then \n'
530 fanzago 1.14 txt += '#\n'
531 fanzago 1.36 txt += '# Register output to LFC\n'
532 fanzago 1.14 txt += '#\n'
533 fanzago 1.66 txt += ' if [ $copy_exit_status -eq 0 ]; then\n'
534 spiga 1.49 txt += ' for out_file in $file_list ; do\n'
535     txt += ' echo "Trying to register the output file into LFC"\n'
536 fanzago 1.71 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1"\n'
537 fanzago 1.70 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1 \n'
538 fanzago 1.14 txt += ' register_exit_status=$?\n'
539 fanzago 1.15 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
540     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
541 fanzago 1.14 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
542 spiga 1.49 txt += ' echo "Problems with the registration to LFC" \n'
543     txt += ' echo "Try with srm protocol" \n'
544 fanzago 1.71 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1"\n'
545 fanzago 1.70 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1 \n'
546 spiga 1.49 txt += ' register_exit_status=$?\n'
547     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
548     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
549     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
550     txt += ' echo "Problems with the registration into LFC" \n'
551     txt += ' fi \n'
552     txt += ' else \n'
553     txt += ' echo "output registered to LFC"\n'
554 fanzago 1.14 txt += ' fi \n'
555 spiga 1.49 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
556     txt += ' done\n'
557 fanzago 1.66 txt += ' else \n'
558 spiga 1.49 txt += ' echo "Trying to copy output file to CloseSE"\n'
559     txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
560     txt += ' for out_file in $file_list ; do\n'
561 fanzago 1.71 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1" \n'
562 fanzago 1.70 txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1 \n'
563 spiga 1.49 txt += ' register_exit_status=$?\n'
564     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
565     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
566     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
567 fanzago 1.66 txt += ' echo "Problems with CloseSE or Catalog" \n'
568 spiga 1.49 txt += ' else \n'
569     txt += ' echo "The program was successfully executed"\n'
570     txt += ' echo "SE = $CLOSE_SE"\n'
571     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
572     txt += ' fi \n'
573     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
574     txt += ' done\n'
575     txt += ' fi \n'
576 fanzago 1.68 txt += ' exit_status=$register_exit_status\n'
577 fanzago 1.14 txt += 'fi \n'
578     return txt
579 nsmirnov 1.1
580 spiga 1.23 def loggingInfo(self, id):
581 slacapra 1.7 """
582     retrieve the logging info from logging and bookkeeping and return it
583     """
584 slacapra 1.18 self.checkProxy()
585 fanzago 1.24 cmd = 'edg-job-get-logging-info -v 2 ' + id
586 slacapra 1.32 cmd_out = runCommand(cmd)
587 slacapra 1.7 return cmd_out
588    
589 nsmirnov 1.1 def queryDetailedStatus(self, id):
590     """ Query a detailed status of the job with id """
591     cmd = 'edg-job-status '+id
592     cmd_out = runCommand(cmd)
593     return cmd_out
594    
595 slacapra 1.80 ##### FEDE ######
596 spiga 1.88 def findSites_(self, n):
597     itr4 =[]
598     sites = common.jobDB.destination(n)
599     if len(sites)>0 and sites[0]=="Any":
600     return itr4
601     itr = ''
602     if sites != [""]:#CarlosDaniele
603     for site in sites:
604     #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
605     itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
606     itr = itr[0:-4]
607     itr4.append( itr )
608 slacapra 1.80 return itr4
609    
610     def createXMLSchScript(self, nj, argsList):
611 spiga 1.88
612 slacapra 1.80 """
613     Create a XML-file for BOSS4.
614     """
615     # job = common.job_list[nj]
616     """
617     INDY
618 spiga 1.88 [begin] FIX-ME:
619     I would pass jobType instead of job
620 slacapra 1.80 """
621     index = nj - 1
622     job = common.job_list[index]
623     jbt = job.type()
624    
625     inp_sandbox = jbt.inputSandbox(index)
626     out_sandbox = jbt.outputSandbox(index)
627 nsmirnov 1.5 """
628 spiga 1.88 [end] FIX-ME
629 nsmirnov 1.5 """
630 slacapra 1.6
631 slacapra 1.81
632     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
633     jt_string = ''
634    
635     xml_fname = str(self.jobtypeName)+'.xml'
636     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
637    
638     #TaskName
639     dir = string.split(common.work_space.topDir(), '/')
640     taskName = dir[len(dir)-2]
641    
642     to_writeReq = ''
643     to_write = ''
644    
645     req=' '
646     req = req + jbt.getRequirements()
647    
648     if self.EDG_requirements:
649     if (req == ' '):
650     req = req + self.EDG_requirements
651     else:
652     req = req + ' && ' + self.EDG_requirements
653     if self.EDG_ce_white_list:
654     ce_white_list = string.split(self.EDG_ce_white_list,',')
655     for i in range(len(ce_white_list)):
656     if i == 0:
657     if (req == ' '):
658     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
659     else:
660     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
661     pass
662     else:
663     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
664     req = req + ')'
665    
666     if self.EDG_ce_black_list:
667     ce_black_list = string.split(self.EDG_ce_black_list,',')
668     for ce in ce_black_list:
669     if (req == ' '):
670     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
671     else:
672     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
673     pass
674     if self.EDG_clock_time:
675     if (req == ' '):
676     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
677     else:
678     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
679    
680     if self.EDG_cpu_time:
681     if (req == ' '):
682     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
683     else:
684     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
685    
686     if ( self.EDG_retry_count ):
687     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
688     pass
689 nsmirnov 1.5
690 slacapra 1.97 if ( self.EDG_shallow_retry_count ):
691 slacapra 1.98 to_write = to_write + 'ShallowRetryCount = "'+self.EDG_shallow_retry_count+'"\n'
692 slacapra 1.97 pass
693    
694 slacapra 1.81 to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
695     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
696 nsmirnov 1.1
697 slacapra 1.102 #TaskName
698 slacapra 1.81 dir = string.split(common.work_space.topDir(), '/')
699     taskName = dir[len(dir)-2]
700    
701     xml.write(str(title))
702 corvo 1.107 xml.write('<task name="' +str(taskName)+'" sub_path="' + common.work_space.bossCache() + '">\n')
703 slacapra 1.81 xml.write(jt_string)
704 spiga 1.88
705     if (to_write != ''):
706     xml.write('<extraTags\n')
707     xml.write(to_write)
708     xml.write('/>\n')
709     pass
710 slacapra 1.81
711     xml.write('<iterator>\n')
712     xml.write('\t<iteratorRule name="ITR1">\n')
713     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
714     xml.write('\t</iteratorRule>\n')
715     xml.write('\t<iteratorRule name="ITR2">\n')
716     for arg in argsList:
717     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
718     pass
719     xml.write('\t</iteratorRule>\n')
720     #print jobList
721     xml.write('\t<iteratorRule name="ITR3">\n')
722     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
723     xml.write('\t</iteratorRule>\n')
724    
725     '''
726 spiga 1.88 indy: here itr4
727 slacapra 1.81 '''
728    
729 fanzago 1.14
730 slacapra 1.81 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
731     xml.write(jt_string)
732 fanzago 1.14
733 slacapra 1.81 #executable
734 nsmirnov 1.1
735 slacapra 1.81 """
736     INDY
737 spiga 1.88 script depends on jobType: it should be probably get in a different way
738 slacapra 1.81 """
739 nsmirnov 1.1 script = job.scriptFilename()
740 slacapra 1.81 xml.write('<program>\n')
741     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
742     xml.write(jt_string)
743 corvo 1.107
744 slacapra 1.81 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
745     xml.write('<program_types> crabjob </program_types>\n')
746 corvo 1.107 inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
747 nsmirnov 1.1
748     if inp_sandbox != None:
749     for fl in inp_sandbox:
750 slacapra 1.81 inp_box = inp_box + '' + fl + ','
751 nsmirnov 1.1 pass
752     pass
753    
754 fanzago 1.40 if (not jbt.additional_inbox_files == []):
755 spiga 1.82 inp_box = inp_box + ','
756 fanzago 1.40 for addFile in jbt.additional_inbox_files:
757     addFile = os.path.abspath(addFile)
758 slacapra 1.81 inp_box = inp_box+''+addFile+','
759 fanzago 1.40 pass
760 nsmirnov 1.1
761     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
762 slacapra 1.81 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
763     xml.write(inp_box)
764    
765     base = jbt.name()
766     stdout = base + '__ITR3_.stdout'
767     stderr = base + '__ITR3_.stderr'
768 fanzago 1.14
769 slacapra 1.81 xml.write('<stderr> ' + stderr + '</stderr>\n')
770     xml.write('<stdout> ' + stdout + '</stdout>\n')
771 fanzago 1.14
772    
773 slacapra 1.81 out_box = stdout + ',' + \
774     stderr + ',.BrokerInfo,'
775    
776     """
777 fanzago 1.30 if int(self.return_data) == 1:
778 fanzago 1.14 if out_sandbox != None:
779     for fl in out_sandbox:
780 slacapra 1.81 out_box = out_box + '' + fl + ','
781 fanzago 1.14 pass
782 nsmirnov 1.1 pass
783     pass
784 slacapra 1.81 """
785 nsmirnov 1.1
786 slacapra 1.81 """
787     INDY
788 spiga 1.88 something similar should be also done for infiles (if it makes sense!)
789     """
790 slacapra 1.104 # Stuff to be returned _always_ via sandbox
791     for fl in jbt.output_file_sandbox:
792     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
793     pass
794     pass
795    
796     # via sandbox iif required return_data
797 slacapra 1.81 if int(self.return_data) == 1:
798     for fl in jbt.output_file:
799     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
800 fanzago 1.48 pass
801 slacapra 1.81 pass
802 slacapra 1.62
803 slacapra 1.81 if out_box[-1] == ',' : out_box = out_box[:-1]
804     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
805     xml.write(out_box)
806    
807     xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
808 fanzago 1.48
809 slacapra 1.81 xml.write('</program>\n')
810     xml.write('</chain>\n')
811 nsmirnov 1.1
812 slacapra 1.81 xml.write('</iterator>\n')
813     xml.write('</task>\n')
814 slacapra 1.51
815 slacapra 1.81 xml.close()
816 spiga 1.88
817    
818 nsmirnov 1.1 return
819 slacapra 1.18
820     def checkProxy(self):
821     """
822     Function to check the Globus proxy.
823     """
824     if (self.proxyValid): return
825 slacapra 1.103
826     ### Just return if asked to do so
827     if (self.dontCheckProxy):
828     self.proxyValid=1
829     return
830    
831 slacapra 1.18 timeleft = -999
832 slacapra 1.50 minTimeLeft=10*3600 # in seconds
833 slacapra 1.51
834     minTimeLeftServer = 100 # in hours
835    
836 slacapra 1.50 mustRenew = 0
837 fanzago 1.69 timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
838 slacapra 1.50 timeLeftServer = -999
839 fanzago 1.53 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
840 slacapra 1.50 mustRenew = 1
841     else:
842 fanzago 1.69 timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
843 slacapra 1.50 if not timeLeftServer or not isInt(timeLeftServer):
844     mustRenew = 1
845 slacapra 1.54 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
846 slacapra 1.50 mustRenew = 1
847     pass
848     pass
849    
850 slacapra 1.51 if mustRenew:
851 gutsche 1.94 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
852 spiga 1.91 cmd = 'voms-proxy-init -voms '+self.VO
853     if self.group:
854     cmd += ':/'+self.VO+'/'+self.group
855 slacapra 1.79 if self.role:
856 spiga 1.91 cmd += '/role='+self.role
857 gutsche 1.94 cmd += ' -valid 192:00'
858 slacapra 1.18 try:
859 slacapra 1.32 # SL as above: damn it!
860 spiga 1.91 common.logger.debug(10,cmd)
861 slacapra 1.19 out = os.system(cmd)
862     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
863 slacapra 1.18 except:
864     msg = "Unable to create a valid proxy!\n"
865     raise CrabException(msg)
866     pass
867 slacapra 1.51
868     ## now I do have a voms proxy valid, and I check the myproxy server
869     renewProxy = 0
870     cmd = 'myproxy-info -d -s '+self.proxyServer
871     cmd_out = runCommand(cmd,0,20)
872     if not cmd_out:
873     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
874     renewProxy = 1
875     else:
876     # if myproxy exist but not long enough, renew
877     reTime = re.compile( r'timeleft: (\d+)' )
878     #print "<"+str(reTime.search( cmd_out ).group(1))+">"
879     if reTime.match( cmd_out ):
880     time = reTime.search( line ).group(1)
881     if time < minTimeLeftServer:
882     renewProxy = 1
883     common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
884     pass
885     pass
886    
887     # if not, create one.
888     if renewProxy:
889     cmd = 'myproxy-init -d -n -s '+self.proxyServer
890     out = os.system(cmd)
891     if (out>0):
892 fanzago 1.53 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
893 slacapra 1.51 pass
894    
895     # cache proxy validity
896 slacapra 1.18 self.proxyValid=1
897     return
898 spiga 1.49
899 slacapra 1.18 def configOpt_(self):
900     edg_ui_cfg_opt = ' '
901     if self.edg_config:
902 slacapra 1.51 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
903 slacapra 1.18 if self.edg_config_vo:
904 slacapra 1.51 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
905 slacapra 1.18 return edg_ui_cfg_opt