ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
Revision: 1.85
Committed: Wed Sep 27 16:17:07 2006 UTC (18 years, 7 months ago) by gutsche
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_1_3_0, CRAB_1_3_0_pre6
Changes since 1.84: +2 -2 lines
Log Message:
use  in srmcp command

File Contents

# User Rev Content
1 nsmirnov 1.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 slacapra 1.50 from EdgConfig import *
6 nsmirnov 1.1 import common
7    
8 slacapra 1.18 import os, sys, time
9 nsmirnov 1.1
10     class SchedulerEdg(Scheduler):
11     def __init__(self):
12     Scheduler.__init__(self,"EDG")
13 slacapra 1.8 self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14     "children_hist","children_num","children_states","condorId","condor_jdl", \
15     "cpuTime","destination", "done_code","exit_code","expectFrom", \
16     "expectUpdate","globusId","jdl","jobId","jobtype", \
17     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18     "owner","parent_job", "reason","resubmitted","rsl","seed",\
19     "stateEnterTime","stateEnterTimes","subjob_failed", \
20     "user tags" , "status" , "status_code","hierarchy"]
21 nsmirnov 1.1 return
22    
23     def configure(self, cfg_params):
24 spiga 1.49
25 slacapra 1.46 try:
26     RB = cfg_params["EDG.rb"]
27     edgConfig = EdgConfig(RB)
28     self.edg_config = edgConfig.config()
29     self.edg_config_vo = edgConfig.configVO()
30     except KeyError:
31     self.edg_config = ''
32     self.edg_config_vo = ''
33 nsmirnov 1.1
34 slacapra 1.51 try:
35     self.proxyServer = cfg_params["EDG.proxy_server"]
36     except KeyError:
37     self.proxyServer = 'myproxy.cern.ch'
38     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39 spiga 1.49
40 slacapra 1.79 try:
41     self.role = cfg_params["EDG.role"]
42     except KeyError:
43     self.role = None
44    
45 nsmirnov 1.1 try: self.LCG_version = cfg_params["EDG.lcg_version"]
46     except KeyError: self.LCG_version = '2'
47    
48     try: self.EDG_requirements = cfg_params['EDG.requirements']
49     except KeyError: self.EDG_requirements = ''
50    
51 spiga 1.49 try: self.EDG_retry_count = cfg_params['EDG.retry_count']
52 nsmirnov 1.1 except KeyError: self.EDG_retry_count = ''
53    
54 fanzago 1.48 try:
55     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
56     #print "self.EDG_ce_black_list = ", self.EDG_ce_black_list
57     except KeyError:
58     self.EDG_ce_black_list = ''
59    
60     try:
61     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
62     #print "self.EDG_ce_white_list = ", self.EDG_ce_white_list
63     except KeyError: self.EDG_ce_white_list = ''
64    
65 fanzago 1.14 try: self.VO = cfg_params['EDG.virtual_organization']
66     except KeyError: self.VO = 'cms'
67    
68 spiga 1.49 try: self.return_data = cfg_params['USER.return_data']
69     except KeyError: self.return_data = 1
70    
71 fanzago 1.39 try:
72     self.copy_input_data = common.analisys_common_info['copy_input_data']
73 fanzago 1.41 #print "self.copy_input_data = ", self.copy_input_data
74 fanzago 1.39 except KeyError: self.copy_input_data = 0
75    
76 fanzago 1.14 try:
77     self.copy_data = cfg_params["USER.copy_data"]
78 fanzago 1.30 if int(self.copy_data) == 1:
79     try:
80     self.SE = cfg_params['USER.storage_element']
81     self.SE_PATH = cfg_params['USER.storage_path']
82     except KeyError:
83     msg = "Error. The [USER] section does not have 'storage_element'"
84     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
85     common.logger.message(msg)
86     raise CrabException(msg)
87     except KeyError: self.copy_data = 0
88    
89     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
90     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
91     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
92     raise CrabException(msg)
93 fanzago 1.38
94     try:
95     self.lfc_host = cfg_params['EDG.lfc_host']
96     except KeyError:
97     msg = "Error. The [EDG] section does not have 'lfc_host' value"
98     msg = msg + " it's necessary to know the LFC host name"
99     common.logger.message(msg)
100     raise CrabException(msg)
101     try:
102     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
103     except KeyError:
104     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
105     msg = msg + " it's necessary to know the catalog type"
106     common.logger.message(msg)
107     raise CrabException(msg)
108     try:
109     self.lfc_home = cfg_params['EDG.lfc_home']
110     except KeyError:
111     msg = "Error. The [EDG] section does not have 'lfc_home' value"
112     msg = msg + " it's necessary to know the home catalog dir"
113     common.logger.message(msg)
114     raise CrabException(msg)
115 fanzago 1.30
116 fanzago 1.14 try:
117     self.register_data = cfg_params["USER.register_data"]
118 fanzago 1.30 if int(self.register_data) == 1:
119     try:
120     self.LFN = cfg_params['USER.lfn_dir']
121     except KeyError:
122     msg = "Error. The [USER] section does not have 'lfn_dir' value"
123 fanzago 1.36 msg = msg + " it's necessary for LCF registration"
124 fanzago 1.30 common.logger.message(msg)
125     raise CrabException(msg)
126     except KeyError: self.register_data = 0
127    
128     if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
129     msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
130     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
131     common.logger.message(msg)
132     raise CrabException(msg)
133 fanzago 1.14
134     try: self.EDG_requirements = cfg_params['EDG.requirements']
135     except KeyError: self.EDG_requirements = ''
136    
137     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
138     except KeyError: self.EDG_retry_count = ''
139    
140     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
141     except KeyError: self.EDG_clock_time= ''
142    
143     try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
144     except KeyError: self.EDG_cpu_time = ''
145    
146 fanzago 1.16 # Add EDG_WL_LOCATION to the python path
147    
148     try:
149     path = os.environ['EDG_WL_LOCATION']
150     except:
151     msg = "Error: the EDG_WL_LOCATION variable is not set."
152     raise CrabException(msg)
153    
154     libPath=os.path.join(path, "lib")
155     sys.path.append(libPath)
156     libPath=os.path.join(path, "lib", "python")
157     sys.path.append(libPath)
158 nsmirnov 1.1
159 slacapra 1.18 self.proxyValid=0
160 gutsche 1.60
161 slacapra 1.61 try:
162     self._taskId = cfg_params['taskId']
163     except:
164     self._taskId = ''
165 gutsche 1.60
166 slacapra 1.81 try: self.jobtypeName = cfg_params['CRAB.jobtype']
167     except KeyError: self.jobtypeName = ''
168    
169     try: self.schedulerName = cfg_params['CRAB.scheduler']
170     except KeyError: self.scheduler = ''
171    
172 nsmirnov 1.1 return
173    
174 fanzago 1.10
175     def sched_parameter(self):
176     """
177     Returns file with scheduler-specific parameters
178     """
179    
180     if (self.edg_config and self.edg_config_vo != ''):
181     self.param='sched_param.clad'
182     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
183     param_file.write('RBconfig = "'+self.edg_config+'";\n')
184     param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
185     param_file.close()
186     return 1
187     else:
188     return 0
189 fanzago 1.13
190 nsmirnov 1.2 def wsSetupEnvironment(self):
191     """
192     Returns part of a job script which does scheduler-specific work.
193     """
194 spiga 1.49 txt = ''
195 mkirn 1.75 txt += '# strip arguments\n'
196     txt += 'echo "strip arguments"\n'
197     txt += 'args=("$@")\n'
198     txt += 'nargs=$#\n'
199     txt += 'shift $nargs\n'
200 gutsche 1.60 txt += "# job number (first parameter for job wrapper)\n"
201 mkirn 1.75 #txt += "NJob=$1\n"
202     txt += "NJob=${args[0]}\n"
203 gutsche 1.60
204     txt += '# job identification to DashBoard \n'
205 gutsche 1.64 txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
206     txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
207     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
208     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
209     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
210     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
211 gutsche 1.60
212 spiga 1.49 txt += 'echo "middleware discovery " \n'
213 gutsche 1.84 txt += 'if [ $GRID3_APP_DIR ]; then\n'
214 spiga 1.49 txt += ' middleware=OSG \n'
215 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
216     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
217 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
218     txt += 'elif [ $OSG_APP ]; then \n'
219     txt += ' middleware=OSG \n'
220 gutsche 1.60 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
221     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
222 spiga 1.49 txt += ' echo "middleware =$middleware" \n'
223 gutsche 1.84 txt += 'elif [ $VO_CMS_SW_DIR ]; then \n'
224     txt += ' middleware=LCG \n'
225     txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
226     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
227     txt += ' echo "middleware =$middleware" \n'
228 spiga 1.49 txt += 'else \n'
229 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
230     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
231     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
232     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
233 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
234     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
235     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
236 gutsche 1.60 txt += ' exit 1 \n'
237     txt += 'fi \n'
238    
239     txt += '# report first time to DashBoard \n'
240     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
241 gutsche 1.64 txt += 'rm -f $RUNTIME_AREA/$repo \n'
242     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
243     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
244    
245 spiga 1.49 txt += '\n\n'
246    
247 fanzago 1.30 if int(self.copy_data) == 1:
248 fanzago 1.14 if self.SE:
249     txt += 'export SE='+self.SE+'\n'
250 fanzago 1.15 txt += 'echo "SE = $SE"\n'
251 fanzago 1.14 if self.SE_PATH:
252     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
253     txt += 'export SE_PATH='+self.SE_PATH+'\n'
254 fanzago 1.15 txt += 'echo "SE_PATH = $SE_PATH"\n'
255 spiga 1.49
256 fanzago 1.38 txt += 'export VO='+self.VO+'\n'
257     ### FEDE: add some line for LFC catalog setting
258 spiga 1.49 txt += 'if [ $middleware == LCG ]; then \n'
259     txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
260     txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
261     txt += ' fi\n'
262     txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
263     txt += ' export LFC_HOST='+self.lfc_host+'\n'
264     txt += ' fi\n'
265     txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
266     txt += ' export LFC_HOME='+self.lfc_home+'\n'
267     txt += ' fi\n'
268     txt += 'elif [ $middleware == OSG ]; then\n'
269     txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
270 fanzago 1.38 txt += 'fi\n'
271     #####
272 fanzago 1.30 if int(self.register_data) == 1:
273 spiga 1.49 txt += 'if [ $middleware == LCG ]; then \n'
274     txt += ' export LFN='+self.LFN+'\n'
275     txt += ' lfc-ls $LFN\n'
276     txt += ' result=$?\n'
277     txt += ' echo $result\n'
278 fanzago 1.36 ### creation of LFN dir in LFC catalog, under /grid/cms dir
279 spiga 1.49 txt += ' if [ $result != 0 ]; then\n'
280     txt += ' lfc-mkdir $LFN\n'
281     txt += ' result=$?\n'
282     txt += ' echo $result\n'
283     txt += ' fi\n'
284     txt += 'elif [ $middleware == OSG ]; then\n'
285     txt += ' echo " Files registration to be implemented for OSG"\n'
286 fanzago 1.36 txt += 'fi\n'
287     txt += '\n'
288 spiga 1.49
289     if self.VO:
290     txt += 'export VO='+self.VO+'\n'
291     if self.LFN:
292     txt += 'if [ $middleware == LCG ]; then \n'
293     txt += ' export LFN='+self.LFN+'\n'
294     txt += 'fi\n'
295     txt += '\n'
296    
297     txt += 'if [ $middleware == LCG ]; then\n'
298     txt += ' CloseCEs=`edg-brokerinfo getCE`\n'
299     txt += ' echo "CloseCEs = $CloseCEs"\n'
300     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
301     txt += ' echo "CE = $CE"\n'
302     txt += 'elif [ $middleware == OSG ]; then \n'
303     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
304 slacapra 1.81 txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
305 spiga 1.49 txt += ' else \n'
306 gutsche 1.60 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
307     txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
308     txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
309     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
310 gutsche 1.64 txt += ' rm -f $RUNTIME_AREA/$repo \n'
311     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
312     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
313 spiga 1.49 txt += ' exit 1 \n'
314     txt += ' fi \n'
315     txt += 'fi \n'
316    
317 nsmirnov 1.2 return txt
318 fanzago 1.15
319 fanzago 1.39 def wsCopyInput(self):
320     """
321     Copy input data from SE to WN
322     """
323     txt = ''
324     try:
325     self.copy_input_data = common.analisys_common_info['copy_input_data']
326     except KeyError: self.copy_input_data = 0
327     if int(self.copy_input_data) == 1:
328 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
329     txt += 'if [ $middleware == OSG ]; then\n'
330     txt += ' #\n'
331     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
332     txt += ' #\n'
333     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
334     txt += 'elif [ $middleware == LCG ]; then \n'
335     txt += ' #\n'
336     txt += ' # Copy Input Data from SE to this WN\n'
337     txt += ' #\n'
338 slacapra 1.81 ### changed by georgia (put a loop copying more than one input files per jobs)
339 spiga 1.49 txt += ' for input_file in $cur_file_list \n'
340     txt += ' do \n'
341 fanzago 1.70 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
342 fanzago 1.68 txt += ' copy_input_exit_status=$?\n'
343     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
344     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
345     txt += ' echo "Problems with copying to WN" \n'
346     txt += ' else \n'
347     txt += ' echo "input copied into WN" \n'
348     txt += ' fi \n'
349 spiga 1.49 txt += ' done \n'
350 slacapra 1.81 ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
351 spiga 1.49 txt += ' for file in $cur_pu_list \n'
352     txt += ' do \n'
353 fanzago 1.70 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
354 fanzago 1.72 txt += ' copy_input_pu_exit_status=$?\n'
355 fanzago 1.68 txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
356     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
357     txt += ' echo "Problems with copying pu to WN" \n'
358     txt += ' else \n'
359     txt += ' echo "input pu files copied into WN" \n'
360     txt += ' fi \n'
361 spiga 1.49 txt += ' done \n'
362     txt += ' \n'
363     txt += ' ### Check SCRATCH space available on WN : \n'
364     txt += ' df -h \n'
365     txt += 'fi \n'
366    
367 fanzago 1.39 return txt
368    
369 fanzago 1.14 def wsCopyOutput(self):
370     """
371     Write a CopyResults part of a job script, e.g.
372     to copy produced output into a storage element.
373     """
374     txt = ''
375 fanzago 1.30 if int(self.copy_data) == 1:
376 fanzago 1.14 txt += '#\n'
377 fanzago 1.15 txt += '# Copy output to SE = $SE\n'
378 fanzago 1.14 txt += '#\n'
379 gutsche 1.73 txt += ' if [ $middleware == OSG ]; then\n'
380     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
381     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
382     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
383     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
384     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
385     txt += ' fi \n'
386 corvo 1.34 txt += ' for out_file in $file_list ; do\n'
387 gutsche 1.73 txt += ' echo "Trying to copy output file to $SE using lcg-cp"\n'
388 mkirn 1.77 if common.logger.debugLevel() >= 5:
389     txt += ' echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
390     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
391     else:
392     txt += ' echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
393     txt += ' exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
394 gutsche 1.73 txt += ' copy_exit_status=$?\n'
395     txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
396 corvo 1.34 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
397     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
398 gutsche 1.73 txt += ' echo "Possible problem with SE = $SE"\n'
399 corvo 1.34 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
400     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
401 mkirn 1.77 txt += ' echo "lcg-cp failed. For verbose lcg-cp output, use command line option -debug 5."\n'
402 gutsche 1.73 txt += ' echo "lcg-cp failed, attempting srmcp"\n'
403     txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
404     txt += ' mkdir -p $HOME/.srmconfig\n'
405     txt += ' if [ $middleware == LCG ]; then\n'
406 mkirn 1.74 txt += ' echo "srmcp -retry_num 5 -retry_timeout 480000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
407     txt += ' exitstring=`srmcp -retry_num 5 -retry_timeout 480000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
408 gutsche 1.73 txt += ' elif [ $middleware == OSG ]; then\n'
409 gutsche 1.85 txt += ' echo "srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
410     txt += ' exitstring=`srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $X509_CERT_DIR file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
411 gutsche 1.73 txt += ' fi \n'
412     txt += ' copy_exit_status=$?\n'
413     txt += ' echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
414     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
415     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
416     txt += ' echo "Problems with SE = $SE"\n'
417     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
418     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
419     txt += ' echo "lcg-cp and srm failed"\n'
420     txt += ' echo "If storage_path in your config file contains a ? you may need a \? instead."\n'
421     txt += ' else\n'
422     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
423     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
424     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
425     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
426     txt += ' echo "srmcp succeeded"\n'
427     txt += ' fi\n'
428 corvo 1.34 txt += ' else\n'
429     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
430     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
431     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
432     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
433 gutsche 1.73 txt += ' echo "lcg-cp succeeded"\n'
434 corvo 1.34 txt += ' fi\n'
435     txt += ' done\n'
436 fanzago 1.14 return txt
437    
438     def wsRegisterOutput(self):
439     """
440     Returns part of a job script which does scheduler-specific work.
441     """
442    
443     txt = ''
444 fanzago 1.30 if int(self.register_data) == 1:
445 spiga 1.49 ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
446     txt += 'if [ $middleware == OSG ]; then\n'
447     txt += ' #\n'
448     txt += ' # Register output to LFC deactivated in OSG mode\n'
449     txt += ' #\n'
450     txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
451     txt += 'elif [ $middleware == LCG ]; then \n'
452 fanzago 1.14 txt += '#\n'
453 fanzago 1.36 txt += '# Register output to LFC\n'
454 fanzago 1.14 txt += '#\n'
455 fanzago 1.66 txt += ' if [ $copy_exit_status -eq 0 ]; then\n'
456 spiga 1.49 txt += ' for out_file in $file_list ; do\n'
457     txt += ' echo "Trying to register the output file into LFC"\n'
458 fanzago 1.71 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1"\n'
459 fanzago 1.70 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1 \n'
460 fanzago 1.14 txt += ' register_exit_status=$?\n'
461 fanzago 1.15 txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
462     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
463 fanzago 1.14 txt += ' if [ $register_exit_status -ne 0 ]; then \n'
464 spiga 1.49 txt += ' echo "Problems with the registration to LFC" \n'
465     txt += ' echo "Try with srm protocol" \n'
466 fanzago 1.71 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1"\n'
467 fanzago 1.70 txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1 \n'
468 spiga 1.49 txt += ' register_exit_status=$?\n'
469     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
470     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
471     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
472     txt += ' echo "Problems with the registration into LFC" \n'
473     txt += ' fi \n'
474     txt += ' else \n'
475     txt += ' echo "output registered to LFC"\n'
476 fanzago 1.14 txt += ' fi \n'
477 spiga 1.49 txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
478     txt += ' done\n'
479 fanzago 1.66 txt += ' else \n'
480 spiga 1.49 txt += ' echo "Trying to copy output file to CloseSE"\n'
481     txt += ' CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
482     txt += ' for out_file in $file_list ; do\n'
483 fanzago 1.71 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1" \n'
484 fanzago 1.70 txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1 \n'
485 spiga 1.49 txt += ' register_exit_status=$?\n'
486     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
487     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
488     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
489 fanzago 1.66 txt += ' echo "Problems with CloseSE or Catalog" \n'
490 spiga 1.49 txt += ' else \n'
491     txt += ' echo "The program was successfully executed"\n'
492     txt += ' echo "SE = $CLOSE_SE"\n'
493     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
494     txt += ' fi \n'
495     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
496     txt += ' done\n'
497     txt += ' fi \n'
498 fanzago 1.68 txt += ' exit_status=$register_exit_status\n'
499 fanzago 1.14 txt += 'fi \n'
500     return txt
501 nsmirnov 1.1
502 spiga 1.23 def loggingInfo(self, id):
503 slacapra 1.7 """
504     retrieve the logging info from logging and bookkeeping and return it
505     """
506 slacapra 1.18 self.checkProxy()
507 fanzago 1.24 cmd = 'edg-job-get-logging-info -v 2 ' + id
508 slacapra 1.32 #cmd_out = os.popen(cmd)
509     cmd_out = runCommand(cmd)
510 slacapra 1.7 return cmd_out
511    
512 slacapra 1.8 def getExitStatus(self, id):
513     return self.getStatusAttribute_(id, 'exit_code')
514 fanzago 1.10
515 nsmirnov 1.1 def queryStatus(self, id):
516 slacapra 1.8 return self.getStatusAttribute_(id, 'status')
517 fanzago 1.10
518 spiga 1.9 def queryDest(self, id):
519     return self.getStatusAttribute_(id, 'destination')
520    
521 slacapra 1.8
522     def getStatusAttribute_(self, id, attr):
523 nsmirnov 1.1 """ Query a status of the job with id """
524 slacapra 1.8
525 slacapra 1.18 self.checkProxy()
526 slacapra 1.8 hstates = {}
527 fanzago 1.12 Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
528 slacapra 1.8 # Bypass edg-job-status interfacing directly to C++ API
529     # Job attribute vector to retrieve status without edg-job-status
530     level = 0
531     # Instance of the Status class provided by LB API
532     jobStat = Status()
533     st = 0
534     jobStat.getStatus(id, level)
535     err, apiMsg = jobStat.get_error()
536     if err:
537 fanzago 1.26 common.logger.debug(5,'Error caught' + apiMsg)
538 nsmirnov 1.1 return None
539 slacapra 1.8 else:
540     for i in range(len(self.states)):
541     # Fill an hash table with all information retrieved from LB API
542     hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
543 slacapra 1.81 result = jobStat.loadStatus(st)[self.states.index(attr)]
544 slacapra 1.8 return result
545 nsmirnov 1.1
546     def queryDetailedStatus(self, id):
547     """ Query a detailed status of the job with id """
548     cmd = 'edg-job-status '+id
549     cmd_out = runCommand(cmd)
550     return cmd_out
551    
552 slacapra 1.80 ##### FEDE ######
553     def findSites_(self, n_tot_job):
554     itr4 = []
555     # print "n_tot_job = ", n_tot_job
556     for n in range(n_tot_job):
557     sites = common.jobDB.destination(n)
558     if len(sites)>0 and sites[0]=="Any": continue
559    
560     #job = common.job_list[n]
561     #jbt = job.type()
562     # print "common.jobDB.destination(n) = ", common.jobDB.destination(n)
563     # print "sites = ", sites
564     itr = ''
565 spiga 1.83 if sites != [""]:#CarlosDaniele
566     for site in sites:
567     #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
568     itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
569     pass
570     # remove last ||
571     itr = itr[0:-4]
572     itr4.append( itr )
573 slacapra 1.80 # remove last ,
574     # print "itr4 = ", itr4
575     return itr4
576    
577     def createXMLSchScript(self, nj, argsList):
578     # def createXMLSchScript(self, nj):
579     """
580     Create a XML-file for BOSS4.
581     """
582     # job = common.job_list[nj]
583     """
584     INDY
585     [begin] da rivedere:
586     in particolare passerei il jobType ed eliminerei le dipendenze da job
587     """
588     index = nj - 1
589     job = common.job_list[index]
590     jbt = job.type()
591    
592     inp_sandbox = jbt.inputSandbox(index)
593     out_sandbox = jbt.outputSandbox(index)
594 nsmirnov 1.5 """
595 slacapra 1.81 [end] da rivedere
596 nsmirnov 1.5 """
597 slacapra 1.6
598 slacapra 1.81
599     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
600     jt_string = ''
601    
602     xml_fname = str(self.jobtypeName)+'.xml'
603     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
604    
605     #TaskName
606     dir = string.split(common.work_space.topDir(), '/')
607     taskName = dir[len(dir)-2]
608    
609     to_writeReq = ''
610     to_write = ''
611    
612     req=' '
613     req = req + jbt.getRequirements()
614    
615    
616     #sites = common.jobDB.destination(nj)
617     #if len(sites)>0 and sites[0]!="Any":
618     # req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
619     #req = req
620    
621     if self.EDG_requirements:
622     if (req == ' '):
623     req = req + self.EDG_requirements
624     else:
625     req = req + ' && ' + self.EDG_requirements
626     if self.EDG_ce_white_list:
627     ce_white_list = string.split(self.EDG_ce_white_list,',')
628     for i in range(len(ce_white_list)):
629     if i == 0:
630     if (req == ' '):
631     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
632     else:
633     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
634     pass
635     else:
636     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
637     req = req + ')'
638    
639     if self.EDG_ce_black_list:
640     ce_black_list = string.split(self.EDG_ce_black_list,',')
641     for ce in ce_black_list:
642     if (req == ' '):
643     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
644     else:
645     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
646     pass
647     if self.EDG_clock_time:
648     if (req == ' '):
649     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
650     else:
651     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
652    
653     if self.EDG_cpu_time:
654     if (req == ' '):
655     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
656     else:
657     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
658    
659     if ( self.EDG_retry_count ):
660     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
661     pass
662 nsmirnov 1.5
663 slacapra 1.81 to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
664     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
665 nsmirnov 1.1
666    
667 slacapra 1.81 #TaskName
668     dir = string.split(common.work_space.topDir(), '/')
669     taskName = dir[len(dir)-2]
670    
671     xml.write(str(title))
672     xml.write('<task name="' +str(taskName)+'">\n')
673     xml.write(jt_string)
674    
675     xml.write('<iterator>\n')
676     xml.write('\t<iteratorRule name="ITR1">\n')
677     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
678     xml.write('\t</iteratorRule>\n')
679     xml.write('\t<iteratorRule name="ITR2">\n')
680     for arg in argsList:
681     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
682     pass
683     xml.write('\t</iteratorRule>\n')
684     #print jobList
685     xml.write('\t<iteratorRule name="ITR3">\n')
686     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
687     xml.write('\t</iteratorRule>\n')
688    
689     '''
690     indy: qui sotto ci sta itr4
691     '''
692    
693     itr4=self.findSites_(nj)
694     #print "--->>> itr4 = ", itr4
695 spiga 1.83 if (itr4 != []):
696 slacapra 1.81 xml.write('\t<iteratorRule name="ITR4">\n')
697     #print argsList
698     for arg in itr4:
699     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
700     pass
701     xml.write('\t</iteratorRule>\n')
702     req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
703     pass
704     # print "--->>> req= ", req
705 nsmirnov 1.1
706 slacapra 1.81 if (to_write != ''):
707     xml.write('<extraTags\n')
708     xml.write(to_write)
709     xml.write('/>\n')
710     pass
711 fanzago 1.14
712 slacapra 1.81 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
713     xml.write(jt_string)
714 fanzago 1.14
715 slacapra 1.81 if (req != ' '):
716     req = req + '\n'
717     xml.write('<extraTags>\n')
718     xml.write('<Requirements>\n')
719     xml.write('<![CDATA[\n')
720     xml.write(req)
721     xml.write(']]>\n')
722     xml.write('</Requirements>\n')
723     xml.write('</extraTags>\n')
724     pass
725 nsmirnov 1.1
726 slacapra 1.81 #executable
727 nsmirnov 1.1
728 slacapra 1.81 """
729     INDY
730     script dipende dal jobType: dovrebbe essere semplice tirarlo fuori in altro modo
731     """
732 nsmirnov 1.1 script = job.scriptFilename()
733 slacapra 1.81 xml.write('<program>\n')
734     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
735     xml.write(jt_string)
736    
737    
738 fanzago 1.14 ### only one .sh JDL has arguments:
739 gutsche 1.56 ### Fabio
740 slacapra 1.81 # xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "EDG") +'"\n')
741     xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
742     xml.write('<program_types> crabjob </program_types>\n')
743     inp_box = script + ','
744 nsmirnov 1.1
745     if inp_sandbox != None:
746     for fl in inp_sandbox:
747 slacapra 1.81 inp_box = inp_box + '' + fl + ','
748 nsmirnov 1.1 pass
749     pass
750    
751 slacapra 1.81 inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
752     os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
753     os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
754     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
755     os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
756 nsmirnov 1.1
757 fanzago 1.40 if (not jbt.additional_inbox_files == []):
758 spiga 1.82 inp_box = inp_box + ','
759 fanzago 1.40 for addFile in jbt.additional_inbox_files:
760     addFile = os.path.abspath(addFile)
761 slacapra 1.81 inp_box = inp_box+''+addFile+','
762 fanzago 1.40 pass
763 nsmirnov 1.1
764     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
765 slacapra 1.81 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
766     xml.write(inp_box)
767    
768     base = jbt.name()
769     stdout = base + '__ITR3_.stdout'
770     stderr = base + '__ITR3_.stderr'
771 fanzago 1.14
772 slacapra 1.81 xml.write('<stderr> ' + stderr + '</stderr>\n')
773     xml.write('<stdout> ' + stdout + '</stdout>\n')
774 fanzago 1.14
775    
776 slacapra 1.81 out_box = stdout + ',' + \
777     stderr + ',.BrokerInfo,'
778    
779     """
780 fanzago 1.30 if int(self.return_data) == 1:
781 fanzago 1.14 if out_sandbox != None:
782     for fl in out_sandbox:
783 slacapra 1.81 out_box = out_box + '' + fl + ','
784 fanzago 1.14 pass
785 nsmirnov 1.1 pass
786     pass
787 slacapra 1.81 """
788 nsmirnov 1.1
789 slacapra 1.81 """
790     INDY
791     qualcosa del genere andrebbe fatta per gli infiles
792     """
793     if int(self.return_data) == 1:
794     for fl in jbt.output_file:
795     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
796 fanzago 1.48 pass
797 slacapra 1.81 pass
798 slacapra 1.62
799 slacapra 1.81 if out_box[-1] == ',' : out_box = out_box[:-1]
800     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
801     xml.write(out_box)
802    
803     xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
804 fanzago 1.48
805 slacapra 1.81 xml.write('</program>\n')
806     xml.write('</chain>\n')
807 nsmirnov 1.1
808 slacapra 1.81 xml.write('</iterator>\n')
809     xml.write('</task>\n')
810 slacapra 1.51
811 slacapra 1.81 xml.close()
812 nsmirnov 1.1 return
813 slacapra 1.18
814     def checkProxy(self):
815     """
816     Function to check the Globus proxy.
817     """
818     if (self.proxyValid): return
819     timeleft = -999
820 slacapra 1.50 minTimeLeft=10*3600 # in seconds
821 slacapra 1.51
822     minTimeLeftServer = 100 # in hours
823    
824 slacapra 1.50 mustRenew = 0
825 fanzago 1.69 timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
826 slacapra 1.50 timeLeftServer = -999
827 fanzago 1.53 if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
828 slacapra 1.50 mustRenew = 1
829     else:
830 fanzago 1.69 timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
831 slacapra 1.50 if not timeLeftServer or not isInt(timeLeftServer):
832     mustRenew = 1
833 slacapra 1.54 elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
834 slacapra 1.50 mustRenew = 1
835     pass
836     pass
837    
838 slacapra 1.51 if mustRenew:
839 gutsche 1.58 common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
840 slacapra 1.79 cmd = 'voms-proxy-init -voms '+self.VO+' -valid 96:00'
841     if self.role:
842     cmd = 'voms-proxy-init -voms '+self.VO+':/'+self.VO+'/role='+self.role+' -valid 96:00'
843 slacapra 1.18 try:
844 slacapra 1.32 # SL as above: damn it!
845 slacapra 1.19 out = os.system(cmd)
846     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
847 slacapra 1.18 except:
848     msg = "Unable to create a valid proxy!\n"
849     raise CrabException(msg)
850     pass
851 slacapra 1.51
852     ## now I do have a voms proxy valid, and I check the myproxy server
853     renewProxy = 0
854     cmd = 'myproxy-info -d -s '+self.proxyServer
855     cmd_out = runCommand(cmd,0,20)
856     if not cmd_out:
857     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
858     renewProxy = 1
859     else:
860     # if myproxy exist but not long enough, renew
861     reTime = re.compile( r'timeleft: (\d+)' )
862     #print "<"+str(reTime.search( cmd_out ).group(1))+">"
863     if reTime.match( cmd_out ):
864     time = reTime.search( line ).group(1)
865     if time < minTimeLeftServer:
866     renewProxy = 1
867     common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
868     pass
869     pass
870    
871     # if not, create one.
872     if renewProxy:
873     cmd = 'myproxy-init -d -n -s '+self.proxyServer
874     out = os.system(cmd)
875     if (out>0):
876 fanzago 1.53 raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
877 slacapra 1.51 pass
878    
879     # cache proxy validity
880 slacapra 1.18 self.proxyValid=1
881     return
882 spiga 1.49
883 slacapra 1.18 def configOpt_(self):
884     edg_ui_cfg_opt = ' '
885     if self.edg_config:
886 slacapra 1.51 edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
887 slacapra 1.18 if self.edg_config_vo:
888 slacapra 1.51 edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
889 slacapra 1.18 return edg_ui_cfg_opt