ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.1.2.3.2.2
Committed: Mon Sep 18 14:37:22 2006 UTC (18 years, 7 months ago) by fanzago
Content type: text/x-python
Branch: CRAB_BOSS4_v1
Changes since 1.1.2.3.2.1: +2 -25 lines
Log Message:
SchedulerEdg and SchedulerGLite are similar, we can remove one of this,taking care of scheduler name

File Contents

# User Rev Content
1 spiga 1.1.2.1 from Scheduler import Scheduler
2     from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5     from EdgConfig import *
6     import common
7    
8     import os, sys, time
9    
10     class SchedulerGlite(Scheduler):
11     def __init__(self):
12     Scheduler.__init__(self,"GLITE")
13     self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14     "children_hist","children_num","children_states","condorId","condor_jdl", \
15     "cpuTime","destination", "done_code","exit_code","expectFrom", \
16     "expectUpdate","globusId","jdl","jobId","jobtype", \
17     "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18     "owner","parent_job", "reason","resubmitted","rsl","seed",\
19     "stateEnterTime","stateEnterTimes","subjob_failed", \
20     "user tags" , "status" , "status_code","hierarchy"]
21     return
22    
23     def configure(self, cfg_params):
24    
25     try:
26     RB = cfg_params["EDG.rb"]
27     edgConfig = EdgConfig(RB)
28     self.edg_config = edgConfig.config()
29     self.edg_config_vo = edgConfig.configVO()
30     except KeyError:
31     self.edg_config = ''
32     self.edg_config_vo = ''
33    
34     try:
35     self.proxyServer = cfg_params["EDG.proxy_server"]
36     except KeyError:
37     self.proxyServer = 'myproxy.cern.ch'
38     common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39    
40     try: self.LCG_version = cfg_params["EDG.lcg_version"]
41     except KeyError: self.LCG_version = '2'
42    
43     try: self.EDG_requirements = cfg_params['EDG.requirements']
44     except KeyError: self.EDG_requirements = ''
45    
46     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
47     except KeyError: self.EDG_retry_count = ''
48    
49     try:
50     self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
51     except KeyError:
52     self.EDG_ce_black_list = ''
53    
54     try:
55     self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
56     except KeyError: self.EDG_ce_white_list = ''
57    
58     try: self.VO = cfg_params['EDG.virtual_organization']
59     except KeyError: self.VO = 'cms'
60    
61     try: self.return_data = cfg_params['USER.return_data']
62     except KeyError: self.return_data = 1
63    
64     try:
65     self.copy_input_data = common.analisys_common_info['copy_input_data']
66     except KeyError: self.copy_input_data = 0
67    
68     try:
69     self.copy_data = cfg_params["USER.copy_data"]
70     if int(self.copy_data) == 1:
71     try:
72     self.SE = cfg_params['USER.storage_element']
73     self.SE_PATH = cfg_params['USER.storage_path']
74     except KeyError:
75     msg = "Error. The [USER] section does not have 'storage_element'"
76     msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
77     common.logger.message(msg)
78     raise CrabException(msg)
79     except KeyError: self.copy_data = 0
80    
81     if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
82     msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
83     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
84     raise CrabException(msg)
85    
86     try:
87     self.lfc_host = cfg_params['EDG.lfc_host']
88     except KeyError:
89     msg = "Error. The [EDG] section does not have 'lfc_host' value"
90     msg = msg + " it's necessary to know the LFC host name"
91     common.logger.message(msg)
92     raise CrabException(msg)
93     try:
94     self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
95     except KeyError:
96     msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
97     msg = msg + " it's necessary to know the catalog type"
98     common.logger.message(msg)
99     raise CrabException(msg)
100     try:
101     self.lfc_home = cfg_params['EDG.lfc_home']
102     except KeyError:
103     msg = "Error. The [EDG] section does not have 'lfc_home' value"
104     msg = msg + " it's necessary to know the home catalog dir"
105     common.logger.message(msg)
106     raise CrabException(msg)
107    
108     try:
109     self.register_data = cfg_params["USER.register_data"]
110     if int(self.register_data) == 1:
111     try:
112     self.LFN = cfg_params['USER.lfn_dir']
113     except KeyError:
114     msg = "Error. The [USER] section does not have 'lfn_dir' value"
115     msg = msg + " it's necessary for LCF registration"
116     common.logger.message(msg)
117     raise CrabException(msg)
118     except KeyError: self.register_data = 0
119    
120     if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
121     msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
122     msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
123     common.logger.message(msg)
124     raise CrabException(msg)
125    
126     try: self.EDG_requirements = cfg_params['EDG.requirements']
127     except KeyError: self.EDG_requirements = ''
128    
129     try: self.EDG_retry_count = cfg_params['EDG.retry_count']
130     except KeyError: self.EDG_retry_count = ''
131    
132     try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
133     except KeyError: self.EDG_clock_time= ''
134    
135     try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
136     except KeyError: self.EDG_cpu_time = ''
137    
138     # Add EDG_WL_LOCATION to the python path
139     try:
140     path = os.environ['EDG_WL_LOCATION']
141     except:
142     msg = "Error: the EDG_WL_LOCATION variable is not set."
143     raise CrabException(msg)
144    
145     libPath=os.path.join(path, "lib")
146     sys.path.append(libPath)
147     libPath=os.path.join(path, "lib", "python")
148     sys.path.append(libPath)
149    
150     self.proxyValid=0
151 spiga 1.1.2.3.2.1
152     try:
153     self._taskId = cfg_params['taskId']
154     except:
155     self._taskId = ''
156    
157     try: self.jobtypeName = cfg_params['CRAB.jobtype']
158     except KeyError: self.jobtypeName = ''
159    
160     try: self.schedulerName = cfg_params['CRAB.scheduler']
161     except KeyError: self.scheduler = ''
162    
163 spiga 1.1.2.1 return
164    
165    
166     def sched_parameter(self):
167     """
168     Returns file with scheduler-specific parameters
169     """
170     if (self.edg_config and self.edg_config_vo != ''):
171     self.param='sched_param.clad'
172     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
173     param_file.write('RBconfig = "'+self.edg_config+'";\n')
174     param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
175     param_file.close()
176     return 1
177     else:
178     return 0
179    
180     def wsSetupEnvironment(self):
181     """
182     Returns part of a job script which does scheduler-specific work.
183     """
184     txt = ''
185 spiga 1.1.2.3.2.1 txt += '# strip arguments\n'
186     txt += 'echo "strip arguments"\n'
187     txt += 'args=("$@")\n'
188     txt += 'nargs=$#\n'
189     txt += 'shift $nargs\n'
190     txt += "# job number (first parameter for job wrapper)\n"
191     #txt += "NJob=$1\n"
192     txt += "NJob=${args[0]}\n"
193    
194     txt += '# job identification to DashBoard \n'
195     txt += 'MonitorJobID=`echo ${NJob}_$EDG_WL_JOBID`\n'
196     txt += 'SyncGridJobId=`echo $EDG_WL_JOBID`\n'
197     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
198     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
199     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
200     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
201    
202 spiga 1.1.2.1 txt += 'echo "middleware discovery " \n'
203     txt += 'if [ $VO_CMS_SW_DIR ]; then\n'
204     txt += ' middleware=LCG \n'
205 spiga 1.1.2.3.2.1 txt += ' echo "SyncCE=`edg-brokerinfo getCE`" | tee -a $RUNTIME_AREA/$repo \n'
206     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
207 spiga 1.1.2.1 txt += ' echo "middleware =$middleware" \n'
208     txt += 'elif [ $GRID3_APP_DIR ]; then\n'
209     txt += ' middleware=OSG \n'
210 spiga 1.1.2.3.2.1 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
211     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
212 spiga 1.1.2.1 txt += ' echo "middleware =$middleware" \n'
213     txt += 'elif [ $OSG_APP ]; then \n'
214     txt += ' middleware=OSG \n'
215 spiga 1.1.2.3.2.1 txt += ' echo "SyncCE=`echo $EDG_WL_LOG_DESTINATION`" | tee -a $RUNTIME_AREA/$repo \n'
216     txt += ' echo "GridFlavour=`echo $middleware`" | tee -a $RUNTIME_AREA/$repo \n'
217 spiga 1.1.2.1 txt += ' echo "middleware =$middleware" \n'
218     txt += 'else \n'
219 spiga 1.1.2.3.2.1 txt += ' echo "SET_CMS_ENV 10030 ==> middleware not identified" \n'
220     txt += ' echo "JOB_EXIT_STATUS = 10030" \n'
221     txt += ' echo "JobExitCode=10030" | tee -a $RUNTIME_AREA/$repo \n'
222     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
223     txt += ' rm -f $RUNTIME_AREA/$repo \n'
224     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
225     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
226     txt += ' exit 1 \n'
227     txt += 'fi \n'
228    
229     txt += '# report first time to DashBoard \n'
230     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
231     txt += 'rm -f $RUNTIME_AREA/$repo \n'
232     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
233     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
234    
235 spiga 1.1.2.1 txt += '\n\n'
236    
237     if int(self.copy_data) == 1:
238     if self.SE:
239     txt += 'export SE='+self.SE+'\n'
240     txt += 'echo "SE = $SE"\n'
241     if self.SE_PATH:
242     if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
243     txt += 'export SE_PATH='+self.SE_PATH+'\n'
244     txt += 'echo "SE_PATH = $SE_PATH"\n'
245    
246     txt += 'export VO='+self.VO+'\n'
247     ### FEDE: add some line for LFC catalog setting
248     txt += 'if [ $middleware == LCG ]; then \n'
249     txt += ' if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
250     txt += ' export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
251     txt += ' fi\n'
252     txt += ' if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
253     txt += ' export LFC_HOST='+self.lfc_host+'\n'
254     txt += ' fi\n'
255     txt += ' if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
256     txt += ' export LFC_HOME='+self.lfc_home+'\n'
257     txt += ' fi\n'
258     txt += 'elif [ $middleware == OSG ]; then\n'
259     txt += ' echo "LFC catalog setting to be implemented for OSG"\n'
260     txt += 'fi\n'
261     #####
262     if int(self.register_data) == 1:
263     txt += 'if [ $middleware == LCG ]; then \n'
264     txt += ' export LFN='+self.LFN+'\n'
265     txt += ' lfc-ls $LFN\n'
266     txt += ' result=$?\n'
267     txt += ' echo $result\n'
268     ### creation of LFN dir in LFC catalog, under /grid/cms dir
269     txt += ' if [ $result != 0 ]; then\n'
270     txt += ' lfc-mkdir $LFN\n'
271     txt += ' result=$?\n'
272     txt += ' echo $result\n'
273     txt += ' fi\n'
274     txt += 'elif [ $middleware == OSG ]; then\n'
275     txt += ' echo " Files registration to be implemented for OSG"\n'
276     txt += 'fi\n'
277     txt += '\n'
278    
279     if self.VO:
280     txt += 'export VO='+self.VO+'\n'
281     if self.LFN:
282     txt += 'if [ $middleware == LCG ]; then \n'
283     txt += ' export LFN='+self.LFN+'\n'
284     txt += 'fi\n'
285     txt += '\n'
286    
287     txt += 'if [ $middleware == LCG ]; then\n'
288     txt += ' CloseCEs=`glite-brokerinfo getCE`\n'
289     txt += ' echo "CloseCEs = $CloseCEs"\n'
290     txt += ' CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
291     txt += ' echo "CE = $CE"\n'
292     txt += 'elif [ $middleware == OSG ]; then \n'
293     txt += ' if [ $OSG_JOB_CONTACT ]; then \n'
294     txt += ' CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
295     txt += ' else \n'
296 spiga 1.1.2.3.2.1 txt += ' echo "SET_CMS_ENV 10099 ==> OSG mode: ERROR in setting CE name from OSG_JOB_CONTACT" \n'
297     txt += ' echo "JOB_EXIT_STATUS = 10099" \n'
298     txt += ' echo "JobExitCode=10099" | tee -a $RUNTIME_AREA/$repo \n'
299     txt += ' dumpStatus $RUNTIME_AREA/$repo \n'
300     txt += ' rm -f $RUNTIME_AREA/$repo \n'
301     txt += ' echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
302     txt += ' echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
303 spiga 1.1.2.1 txt += ' exit 1 \n'
304     txt += ' fi \n'
305     txt += 'fi \n'
306    
307     return txt
308    
309     def wsCopyInput(self):
310     """
311     Copy input data from SE to WN
312     """
313     txt = ''
314     try:
315     self.copy_input_data = common.analisys_common_info['copy_input_data']
316     except KeyError: self.copy_input_data = 0
317     if int(self.copy_input_data) == 1:
318     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
319     txt += 'if [ $middleware == OSG ]; then\n'
320     txt += ' #\n'
321     txt += ' # Copy Input Data from SE to this WN deactivated in OSG mode\n'
322     txt += ' #\n'
323     txt += ' echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
324     txt += 'elif [ $middleware == LCG ]; then \n'
325     txt += ' #\n'
326     txt += ' # Copy Input Data from SE to this WN\n'
327     txt += ' #\n'
328 spiga 1.1.2.3.2.1 ### changed by georgia (put a loop copying more than one input files per jobs)
329 spiga 1.1.2.1 txt += ' for input_file in $cur_file_list \n'
330     txt += ' do \n'
331 spiga 1.1.2.3.2.1 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
332     txt += ' copy_input_exit_status=$?\n'
333     txt += ' echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
334     txt += ' if [ $copy_input_exit_status -ne 0 ]; then \n'
335     txt += ' echo "Problems with copying to WN" \n'
336     txt += ' else \n'
337     txt += ' echo "input copied into WN" \n'
338     txt += ' fi \n'
339 spiga 1.1.2.1 txt += ' done \n'
340 spiga 1.1.2.3.2.1 ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
341 spiga 1.1.2.1 txt += ' for file in $cur_pu_list \n'
342     txt += ' do \n'
343 spiga 1.1.2.3.2.1 txt += ' lcg-cp --vo $VO --verbose -t 1200 lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
344     txt += ' copy_input_pu_exit_status=$?\n'
345     txt += ' echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
346     txt += ' if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
347     txt += ' echo "Problems with copying pu to WN" \n'
348     txt += ' else \n'
349     txt += ' echo "input pu files copied into WN" \n'
350     txt += ' fi \n'
351 spiga 1.1.2.1 txt += ' done \n'
352     txt += ' \n'
353     txt += ' ### Check SCRATCH space available on WN : \n'
354     txt += ' df -h \n'
355     txt += 'fi \n'
356    
357     return txt
358    
359     def wsCopyOutput(self):
360     """
361     Write a CopyResults part of a job script, e.g.
362     to copy produced output into a storage element.
363     """
364     txt = ''
365     if int(self.copy_data) == 1:
366     txt += '#\n'
367     txt += '# Copy output to SE = $SE\n'
368     txt += '#\n'
369 spiga 1.1.2.3.2.1 txt += ' if [ $middleware == OSG ]; then\n'
370     txt += ' echo "X509_USER_PROXY = $X509_USER_PROXY"\n'
371     txt += ' echo "source $OSG_APP/glite/setup_glite_ui.sh"\n'
372     txt += ' source $OSG_APP/glite/setup_glite_ui.sh\n'
373     txt += ' export X509_CERT_DIR=$OSG_APP/glite/etc/grid-security/certificates\n'
374     txt += ' echo "export X509_CERT_DIR=$X509_CERT_DIR"\n'
375     txt += ' fi \n'
376 spiga 1.1.2.1 txt += ' for out_file in $file_list ; do\n'
377 spiga 1.1.2.3.2.1 txt += ' echo "Trying to copy output file to $SE using lcg-cp"\n'
378     txt += ' echo "lcg-cp --vo $VO -t 1200 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
379     txt += ' exitstring=`lcg-cp --vo $VO -t 1200 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
380     txt += ' copy_exit_status=$?\n'
381     txt += ' echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
382 spiga 1.1.2.1 txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
383     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
384 fanzago 1.1.2.3.2.2 txt += ' echo "Possible problem with SE = $SE"\n'
385 spiga 1.1.2.1 txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
386     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
387 spiga 1.1.2.3.2.1 txt += ' echo "lcg-cp failed, attempting srmcp"\n'
388     txt += ' echo "mkdir -p $HOME/.srmconfig"\n'
389     txt += ' mkdir -p $HOME/.srmconfig\n'
390     txt += ' if [ $middleware == LCG ]; then\n'
391     txt += ' echo "srmcp -retry_num 5 -retry_timeout 240000 file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
392     txt += ' exitstring=`srmcp -retry_num 5 -retry_timeout 240000 file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
393     txt += ' elif [ $middleware == OSG ]; then\n'
394     txt += ' echo "srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
395     txt += ' exitstring=`srmcp -retry_num 5 -retry_timeout 240000 -x509_user_trusted_certificates $OSG_APP/glite/etc/grid-security/certificates file:////\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
396     txt += ' fi \n'
397     txt += ' copy_exit_status=$?\n'
398     txt += ' echo "COPY_EXIT_STATUS for srm = $copy_exit_status"\n'
399     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
400     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
401     txt += ' echo "Problems with SE = $SE"\n'
402     txt += ' echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
403     txt += ' echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
404     txt += ' echo "lcg-cp and srm failed"\n'
405     txt += ' echo "If storage_path in your config file contains a ? you may need a \? instead."\n'
406     txt += ' else\n'
407     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
408     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
409     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
410     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
411     txt += ' echo "srmcp succeeded"\n'
412     txt += ' fi\n'
413 spiga 1.1.2.1 txt += ' else\n'
414     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
415     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
416     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
417     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
418 spiga 1.1.2.3.2.1 txt += ' echo "lcg-cp succeeded"\n'
419 spiga 1.1.2.1 txt += ' fi\n'
420     txt += ' done\n'
421     return txt
422    
423     def wsRegisterOutput(self):
424     """
425     Returns part of a job script which does scheduler-specific work.
426     """
427    
428     txt = ''
429     if int(self.register_data) == 1:
430     ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
431     txt += 'if [ $middleware == OSG ]; then\n'
432     txt += ' #\n'
433     txt += ' # Register output to LFC deactivated in OSG mode\n'
434     txt += ' #\n'
435     txt += ' echo "Register output to LFC deactivated in OSG mode"\n'
436     txt += 'elif [ $middleware == LCG ]; then \n'
437     txt += '#\n'
438     txt += '# Register output to LFC\n'
439     txt += '#\n'
440 spiga 1.1.2.3.2.1 txt += ' if [ $copy_exit_status -eq 0 ]; then\n'
441 spiga 1.1.2.1 txt += ' for out_file in $file_list ; do\n'
442     txt += ' echo "Trying to register the output file into LFC"\n'
443 spiga 1.1.2.3.2.1 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1"\n'
444     txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 sfn://$SE$SE_PATH/$out_file 2>&1 \n'
445 spiga 1.1.2.1 txt += ' register_exit_status=$?\n'
446     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
447     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
448     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
449     txt += ' echo "Problems with the registration to LFC" \n'
450     txt += ' echo "Try with srm protocol" \n'
451 spiga 1.1.2.3.2.1 txt += ' echo "lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1"\n'
452     txt += ' lcg-rf -l $LFN/$out_file --vo $VO -t 1200 srm://$SE$SE_PATH/$out_file 2>&1 \n'
453 spiga 1.1.2.1 txt += ' register_exit_status=$?\n'
454     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
455     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
456     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
457     txt += ' echo "Problems with the registration into LFC" \n'
458     txt += ' fi \n'
459     txt += ' else \n'
460     txt += ' echo "output registered to LFC"\n'
461     txt += ' fi \n'
462     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
463     txt += ' done\n'
464 spiga 1.1.2.3.2.1 txt += ' else \n'
465 spiga 1.1.2.1 txt += ' echo "Trying to copy output file to CloseSE"\n'
466     txt += ' CLOSE_SE=`glite-brokerinfo getCloseSEs | head -1`\n'
467     txt += ' for out_file in $file_list ; do\n'
468 spiga 1.1.2.3.2.1 txt += ' echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1" \n'
469     txt += ' lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://$RUNTIME_AREA/$out_file 2>&1 \n'
470 spiga 1.1.2.1 txt += ' register_exit_status=$?\n'
471     txt += ' echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
472     txt += ' echo "STAGE_OUT = $register_exit_status"\n'
473     txt += ' if [ $register_exit_status -ne 0 ]; then \n'
474 spiga 1.1.2.3.2.1 txt += ' echo "Problems with CloseSE or Catalog" \n'
475 spiga 1.1.2.1 txt += ' else \n'
476     txt += ' echo "The program was successfully executed"\n'
477     txt += ' echo "SE = $CLOSE_SE"\n'
478     txt += ' echo "LFN for the file is LFN=${LFN}/$out_file"\n'
479     txt += ' fi \n'
480     txt += ' echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
481     txt += ' done\n'
482     txt += ' fi \n'
483 spiga 1.1.2.3.2.1 txt += ' exit_status=$register_exit_status\n'
484 spiga 1.1.2.1 txt += 'fi \n'
485     return txt
486    
487     def loggingInfo(self, id):
488     """
489     retrieve the logging info from logging and bookkeeping and return it
490     """
491     self.checkProxy()
492     cmd = 'glite-job-logging-info -v 2 ' + id
493     #cmd_out = os.popen(cmd)
494     cmd_out = runCommand(cmd)
495     return cmd_out
496    
497     def getExitStatus(self, id):
498     return self.getStatusAttribute_(id, 'exit_code')
499    
500     def queryStatus(self, id):
501     return self.getStatusAttribute_(id, 'status')
502    
503     def queryDest(self, id):
504     return self.getStatusAttribute_(id, 'destination')
505    
506    
507     def getStatusAttribute_(self, id, attr):
508     """ Query a status of the job with id """
509    
510     self.checkProxy()
511     hstates = {}
512     Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
513     # Bypass edg-job-status interfacing directly to C++ API
514     # Job attribute vector to retrieve status without edg-job-status
515     level = 0
516     # Instance of the Status class provided by LB API
517     jobStat = Status()
518     st = 0
519     jobStat.getStatus(id, level)
520     err, apiMsg = jobStat.get_error()
521     if err:
522     common.logger.debug(5,'Error caught' + apiMsg)
523     return None
524     else:
525     for i in range(len(self.states)):
526     # Fill an hash table with all information retrieved from LB API
527     hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
528 fanzago 1.1.2.3.2.2 result = jobStat.loadStatus(st)[self.states.index(attr)]
529 spiga 1.1.2.1 return result
530    
531     def queryDetailedStatus(self, id):
532     """ Query a detailed status of the job with id """
533     cmd = 'glite-job-status '+id
534     cmd_out = runCommand(cmd)
535     return cmd_out
536    
537 spiga 1.1.2.3.2.1 ##### FEDE ######
538     def findSites_(self, n_tot_job):
539     itr4 = []
540     # print "n_tot_job = ", n_tot_job
541     for n in range(n_tot_job):
542     sites = common.jobDB.destination(n)
543     #job = common.job_list[n]
544     #jbt = job.type()
545     # print "common.jobDB.destination(n) = ", common.jobDB.destination(n)
546     # print "sites = ", sites
547     itr = ''
548     for site in sites:
549     #itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
550     itr = itr + 'target.GlueSEUniqueID=="'+site+'" || '
551     pass
552     # remove last ||
553     itr = itr[0:-4]
554     itr4.append( itr )
555     # remove last ,
556     # print "itr4 = ", itr4
557     return itr4
558    
559     def createXMLSchScript(self, nj, argsList):
560     # def createXMLSchScript(self, nj):
561 spiga 1.1.2.1 """
562 spiga 1.1.2.3.2.1 Create a XML-file for BOSS4.
563 spiga 1.1.2.1 """
564 spiga 1.1.2.3.2.1 # job = common.job_list[nj]
565 spiga 1.1.2.2 """
566 spiga 1.1.2.3.2.1 INDY
567     [begin] da rivedere:
568     in particolare passerei il jobType ed eliminerei le dipendenze da job
569     """
570     index = nj - 1
571     job = common.job_list[index]
572 spiga 1.1.2.2 jbt = job.type()
573    
574 spiga 1.1.2.3.2.1 inp_sandbox = jbt.inputSandbox(index)
575     out_sandbox = jbt.outputSandbox(index)
576 spiga 1.1.2.1 """
577 spiga 1.1.2.3.2.1 [end] da rivedere
578 spiga 1.1.2.1 """
579 spiga 1.1.2.3.2.1
580 spiga 1.1.2.1
581     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
582     jt_string = ''
583    
584 spiga 1.1.2.2 xml_fname = str(self.jobtypeName)+'.xml'
585 spiga 1.1.2.1 xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
586    
587     #TaskName
588     dir = string.split(common.work_space.topDir(), '/')
589     taskName = dir[len(dir)-2]
590    
591     to_writeReq = ''
592     to_write = ''
593 spiga 1.1.2.3.2.1
594 spiga 1.1.2.1 req=' '
595 spiga 1.1.2.3.2.1 req = req + jbt.getRequirements(nj)
596    
597    
598     #sites = common.jobDB.destination(nj)
599     #if len(sites)>0 and sites[0]!="Any":
600     # req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
601     #req = req
602    
603 spiga 1.1.2.1 if self.EDG_requirements:
604     if (req == ' '):
605     req = req + self.EDG_requirements
606     else:
607     req = req + ' && ' + self.EDG_requirements
608     if self.EDG_ce_white_list:
609     ce_white_list = string.split(self.EDG_ce_white_list,',')
610     for i in range(len(ce_white_list)):
611     if i == 0:
612     if (req == ' '):
613     req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
614     else:
615     req = req + ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
616     pass
617     else:
618     req = req + ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
619     req = req + ')'
620    
621     if self.EDG_ce_black_list:
622     ce_black_list = string.split(self.EDG_ce_black_list,',')
623     for ce in ce_black_list:
624     if (req == ' '):
625     req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
626     else:
627     req = req + ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
628     pass
629     if self.EDG_clock_time:
630     if (req == ' '):
631     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
632     else:
633     req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
634    
635     if self.EDG_cpu_time:
636     if (req == ' '):
637     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
638     else:
639     req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
640 spiga 1.1.2.3.2.1
641 spiga 1.1.2.1 if ( self.EDG_retry_count ):
642     to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
643     pass
644    
645     to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
646     to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
647    
648    
649     #TaskName
650     dir = string.split(common.work_space.topDir(), '/')
651     taskName = dir[len(dir)-2]
652    
653 spiga 1.1.2.3.2.1 xml.write(str(title))
654     xml.write('<task name="' +str(taskName)+'">\n')
655     xml.write(jt_string)
656 spiga 1.1.2.1
657 spiga 1.1.2.3.2.1 xml.write('<iterator>\n')
658     xml.write('\t<iteratorRule name="ITR1">\n')
659     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
660     xml.write('\t</iteratorRule>\n')
661     xml.write('\t<iteratorRule name="ITR2">\n')
662     for arg in argsList:
663     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
664     pass
665     xml.write('\t</iteratorRule>\n')
666     #print jobList
667     xml.write('\t<iteratorRule name="ITR3">\n')
668     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
669     xml.write('\t</iteratorRule>\n')
670    
671     '''
672     indy: qui sotto ci sta itr4
673     '''
674    
675     itr4=self.findSites_(nj)
676     #print "--->>> itr4 = ", itr4
677     if (itr4 != ''):
678     xml.write('\t<iteratorRule name="ITR4">\n')
679     #print argsList
680     for arg in itr4:
681     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
682 spiga 1.1.2.1 pass
683 spiga 1.1.2.3.2.1 xml.write('\t</iteratorRule>\n')
684     req = req + ' && anyMatch(other.storage.CloseSEs, (_ITR4_))'
685 spiga 1.1.2.1 pass
686 spiga 1.1.2.3.2.1 # print "--->>> req= ", req
687    
688     if (to_write != ''):
689     xml.write('<extraTags\n')
690     xml.write(to_write)
691     xml.write('/>\n')
692     pass
693    
694 spiga 1.1.2.2 xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
695 spiga 1.1.2.1 xml.write(jt_string)
696    
697 spiga 1.1.2.3.2.1 if (req != ' '):
698     req = req + '\n'
699     xml.write('<extraTags>\n')
700     xml.write('<Requirements>\n')
701     xml.write('<![CDATA[\n')
702     xml.write(req)
703     xml.write(']]>\n')
704     xml.write('</Requirements>\n')
705     xml.write('</extraTags>\n')
706     pass
707    
708 spiga 1.1.2.1 #executable
709 spiga 1.1.2.3.2.1
710     """
711     INDY
712     script dipende dal jobType: dovrebbe essere semplice tirarlo fuori in altro modo
713     """
714 spiga 1.1.2.1 script = job.scriptFilename()
715 spiga 1.1.2.3.2.1 xml.write('<program>\n')
716     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
717 spiga 1.1.2.1 xml.write(jt_string)
718    
719    
720     ### only one .sh JDL has arguments:
721     ### Fabio
722 spiga 1.1.2.3.2.1 # xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "EDG") +'"\n')
723     xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
724     xml.write('<program_types> crabjob </program_types>\n')
725     inp_box = script + ','
726 spiga 1.1.2.1
727     if inp_sandbox != None:
728     for fl in inp_sandbox:
729     inp_box = inp_box + '' + fl + ','
730     pass
731     pass
732    
733     inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
734     os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
735     os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
736     os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
737     os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
738    
739     if (not jbt.additional_inbox_files == []):
740     inp_box = inp_box + ', '
741     for addFile in jbt.additional_inbox_files:
742     addFile = os.path.abspath(addFile)
743     inp_box = inp_box+''+addFile+','
744     pass
745    
746     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
747 spiga 1.1.2.3.2.1 inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
748 spiga 1.1.2.1 xml.write(inp_box)
749    
750 spiga 1.1.2.3.2.1 base = jbt.name()
751     stdout = base + '__ITR3_.stdout'
752     stderr = base + '__ITR3_.stderr'
753 spiga 1.1.2.1
754 spiga 1.1.2.3.2.1 xml.write('<stderr> ' + stderr + '</stderr>\n')
755     xml.write('<stdout> ' + stdout + '</stdout>\n')
756    
757    
758     out_box = stdout + ',' + \
759     stderr + ',.BrokerInfo,'
760 spiga 1.1.2.1
761 spiga 1.1.2.3.2.1 """
762 spiga 1.1.2.1 if int(self.return_data) == 1:
763     if out_sandbox != None:
764     for fl in out_sandbox:
765     out_box = out_box + '' + fl + ','
766     pass
767     pass
768     pass
769 spiga 1.1.2.3.2.1 """
770    
771     """
772     INDY
773     qualcosa del genere andrebbe fatta per gli infiles
774     """
775     if int(self.return_data) == 1:
776     for fl in jbt.output_file:
777     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
778     pass
779     pass
780    
781 spiga 1.1.2.1 if out_box[-1] == ',' : out_box = out_box[:-1]
782 spiga 1.1.2.3.2.1 out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
783     xml.write(out_box)
784 spiga 1.1.2.1
785 spiga 1.1.2.3.2.1 xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
786 spiga 1.1.2.1
787 spiga 1.1.2.3.2.1 xml.write('</program>\n')
788 spiga 1.1.2.1 xml.write('</chain>\n')
789    
790 spiga 1.1.2.3.2.1 xml.write('</iterator>\n')
791     xml.write('</task>\n')
792 spiga 1.1.2.1
793     xml.close()
794     return
795    
796     def checkProxy(self):
797     """
798     Function to check the Globus proxy.
799     """
800     if (self.proxyValid): return
801     timeleft = -999
802     minTimeLeft=10*3600 # in seconds
803    
804     minTimeLeftServer = 100 # in hours
805    
806     mustRenew = 0
807     timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
808     timeLeftServer = -999
809     if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
810     mustRenew = 1
811     else:
812     timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
813     if not timeLeftServer or not isInt(timeLeftServer):
814     mustRenew = 1
815     elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
816     mustRenew = 1
817     pass
818     pass
819    
820     if mustRenew:
821     common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
822     cmd = 'voms-proxy-init -voms cms -valid 96:00'
823     try:
824     # SL as above: damn it!
825     out = os.system(cmd)
826     if (out>0): raise CrabException("Unable to create a valid proxy!\n")
827     except:
828     msg = "Unable to create a valid proxy!\n"
829     raise CrabException(msg)
830     pass
831    
832     ## now I do have a voms proxy valid, and I check the myproxy server
833     renewProxy = 0
834     cmd = 'myproxy-info -d -s '+self.proxyServer
835     cmd_out = runCommand(cmd,0,20)
836     if not cmd_out:
837     common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
838     renewProxy = 1
839     else:
840     # if myproxy exist but not long enough, renew
841     reTime = re.compile( r'timeleft: (\d+)' )
842     if reTime.match( cmd_out ):
843     time = reTime.search( line ).group(1)
844     if time < minTimeLeftServer:
845     renewProxy = 1
846     common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
847     pass
848     pass
849    
850     # if not, create one.
851     if renewProxy:
852     cmd = 'myproxy-init -d -n -s '+self.proxyServer
853     out = os.system(cmd)
854     if (out>0):
855     raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
856     pass
857    
858     # cache proxy validity
859     self.proxyValid=1
860     return
861    
862     def configOpt_(self):
863     edg_ui_cfg_opt = ' '
864     if self.edg_config:
865     edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
866     if self.edg_config_vo:
867     edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
868     return edg_ui_cfg_opt