ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.133.2.4
Committed: Fri Oct 12 12:56:30 2007 UTC (17 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: CRAB_1_5_4_SLC3_start
CVS Tags: CRAB_1_5_4_SLC3_pre3, CRAB_1_5_4_SLC3_pre2
Changes since 1.133.2.3: +22 -9 lines
Log Message:
add check if both return_data and copy_data are set to 1: in case raise exceptionSchedulerCondor_g.py

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.50 from EdgConfig import *
6 gutsche 1.130 from BlackWhiteListParser import BlackWhiteListParser
7 nsmirnov 1.1 import common
8    
9 slacapra 1.113 import os, sys, time
10 nsmirnov 1.1
11     class SchedulerEdg(Scheduler):
12     def __init__(self):
13     Scheduler.__init__(self,"EDG")
14 slacapra 1.8 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
15     "children_hist","children_num","children_states","condorId","condor_jdl", \
16     "cpuTime","destination", "done_code","exit_code","expectFrom", \
17     "expectUpdate","globusId","jdl","jobId","jobtype", \
18     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
19     "owner","parent_job", "reason","resubmitted","rsl","seed",\
20     "stateEnterTime","stateEnterTimes","subjob_failed", \
21     "user tags" , "status" , "status_code","hierarchy"]
22 nsmirnov 1.1 return
23    
24     def configure(self, cfg_params):
25 spiga 1.128
26 gutsche 1.130 # init BlackWhiteListParser
27     self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
28 spiga 1.128
29 corvo 1.131 self.proxyValid=0
30     try: self.dontCheckProxy=int(cfg_params["EDG.dont_check_proxy"])
31     except KeyError: self.dontCheckProxy = 0
32    
33 slacapra 1.46 try:
34 fanzago 1.99 RB=cfg_params["EDG.rb"]
35     self.rb_param_file=self.rb_configure(RB)
36 slacapra 1.46 except KeyError:
37 fanzago 1.99 self.rb_param_file=''
38     pass
39 slacapra 1.51 try:
40     self.proxyServer = cfg_params["EDG.proxy_server"]
41     except KeyError:
42     self.proxyServer = 'myproxy.cern.ch'
43     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
44 spiga 1.49
45 slacapra 1.79 try:
46 spiga 1.91 self.group = cfg_params["EDG.group"]
47     except KeyError:
48     self.group = None
49    
50     try:
51 slacapra 1.79 self.role = cfg_params["EDG.role"]
52     except KeyError:
53     self.role = None
54    
55 fanzago 1.133.2.1 #try: self.LCG_version = cfg_params["EDG.lcg_version"]
56     #except KeyError: self.LCG_version = '2'
57 nsmirnov 1.1
58 fanzago 1.48 try:
59     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
60     except KeyError:
61     self.EDG_ce_black_list = ''
62    
63     try:
64     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
65     except KeyError: self.EDG_ce_white_list = ''
66    
67 fanzago 1.14 try: self.VO = cfg_params['EDG.virtual_organization']
68     except KeyError: self.VO = 'cms'
69    
70 slacapra 1.102 try: self.copy_input_data = cfg_params["USER.copy_input_data"]
71     except KeyError: self.copy_input_data = 0
72    
73 spiga 1.49 try: self.return_data = cfg_params['USER.return_data']
74 spiga 1.91 except KeyError: self.return_data = 0
75    
76 fanzago 1.14 try:
77     self.copy_data = cfg_params["USER.copy_data"]
78 fanzago 1.30 if int(self.copy_data) == 1:
79     try:
80     self.SE = cfg_params['USER.storage_element']
81     self.SE_PATH = cfg_params['USER.storage_path']
82     except KeyError:
83     msg = "Error. The [USER] section does not have 'storage_element'"
84     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
85     common.logger.message(msg)
86     raise CrabException(msg)
87     except KeyError: self.copy_data = 0
88    
89     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
90 fanzago 1.133.2.4 msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
91 corvo 1.122 msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
92 fanzago 1.30 raise CrabException(msg)
93 fanzago 1.38
94 fanzago 1.133.2.4 if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
95     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
96     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
97     raise CrabException(msg)
98    
99 fanzago 1.119 ########### FEDE FOR DBS2 ##############################
100     try:
101     self.publish_data = cfg_params["USER.publish_data"]
102 corvo 1.132 self.checkProxy()
103 fanzago 1.119 if int(self.publish_data) == 1:
104     try:
105     self.publish_data_name = cfg_params['USER.publish_data_name']
106     except KeyError:
107     msg = "Error. The [USER] section does not have 'publish_data_name'"
108 fanzago 1.123 raise CrabException(msg)
109     try:
110 slacapra 1.126 tmp = runCommand("voms-proxy-info -identity")
111     tmp = string.split(tmp,'/')
112     reCN=re.compile(r'CN=')
113     for t in tmp:
114     if reCN.match(t):
115 spiga 1.127 self.UserGridName=string.strip((t.replace('CN=','')).replace(' ',''))
116 slacapra 1.126
117     #self.UserGridName = string.strip(runCommand("voms-proxy-info -identity | awk -F\'CN\' \'{print $2$3$4}\' | tr -d \'=/ \'"))
118 fanzago 1.123 except:
119     msg = "Error. Problem with voms-proxy-info -identity command"
120 fanzago 1.119 raise CrabException(msg)
121     except KeyError: self.publish_data = 0
122    
123     if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
124     msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
125     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
126     common.logger.message(msg)
127     raise CrabException(msg)
128     #################################################
129 fanzago 1.123
130 fanzago 1.133.2.1 #try:
131     # self.lfc_host = cfg_params['EDG.lfc_host']
132     #except KeyError:
133     # msg = "Error. The [EDG] section does not have 'lfc_host' value"
134     # msg = msg + " it's necessary to know the LFC host name"
135     # common.logger.message(msg)
136     # raise CrabException(msg)
137     #try:
138     # self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
139     #except KeyError:
140     # msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
141     # msg = msg + " it's necessary to know the catalog type"
142     # common.logger.message(msg)
143     # raise CrabException(msg)
144     #try:
145     # self.lfc_home = cfg_params['EDG.lfc_home']
146     #except KeyError:
147     # msg = "Error. The [EDG] section does not have 'lfc_home' value"
148     # msg = msg + " it's necessary to know the home catalog dir"
149     # common.logger.message(msg)
150     # raise CrabException(msg)
151 fanzago 1.30
152 fanzago 1.133.2.1 #try:
153     # self.register_data = cfg_params["USER.register_data"]
154     # if int(self.register_data) == 1:
155     # try:
156     # self.LFN = cfg_params['USER.lfn_dir']
157     # except KeyError:
158     # msg = "Error. The [USER] section does not have 'lfn_dir' value"
159     # msg = msg + " it's necessary for LCF registration"
160     # common.logger.message(msg)
161     # raise CrabException(msg)
162     #except KeyError: self.register_data = 0
163    
164     #if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
165     # msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
166     # msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
167     # common.logger.message(msg)
168     # raise CrabException(msg)
169 fanzago 1.14
170     try: self.EDG_requirements = cfg_params['EDG.requirements']
171     except KeyError: self.EDG_requirements = ''
172 slacapra 1.97
173 slacapra 1.101 try: self.EDG_addJdlParam = string.split(cfg_params['EDG.additional_jdl_parameters'],',')
174     except KeyError: self.EDG_addJdlParam = []
175    
176 fanzago 1.14 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
177     except KeyError: self.EDG_retry_count = ''
178 slacapra 1.97
179     try: self.EDG_shallow_retry_count= cfg_params['EDG.shallow_retry_count']
180     except KeyError: self.EDG_shallow_retry_count = ''
181    
182 fanzago 1.14 try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
183     except KeyError: self.EDG_clock_time= ''
184 slacapra 1.97
185 fanzago 1.14 try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
186     except KeyError: self.EDG_cpu_time = ''
187    
188 fanzago 1.16 # Add EDG_WL_LOCATION to the python path
189    
190     try:
191     path = os.environ['EDG_WL_LOCATION']
192     except:
193     msg = "Error: the EDG_WL_LOCATION variable is not set."
194     raise CrabException(msg)
195    
196     libPath=os.path.join(path, "lib")
197     sys.path.append(libPath)
198     libPath=os.path.join(path, "lib", "python")
199     sys.path.append(libPath)
200 nsmirnov 1.1
201 slacapra 1.61 try:
202     self._taskId = cfg_params['taskId']
203     except:
204     self._taskId = ''
205 gutsche 1.60
206 slacapra 1.81 try: self.jobtypeName = cfg_params['CRAB.jobtype']
207     except KeyError: self.jobtypeName = ''
208    
209     try: self.schedulerName = cfg_params['CRAB.scheduler']
210     except KeyError: self.scheduler = ''
211    
212 nsmirnov 1.1 return
213    
214 fanzago 1.10
215 fanzago 1.99 def rb_configure(self, RB):
216     self.edg_config = ''
217     self.edg_config_vo = ''
218     self.rb_param_file = ''
219    
220     edgConfig = EdgConfig(RB)
221     self.edg_config = edgConfig.config()
222     self.edg_config_vo = edgConfig.configVO()
223    
224     if (self.edg_config and self.edg_config_vo != ''):
225 corvo 1.122 self.rb_param_file = 'RBconfig = "'+self.edg_config+'";\nRBconfigVO = "'+self.edg_config_vo+'";\n'
226 fanzago 1.100 #print "rb_param_file = ", self.rb_param_file
227 fanzago 1.99 return self.rb_param_file
228    
229    
230 fanzago 1.10 def sched_parameter(self):
231     """
232 spiga 1.88 Returns file with requirements and scheduler-specific parameters
233 fanzago 1.10 """
234 spiga 1.88 index = int(common.jobDB.nJobs()) - 1
235     job = common.job_list[index]
236     jbt = job.type()
237    
238 slacapra 1.95 lastBlock=-1
239 spiga 1.88 first = []
240     for n in range(common.jobDB.nJobs()):
241 slacapra 1.95 currBlock=common.jobDB.block(n)
242     if (currBlock!=lastBlock):
243     lastBlock = currBlock
244 spiga 1.88 first.append(n)
245    
246     req = ''
247     req = req + jbt.getRequirements()
248    
249     if self.EDG_requirements:
250     if (req == ' '):
251     req = req + self.EDG_requirements
252     else:
253     req = req + ' && ' + self.EDG_requirements
254 slacapra 1.95
255 spiga 1.88 if self.EDG_ce_white_list:
256     ce_white_list = string.split(self.EDG_ce_white_list,',')
257     for i in range(len(ce_white_list)):
258     if i == 0:
259     if (req == ' '):
260 spiga 1.106 req = req + '((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
261 spiga 1.88 else:
262 spiga 1.106 req = req + ' && ((RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
263 spiga 1.88 pass
264     else:
265 spiga 1.106 req = req + ' || (RegExp("' + string.strip(ce_white_list[i]) + '", other.GlueCEUniqueId))'
266 spiga 1.88 req = req + ')'
267    
268     if self.EDG_ce_black_list:
269     ce_black_list = string.split(self.EDG_ce_black_list,',')
270     for ce in ce_black_list:
271     if (req == ' '):
272 spiga 1.106 req = req + '(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
273 spiga 1.88 else:
274 spiga 1.106 req = req + ' && (!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))'
275 spiga 1.88 pass
276     if self.EDG_clock_time:
277     if (req == ' '):
278     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
279     else:
280     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
281    
282     if self.EDG_cpu_time:
283     if (req == ' '):
284     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
285     else:
286     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
287    
288     for i in range(len(first)): # Add loop DS
289 spiga 1.110 groupReq = req
290 spiga 1.88 self.param='sched_param_'+str(i)+'.clad'
291 fanzago 1.10 param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
292 spiga 1.88
293     itr4=self.findSites_(first[i])
294 slacapra 1.95 for arg in itr4:
295 spiga 1.110 groupReq = groupReq + ' && anyMatch(other.storage.CloseSEs, ('+str(arg)+'))'
296     param_file.write('Requirements = '+groupReq +';\n')
297 spiga 1.88
298 fanzago 1.99 if (self.rb_param_file != ''):
299     param_file.write(self.rb_param_file)
300 spiga 1.88
301 slacapra 1.101 if len(self.EDG_addJdlParam):
302     for p in self.EDG_addJdlParam:
303     param_file.write(p)
304    
305 fanzago 1.10 param_file.close()
306 spiga 1.88
307 fanzago 1.13
308 nsmirnov 1.2 def wsSetupEnvironment(self):
309     """
310     Returns part of a job script which does scheduler-specific work.
311     """
312 spiga 1.49 txt = ''
313 mkirn 1.75 txt += '# strip arguments\n'
314     txt += 'echo "strip arguments"\n'
315     txt += 'args=("$@")\n'
316     txt += 'nargs=$#\n'
317     txt += 'shift $nargs\n'
318 gutsche 1.60 txt += "# job number (first parameter for job wrapper)\n"
319 mkirn 1.75 #txt += "NJob=$1\n"
320     txt += "NJob=${args[0]}\n"
321 gutsche 1.60
322     txt += '# job identification to DashBoard \n'
323 gutsche 1.64 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
324     txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
325     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
326     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
327     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
328     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
329 gutsche 1.60
330 spiga 1.49 txt += 'echo "middleware discovery " \n'
331 gutsche 1.84 txt += 'if [ $GRID3_APP_DIR ]; then\n'
332 spiga 1.49 txt += ' middleware=OSG \n'
333 fanzago 1.133.2.1 txt += ' if [ $GLOBUS_GRAM_JOB_CONTACT ]; then \n'
334     txt += ' SyncCE=`echo "echo $GLOBUS_GRAM_JOB_CONTACT" | cut -d: -f2 | sed \'s/\/\///\'`;\n'
335     txt += ' echo "SyncCE=$SyncCE" | tee -a $RUNTIME_AREA/$repo ;\n'
336     txt += ' else\n'
337     txt += ' echo "not reporting SyncCE";\n'
338     txt += ' fi\n';
339 gutsche 1.60 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
340 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
341     txt += 'elif [ $OSG_APP ]; then \n'
342     txt += ' middleware=OSG \n'
343 fanzago 1.133.2.1 txt += ' if [ $GLOBUS_GRAM_JOB_CONTACT ]; then \n'
344     txt += ' SyncCE=`echo "echo $GLOBUS_GRAM_JOB_CONTACT" | cut -d: -f2 | sed \'s/\/\///\'`;\n'
345     txt += ' echo "SyncCE=$SyncCE" | tee -a $RUNTIME_AREA/$repo ;\n'
346     txt += ' else\n'
347     txt += ' echo "not reporting SyncCE";\n'
348     txt += ' fi\n';
349 gutsche 1.60 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
350 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
351 gutsche 1.84 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
352     txt += ' middleware=LCG \n'
353 spiga 1.128 # txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
354     txt += ' echo "SyncCE=`glite-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
355 gutsche 1.84 txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
356     txt += ' echo "middleware =$middleware" \n'
357 spiga 1.49 txt += 'else \n'
358 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
359     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
360     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
361     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
362 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
363     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
364     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
365 gutsche 1.60 txt += ' exit 1 \n'
366     txt += 'fi \n'
367    
368     txt += '# report first time to DashBoard \n'
369     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
370 gutsche 1.64 txt += 'rm -f $RUNTIME_AREA/$repo \n'
371     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
372     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
373    
374 spiga 1.49 txt += '\n\n'
375    
376 fanzago 1.119 # if int(self.copy_data) == 1:
377     # if self.SE:
378     # txt += 'export SE='+self.SE+'\n'
379     # txt += 'echo "SE = $SE"\n'
380     # if self.SE_PATH:
381     # if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
382     # txt += 'export SE_PATH='+self.SE_PATH+'\n'
383     # txt += 'echo "SE_PATH = $SE_PATH"\n'
384 spiga 1.49
385 fanzago 1.38 txt += 'export VO='+self.VO+'\n'
386 spiga 1.88 ### add some line for LFC catalog setting
387 fanzago 1.133.2.1 #txt += 'if [ $middleware == LCG ]; then \n'
388     #txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
389     #txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
390     #txt += ' fi\n'
391     #txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
392     #txt += ' export LFC_HOST='+self.lfc_host+'\n'
393     #txt += ' fi\n'
394     #txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
395     #txt += ' export LFC_HOME='+self.lfc_home+'\n'
396     #txt += ' fi\n'
397     #txt += 'elif [ $middleware == OSG ]; then\n'
398     #txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
399     #txt += 'fi\n'
400 fanzago 1.38 #####
401 fanzago 1.133.2.1 #if int(self.register_data) == 1:
402     # txt += 'if [ $middleware == LCG ]; then \n'
403     # txt += ' export LFN='+self.LFN+'\n'
404     # txt += ' lfc-ls $LFN\n'
405     # txt += ' result=$?\n'
406     # txt += ' echo $result\n'
407     # ### creation of LFN dir in LFC catalog, under /grid/cms dir
408     # txt += ' if [ $result != 0 ]; then\n'
409     # txt += ' lfc-mkdir $LFN\n'
410     # txt += ' result=$?\n'
411     # txt += ' echo $result\n'
412     # txt += ' fi\n'
413     # txt += 'elif [ $middleware == OSG ]; then\n'
414     # txt += ' echo " Files registration to be implemented for OSG"\n'
415     # txt += 'fi\n'
416     # txt += '\n'
417     # if self.VO:
418     # txt += 'export VO='+self.VO+'\n'
419     # if self.LFN:
420     # txt += 'if [ $middleware == LCG ]; then \n'
421     # txt += ' export LFN='+self.LFN+'\n'
422     # txt += 'fi\n'
423     # txt += '\n'
424 spiga 1.49
425     txt += 'if [ $middleware == LCG ]; then\n'
426 spiga 1.128 # txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
427     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
428 spiga 1.49 txt += ' echo "CloseCEs = $CloseCEs"\n'
429     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
430     txt += ' echo "CE = $CE"\n'
431     txt += 'elif [ $middleware == OSG ]; then \n'
432     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
433 slacapra 1.81 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
434 spiga 1.49 txt += ' else \n'
435 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
436     txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
437     txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
438     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
439 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
440     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
441     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
442 spiga 1.49 txt += ' exit 1 \n'
443     txt += ' fi \n'
444     txt += 'fi \n'
445    
446 nsmirnov 1.2 return txt
447 fanzago 1.15
448 fanzago 1.39 def wsCopyInput(self):
449     """
450     Copy input data from SE to WN
451     """
452     txt = ''
453 slacapra 1.102 if not self.copy_input_data: return txt
454 slacapra 1.90
455 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
456 slacapra 1.92 txt += 'if [ $middleware == OSG ]; then\n'
457     txt += ' #\n'
458     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
459     txt += ' #\n'
460     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
461     txt += 'elif [ $middleware == LCG ]; then \n'
462     txt += ' #\n'
463     txt += ' # Copy Input Data from SE to this WN\n'
464     txt += ' #\n'
465     ### changed by georgia (put a loop copying more than one input files per jobs)
466     txt += ' for input_file in $cur_file_list \n'
467     txt += ' do \n'
468     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
469     txt += ' copy_input_exit_status=$?\n'
470     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
471     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
472     txt += ' echo "Problems with copying to WN" \n'
473     txt += ' else \n'
474     txt += ' echo "input copied into WN" \n'
475     txt += ' fi \n'
476     txt += ' done \n'
477     ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
478     txt += ' for file in $cur_pu_list \n'
479     txt += ' do \n'
480     txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
481     txt += ' copy_input_pu_exit_status=$?\n'
482     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
483     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
484     txt += ' echo "Problems with copying pu to WN" \n'
485     txt += ' else \n'
486     txt += ' echo "input pu files copied into WN" \n'
487     txt += ' fi \n'
488     txt += ' done \n'
489     txt += ' \n'
490     txt += ' ### Check SCRATCH space available on WN : \n'
491     txt += ' df -h \n'
492     txt += 'fi \n'
493 spiga 1.49
494 fanzago 1.39 return txt
495    
496 fanzago 1.14 def wsCopyOutput(self):
497     """
498     Write a CopyResults part of a job script, e.g.
499     to copy produced output into a storage element.
500     """
501 fanzago 1.133.2.1 txt = '\n'
502    
503     txt += '#\n'
504     txt += '# COPY OUTPUT FILE TO SE\n'
505     txt += '#\n\n'
506 fanzago 1.119
507     SE_PATH=''
508 fanzago 1.30 if int(self.copy_data) == 1:
509 fanzago 1.133.2.1 if self.SE:
510     txt += 'export SE='+self.SE+'\n'
511     txt += 'echo "SE = $SE"\n'
512     if self.SE_PATH:
513     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
514     SE_PATH=self.SE_PATH
515     if int(self.publish_data) == 1:
516     txt += '### publish_data = 1 so the SE path where to copy the output is: \n'
517     path_add = self.UserGridName + '/' + self.publish_data_name +'_${PSETHASH}/'
518     SE_PATH = SE_PATH + path_add
519     txt += 'export SE_PATH='+SE_PATH+'\n'
520     txt += 'echo "SE_PATH = $SE_PATH"\n'
521    
522     txt += 'echo "####################################################"\n'
523     txt += 'echo "# Copy output files from WN = `hostname` to SE = $SE"\n'
524     txt += 'echo "####################################################"\n'
525    
526 fanzago 1.133.2.2 txt += 'if [ $output_exit_status -eq 60302 ]; then\n'
527     txt += ' echo "--> No output file to copy to $SE"\n'
528     txt += ' copy_exit_status=$output_exit_status\n'
529     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
530     txt += 'else\n'
531     txt += ' for out_file in $file_list ; do\n'
532     txt += ' echo "Trying to copy output file to $SE"\n'
533     txt += ' cmscp $out_file ${SE} ${SE_PATH} $out_file $middleware\n'
534     txt += ' copy_exit_status=$?\n'
535     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
536     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
537     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
538     txt += ' echo "Problem copying $out_file to $SE $SE_PATH"\n'
539     txt += ' echo "StageOutExitStatus = $copy_exit_status " | tee -a $RUNTIME_AREA/$repo\n'
540     txt += ' copy_exit_status=60307\n'
541     txt += ' else\n'
542     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
543     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
544     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
545     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
546     txt += ' fi\n'
547     txt += ' done\n'
548     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
549     txt += ' SE=""\n'
550     txt += ' echo "SE = $SE"\n'
551     txt += ' SE_PATH=""\n'
552     txt += ' echo "SE_PATH = $SE_PATH"\n'
553 fanzago 1.133.2.1 txt += ' fi\n'
554     txt += 'fi\n'
555     txt += 'exit_status=$copy_exit_status\n'
556     pass
557 fanzago 1.14 return txt
558 nsmirnov 1.1
559 spiga 1.23 def loggingInfo(self, id):
560 slacapra 1.7 """
561     retrieve the logging info from logging and bookkeeping and return it
562     """
563 slacapra 1.18 self.checkProxy()
564 fanzago 1.24 cmd = 'edg-job-get-logging-info -v 2 ' + id
565 slacapra 1.32 cmd_out = runCommand(cmd)
566 slacapra 1.7 return cmd_out
567    
568 nsmirnov 1.1 def queryDetailedStatus(self, id):
569     """ Query a detailed status of the job with id """
570     cmd = 'edg-job-status '+id
571     cmd_out = runCommand(cmd)
572     return cmd_out
573    
574 spiga 1.88 def findSites_(self, n):
575     itr4 =[]
576 spiga 1.128
577 spiga 1.88 sites = common.jobDB.destination(n)
578 spiga 1.128
579     if len(sites)>0 and sites[0]=="":
580 spiga 1.88 return itr4
581 spiga 1.128
582 spiga 1.88 itr = ''
583     if sites != [""]:#CarlosDaniele
584 spiga 1.128 ##Addedd Daniele
585 gutsche 1.130 replicas = self.blackWhiteListParser.checkBlackList(sites,n)
586 spiga 1.128 if len(replicas)!=0:
587 gutsche 1.130 replicas = self.blackWhiteListParser.checkWhiteList(replicas,n)
588 spiga 1.128
589     if len(replicas)==0:
590 fanzago 1.133.2.3 itr = itr + 'target.GlueSEUniqueID=="NONE" '
591     #msg = 'No sites remaining that host any part of the requested data! Exiting... '
592     #raise CrabException(msg)
593 spiga 1.128 #####
594     # for site in sites:
595     for site in replicas:
596 spiga 1.88 #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
597     itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
598     itr = itr[0:-4]
599     itr4.append( itr )
600 slacapra 1.80 return itr4
601    
602     def createXMLSchScript(self, nj, argsList):
603 spiga 1.88
604 slacapra 1.80 """
605     Create a XML-file for BOSS4.
606     """
607     # job = common.job_list[nj]
608     """
609     INDY
610 spiga 1.88 [begin] FIX-ME:
611     I would pass jobType instead of job
612 slacapra 1.80 """
613     index = nj - 1
614     job = common.job_list[index]
615     jbt = job.type()
616    
617     inp_sandbox = jbt.inputSandbox(index)
618 slacapra 1.113 #out_sandbox = jbt.outputSandbox(index)
619 nsmirnov 1.5 """
620 spiga 1.88 [end] FIX-ME
621 nsmirnov 1.5 """
622 slacapra 1.6
623 slacapra 1.81
624     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
625     jt_string = ''
626    
627     xml_fname = str(self.jobtypeName)+'.xml'
628     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
629    
630     #TaskName
631     dir = string.split(common.work_space.topDir(), '/')
632     taskName = dir[len(dir)-2]
633    
634     to_write = ''
635    
636     req=' '
637     req = req + jbt.getRequirements()
638    
639     if self.EDG_requirements:
640     if (req == ' '):
641     req = req + self.EDG_requirements
642     else:
643     req = req + ' && ' + self.EDG_requirements
644     if self.EDG_ce_white_list:
645     ce_white_list = string.split(self.EDG_ce_white_list,',')
646     for i in range(len(ce_white_list)):
647     if i == 0:
648     if (req == ' '):
649     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
650     else:
651     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
652     pass
653     else:
654     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
655     req = req + ')'
656    
657     if self.EDG_ce_black_list:
658     ce_black_list = string.split(self.EDG_ce_black_list,',')
659     for ce in ce_black_list:
660     if (req == ' '):
661     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
662     else:
663     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
664     pass
665     if self.EDG_clock_time:
666     if (req == ' '):
667     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
668     else:
669     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
670    
671     if self.EDG_cpu_time:
672     if (req == ' '):
673     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
674     else:
675     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
676    
677     if ( self.EDG_retry_count ):
678     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
679     pass
680 nsmirnov 1.5
681 slacapra 1.97 if ( self.EDG_shallow_retry_count ):
682 slacapra 1.98 to_write = to_write + 'ShallowRetryCount = "'+self.EDG_shallow_retry_count+'"\n'
683 slacapra 1.97 pass
684    
685 slacapra 1.81 to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
686     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
687 nsmirnov 1.1
688 slacapra 1.102 #TaskName
689 slacapra 1.81 dir = string.split(common.work_space.topDir(), '/')
690     taskName = dir[len(dir)-2]
691    
692     xml.write(str(title))
693 spiga 1.116 #xml.write('<task name="' +str(taskName)+'" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache">\n')
694    
695     #xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + '" task_info="' + os.path.expandvars('X509_USER_PROXY') + '">\n')
696 fanzago 1.133.2.1 x509_cmd = 'ls /tmp/x509up_u`id -u`'
697     x509=runCommand(x509_cmd).strip()
698     xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + ' task_info="' + str(x509) + '">\n')
699 slacapra 1.81 xml.write(jt_string)
700 spiga 1.88
701     if (to_write != ''):
702     xml.write('<extraTags\n')
703     xml.write(to_write)
704     xml.write('/>\n')
705     pass
706 slacapra 1.81
707     xml.write('<iterator>\n')
708     xml.write('\t<iteratorRule name="ITR1">\n')
709     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
710     xml.write('\t</iteratorRule>\n')
711     xml.write('\t<iteratorRule name="ITR2">\n')
712     for arg in argsList:
713     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
714     pass
715     xml.write('\t</iteratorRule>\n')
716     #print jobList
717     xml.write('\t<iteratorRule name="ITR3">\n')
718     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
719     xml.write('\t</iteratorRule>\n')
720    
721     '''
722 spiga 1.88 indy: here itr4
723 slacapra 1.81 '''
724    
725 spiga 1.116 xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.schedulerName)+'">\n')
726     # xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
727 slacapra 1.81 xml.write(jt_string)
728 fanzago 1.14
729 slacapra 1.81 #executable
730 nsmirnov 1.1
731 slacapra 1.81 """
732     INDY
733 spiga 1.88 script depends on jobType: it should be probably get in a different way
734 slacapra 1.81 """
735 nsmirnov 1.1 script = job.scriptFilename()
736 slacapra 1.81 xml.write('<program>\n')
737     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
738     xml.write(jt_string)
739 corvo 1.107
740 slacapra 1.81 xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
741     xml.write('<program_types> crabjob </program_types>\n')
742 corvo 1.107 inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
743 nsmirnov 1.1
744     if inp_sandbox != None:
745     for fl in inp_sandbox:
746 slacapra 1.81 inp_box = inp_box + '' + fl + ','
747 nsmirnov 1.1 pass
748     pass
749    
750 corvo 1.115 # if (not jbt.additional_inbox_files == []):
751 corvo 1.112 # inp_box = inp_box + ','
752 corvo 1.115 # for addFile in jbt.additional_inbox_files:
753     # #addFile = os.path.abspath(addFile)
754     # inp_box = inp_box+''+addFile+','
755     # pass
756 nsmirnov 1.1
757     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
758 slacapra 1.81 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
759     xml.write(inp_box)
760    
761     base = jbt.name()
762     stdout = base + '__ITR3_.stdout'
763     stderr = base + '__ITR3_.stderr'
764 fanzago 1.14
765 slacapra 1.81 xml.write('<stderr> ' + stderr + '</stderr>\n')
766     xml.write('<stdout> ' + stdout + '</stdout>\n')
767 fanzago 1.14
768    
769 slacapra 1.81 out_box = stdout + ',' + \
770     stderr + ',.BrokerInfo,'
771    
772     """
773 fanzago 1.30 if int(self.return_data) == 1:
774 fanzago 1.14 if out_sandbox != None:
775     for fl in out_sandbox:
776 slacapra 1.81 out_box = out_box + '' + fl + ','
777 fanzago 1.14 pass
778 nsmirnov 1.1 pass
779     pass
780 slacapra 1.81 """
781 nsmirnov 1.1
782 slacapra 1.81 """
783     INDY
784 spiga 1.88 something similar should be also done for infiles (if it makes sense!)
785     """
786 slacapra 1.104 # Stuff to be returned _always_ via sandbox
787     for fl in jbt.output_file_sandbox:
788     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
789     pass
790     pass
791    
792     # via sandbox iif required return_data
793 slacapra 1.81 if int(self.return_data) == 1:
794     for fl in jbt.output_file:
795     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
796 fanzago 1.48 pass
797 slacapra 1.81 pass
798 slacapra 1.62
799 slacapra 1.81 if out_box[-1] == ',' : out_box = out_box[:-1]
800     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
801     xml.write(out_box)
802    
803     xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
804 fanzago 1.48
805 slacapra 1.81 xml.write('</program>\n')
806     xml.write('</chain>\n')
807 nsmirnov 1.1
808 slacapra 1.81 xml.write('</iterator>\n')
809     xml.write('</task>\n')
810 slacapra 1.51
811 slacapra 1.81 xml.close()
812 spiga 1.88
813    
814 nsmirnov 1.1 return
815 slacapra 1.18
816     def checkProxy(self):
817     """
818     Function to check the Globus proxy.
819     """
820     if (self.proxyValid): return
821 slacapra 1.103
822     ### Just return if asked to do so
823 corvo 1.117 if (self.dontCheckProxy==1):
824 slacapra 1.103 self.proxyValid=1
825     return
826    
827 slacapra 1.50 minTimeLeft=10*3600 # in seconds
828 slacapra 1.51
829     minTimeLeftServer = 100 # in hours
830    
831 slacapra 1.50 mustRenew = 0
832 fanzago 1.69 timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
833 slacapra 1.50 timeLeftServer = -999
834 fanzago 1.53 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
835 slacapra 1.50 mustRenew = 1
836     else:
837 fanzago 1.69 timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
838 slacapra 1.50 if not timeLeftServer or not isInt(timeLeftServer):
839     mustRenew = 1
840 slacapra 1.54 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
841 slacapra 1.50 mustRenew = 1
842     pass
843     pass
844    
845 slacapra 1.51 if mustRenew:
846 gutsche 1.94 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 192h\n")
847 spiga 1.91 cmd = 'voms-proxy-init -voms '+self.VO
848     if self.group:
849     cmd += ':/'+self.VO+'/'+self.group
850 slacapra 1.79 if self.role:
851 spiga 1.91 cmd += '/role='+self.role
852 gutsche 1.94 cmd += ' -valid 192:00'
853 slacapra 1.18 try:
854 slacapra 1.32 # SL as above: damn it!
855 spiga 1.91 common.logger.debug(10,cmd)
856 slacapra 1.19 out = os.system(cmd)
857     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
858 slacapra 1.18 except:
859     msg = "Unable to create a valid proxy!\n"
860     raise CrabException(msg)
861     pass
862 slacapra 1.51
863     ## now I do have a voms proxy valid, and I check the myproxy server
864     renewProxy = 0
865     cmd = 'myproxy-info -d -s '+self.proxyServer
866     cmd_out = runCommand(cmd,0,20)
867     if not cmd_out:
868     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
869     renewProxy = 1
870     else:
871 fanzago 1.133.2.4 ## minimum time: 5 days
872     minTime = 4 * 24 * 3600
873     ## regex to extract the right information
874     myproxyRE = re.compile("timeleft: (?P<hours>[\\d]*):(?P<minutes>[\\d]*):(?P<seconds>[\\d]*)")
875     for row in cmd_out.split("\n"):
876     g = myproxyRE.search(row)
877     if g:
878     hours = g.group("hours")
879     minutes = g.group("minutes")
880     seconds = g.group("seconds")
881     timeleft = int(hours)*3600 + int(minutes)*60 + int(seconds)
882     if timeleft < minTime:
883     renewProxy = 1
884     common.logger.message('Your proxy will expire in:\n\t'+hours+' hours '+minutes+' minutes '+seconds+' seconds\n')
885     common.logger.message('Need to renew it:')
886     pass
887 slacapra 1.51 pass
888     pass
889    
890     # if not, create one.
891     if renewProxy:
892     cmd = 'myproxy-init -d -n -s '+self.proxyServer
893     out = os.system(cmd)
894     if (out>0):
895 fanzago 1.53 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
896 slacapra 1.51 pass
897    
898     # cache proxy validity
899 slacapra 1.18 self.proxyValid=1
900     return
901 spiga 1.49
902 slacapra 1.18 def configOpt_(self):
903     edg_ui_cfg_opt = ' '
904     if self.edg_config:
905 slacapra 1.51 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
906 slacapra 1.18 if self.edg_config_vo:
907 slacapra 1.51 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
908 slacapra 1.18 return edg_ui_cfg_opt
909 spiga 1.121
910     def submitTout(self, list):
911     return 120
912    
913