ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.1.2.3.2.1
Committed: Fri Sep 15 07:51:02 2006 UTC (18 years, 7 months ago) by spiga
Content type: text/x-python
Branch: CRAB_BOSS4_v1
Changes since 1.1.2.3: +272 -329 lines
Log Message:
merged with last changes

File Contents

# User Rev Content
1 spiga 1.1.2.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     from EdgConfig import *
6     import common
7    
8     import os, sys, time
9    
10     class SchedulerGlite(Scheduler):
11     def __init__(self):
12     Scheduler.__init__(self,"GLITE")
13     self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14     "children_hist","children_num","children_states","condorId","condor_jdl", \
15     "cpuTime","destination", "done_code","exit_code","expectFrom", \
16     "expectUpdate","globusId","jdl","jobId","jobtype", \
17     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18     "owner","parent_job", "reason","resubmitted","rsl","seed",\
19     "stateEnterTime","stateEnterTimes","subjob_failed", \
20     "user tags" , "status" , "status_code","hierarchy"]
21     return
22    
23     def configure(self, cfg_params):
24    
25     try:
26     RB = cfg_params["EDG.rb"]
27     edgConfig = EdgConfig(RB)
28     self.edg_config = edgConfig.config()
29     self.edg_config_vo = edgConfig.configVO()
30     except KeyError:
31     self.edg_config = ''
32     self.edg_config_vo = ''
33    
34     try:
35     self.proxyServer = cfg_params["EDG.proxy_server"]
36     except KeyError:
37     self.proxyServer = 'myproxy.cern.ch'
38     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39    
40     try: self.LCG_version = cfg_params["EDG.lcg_version"]
41     except KeyError: self.LCG_version = '2'
42    
43     try: self.EDG_requirements = cfg_params['EDG.requirements']
44     except KeyError: self.EDG_requirements = ''
45    
46     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
47     except KeyError: self.EDG_retry_count = ''
48    
49     try:
50     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
51     except KeyError:
52     self.EDG_ce_black_list = ''
53    
54     try:
55     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
56     except KeyError: self.EDG_ce_white_list = ''
57    
58     try: self.VO = cfg_params['EDG.virtual_organization']
59     except KeyError: self.VO = 'cms'
60    
61     try: self.return_data = cfg_params['USER.return_data']
62     except KeyError: self.return_data = 1
63    
64     try:
65     self.copy_input_data = common.analisys_common_info['copy_input_data']
66     except KeyError: self.copy_input_data = 0
67    
68     try:
69     self.copy_data = cfg_params["USER.copy_data"]
70     if int(self.copy_data) == 1:
71     try:
72     self.SE = cfg_params['USER.storage_element']
73     self.SE_PATH = cfg_params['USER.storage_path']
74     except KeyError:
75     msg = "Error. The [USER] section does not have 'storage_element'"
76     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
77     common.logger.message(msg)
78     raise CrabException(msg)
79     except KeyError: self.copy_data = 0
80    
81     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
82     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
83     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
84     raise CrabException(msg)
85    
86     try:
87     self.lfc_host = cfg_params['EDG.lfc_host']
88     except KeyError:
89     msg = "Error. The [EDG] section does not have 'lfc_host' value"
90     msg = msg + " it's necessary to know the LFC host name"
91     common.logger.message(msg)
92     raise CrabException(msg)
93     try:
94     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
95     except KeyError:
96     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
97     msg = msg + " it's necessary to know the catalog type"
98     common.logger.message(msg)
99     raise CrabException(msg)
100     try:
101     self.lfc_home = cfg_params['EDG.lfc_home']
102     except KeyError:
103     msg = "Error. The [EDG] section does not have 'lfc_home' value"
104     msg = msg + " it's necessary to know the home catalog dir"
105     common.logger.message(msg)
106     raise CrabException(msg)
107    
108     try:
109     self.register_data = cfg_params["USER.register_data"]
110     if int(self.register_data) == 1:
111     try:
112     self.LFN = cfg_params['USER.lfn_dir']
113     except KeyError:
114     msg = "Error. The [USER] section does not have 'lfn_dir' value"
115     msg = msg + " it's necessary for LCF registration"
116     common.logger.message(msg)
117     raise CrabException(msg)
118     except KeyError: self.register_data = 0
119    
120     if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
121     msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
122     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
123     common.logger.message(msg)
124     raise CrabException(msg)
125    
126     try: self.EDG_requirements = cfg_params['EDG.requirements']
127     except KeyError: self.EDG_requirements = ''
128    
129     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
130     except KeyError: self.EDG_retry_count = ''
131    
132     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
133     except KeyError: self.EDG_clock_time= ''
134    
135     try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
136     except KeyError: self.EDG_cpu_time = ''
137    
138 spiga 1.1.2.2
139 spiga 1.1.2.3 try: self.schedulerName = cfg_params['CRAB.scheduler']
140     except KeyError: self.schedulerName = ''
141 spiga 1.1.2.2
142     try: self.jobtypeName = cfg_params['CRAB.jobtype']
143     except KeyError: self.jobtypeName = ''
144    
145 spiga 1.1.2.1 # Add EDG_WL_LOCATION to the python path
146     try:
147     path = os.environ['EDG_WL_LOCATION']
148     except:
149     msg = "Error: the EDG_WL_LOCATION variable is not set."
150     raise CrabException(msg)
151    
152     libPath=os.path.join(path, "lib")
153     sys.path.append(libPath)
154     libPath=os.path.join(path, "lib", "python")
155     sys.path.append(libPath)
156    
157     self.proxyValid=0
158 spiga 1.1.2.3.2.1
159     try:
160     self._taskId = cfg_params['taskId']
161     except:
162     self._taskId = ''
163    
164     try: self.jobtypeName = cfg_params['CRAB.jobtype']
165     except KeyError: self.jobtypeName = ''
166    
167     try: self.schedulerName = cfg_params['CRAB.scheduler']
168     except KeyError: self.scheduler = ''
169    
170 spiga 1.1.2.1 return
171    
172    
173     def sched_parameter(self):
174     """
175     Returns file with scheduler-specific parameters
176     """
177     if (self.edg_config and self.edg_config_vo != ''):
178     self.param='sched_param.clad'
179     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
180     param_file.write('RBconfig = "'+self.edg_config+'";\n')
181     param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
182     param_file.close()
183     return 1
184     else:
185     return 0
186    
187     def wsSetupEnvironment(self):
188     """
189     Returns part of a job script which does scheduler-specific work.
190     """
191     txt = ''
192 spiga 1.1.2.3.2.1 txt += '# strip arguments\n'
193     txt += 'echo "strip arguments"\n'
194     txt += 'args=("$@")\n'
195     txt += 'nargs=$#\n'
196     txt += 'shift $nargs\n'
197     txt += "# job number (first parameter for job wrapper)\n"
198     #txt += "NJob=$1\n"
199     txt += "NJob=${args[0]}\n"
200    
201     txt += '# job identification to DashBoard \n'
202     txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
203     txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
204     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
205     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
206     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
207     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
208    
209 spiga 1.1.2.1 txt += 'echo "middleware discovery " \n'
210     txt += 'if [ $VO_CMS_SW_DIR ]; then\n'
211     txt += ' middleware=LCG \n'
212 spiga 1.1.2.3.2.1 txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
213     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
214 spiga 1.1.2.1 txt += ' echo "middleware =$middleware" \n'
215     txt += 'elif [ $GRID3_APP_DIR ]; then\n'
216     txt += ' middleware=OSG \n'
217 spiga 1.1.2.3.2.1 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
218     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
219 spiga 1.1.2.1 txt += ' echo "middleware =$middleware" \n'
220     txt += 'elif [ $OSG_APP ]; then \n'
221     txt += ' middleware=OSG \n'
222 spiga 1.1.2.3.2.1 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
223     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
224 spiga 1.1.2.1 txt += ' echo "middleware =$middleware" \n'
225     txt += 'else \n'
226 spiga 1.1.2.3.2.1 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
227     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
228     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
229     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
230     txt += ' rm -f $RUNTIME_AREA/$repo \n'
231     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
232     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
233     txt += ' exit 1 \n'
234     txt += 'fi \n'
235    
236     txt += '# report first time to DashBoard \n'
237     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
238     txt += 'rm -f $RUNTIME_AREA/$repo \n'
239     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
240     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
241    
242 spiga 1.1.2.1 txt += '\n\n'
243    
244     if int(self.copy_data) == 1:
245     if self.SE:
246     txt += 'export SE='+self.SE+'\n'
247     txt += 'echo "SE = $SE"\n'
248     if self.SE_PATH:
249     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
250     txt += 'export SE_PATH='+self.SE_PATH+'\n'
251     txt += 'echo "SE_PATH = $SE_PATH"\n'
252    
253     txt += 'export VO='+self.VO+'\n'
254     ### FEDE: add some line for LFC catalog setting
255     txt += 'if [ $middleware == LCG ]; then \n'
256     txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
257     txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
258     txt += ' fi\n'
259     txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
260     txt += ' export LFC_HOST='+self.lfc_host+'\n'
261     txt += ' fi\n'
262     txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
263     txt += ' export LFC_HOME='+self.lfc_home+'\n'
264     txt += ' fi\n'
265     txt += 'elif [ $middleware == OSG ]; then\n'
266     txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
267     txt += 'fi\n'
268     #####
269     if int(self.register_data) == 1:
270     txt += 'if [ $middleware == LCG ]; then \n'
271     txt += ' export LFN='+self.LFN+'\n'
272     txt += ' lfc-ls $LFN\n'
273     txt += ' result=$?\n'
274     txt += ' echo $result\n'
275     ### creation of LFN dir in LFC catalog, under /grid/cms dir
276     txt += ' if [ $result != 0 ]; then\n'
277     txt += ' lfc-mkdir $LFN\n'
278     txt += ' result=$?\n'
279     txt += ' echo $result\n'
280     txt += ' fi\n'
281     txt += 'elif [ $middleware == OSG ]; then\n'
282     txt += ' echo " Files registration to be implemented for OSG"\n'
283     txt += 'fi\n'
284     txt += '\n'
285    
286     if self.VO:
287     txt += 'export VO='+self.VO+'\n'
288     if self.LFN:
289     txt += 'if [ $middleware == LCG ]; then \n'
290     txt += ' export LFN='+self.LFN+'\n'
291     txt += 'fi\n'
292     txt += '\n'
293    
294     txt += 'if [ $middleware == LCG ]; then\n'
295     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
296     txt += ' echo "CloseCEs = $CloseCEs"\n'
297     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
298     txt += ' echo "CE = $CE"\n'
299     txt += 'elif [ $middleware == OSG ]; then \n'
300     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
301     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
302     txt += ' else \n'
303 spiga 1.1.2.3.2.1 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
304     txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
305     txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
306     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
307     txt += ' rm -f $RUNTIME_AREA/$repo \n'
308     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
309     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
310 spiga 1.1.2.1 txt += ' exit 1 \n'
311     txt += ' fi \n'
312     txt += 'fi \n'
313    
314     return txt
315    
316     def wsCopyInput(self):
317     """
318     Copy input data from SE to WN
319     """
320     txt = ''
321     try:
322     self.copy_input_data = common.analisys_common_info['copy_input_data']
323     except KeyError: self.copy_input_data = 0
324     if int(self.copy_input_data) == 1:
325     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
326     txt += 'if [ $middleware == OSG ]; then\n'
327     txt += ' #\n'
328     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
329     txt += ' #\n'
330     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
331     txt += 'elif [ $middleware == LCG ]; then \n'
332     txt += ' #\n'
333     txt += ' # Copy Input Data from SE to this WN\n'
334     txt += ' #\n'
335 spiga 1.1.2.3.2.1 ### changed by georgia (put a loop copying more than one input files per jobs)
336 spiga 1.1.2.1 txt += ' for input_file in $cur_file_list \n'
337     txt += ' do \n'
338 spiga 1.1.2.3.2.1 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
339     txt += ' copy_input_exit_status=$?\n'
340     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
341     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
342     txt += ' echo "Problems with copying to WN" \n'
343     txt += ' else \n'
344     txt += ' echo "input copied into WN" \n'
345     txt += ' fi \n'
346 spiga 1.1.2.1 txt += ' done \n'
347 spiga 1.1.2.3.2.1 ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
348 spiga 1.1.2.1 txt += ' for file in $cur_pu_list \n'
349     txt += ' do \n'
350 spiga 1.1.2.3.2.1 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
351     txt += ' copy_input_pu_exit_status=$?\n'
352     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
353     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
354     txt += ' echo "Problems with copying pu to WN" \n'
355     txt += ' else \n'
356     txt += ' echo "input pu files copied into WN" \n'
357     txt += ' fi \n'
358 spiga 1.1.2.1 txt += ' done \n'
359     txt += ' \n'
360     txt += ' ### Check SCRATCH space available on WN : \n'
361     txt += ' df -h \n'
362     txt += 'fi \n'
363    
364     return txt
365    
366     def wsCopyOutput(self):
367     """
368     Write a CopyResults part of a job script, e.g.
369     to copy produced output into a storage element.
370     """
371     txt = ''
372     if int(self.copy_data) == 1:
373     txt += '#\n'
374     txt += '# Copy output to SE = $SE\n'
375     txt += '#\n'
376 spiga 1.1.2.3.2.1 txt += ' if [ $middleware == OSG ]; then\n'
377     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
378     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
379     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
380     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
381     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
382     txt += ' fi \n'
383 spiga 1.1.2.1 txt += ' for out_file in $file_list ; do\n'
384 spiga 1.1.2.3.2.1 txt += ' echo "Trying to copy output file to $SE using lcg-cp"\n'
385     txt += ' echo "lcg-cp --vo $VO -t 1200 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
386     txt += ' exitstring=`lcg-cp --vo $VO -t 1200 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
387     txt += ' copy_exit_status=$?\n'
388     txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
389 spiga 1.1.2.1 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
390     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
391     txt += ' echo "Problems with SE = $SE"\n'
392     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
393     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
394 spiga 1.1.2.3.2.1 txt += ' echo "lcg-cp failed, attempting srmcp"\n'
395     txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
396     txt += ' mkdir -p $HOME/.srmconfig\n'
397     txt += ' if [ $middleware == LCG ]; then\n'
398     txt += ' echo "srmcp -retry_num 5 -retry_timeout 240000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
399     txt += ' exitstring=`srmcp -retry_num 5 -retry_timeout 240000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
400     txt += ' elif [ $middleware == OSG ]; then\n'
401     txt += ' echo "srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
402     txt += ' exitstring=`srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
403     txt += ' fi \n'
404     txt += ' copy_exit_status=$?\n'
405     txt += ' echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
406     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
407     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
408     txt += ' echo "Problems with SE = $SE"\n'
409     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
410     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
411     txt += ' echo "lcg-cp and srm failed"\n'
412     txt += ' echo "If storage_path in your config file contains a ? you may need a \? instead."\n'
413     txt += ' else\n'
414     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
415     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
416     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
417     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
418     txt += ' echo "srmcp succeeded"\n'
419     txt += ' fi\n'
420 spiga 1.1.2.1 txt += ' else\n'
421     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
422     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
423     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
424     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
425 spiga 1.1.2.3.2.1 txt += ' echo "lcg-cp succeeded"\n'
426 spiga 1.1.2.1 txt += ' fi\n'
427     txt += ' done\n'
428     return txt
429    
430     def wsRegisterOutput(self):
431     """
432     Returns part of a job script which does scheduler-specific work.
433     """
434    
435     txt = ''
436     if int(self.register_data) == 1:
437     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
438     txt += 'if [ $middleware == OSG ]; then\n'
439     txt += ' #\n'
440     txt += ' # Register output to LFC deactivated in OSG mode\n'
441     txt += ' #\n'
442     txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
443     txt += 'elif [ $middleware == LCG ]; then \n'
444     txt += '#\n'
445     txt += '# Register output to LFC\n'
446     txt += '#\n'
447 spiga 1.1.2.3.2.1 txt += ' if [ $copy_exit_status -eq 0 ]; then\n'
448 spiga 1.1.2.1 txt += ' for out_file in $file_list ; do\n'
449     txt += ' echo "Trying to register the output file into LFC"\n'
450 spiga 1.1.2.3.2.1 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1"\n'
451     txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1 \n'
452 spiga 1.1.2.1 txt += ' register_exit_status=$?\n'
453     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
454     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
455     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
456     txt += ' echo "Problems with the registration to LFC" \n'
457     txt += ' echo "Try with srm protocol" \n'
458 spiga 1.1.2.3.2.1 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1"\n'
459     txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1 \n'
460 spiga 1.1.2.1 txt += ' register_exit_status=$?\n'
461     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
462     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
463     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
464     txt += ' echo "Problems with the registration into LFC" \n'
465     txt += ' fi \n'
466     txt += ' else \n'
467     txt += ' echo "output registered to LFC"\n'
468     txt += ' fi \n'
469     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
470     txt += ' done\n'
471 spiga 1.1.2.3.2.1 txt += ' else \n'
472 spiga 1.1.2.1 txt += ' echo "Trying to copy output file to CloseSE"\n'
473     txt += ' CLOSE_SE=`glite-brokerinfo getCloseSEs | head -1`\n'
474     txt += ' for out_file in $file_list ; do\n'
475 spiga 1.1.2.3.2.1 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1" \n'
476     txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1 \n'
477 spiga 1.1.2.1 txt += ' register_exit_status=$?\n'
478     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
479     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
480     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
481 spiga 1.1.2.3.2.1 txt += ' echo "Problems with CloseSE or Catalog" \n'
482 spiga 1.1.2.1 txt += ' else \n'
483     txt += ' echo "The program was successfully executed"\n'
484     txt += ' echo "SE = $CLOSE_SE"\n'
485     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
486     txt += ' fi \n'
487     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
488     txt += ' done\n'
489     txt += ' fi \n'
490 spiga 1.1.2.3.2.1 txt += ' exit_status=$register_exit_status\n'
491 spiga 1.1.2.1 txt += 'fi \n'
492     return txt
493    
494     def loggingInfo(self, id):
495     """
496     retrieve the logging info from logging and bookkeeping and return it
497     """
498     self.checkProxy()
499     cmd = 'glite-job-logging-info -v 2 ' + id
500     #cmd_out = os.popen(cmd)
501     cmd_out = runCommand(cmd)
502     return cmd_out
503    
504    
505     def getExitStatus(self, id):
506     return self.getStatusAttribute_(id, 'exit_code')
507    
508     def queryStatus(self, id):
509     return self.getStatusAttribute_(id, 'status')
510    
511     def queryDest(self, id):
512     return self.getStatusAttribute_(id, 'destination')
513    
514    
515     def getStatusAttribute_(self, id, attr):
516     """ Query a status of the job with id """
517    
518     self.checkProxy()
519     hstates = {}
520     Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
521     # Bypass edg-job-status interfacing directly to C++ API
522     # Job attribute vector to retrieve status without edg-job-status
523     level = 0
524     # Instance of the Status class provided by LB API
525     jobStat = Status()
526     st = 0
527     jobStat.getStatus(id, level)
528     err, apiMsg = jobStat.get_error()
529     if err:
530     common.logger.debug(5,'Error caught' + apiMsg)
531     return None
532     else:
533     for i in range(len(self.states)):
534     # Fill an hash table with all information retrieved from LB API
535     hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
536     result = jobStat.loadStatus(st)[ self.states.index(attr) ]
537     return result
538    
539     def queryDetailedStatus(self, id):
540     """ Query a detailed status of the job with id """
541     cmd = 'glite-job-status '+id
542     cmd_out = runCommand(cmd)
543     return cmd_out
544    
545 spiga 1.1.2.3.2.1 ##### FEDE ######
546     def findSites_(self, n_tot_job):
547     itr4 = []
548     # print "n_tot_job = ", n_tot_job
549     for n in range(n_tot_job):
550     sites = common.jobDB.destination(n)
551     #job = common.job_list[n]
552     #jbt = job.type()
553     # print "common.jobDB.destination(n) = ", common.jobDB.destination(n)
554     # print "sites = ", sites
555     itr = ''
556     for site in sites:
557     #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
558     itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
559     pass
560     # remove last ||
561     itr = itr[0:-4]
562     itr4.append( itr )
563     # remove last ,
564     # print "itr4 = ", itr4
565     return itr4
566    
567     def createXMLSchScript(self, nj, argsList):
568     # def createXMLSchScript(self, nj):
569 spiga 1.1.2.1 """
570 spiga 1.1.2.3.2.1 Create a XML-file for BOSS4.
571 spiga 1.1.2.1 """
572 spiga 1.1.2.3.2.1 # job = common.job_list[nj]
573 spiga 1.1.2.2 """
574 spiga 1.1.2.3.2.1 INDY
575     [begin] da rivedere:
576     in particolare passerei il jobType ed eliminerei le dipendenze da job
577     """
578     index = nj - 1
579     job = common.job_list[index]
580 spiga 1.1.2.2 jbt = job.type()
581    
582 spiga 1.1.2.3.2.1 inp_sandbox = jbt.inputSandbox(index)
583     out_sandbox = jbt.outputSandbox(index)
584 spiga 1.1.2.1 """
585 spiga 1.1.2.3.2.1 [end] da rivedere
586 spiga 1.1.2.1 """
587 spiga 1.1.2.3.2.1
588 spiga 1.1.2.1
589     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
590     jt_string = ''
591    
592 spiga 1.1.2.2 xml_fname = str(self.jobtypeName)+'.xml'
593 spiga 1.1.2.1 xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
594    
595     #TaskName
596     dir = string.split(common.work_space.topDir(), '/')
597     taskName = dir[len(dir)-2]
598    
599     to_writeReq = ''
600     to_write = ''
601 spiga 1.1.2.3.2.1
602 spiga 1.1.2.1 req=' '
603 spiga 1.1.2.3.2.1 req = req + jbt.getRequirements(nj)
604    
605    
606     #sites = common.jobDB.destination(nj)
607     #if len(sites)>0 and sites[0]!="Any":
608     # req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
609     #req = req
610    
611 spiga 1.1.2.1 if self.EDG_requirements:
612     if (req == ' '):
613     req = req + self.EDG_requirements
614     else:
615     req = req + ' && ' + self.EDG_requirements
616     if self.EDG_ce_white_list:
617     ce_white_list = string.split(self.EDG_ce_white_list,',')
618     for i in range(len(ce_white_list)):
619     if i == 0:
620     if (req == ' '):
621     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
622     else:
623     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
624     pass
625     else:
626     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
627     req = req + ')'
628    
629     if self.EDG_ce_black_list:
630     ce_black_list = string.split(self.EDG_ce_black_list,',')
631     for ce in ce_black_list:
632     if (req == ' '):
633     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
634     else:
635     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
636     pass
637     if self.EDG_clock_time:
638     if (req == ' '):
639     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
640     else:
641     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
642    
643     if self.EDG_cpu_time:
644     if (req == ' '):
645     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
646     else:
647     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
648    
649 spiga 1.1.2.3.2.1 #if (req != ' '):
650     # req = req + '\n'
651     # to_writeReq = req
652    
653 spiga 1.1.2.1 if ( self.EDG_retry_count ):
654     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
655     pass
656    
657     to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
658     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
659    
660    
661     #TaskName
662     dir = string.split(common.work_space.topDir(), '/')
663     taskName = dir[len(dir)-2]
664    
665 spiga 1.1.2.3.2.1 xml.write(str(title))
666     xml.write('<task name="' +str(taskName)+'">\n')
667     xml.write(jt_string)
668 spiga 1.1.2.1
669 spiga 1.1.2.3.2.1 xml.write('<iterator>\n')
670    
671     #print str(nj)
672     # xml.write('\t<iteratorRule name="ITR1" rule="1:'+ str(nj) + '" />\n')
673     #print argsList
674     # xml.write('\t<iteratorRule name="ITR2" rule="'+ argsList + '" />\n')
675     #print jobList
676     # xml.write('\t<iteratorRule name="ITR3" rule="1:'+ str(nj) + ':6" />\n') #print str(nj)
677     xml.write('\t<iteratorRule name="ITR1">\n')
678     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
679     xml.write('\t</iteratorRule>\n')
680     xml.write('\t<iteratorRule name="ITR2">\n')
681     #print argsList
682     for arg in argsList:
683     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
684     pass
685     xml.write('\t</iteratorRule>\n')
686     #print jobList
687     xml.write('\t<iteratorRule name="ITR3">\n')
688     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
689     xml.write('\t</iteratorRule>\n')
690    
691     #### FEDE #####
692     '''
693     indy: qui sotto ci sta itr4
694     '''
695    
696     itr4=self.findSites_(nj)
697     #print "--->>> itr4 = ", itr4
698     if (itr4 != ''):
699     xml.write('\t<iteratorRule name="ITR4">\n')
700     #print argsList
701     for arg in itr4:
702     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
703 spiga 1.1.2.1 pass
704 spiga 1.1.2.3.2.1 xml.write('\t</iteratorRule>\n')
705     req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
706 spiga 1.1.2.1 pass
707 spiga 1.1.2.3.2.1 # print "--->>> req= ", req
708    
709     if (to_write != ''):
710     xml.write('<extraTags\n')
711     xml.write(to_write)
712     xml.write('/>\n')
713     pass
714    
715 spiga 1.1.2.2 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
716 spiga 1.1.2.1 xml.write(jt_string)
717    
718 spiga 1.1.2.3.2.1 if (req != ' '):
719     req = req + '\n'
720     xml.write('<extraTags>\n')
721     xml.write('<Requirements>\n')
722     xml.write('<![CDATA[\n')
723     xml.write(req)
724     xml.write(']]>\n')
725     xml.write('</Requirements>\n')
726     xml.write('</extraTags>\n')
727     pass
728    
729 spiga 1.1.2.1 #executable
730 spiga 1.1.2.3.2.1
731     """
732     INDY
733     script dipende dal jobType: dovrebbe essere semplice tirarlo fuori in altro modo
734     """
735 spiga 1.1.2.1 script = job.scriptFilename()
736 spiga 1.1.2.3.2.1 xml.write('<program>\n')
737     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
738 spiga 1.1.2.1 xml.write(jt_string)
739    
740    
741     ### only one .sh JDL has arguments:
742     ### Fabio
743 spiga 1.1.2.3.2.1 # xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "EDG") +'"\n')
744     xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
745     xml.write('<program_types> crabjob </program_types>\n')
746     inp_box = script + ','
747 spiga 1.1.2.1
748     if inp_sandbox != None:
749     for fl in inp_sandbox:
750     inp_box = inp_box + '' + fl + ','
751     pass
752     pass
753    
754     # Marco (VERY TEMPORARY ML STUFF)
755     inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
756     os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
757     os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
758     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
759     os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
760     # End Marco
761    
762     if (not jbt.additional_inbox_files == []):
763     inp_box = inp_box + ', '
764     for addFile in jbt.additional_inbox_files:
765     addFile = os.path.abspath(addFile)
766     inp_box = inp_box+''+addFile+','
767     pass
768    
769     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
770 spiga 1.1.2.3.2.1 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
771 spiga 1.1.2.1 xml.write(inp_box)
772    
773 spiga 1.1.2.3.2.1 base = jbt.name()
774     stdout = base + '__ITR3_.stdout'
775     stderr = base + '__ITR3_.stderr'
776 spiga 1.1.2.1
777 spiga 1.1.2.3.2.1 xml.write('<stderr> ' + stderr + '</stderr>\n')
778     xml.write('<stdout> ' + stdout + '</stdout>\n')
779    
780    
781     out_box = stdout + ',' + \
782     stderr + ',.BrokerInfo,'
783 spiga 1.1.2.1
784 spiga 1.1.2.3.2.1 """
785 spiga 1.1.2.1 if int(self.return_data) == 1:
786     if out_sandbox != None:
787     for fl in out_sandbox:
788     out_box = out_box + '' + fl + ','
789     pass
790     pass
791     pass
792 spiga 1.1.2.3.2.1 """
793    
794     """
795     INDY
796     qualcosa del genere andrebbe fatta per gli infiles
797     """
798     if int(self.return_data) == 1:
799     for fl in jbt.output_file:
800     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
801     pass
802     pass
803    
804 spiga 1.1.2.1 if out_box[-1] == ',' : out_box = out_box[:-1]
805 spiga 1.1.2.3.2.1 out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
806     xml.write(out_box)
807 spiga 1.1.2.1
808 spiga 1.1.2.3.2.1 xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
809 spiga 1.1.2.1
810 spiga 1.1.2.3.2.1 xml.write('</program>\n')
811 spiga 1.1.2.1 xml.write('</chain>\n')
812    
813 spiga 1.1.2.3.2.1 xml.write('</iterator>\n')
814     xml.write('</task>\n')
815 spiga 1.1.2.1
816     xml.close()
817     return
818    
819     def checkProxy(self):
820     """
821     Function to check the Globus proxy.
822     """
823     if (self.proxyValid): return
824     timeleft = -999
825     minTimeLeft=10*3600 # in seconds
826    
827     minTimeLeftServer = 100 # in hours
828    
829     mustRenew = 0
830     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
831     timeLeftServer = -999
832     if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
833     mustRenew = 1
834     else:
835     timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
836     if not timeLeftServer or not isInt(timeLeftServer):
837     mustRenew = 1
838     elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
839     mustRenew = 1
840     pass
841     pass
842    
843     if mustRenew:
844     common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
845     cmd = 'voms-proxy-init -voms cms -valid 96:00'
846     try:
847     # SL as above: damn it!
848     out = os.system(cmd)
849     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
850     except:
851     msg = "Unable to create a valid proxy!\n"
852     raise CrabException(msg)
853     pass
854    
855     ## now I do have a voms proxy valid, and I check the myproxy server
856     renewProxy = 0
857     cmd = 'myproxy-info -d -s '+self.proxyServer
858     cmd_out = runCommand(cmd,0,20)
859     if not cmd_out:
860     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
861     renewProxy = 1
862     else:
863     # if myproxy exist but not long enough, renew
864     reTime = re.compile( r'timeleft: (\d+)' )
865     if reTime.match( cmd_out ):
866     time = reTime.search( line ).group(1)
867     if time < minTimeLeftServer:
868     renewProxy = 1
869     common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
870     pass
871     pass
872    
873     # if not, create one.
874     if renewProxy:
875     cmd = 'myproxy-init -d -n -s '+self.proxyServer
876     out = os.system(cmd)
877     if (out>0):
878     raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
879     pass
880    
881     # cache proxy validity
882     self.proxyValid=1
883     return
884    
885     def configOpt_(self):
886     edg_ui_cfg_opt = ' '
887     if self.edg_config:
888     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
889     if self.edg_config_vo:
890     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
891     return edg_ui_cfg_opt