ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.142
Committed: Fri Oct 5 12:55:05 2007 UTC (17 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_0_0
Changes since 1.141: +74 -75 lines
Log Message:
removed data registration in lcg catalog

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.50 from EdgConfig import *
6 gutsche 1.130 from BlackWhiteListParser import BlackWhiteListParser
7 nsmirnov 1.1 import common
8    
9 slacapra 1.113 import os, sys, time
10 nsmirnov 1.1
11     class SchedulerEdg(Scheduler):
12     def __init__(self):
13     Scheduler.__init__(self,"EDG")
14 slacapra 1.8 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
15     "children_hist","children_num","children_states","condorId","condor_jdl", \
16     "cpuTime","destination", "done_code","exit_code","expectFrom", \
17     "expectUpdate","globusId","jdl","jobId","jobtype", \
18     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
19     "owner","parent_job", "reason","resubmitted","rsl","seed",\
20     "stateEnterTime","stateEnterTimes","subjob_failed", \
21     "user tags" , "status" , "status_code","hierarchy"]
22 nsmirnov 1.1 return
23    
24     def configure(self, cfg_params):
25 spiga 1.128
26 gutsche 1.130 # init BlackWhiteListParser
27     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
28 spiga 1.128
29 corvo 1.131 self.proxyValid=0
30     try: self.dontCheckProxy=int(cfg_params["EDG.dont_check_proxy"])
31     except KeyError: self.dontCheckProxy = 0
32    
33 slacapra 1.46 try:
34 fanzago 1.99 RB=cfg_params["EDG.rb"]
35     self.rb_param_file=self.rb_configure(RB)
36 slacapra 1.46 except KeyError:
37 fanzago 1.99 self.rb_param_file=''
38     pass
39 slacapra 1.51 try:
40     self.proxyServer = cfg_params["EDG.proxy_server"]
41     except KeyError:
42     self.proxyServer = 'myproxy.cern.ch'
43     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
44 spiga 1.49
45 slacapra 1.79 try:
46 spiga 1.91 self.group = cfg_params["EDG.group"]
47     except KeyError:
48     self.group = None
49    
50     try:
51 slacapra 1.79 self.role = cfg_params["EDG.role"]
52     except KeyError:
53     self.role = None
54    
55 fanzago 1.139 #try: self.LCG_version = cfg_params["EDG.lcg_version"]
56     #except KeyError: self.LCG_version = '2'
57 nsmirnov 1.1
58 fanzago 1.48 try:
59     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
60     except KeyError:
61     self.EDG_ce_black_list = ''
62    
63     try:
64     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
65     except KeyError: self.EDG_ce_white_list = ''
66    
67 fanzago 1.14 try: self.VO = cfg_params['EDG.virtual_organization']
68     except KeyError: self.VO = 'cms'
69    
70 slacapra 1.102 try: self.copy_input_data = cfg_params["USER.copy_input_data"]
71     except KeyError: self.copy_input_data = 0
72    
73 spiga 1.49 try: self.return_data = cfg_params['USER.return_data']
74 spiga 1.91 except KeyError: self.return_data = 0
75    
76 fanzago 1.14 try:
77     self.copy_data = cfg_params["USER.copy_data"]
78 fanzago 1.30 if int(self.copy_data) == 1:
79     try:
80     self.SE = cfg_params['USER.storage_element']
81     self.SE_PATH = cfg_params['USER.storage_path']
82     except KeyError:
83     msg = "Error. The [USER] section does not have 'storage_element'"
84     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
85     common.logger.message(msg)
86     raise CrabException(msg)
87     except KeyError: self.copy_data = 0
88    
89     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
90     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
91 corvo 1.122 msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
92 fanzago 1.30 raise CrabException(msg)
93 fanzago 1.38
94 fanzago 1.119 ########### FEDE FOR DBS2 ##############################
95     try:
96     self.publish_data = cfg_params["USER.publish_data"]
97 corvo 1.132 self.checkProxy()
98 fanzago 1.119 if int(self.publish_data) == 1:
99     try:
100     self.publish_data_name = cfg_params['USER.publish_data_name']
101     except KeyError:
102     msg = "Error. The [USER] section does not have 'publish_data_name'"
103 fanzago 1.123 raise CrabException(msg)
104     try:
105 slacapra 1.126 tmp = runCommand("voms-proxy-info -identity")
106     tmp = string.split(tmp,'/')
107     reCN=re.compile(r'CN=')
108     for t in tmp:
109     if reCN.match(t):
110 spiga 1.127 self.UserGridName=string.strip((t.replace('CN=','')).replace(' ',''))
111 slacapra 1.126
112     #self.UserGridName = string.strip(runCommand("voms-proxy-info -identity | awk -F\'CN\' \'{print $2$3$4}\' | tr -d \'=/ \'"))
113 fanzago 1.123 except:
114     msg = "Error. Problem with voms-proxy-info -identity command"
115 fanzago 1.119 raise CrabException(msg)
116     except KeyError: self.publish_data = 0
117    
118     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
119     msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
120     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
121     common.logger.message(msg)
122     raise CrabException(msg)
123     #################################################
124 fanzago 1.123
125 fanzago 1.142 #try:
126     # self.lfc_host = cfg_params['EDG.lfc_host']
127     #except KeyError:
128     # msg = "Error. The [EDG] section does not have 'lfc_host' value"
129     # msg = msg + " it's necessary to know the LFC host name"
130     # common.logger.message(msg)
131     # raise CrabException(msg)
132     #try:
133     # self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
134     #except KeyError:
135     # msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
136     # msg = msg + " it's necessary to know the catalog type"
137     # common.logger.message(msg)
138     # raise CrabException(msg)
139     #try:
140     # self.lfc_home = cfg_params['EDG.lfc_home']
141     #except KeyError:
142     # msg = "Error. The [EDG] section does not have 'lfc_home' value"
143     # msg = msg + " it's necessary to know the home catalog dir"
144     # common.logger.message(msg)
145     # raise CrabException(msg)
146 fanzago 1.30
147 fanzago 1.142 #try:
148     # self.register_data = cfg_params["USER.register_data"]
149     # if int(self.register_data) == 1:
150     # try:
151     # self.LFN = cfg_params['USER.lfn_dir']
152     # except KeyError:
153     # msg = "Error. The [USER] section does not have 'lfn_dir' value"
154     # msg = msg + " it's necessary for LCF registration"
155     # common.logger.message(msg)
156     # raise CrabException(msg)
157     #except KeyError: self.register_data = 0
158    
159     #if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
160     # msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
161     # msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
162     # common.logger.message(msg)
163     # raise CrabException(msg)
164 fanzago 1.14
165     try: self.EDG_requirements = cfg_params['EDG.requirements']
166     except KeyError: self.EDG_requirements = ''
167 slacapra 1.97
168 slacapra 1.101 try: self.EDG_addJdlParam = string.split(cfg_params['EDG.additional_jdl_parameters'],',')
169     except KeyError: self.EDG_addJdlParam = []
170    
171 fanzago 1.14 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
172     except KeyError: self.EDG_retry_count = ''
173 slacapra 1.97
174     try: self.EDG_shallow_retry_count= cfg_params['EDG.shallow_retry_count']
175     except KeyError: self.EDG_shallow_retry_count = ''
176    
177 fanzago 1.14 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
178     except KeyError: self.EDG_clock_time= ''
179 slacapra 1.97
180 fanzago 1.14 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
181     except KeyError: self.EDG_cpu_time = ''
182    
183 fanzago 1.16 # Add EDG_WL_LOCATION to the python path
184    
185     try:
186     path = os.environ['EDG_WL_LOCATION']
187     except:
188     msg = "Error: the EDG_WL_LOCATION variable is not set."
189     raise CrabException(msg)
190    
191     libPath=os.path.join(path, "lib")
192     sys.path.append(libPath)
193     libPath=os.path.join(path, "lib", "python")
194     sys.path.append(libPath)
195 nsmirnov 1.1
196 slacapra 1.61 try:
197     self._taskId = cfg_params['taskId']
198     except:
199     self._taskId = ''
200 gutsche 1.60
201 slacapra 1.81 try: self.jobtypeName = cfg_params['CRAB.jobtype']
202     except KeyError: self.jobtypeName = ''
203    
204     try: self.schedulerName = cfg_params['CRAB.scheduler']
205     except KeyError: self.scheduler = ''
206    
207 nsmirnov 1.1 return
208    
209 fanzago 1.10
210 fanzago 1.99 def rb_configure(self, RB):
211     self.edg_config = ''
212     self.edg_config_vo = ''
213     self.rb_param_file = ''
214    
215     edgConfig = EdgConfig(RB)
216     self.edg_config = edgConfig.config()
217     self.edg_config_vo = edgConfig.configVO()
218    
219     if (self.edg_config and self.edg_config_vo != ''):
220 corvo 1.122 self.rb_param_file = 'RBconfig = "'+self.edg_config+'";\nRBconfigVO = "'+self.edg_config_vo+'";\n'
221 fanzago 1.100 #print "rb_param_file = ", self.rb_param_file
222 fanzago 1.99 return self.rb_param_file
223    
224    
225 fanzago 1.10 def sched_parameter(self):
226     """
227 spiga 1.88 Returns file with requirements and scheduler-specific parameters
228 fanzago 1.10 """
229 spiga 1.88 index = int(common.jobDB.nJobs()) - 1
230     job = common.job_list[index]
231     jbt = job.type()
232    
233 slacapra 1.95 lastBlock=-1
234 spiga 1.88 first = []
235     for n in range(common.jobDB.nJobs()):
236 slacapra 1.95 currBlock=common.jobDB.block(n)
237     if (currBlock!=lastBlock):
238     lastBlock = currBlock
239 spiga 1.88 first.append(n)
240    
241     req = ''
242     req = req + jbt.getRequirements()
243    
244     if self.EDG_requirements:
245     if (req == ' '):
246     req = req + self.EDG_requirements
247     else:
248     req = req + ' && ' + self.EDG_requirements
249 slacapra 1.95
250 spiga 1.88 if self.EDG_ce_white_list:
251     ce_white_list = string.split(self.EDG_ce_white_list,',')
252     for i in range(len(ce_white_list)):
253     if i == 0:
254     if (req == ' '):
255 spiga 1.106 req = req + '((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
256 spiga 1.88 else:
257 spiga 1.106 req = req + ' && ((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
258 spiga 1.88 pass
259     else:
260 spiga 1.106 req = req + ' || (RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
261 spiga 1.88 req = req + ')'
262    
263     if self.EDG_ce_black_list:
264     ce_black_list = string.split(self.EDG_ce_black_list,',')
265     for ce in ce_black_list:
266     if (req == ' '):
267 spiga 1.106 req = req + '(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
268 spiga 1.88 else:
269 spiga 1.106 req = req + ' && (!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
270 spiga 1.88 pass
271     if self.EDG_clock_time:
272     if (req == ' '):
273     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
274     else:
275     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
276    
277     if self.EDG_cpu_time:
278     if (req == ' '):
279     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
280     else:
281     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
282    
283     for i in range(len(first)): # Add loop DS
284 spiga 1.110 groupReq = req
285 spiga 1.88 self.param='sched_param_'+str(i)+'.clad'
286 fanzago 1.10 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
287 spiga 1.88
288     itr4=self.findSites_(first[i])
289 slacapra 1.95 for arg in itr4:
290 spiga 1.110 groupReq = groupReq + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
291     param_file.write('Requirements = '+groupReq +';\n')
292 spiga 1.88
293 fanzago 1.99 if (self.rb_param_file != ''):
294     param_file.write(self.rb_param_file)
295 spiga 1.88
296 slacapra 1.101 if len(self.EDG_addJdlParam):
297     for p in self.EDG_addJdlParam:
298     param_file.write(p)
299    
300 fanzago 1.10 param_file.close()
301 spiga 1.88
302 fanzago 1.13
303 nsmirnov 1.2 def wsSetupEnvironment(self):
304     """
305     Returns part of a job script which does scheduler-specific work.
306     """
307 spiga 1.49 txt = ''
308 mkirn 1.75 txt += '# strip arguments\n'
309     txt += 'echo "strip arguments"\n'
310     txt += 'args=("$@")\n'
311     txt += 'nargs=$#\n'
312     txt += 'shift $nargs\n'
313 gutsche 1.60 txt += "# job number (first parameter for job wrapper)\n"
314 mkirn 1.75 #txt += "NJob=$1\n"
315     txt += "NJob=${args[0]}\n"
316 gutsche 1.60
317     txt += '# job identification to DashBoard \n'
318 gutsche 1.64 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
319     txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
320     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
321     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
322     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
323     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
324 gutsche 1.60
325 spiga 1.49 txt += 'echo "middleware discovery " \n'
326 gutsche 1.84 txt += 'if [ $GRID3_APP_DIR ]; then\n'
327 spiga 1.49 txt += ' middleware=OSG \n'
328 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
329     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
330 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
331     txt += 'elif [ $OSG_APP ]; then \n'
332     txt += ' middleware=OSG \n'
333 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
334     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
335 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
336 gutsche 1.84 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
337     txt += ' middleware=LCG \n'
338 spiga 1.128 # txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
339     txt += ' echo "SyncCE=`glite-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
340 gutsche 1.84 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
341     txt += ' echo "middleware =$middleware" \n'
342 spiga 1.49 txt += 'else \n'
343 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
344     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
345     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
346     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
347 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
348     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
349     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
350 gutsche 1.60 txt += ' exit 1 \n'
351     txt += 'fi \n'
352    
353     txt += '# report first time to DashBoard \n'
354     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
355 gutsche 1.64 txt += 'rm -f $RUNTIME_AREA/$repo \n'
356     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
357     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
358    
359 spiga 1.49 txt += '\n\n'
360    
361 fanzago 1.119 # if int(self.copy_data) == 1:
362     # if self.SE:
363     # txt += 'export SE='+self.SE+'\n'
364     # txt += 'echo "SE = $SE"\n'
365     # if self.SE_PATH:
366     # if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
367     # txt += 'export SE_PATH='+self.SE_PATH+'\n'
368     # txt += 'echo "SE_PATH = $SE_PATH"\n'
369 spiga 1.49
370 fanzago 1.38 txt += 'export VO='+self.VO+'\n'
371 spiga 1.88 ### add some line for LFC catalog setting
372 fanzago 1.142 #txt += 'if [ $middleware == LCG ]; then \n'
373     #txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
374     #txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
375     #txt += ' fi\n'
376     #txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
377     #txt += ' export LFC_HOST='+self.lfc_host+'\n'
378     #txt += ' fi\n'
379     #txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
380     #txt += ' export LFC_HOME='+self.lfc_home+'\n'
381     #txt += ' fi\n'
382     #txt += 'elif [ $middleware == OSG ]; then\n'
383     #txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
384     #txt += 'fi\n'
385 fanzago 1.38 #####
386 fanzago 1.142 #if int(self.register_data) == 1:
387     # txt += 'if [ $middleware == LCG ]; then \n'
388     # txt += ' export LFN='+self.LFN+'\n'
389     # txt += ' lfc-ls $LFN\n'
390     # txt += ' result=$?\n'
391     # txt += ' echo $result\n'
392     # ### creation of LFN dir in LFC catalog, under /grid/cms dir
393     # txt += ' if [ $result != 0 ]; then\n'
394     # txt += ' lfc-mkdir $LFN\n'
395     # txt += ' result=$?\n'
396     # txt += ' echo $result\n'
397     # txt += ' fi\n'
398     # txt += 'elif [ $middleware == OSG ]; then\n'
399     # txt += ' echo " Files registration to be implemented for OSG"\n'
400     # txt += 'fi\n'
401     # txt += '\n'
402     # if self.VO:
403     # txt += 'export VO='+self.VO+'\n'
404     # if self.LFN:
405     # txt += 'if [ $middleware == LCG ]; then \n'
406     # txt += ' export LFN='+self.LFN+'\n'
407     # txt += 'fi\n'
408     # txt += '\n'
409 spiga 1.49
410     txt += 'if [ $middleware == LCG ]; then\n'
411 spiga 1.128 # txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
412     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
413 spiga 1.49 txt += ' echo "CloseCEs = $CloseCEs"\n'
414     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
415     txt += ' echo "CE = $CE"\n'
416     txt += 'elif [ $middleware == OSG ]; then \n'
417     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
418 slacapra 1.81 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
419 spiga 1.49 txt += ' else \n'
420 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
421     txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
422     txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
423     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
424 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
425     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
426     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
427 spiga 1.49 txt += ' exit 1 \n'
428     txt += ' fi \n'
429     txt += 'fi \n'
430    
431 nsmirnov 1.2 return txt
432 fanzago 1.15
433 fanzago 1.39 def wsCopyInput(self):
434     """
435     Copy input data from SE to WN
436     """
437     txt = ''
438 slacapra 1.102 if not self.copy_input_data: return txt
439 slacapra 1.90
440 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
441 slacapra 1.92 txt += 'if [ $middleware == OSG ]; then\n'
442     txt += ' #\n'
443     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
444     txt += ' #\n'
445     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
446     txt += 'elif [ $middleware == LCG ]; then \n'
447     txt += ' #\n'
448     txt += ' # Copy Input Data from SE to this WN\n'
449     txt += ' #\n'
450     ### changed by georgia (put a loop copying more than one input files per jobs)
451     txt += ' for input_file in $cur_file_list \n'
452     txt += ' do \n'
453     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
454     txt += ' copy_input_exit_status=$?\n'
455     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
456     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
457     txt += ' echo "Problems with copying to WN" \n'
458     txt += ' else \n'
459     txt += ' echo "input copied into WN" \n'
460     txt += ' fi \n'
461     txt += ' done \n'
462     ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
463     txt += ' for file in $cur_pu_list \n'
464     txt += ' do \n'
465     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
466     txt += ' copy_input_pu_exit_status=$?\n'
467     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
468     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
469     txt += ' echo "Problems with copying pu to WN" \n'
470     txt += ' else \n'
471     txt += ' echo "input pu files copied into WN" \n'
472     txt += ' fi \n'
473     txt += ' done \n'
474     txt += ' \n'
475     txt += ' ### Check SCRATCH space available on WN : \n'
476     txt += ' df -h \n'
477     txt += 'fi \n'
478 spiga 1.49
479 fanzago 1.39 return txt
480    
481 fanzago 1.14 def wsCopyOutput(self):
482     """
483     Write a CopyResults part of a job script, e.g.
484     to copy produced output into a storage element.
485     """
486 fanzago 1.134 txt = '\n'
487    
488     txt += '#\n'
489     txt += '# COPY OUTPUT FILE TO SE\n'
490     txt += '#\n\n'
491 fanzago 1.119
492     SE_PATH=''
493 fanzago 1.30 if int(self.copy_data) == 1:
494 slacapra 1.135 if self.SE:
495     txt += 'export SE='+self.SE+'\n'
496     txt += 'echo "SE = $SE"\n'
497     if self.SE_PATH:
498     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
499     SE_PATH=self.SE_PATH
500     if int(self.publish_data) == 1:
501     txt += '### publish_data = 1 so the SE path where to copy the output is: \n'
502     path_add = self.UserGridName + '/' + self.publish_data_name +'_${PSETHASH}/'
503     SE_PATH = SE_PATH + path_add
504     txt += 'export SE_PATH='+SE_PATH+'\n'
505     txt += 'echo "SE_PATH = $SE_PATH"\n'
506    
507     txt += 'echo "####################################################"\n'
508     txt += 'echo "# Copy output files from WN = `hostname` to SE = $SE"\n'
509     txt += 'echo "####################################################"\n'
510    
511     txt += 'for out_file in $file_list ; do\n'
512 fanzago 1.140 txt += ' echo "Trying to copy output file to $SE"\n'
513 slacapra 1.135 txt += ' cmscp $out_file ${SE} ${SE_PATH} $out_file $middleware\n'
514     txt += ' copy_exit_status=$?\n'
515 fanzago 1.140 txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
516 slacapra 1.135 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
517    
518     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
519     txt += ' echo "Problem copying $out_file to $SE $SE_PATH"\n'
520     txt += ' echo "StageOutExitStatus = $copy_exit_status " | tee -a $RUNTIME_AREA/$repo\n'
521     txt += ' copy_exit_status=60307\n'
522    
523     txt += ' else\n'
524     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
525     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
526     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
527     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
528     txt += ' fi\n'
529     txt += 'done\n'
530     txt += 'if [ $copy_exit_status -ne 0 ]; then\n'
531     txt += ' SE=""\n'
532     txt += ' echo "SE = $SE"\n'
533     txt += ' SE_PATH=""\n'
534     txt += ' echo "SE_PATH = $SE_PATH"\n'
535     txt += 'fi\n'
536     txt += 'exit_status=$copy_exit_status\n'
537     pass
538 fanzago 1.14 return txt
539 nsmirnov 1.1
540 spiga 1.23 def loggingInfo(self, id):
541 slacapra 1.7 """
542     retrieve the logging info from logging and bookkeeping and return it
543     """
544 slacapra 1.18 self.checkProxy()
545 fanzago 1.24 cmd = 'edg-job-get-logging-info -v 2 ' + id
546 slacapra 1.32 cmd_out = runCommand(cmd)
547 slacapra 1.7 return cmd_out
548    
549 nsmirnov 1.1 def queryDetailedStatus(self, id):
550     """ Query a detailed status of the job with id """
551     cmd = 'edg-job-status '+id
552     cmd_out = runCommand(cmd)
553     return cmd_out
554    
555 spiga 1.88 def findSites_(self, n):
556     itr4 =[]
557 spiga 1.128
558 spiga 1.88 sites = common.jobDB.destination(n)
559 spiga 1.128
560     if len(sites)>0 and sites[0]=="":
561 spiga 1.88 return itr4
562 spiga 1.128
563 spiga 1.88 itr = ''
564     if sites != [""]:#CarlosDaniele
565 spiga 1.128 ##Addedd Daniele
566 gutsche 1.130 replicas = self.blackWhiteListParser.checkBlackList(sites,n)
567 spiga 1.128 if len(replicas)!=0:
568 gutsche 1.130 replicas = self.blackWhiteListParser.checkWhiteList(replicas,n)
569 spiga 1.128
570     if len(replicas)==0:
571     msg = 'No sites remaining that host any part of the requested data! Exiting... '
572     raise CrabException(msg)
573     #####
574     # for site in sites:
575     for site in replicas:
576 spiga 1.88 #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
577     itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
578     itr = itr[0:-4]
579     itr4.append( itr )
580 slacapra 1.80 return itr4
581    
582     def createXMLSchScript(self, nj, argsList):
583 spiga 1.88
584 slacapra 1.80 """
585     Create a XML-file for BOSS4.
586     """
587     # job = common.job_list[nj]
588     """
589     INDY
590 spiga 1.88 [begin] FIX-ME:
591     I would pass jobType instead of job
592 slacapra 1.80 """
593     index = nj - 1
594     job = common.job_list[index]
595     jbt = job.type()
596    
597     inp_sandbox = jbt.inputSandbox(index)
598 slacapra 1.113 #out_sandbox = jbt.outputSandbox(index)
599 nsmirnov 1.5 """
600 spiga 1.88 [end] FIX-ME
601 nsmirnov 1.5 """
602 slacapra 1.6
603 slacapra 1.81
604     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
605     jt_string = ''
606    
607     xml_fname = str(self.jobtypeName)+'.xml'
608     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
609    
610     #TaskName
611     dir = string.split(common.work_space.topDir(), '/')
612     taskName = dir[len(dir)-2]
613    
614     to_write = ''
615    
616     req=' '
617     req = req + jbt.getRequirements()
618    
619     if self.EDG_requirements:
620     if (req == ' '):
621     req = req + self.EDG_requirements
622     else:
623     req = req + ' && ' + self.EDG_requirements
624     if self.EDG_ce_white_list:
625     ce_white_list = string.split(self.EDG_ce_white_list,',')
626     for i in range(len(ce_white_list)):
627     if i == 0:
628     if (req == ' '):
629     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
630     else:
631     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
632     pass
633     else:
634     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
635     req = req + ')'
636    
637     if self.EDG_ce_black_list:
638     ce_black_list = string.split(self.EDG_ce_black_list,',')
639     for ce in ce_black_list:
640     if (req == ' '):
641     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
642     else:
643     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
644     pass
645     if self.EDG_clock_time:
646     if (req == ' '):
647     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
648     else:
649     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
650    
651     if self.EDG_cpu_time:
652     if (req == ' '):
653     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
654     else:
655     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
656    
657     if ( self.EDG_retry_count ):
658     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
659     pass
660 nsmirnov 1.5
661 slacapra 1.97 if ( self.EDG_shallow_retry_count ):
662 slacapra 1.98 to_write = to_write + 'ShallowRetryCount = "'+self.EDG_shallow_retry_count+'"\n'
663 slacapra 1.97 pass
664    
665 slacapra 1.81 to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
666     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
667 nsmirnov 1.1
668 slacapra 1.102 #TaskName
669 slacapra 1.81 dir = string.split(common.work_space.topDir(), '/')
670     taskName = dir[len(dir)-2]
671    
672     xml.write(str(title))
673 spiga 1.116 #xml.write('<task name="' +str(taskName)+'" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache">\n')
674    
675     #xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + '" task_info="' + os.path.expandvars('X509_USER_PROXY') + '">\n')
676 spiga 1.137 x509_cmd = 'ls /tmp/x509up_u`id -u`'
677     x509=runCommand(x509_cmd).strip()
678     xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + ' task_info="' + str(x509) + '">\n')
679 slacapra 1.81 xml.write(jt_string)
680 spiga 1.88
681     if (to_write != ''):
682     xml.write('<extraTags\n')
683     xml.write(to_write)
684     xml.write('/>\n')
685     pass
686 slacapra 1.81
687     xml.write('<iterator>\n')
688     xml.write('\t<iteratorRule name="ITR1">\n')
689     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
690     xml.write('\t</iteratorRule>\n')
691     xml.write('\t<iteratorRule name="ITR2">\n')
692     for arg in argsList:
693     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
694     pass
695     xml.write('\t</iteratorRule>\n')
696     #print jobList
697     xml.write('\t<iteratorRule name="ITR3">\n')
698     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
699     xml.write('\t</iteratorRule>\n')
700    
701     '''
702 spiga 1.88 indy: here itr4
703 slacapra 1.81 '''
704    
705 spiga 1.116 xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.schedulerName)+'">\n')
706     # xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
707 slacapra 1.81 xml.write(jt_string)
708 fanzago 1.14
709 slacapra 1.81 #executable
710 nsmirnov 1.1
711 slacapra 1.81 """
712     INDY
713 spiga 1.88 script depends on jobType: it should be probably get in a different way
714 slacapra 1.81 """
715 nsmirnov 1.1 script = job.scriptFilename()
716 slacapra 1.81 xml.write('<program>\n')
717     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
718     xml.write(jt_string)
719 corvo 1.107
720 slacapra 1.81 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
721     xml.write('<program_types> crabjob </program_types>\n')
722 corvo 1.107 inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
723 nsmirnov 1.1
724     if inp_sandbox != None:
725     for fl in inp_sandbox:
726 slacapra 1.81 inp_box = inp_box + '' + fl + ','
727 nsmirnov 1.1 pass
728     pass
729    
730 corvo 1.115 # if (not jbt.additional_inbox_files == []):
731 corvo 1.112 # inp_box = inp_box + ','
732 corvo 1.115 # for addFile in jbt.additional_inbox_files:
733     # #addFile = os.path.abspath(addFile)
734     # inp_box = inp_box+''+addFile+','
735     # pass
736 nsmirnov 1.1
737     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
738 slacapra 1.81 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
739     xml.write(inp_box)
740    
741     base = jbt.name()
742     stdout = base + '__ITR3_.stdout'
743     stderr = base + '__ITR3_.stderr'
744 fanzago 1.14
745 slacapra 1.81 xml.write('<stderr> ' + stderr + '</stderr>\n')
746     xml.write('<stdout> ' + stdout + '</stdout>\n')
747 fanzago 1.14
748    
749 slacapra 1.81 out_box = stdout + ',' + \
750     stderr + ',.BrokerInfo,'
751    
752     """
753 fanzago 1.30 if int(self.return_data) == 1:
754 fanzago 1.14 if out_sandbox != None:
755     for fl in out_sandbox:
756 slacapra 1.81 out_box = out_box + '' + fl + ','
757 fanzago 1.14 pass
758 nsmirnov 1.1 pass
759     pass
760 slacapra 1.81 """
761 nsmirnov 1.1
762 slacapra 1.81 """
763     INDY
764 spiga 1.88 something similar should be also done for infiles (if it makes sense!)
765     """
766 slacapra 1.104 # Stuff to be returned _always_ via sandbox
767     for fl in jbt.output_file_sandbox:
768     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
769     pass
770     pass
771    
772     # via sandbox iif required return_data
773 slacapra 1.81 if int(self.return_data) == 1:
774     for fl in jbt.output_file:
775     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
776 fanzago 1.48 pass
777 slacapra 1.81 pass
778 slacapra 1.62
779 slacapra 1.81 if out_box[-1] == ',' : out_box = out_box[:-1]
780     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
781     xml.write(out_box)
782    
783     xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
784 fanzago 1.48
785 slacapra 1.81 xml.write('</program>\n')
786     xml.write('</chain>\n')
787 nsmirnov 1.1
788 slacapra 1.81 xml.write('</iterator>\n')
789     xml.write('</task>\n')
790 slacapra 1.51
791 slacapra 1.81 xml.close()
792 spiga 1.88
793    
794 nsmirnov 1.1 return
795 slacapra 1.18
796     def checkProxy(self):
797     """
798     Function to check the Globus proxy.
799     """
800     if (self.proxyValid): return
801 slacapra 1.103
802     ### Just return if asked to do so
803 corvo 1.117 if (self.dontCheckProxy==1):
804 slacapra 1.103 self.proxyValid=1
805     return
806    
807 slacapra 1.50 minTimeLeft=10*3600 # in seconds
808 slacapra 1.51
809     minTimeLeftServer = 100 # in hours
810    
811 slacapra 1.50 mustRenew = 0
812 fanzago 1.69 timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
813 slacapra 1.50 timeLeftServer = -999
814 fanzago 1.53 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
815 slacapra 1.50 mustRenew = 1
816     else:
817 fanzago 1.69 timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
818 slacapra 1.50 if not timeLeftServer or not isInt(timeLeftServer):
819     mustRenew = 1
820 slacapra 1.54 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
821 slacapra 1.50 mustRenew = 1
822     pass
823     pass
824    
825 slacapra 1.51 if mustRenew:
826 gutsche 1.94 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
827 spiga 1.91 cmd = 'voms-proxy-init -voms '+self.VO
828     if self.group:
829     cmd += ':/'+self.VO+'/'+self.group
830 slacapra 1.79 if self.role:
831 spiga 1.91 cmd += '/role='+self.role
832 gutsche 1.94 cmd += ' -valid 192:00'
833 slacapra 1.18 try:
834 slacapra 1.32 # SL as above: damn it!
835 spiga 1.91 common.logger.debug(10,cmd)
836 slacapra 1.19 out = os.system(cmd)
837     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
838 slacapra 1.18 except:
839     msg = "Unable to create a valid proxy!\n"
840     raise CrabException(msg)
841     pass
842 slacapra 1.51
843     ## now I do have a voms proxy valid, and I check the myproxy server
844     renewProxy = 0
845     cmd = 'myproxy-info -d -s '+self.proxyServer
846     cmd_out = runCommand(cmd,0,20)
847     if not cmd_out:
848     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
849     renewProxy = 1
850     else:
851     # if myproxy exist but not long enough, renew
852     reTime = re.compile( r'timeleft: (\d+)' )
853     #print "<"+str(reTime.search( cmd_out ).group(1))+">"
854     if reTime.match( cmd_out ):
855 slacapra 1.113 time = reTime.search( cmd_out ).group(1)
856 slacapra 1.51 if time < minTimeLeftServer:
857     renewProxy = 1
858     common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
859     pass
860     pass
861    
862     # if not, create one.
863     if renewProxy:
864     cmd = 'myproxy-init -d -n -s '+self.proxyServer
865     out = os.system(cmd)
866     if (out>0):
867 fanzago 1.53 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
868 slacapra 1.51 pass
869    
870     # cache proxy validity
871 slacapra 1.18 self.proxyValid=1
872     return
873 spiga 1.49
874 slacapra 1.18 def configOpt_(self):
875     edg_ui_cfg_opt = ' '
876     if self.edg_config:
877 slacapra 1.51 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
878 slacapra 1.18 if self.edg_config_vo:
879 slacapra 1.51 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
880 slacapra 1.18 return edg_ui_cfg_opt
881 spiga 1.121
882     def submitTout(self, list):
883     return 120
884    
885