ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerGlite.py (file contents):
Revision 1.1 by spiga, Wed Jul 19 14:58:22 2006 UTC vs.
Revision 1.1.2.1 by spiga, Wed Jul 19 14:58:22 2006 UTC

# Line 0 | Line 1
1 + from Scheduler import Scheduler
2 + from crab_logger import Logger
3 + from crab_exceptions import *
4 + from crab_util import *
5 + from EdgConfig import *
6 + import common
7 +
8 + import os, sys, time
9 +
10 + class SchedulerGlite(Scheduler):
11 +    def __init__(self):
12 +        Scheduler.__init__(self,"GLITE")
13 +        self.states = [ "Acl", "cancelReason", "cancelling","ce_node","children", \
14 +                      "children_hist","children_num","children_states","condorId","condor_jdl", \
15 +                      "cpuTime","destination", "done_code","exit_code","expectFrom", \
16 +                      "expectUpdate","globusId","jdl","jobId","jobtype", \
17 +                      "lastUpdateTime","localId","location", "matched_jdl","network_server", \
18 +                      "owner","parent_job", "reason","resubmitted","rsl","seed",\
19 +                      "stateEnterTime","stateEnterTimes","subjob_failed", \
20 +                      "user tags" , "status" , "status_code","hierarchy"]
21 +        return
22 +
23 +    def configure(self, cfg_params):
24 +
25 +        try:
26 +            RB = cfg_params["EDG.rb"]
27 +            edgConfig = EdgConfig(RB)
28 +            self.edg_config = edgConfig.config()
29 +            self.edg_config_vo = edgConfig.configVO()
30 +        except KeyError:
31 +            self.edg_config = ''
32 +            self.edg_config_vo = ''
33 +
34 +        try:
35 +            self.proxyServer = cfg_params["EDG.proxy_server"]
36 +        except KeyError:
37 +            self.proxyServer = 'myproxy.cern.ch'
38 +        common.logger.debug(5,'Setting myproxy server to '+self.proxyServer)
39 +
40 +        try: self.LCG_version = cfg_params["EDG.lcg_version"]
41 +        except KeyError: self.LCG_version = '2'
42 +
43 +        try: self.EDG_requirements = cfg_params['EDG.requirements']
44 +        except KeyError: self.EDG_requirements = ''
45 +
46 +        try: self.EDG_retry_count = cfg_params['EDG.retry_count']
47 +        except KeyError: self.EDG_retry_count = ''
48 +
49 +        try:
50 +            self.EDG_ce_black_list = cfg_params['EDG.ce_black_list']
51 +        except KeyError:
52 +            self.EDG_ce_black_list  = ''
53 +
54 +        try:
55 +            self.EDG_ce_white_list = cfg_params['EDG.ce_white_list']
56 +        except KeyError: self.EDG_ce_white_list = ''
57 +
58 +        try: self.VO = cfg_params['EDG.virtual_organization']
59 +        except KeyError: self.VO = 'cms'
60 +
61 +        try: self.return_data = cfg_params['USER.return_data']
62 +        except KeyError: self.return_data = 1
63 +
64 +        try:
65 +             self.copy_input_data = common.analisys_common_info['copy_input_data']
66 +        except KeyError: self.copy_input_data = 0
67 +
68 +        try:
69 +            self.copy_data = cfg_params["USER.copy_data"]
70 +            if int(self.copy_data) == 1:
71 +                try:
72 +                    self.SE = cfg_params['USER.storage_element']
73 +                    self.SE_PATH = cfg_params['USER.storage_path']
74 +                except KeyError:
75 +                    msg = "Error. The [USER] section does not have 'storage_element'"
76 +                    msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
77 +                    common.logger.message(msg)
78 +                    raise CrabException(msg)
79 +        except KeyError: self.copy_data = 0
80 +
81 +        if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
82 +           msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
83 +           msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
84 +           raise CrabException(msg)
85 +
86 +        try:
87 +            self.lfc_host = cfg_params['EDG.lfc_host']
88 +        except KeyError:
89 +            msg = "Error. The [EDG] section does not have 'lfc_host' value"
90 +            msg = msg + " it's necessary to know the LFC host name"
91 +            common.logger.message(msg)
92 +            raise CrabException(msg)
93 +        try:
94 +            self.lcg_catalog_type = cfg_params['EDG.lcg_catalog_type']
95 +        except KeyError:
96 +            msg = "Error. The [EDG] section does not have 'lcg_catalog_type' value"
97 +            msg = msg + " it's necessary to know the catalog type"
98 +            common.logger.message(msg)
99 +            raise CrabException(msg)
100 +        try:
101 +            self.lfc_home = cfg_params['EDG.lfc_home']
102 +        except KeyError:
103 +            msg = "Error. The [EDG] section does not have 'lfc_home' value"
104 +            msg = msg + " it's necessary to know the home catalog dir"
105 +            common.logger.message(msg)
106 +            raise CrabException(msg)
107 +      
108 +        try:
109 +            self.register_data = cfg_params["USER.register_data"]
110 +            if int(self.register_data) == 1:
111 +                try:
112 +                    self.LFN = cfg_params['USER.lfn_dir']
113 +                except KeyError:
114 +                    msg = "Error. The [USER] section does not have 'lfn_dir' value"
115 +                    msg = msg + " it's necessary for LCF registration"
116 +                    common.logger.message(msg)
117 +                    raise CrabException(msg)
118 +        except KeyError: self.register_data = 0
119 +
120 +        if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
121 +           msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
122 +           msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
123 +           common.logger.message(msg)
124 +           raise CrabException(msg)
125 +
126 +        try: self.EDG_requirements = cfg_params['EDG.requirements']
127 +        except KeyError: self.EDG_requirements = ''
128 +                                                                                                                                                            
129 +        try: self.EDG_retry_count = cfg_params['EDG.retry_count']
130 +        except KeyError: self.EDG_retry_count = ''
131 +                                                                                                                                                            
132 +        try: self.EDG_clock_time = cfg_params['EDG.max_wall_clock_time']
133 +        except KeyError: self.EDG_clock_time= ''
134 +                                                                                                                                                            
135 +        try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
136 +        except KeyError: self.EDG_cpu_time = ''
137 +
138 +        # Add EDG_WL_LOCATION to the python path
139 +        try:
140 +            path = os.environ['EDG_WL_LOCATION']
141 +        except:
142 +            msg = "Error: the EDG_WL_LOCATION variable is not set."
143 +            raise CrabException(msg)
144 +
145 +        libPath=os.path.join(path, "lib")
146 +        sys.path.append(libPath)
147 +        libPath=os.path.join(path, "lib", "python")
148 +        sys.path.append(libPath)
149 +
150 +        self.proxyValid=0
151 +        return
152 +    
153 +
154 +    def sched_parameter(self):
155 +        """
156 +        Returns file with scheduler-specific parameters
157 +        """
158 +        if (self.edg_config and self.edg_config_vo != ''):
159 +            self.param='sched_param.clad'
160 +            param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
161 +            param_file.write('RBconfig = "'+self.edg_config+'";\n')  
162 +            param_file.write('RBconfigVO = "'+self.edg_config_vo+'";')
163 +            param_file.close()  
164 +            return 1
165 +        else:
166 +            return 0
167 +
168 +    def wsSetupEnvironment(self):
169 +        """
170 +        Returns part of a job script which does scheduler-specific work.
171 +        """
172 +        txt = ''
173 +        txt += 'echo "middleware discovery " \n'
174 +        txt += 'if [ $VO_CMS_SW_DIR ]; then\n'
175 +        txt += '    middleware=LCG \n'
176 +        txt += '    echo "middleware =$middleware" \n'
177 +        txt += 'elif [ $GRID3_APP_DIR ]; then\n'
178 +        txt += '    middleware=OSG \n'
179 +        txt += '    echo "middleware =$middleware" \n'
180 +        txt += 'elif [ $OSG_APP ]; then \n'
181 +        txt += '    middleware=OSG \n'
182 +        txt += '    echo "middleware =$middleware" \n'
183 +        txt += 'else \n'
184 +        txt += '    echo "SET_CMS_ENV 1 ==> middleware not identified" \n'
185 +        txt += '    echo "JOB_EXIT_STATUS = 1"\n'
186 +        txt += '    exit 1\n'
187 +        txt += 'fi\n'
188 +
189 +        txt += '\n\n'
190 +
191 +        ### OLI: removed to move DashBoard reporting header to front of wrapper script
192 +        # txt += 'if [ $middleware == LCG ]; then \n'
193 +        # txt += '    echo "SyncGridJobId=`echo $EDG_WL_JOBID`" | tee -a $RUNTIME_AREA/$repo\n'
194 +        # txt += 'fi\n'
195 +
196 +        if int(self.copy_data) == 1:
197 +           if self.SE:
198 +              txt += 'export SE='+self.SE+'\n'
199 +              txt += 'echo "SE = $SE"\n'
200 +           if self.SE_PATH:
201 +              if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
202 +              txt += 'export SE_PATH='+self.SE_PATH+'\n'
203 +              txt += 'echo "SE_PATH = $SE_PATH"\n'
204 +
205 +        txt += 'export VO='+self.VO+'\n'
206 +        ### FEDE: add some line for LFC catalog setting
207 +        txt += 'if [ $middleware == LCG ]; then \n'
208 +        txt += '    if [[ $LCG_CATALOG_TYPE != \''+self.lcg_catalog_type+'\' ]]; then\n'
209 +        txt += '        export LCG_CATALOG_TYPE='+self.lcg_catalog_type+'\n'
210 +        txt += '    fi\n'
211 +        txt += '    if [[ $LFC_HOST != \''+self.lfc_host+'\' ]]; then\n'
212 +        txt += '        export LFC_HOST='+self.lfc_host+'\n'
213 +        txt += '    fi\n'
214 +        txt += '    if [[ $LFC_HOME != \''+self.lfc_home+'\' ]]; then\n'
215 +        txt += '        export LFC_HOME='+self.lfc_home+'\n'
216 +        txt += '    fi\n'
217 +        txt += 'elif [ $middleware == OSG ]; then\n'
218 +        txt += '    echo "LFC catalog setting to be implemented for OSG"\n'
219 +        txt += 'fi\n'
220 +        #####
221 +        if int(self.register_data) == 1:
222 +           txt += 'if [ $middleware == LCG ]; then \n'
223 +           txt += '    export LFN='+self.LFN+'\n'
224 +           txt += '    lfc-ls $LFN\n'
225 +           txt += '    result=$?\n'
226 +           txt += '    echo $result\n'
227 +           ### creation of LFN dir in LFC catalog, under /grid/cms dir  
228 +           txt += '    if [ $result != 0 ]; then\n'
229 +           txt += '       lfc-mkdir $LFN\n'
230 +           txt += '       result=$?\n'
231 +           txt += '       echo $result\n'
232 +           txt += '    fi\n'
233 +           txt += 'elif [ $middleware == OSG ]; then\n'
234 +           txt += '    echo " Files registration to be implemented for OSG"\n'
235 +           txt += 'fi\n'
236 +           txt += '\n'
237 +
238 +           if self.VO:
239 +              txt += 'export VO='+self.VO+'\n'
240 +           if self.LFN:
241 +              txt += 'if [ $middleware == LCG ]; then \n'
242 +              txt += '    export LFN='+self.LFN+'\n'
243 +              txt += 'fi\n'
244 +              txt += '\n'
245 +
246 +        txt += 'if [ $middleware == LCG ]; then\n'
247 +        txt += '    CloseCEs=`glite-brokerinfo getCE`\n'
248 +        txt += '    echo "CloseCEs = $CloseCEs"\n'
249 +        txt += '    CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
250 +        txt += '    echo "CE = $CE"\n'
251 +        txt += 'elif [ $middleware == OSG ]; then \n'
252 +        txt += '    if [ $OSG_JOB_CONTACT ]; then \n'
253 +        txt += '        CE=`echo $OSG_JOB_CONTACT | /usr/bin/awk -F\/ \'{print $1}\'` \n'
254 +        txt += '    else \n'
255 +        txt += '        echo "SET_ENV 1 ==> ERROR in setting CE name - OSG mode -" \n'
256 +        txt += '        exit 1 \n'
257 +        txt += '    fi \n'
258 +        txt += 'fi \n'
259 +
260 +        return txt
261 +
262 +    def wsCopyInput(self):
263 +        """
264 +        Copy input data from SE to WN    
265 +        """
266 +        txt = ''
267 +        try:
268 +            self.copy_input_data = common.analisys_common_info['copy_input_data']
269 +            #print "self.copy_input_data = ", self.copy_input_data
270 +        except KeyError: self.copy_input_data = 0
271 +        if int(self.copy_input_data) == 1:
272 +        ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
273 +           txt += 'if [ $middleware == OSG ]; then\n'
274 +           txt += '   #\n'
275 +           txt += '   #   Copy Input Data from SE to this WN deactivated in OSG mode\n'
276 +           txt += '   #\n'
277 +           txt += '   echo "Copy Input Data from SE to this WN deactivated in OSG mode"\n'
278 +           txt += 'elif [ $middleware == LCG ]; then \n'
279 +           txt += '   #\n'
280 +           txt += '   #   Copy Input Data from SE to this WN\n'
281 +           txt += '   #\n'
282 + ### changed by georgia (put a loop copying more than one input files per jobs)          
283 +           txt += '   for input_file in $cur_file_list \n'
284 +           txt += '   do \n'
285 +           txt += '    lcg-cp --vo $VO lfn:$input_lfn/$input_file file:`pwd`/$input_file 2>&1\n'
286 +           txt += '    copy_input_exit_status=$?\n'
287 +           txt += '    echo "COPY_INPUT_EXIT_STATUS = $copy_input_exit_status"\n'
288 +           txt += '    if [ $copy_input_exit_status -ne 0 ]; then \n'
289 +           txt += '       echo "Problems with copying to WN" \n'
290 +           txt += '    else \n'
291 +           txt += '       echo "input copied into WN" \n'
292 +           txt += '    fi \n'
293 +           txt += '   done \n'
294 + ### copy a set of PU ntuples (same for each jobs -- but accessed randomly)
295 +           txt += '   for file in $cur_pu_list \n'
296 +           txt += '   do \n'
297 +           txt += '    lcg-cp --vo $VO lfn:$pu_lfn/$file file:`pwd`/$file 2>&1\n'
298 +           txt += '    copy_input_exit_status=$?\n'
299 +           txt += '    echo "COPY_INPUT_PU_EXIT_STATUS = $copy_input_pu_exit_status"\n'
300 +           txt += '    if [ $copy_input_pu_exit_status -ne 0 ]; then \n'
301 +           txt += '       echo "Problems with copying pu to WN" \n'
302 +           txt += '    else \n'
303 +           txt += '       echo "input pu files copied into WN" \n'
304 +           txt += '    fi \n'
305 +           txt += '   done \n'
306 +           txt += '   \n'
307 +           txt += '   ### Check SCRATCH space available on WN : \n'
308 +           txt += '   df -h \n'
309 +           txt += 'fi \n'
310 +          
311 +        return txt
312 +
313 +    def wsCopyOutput(self):
314 +        """
315 +        Write a CopyResults part of a job script, e.g.
316 +        to copy produced output into a storage element.
317 +        """
318 +        txt = ''
319 +        if int(self.copy_data) == 1:
320 +           txt += '#\n'
321 +           txt += '#   Copy output to SE = $SE\n'
322 +           txt += '#\n'
323 +           txt += 'if [ $exe_result -eq 0 ]; then\n'
324 +           txt += '    for out_file in $file_list ; do\n'
325 +           txt += '        echo "Trying to copy output file to $SE "\n'
326 +           ## OLI_Daniele globus-* for OSG, lcg-* for LCG
327 +           txt += '        if [ $middleware == OSG ]; then\n'
328 +           txt += '           echo "globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
329 +           txt += '           copy_exit_status=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
330 +           #txt += '           exitstring=`globus-url-copy file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
331 +           txt += '        elif [ $middleware == LCG ]; then \n'
332 +           txt += '           echo "lcg-cp --vo cms -t 1200 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
333 +           txt += '           copy_exit_status=`lcg-cp --vo cms -t 1200 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
334 +           #txt += '           exitstring=`lcg-cp --vo cms -t 30 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
335 +           txt += '        fi \n'
336 +           #txt += '        copy_exit_status=$?\n'
337 +           txt += '        echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
338 +           txt += '        echo "STAGE_OUT = $copy_exit_status"\n'
339 +           txt += '        if [ $copy_exit_status -ne 0 ]; then\n'
340 +           txt += '            echo "Problems with SE = $SE"\n'
341 +           txt += '            echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
342 +           txt += '            echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
343 +           txt += '        else\n'
344 +           txt += '            echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
345 +           txt += '            echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
346 +           txt += '            echo "output copied into $SE/$SE_PATH directory"\n'
347 +           txt += '            echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
348 +           txt += '         fi\n'
349 +           txt += '     done\n'
350 +           txt += 'fi\n'
351 +        return txt
352 +
353 +    def wsRegisterOutput(self):
354 +        """
355 +        Returns part of a job script which does scheduler-specific work.
356 +        """
357 +
358 +        txt = ''
359 +        if int(self.register_data) == 1:
360 +        ## OLI_Daniele deactivate for OSG (wait for LCG UI installed on OSG)
361 +           txt += 'if [ $middleware == OSG ]; then\n'
362 +           txt += '   #\n'
363 +           txt += '   #   Register output to LFC deactivated in OSG mode\n'
364 +           txt += '   #\n'
365 +           txt += '   echo "Register output to LFC deactivated in OSG mode"\n'
366 +           txt += 'elif [ $middleware == LCG ]; then \n'
367 +           txt += '#\n'
368 +           txt += '#  Register output to LFC\n'
369 +           txt += '#\n'
370 +           txt += '   if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
371 +           txt += '      for out_file in $file_list ; do\n'
372 +           txt += '         echo "Trying to register the output file into LFC"\n'
373 +           txt += '         echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
374 +           txt += '         lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
375 +           txt += '         register_exit_status=$?\n'
376 +           txt += '         echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
377 +           txt += '         echo "STAGE_OUT = $register_exit_status"\n'
378 +           txt += '         if [ $register_exit_status -ne 0 ]; then \n'
379 +           txt += '            echo "Problems with the registration to LFC" \n'
380 +           txt += '            echo "Try with srm protocol" \n'
381 +           txt += '            echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
382 +           txt += '            lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
383 +           txt += '            register_exit_status=$?\n'
384 +           txt += '            echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
385 +           txt += '            echo "STAGE_OUT = $register_exit_status"\n'
386 +           txt += '            if [ $register_exit_status -ne 0 ]; then \n'
387 +           txt += '               echo "Problems with the registration into LFC" \n'
388 +           txt += '            fi \n'
389 +           txt += '         else \n'
390 +           txt += '            echo "output registered to LFC"\n'
391 +           txt += '         fi \n'
392 +           txt += '         echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
393 +           txt += '      done\n'
394 +           txt += '   elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
395 +           txt += '      echo "Trying to copy output file to CloseSE"\n'
396 +           txt += '      CLOSE_SE=`glite-brokerinfo getCloseSEs | head -1`\n'
397 +           txt += '      for out_file in $file_list ; do\n'
398 +           txt += '         echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
399 +           txt += '         lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
400 +           txt += '         register_exit_status=$?\n'
401 +           txt += '         echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
402 +           txt += '         echo "STAGE_OUT = $register_exit_status"\n'
403 +           txt += '         if [ $register_exit_status -ne 0 ]; then \n'
404 +           txt += '            echo "Problems with CloseSE" \n'
405 +           txt += '         else \n'
406 +           txt += '            echo "The program was successfully executed"\n'
407 +           txt += '            echo "SE = $CLOSE_SE"\n'
408 +           txt += '            echo "LFN for the file is LFN=${LFN}/$out_file"\n'
409 +           txt += '         fi \n'
410 +           txt += '         echo "StageOutExitStatus = $register_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
411 +           txt += '      done\n'
412 +           txt += '   else\n'
413 +           txt += '      echo "Problem with the executable"\n'
414 +           txt += '   fi \n'
415 +           txt += 'fi \n'
416 +        return txt
417 +
418 +    def loggingInfo(self, id):
419 +        """
420 +        retrieve the logging info from logging and bookkeeping and return it
421 +        """
422 +        self.checkProxy()
423 +        cmd = 'glite-job-logging-info -v 2 ' + id
424 +        #cmd_out = os.popen(cmd)
425 +        cmd_out = runCommand(cmd)
426 +        return cmd_out
427 +
428 +    def listMatch(self, nj):
429 +        """
430 +        Check the compatibility of available resources
431 +        """
432 +        self.checkProxy()
433 +        jdl = common.job_list[nj].jdlFilename()
434 +        cmd = 'glite-job-list-match ' + self.configOpt_() + jdl
435 +        cmd_out = runCommand(cmd,0,10)
436 +        if not cmd_out:
437 +            raise CrabException("ERROR: "+cmd+" failed!")
438 +
439 +        return self.parseListMatch_(cmd_out, jdl)
440 +
441 +    def parseListMatch_(self, out, jdl):
442 +        """
443 +        Parse the f* output of edg-list-match and produce something sensible
444 +        """
445 +        reComment = re.compile( r'^\**$' )
446 +        reEmptyLine = re.compile( r'^$' )
447 +        reVO = re.compile( r'Selected Virtual Organisation name.*' )
448 +        reLine = re.compile( r'.*')
449 +        reCE = re.compile( r'(.*:.*)')
450 +        reCEId = re.compile( r'CEId.*')
451 +        reNO = re.compile( r'No Computing Element matching' )
452 +        reRB = re.compile( r'Connecting to host' )
453 +        next = 0
454 +        CEs=[]
455 +        Match=0
456 +
457 +        #print out
458 +        lines = reLine.findall(out)
459 +
460 +        i=0
461 +        CEs=[]
462 +        for line in lines:
463 +            string.strip(line)
464 +            #print line
465 +            if reNO.match( line ):
466 +                common.logger.debug(5,line)
467 +                return 0
468 +                pass
469 +            if reVO.match( line ):
470 +                VO =reVO.match( line ).group()
471 +                common.logger.debug(5,"VO "+VO)
472 +                pass
473 +
474 +            if reRB.match( line ):
475 +                RB = reRB.match(line).group()
476 +                common.logger.debug(5,"RB "+RB)
477 +                pass
478 +
479 +            if reCEId.search( line ):
480 +                for lineCE in lines[i:-1]:
481 +                    if reCE.match( lineCE ):
482 +                        CE = string.strip(reCE.search(lineCE).group(1))
483 +                        CEs.append(CE.split(':')[0])
484 +                        pass
485 +                    pass
486 +                pass
487 +            i=i+1
488 +            pass
489 +
490 +        common.logger.debug(5,"All CE :"+str(CEs))
491 +
492 +        sites = []
493 +        [sites.append(it) for it in CEs if not sites.count(it)]
494 +
495 +        common.logger.debug(5,"All Sites :"+str(sites))
496 +        return len(sites)
497 +
498 +    def noMatchFound_(self, jdl):
499 +        reReq = re.compile( r'Requirements' )
500 +        reString = re.compile( r'"\S*"' )
501 +        f = file(jdl,'r')
502 +        for line in f.readlines():
503 +            line= line.strip()
504 +            if reReq.match(line):
505 +                for req in reString.findall(line):
506 +                    if re.search("VO",req):
507 +                        common.logger.message( "SW required: "+req)
508 +                        continue
509 +                    if re.search('"\d+',req):
510 +                        common.logger.message("Other req  : "+req)
511 +                        continue
512 +                    common.logger.message( "CE required: "+req)
513 +                break
514 +            pass
515 +        raise CrabException("No compatible resources found!")
516 +
517 +    def submit(self, nj):
518 +        """
519 +        Submit one EDG job.
520 +        """
521 +
522 +        self.checkProxy()
523 +        jid = None
524 +        jdl = common.job_list[nj].jdlFilename()
525 +
526 +        cmd = 'glite-job-submit ' + self.configOpt_() + jdl
527 +        cmd_out = runCommand(cmd)
528 +        if cmd_out != None:
529 +            reSid = re.compile( r'https.+' )
530 +            jid = reSid.search(cmd_out).group()
531 +            pass
532 +        return jid
533 +
534 +    def resubmit(self, nj_list):
535 +        """
536 +        Prepare jobs to be submit
537 +        """
538 +        return
539 +
540 +    def getExitStatus(self, id):
541 +        return self.getStatusAttribute_(id, 'exit_code')
542 +
543 +    def queryStatus(self, id):
544 +        return self.getStatusAttribute_(id, 'status')
545 +
546 +    def queryDest(self, id):  
547 +        return self.getStatusAttribute_(id, 'destination')
548 +
549 +
550 +    def getStatusAttribute_(self, id, attr):
551 +        """ Query a status of the job with id """
552 +
553 +        self.checkProxy()
554 +        hstates = {}
555 +        Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
556 +        # Bypass edg-job-status interfacing directly to C++ API
557 +        # Job attribute vector to retrieve status without edg-job-status
558 +        level = 0
559 +        # Instance of the Status class provided by LB API
560 +        jobStat = Status()
561 +        st = 0
562 +        jobStat.getStatus(id, level)
563 +        err, apiMsg = jobStat.get_error()
564 +        if err:
565 +            common.logger.debug(5,'Error caught' + apiMsg)
566 +            return None
567 +        else:
568 +            for i in range(len(self.states)):
569 +                # Fill an hash table with all information retrieved from LB API
570 +                hstates[ self.states[i] ] = jobStat.loadStatus(st)[i]
571 +            result = jobStat.loadStatus(st)[ self.states.index(attr) ]
572 +            return result
573 +
574 +    def queryDetailedStatus(self, id):
575 +        """ Query a detailed status of the job with id """
576 +        cmd = 'glite-job-status '+id
577 +        cmd_out = runCommand(cmd)
578 +        return cmd_out
579 +
580 +    def getOutput(self, id):
581 +        """
582 +        Get output for a finished job with id.
583 +        Returns the name of directory with results.
584 +        """
585 +
586 +        self.checkProxy()
587 +        cmd = 'glite-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
588 +        cmd_out = runCommand(cmd)
589 +
590 +        # Determine the output directory name
591 +        dir = common.work_space.resDir()
592 +        dir += os.environ['USER']
593 +        dir += '_' + os.path.basename(id)
594 +        return dir
595 +
596 +    def cancel(self, id):
597 +        """ Cancel the EDG job with id """
598 +        self.checkProxy()
599 +        cmd = 'glite-job-cancel --noint ' + id
600 +        cmd_out = runCommand(cmd)
601 +        return cmd_out
602 +
603 +
604 +    def createXMLSchScript(self, nj):
605 +        """
606 +        Create a XML-file for BOSS4.
607 +        """
608 +        job = common.job_list[nj]
609 +        jbt = job.type()
610 +        inp_sandbox = jbt.inputSandbox(nj)
611 +        out_sandbox = jbt.outputSandbox(nj)
612 +        
613 +        title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
614 +        jt_string = ''
615 +        
616 +        xml_fname = 'orca.xml'
617 +        xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
618 +
619 +        #TaskName  
620 +        dir = string.split(common.work_space.topDir(), '/')
621 +        taskName = dir[len(dir)-2]
622 +  
623 +        to_writeReq = ''
624 +        to_write = ''
625 +        req=' '
626 +        req = req + jbt.getRequirements()
627 +        if self.EDG_requirements:
628 +            if (req == ' '):
629 +                req = req + self.EDG_requirements
630 +            else:
631 +                req = req +  ' && ' + self.EDG_requirements
632 +        if self.EDG_ce_white_list:
633 +            ce_white_list = string.split(self.EDG_ce_white_list,',')
634 +            for i in range(len(ce_white_list)):
635 +                if i == 0:
636 +                    if (req == ' '):
637 +                        req = req + '((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
638 +                    else:
639 +                        req = req +  ' && ((RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
640 +                    pass
641 +                else:
642 +                    req = req +  ' || (RegExp("' + ce_white_list[i] + '", other.GlueCEUniqueId))'
643 +            req = req + ')'
644 +        
645 +        if self.EDG_ce_black_list:
646 +            ce_black_list = string.split(self.EDG_ce_black_list,',')
647 +            for ce in ce_black_list:
648 +                if (req == ' '):
649 +                    req = req + '(!RegExp("' + ce + '", other.GlueCEUniqueId))'
650 +                else:
651 +                    req = req +  ' && (!RegExp("' + ce + '", other.GlueCEUniqueId))'
652 +                pass
653 +        if self.EDG_clock_time:
654 +            if (req == ' '):
655 +                req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
656 +            else:
657 +                req = req + ' && other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
658 +
659 +        if self.EDG_cpu_time:
660 +            if (req == ' '):
661 +                req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
662 +            else:
663 +                req = req + ' && other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
664 +        if (req != ' '):
665 +            req = req + '\n'
666 +            to_writeReq = req
667 +                                                                                                                                                            
668 +
669 +        if ( self.EDG_retry_count ):              
670 +            to_write = to_write + 'RetryCount = "'+self.EDG_retry_count+'"\n'
671 +            pass
672 +
673 +        to_write = to_write + 'MyProxyServer = "&quot;' + self.proxyServer + '&quot;"\n'
674 +        to_write = to_write + 'VirtualOrganisation = "&quot;' + self.VO + '&quot;"\n'
675 +
676 +
677 +        #TaskName  
678 +        dir = string.split(common.work_space.topDir(), '/')
679 +        taskName = dir[len(dir)-2]
680 +
681 +        if nj == 0:
682 +            xml.write(str(title))
683 +            xml.write('<task name="' +str(taskName)+'">\n')
684 +            xml.write(jt_string)
685 +            #Here it must pass the extra Tags.. (into Task & out of chain)  
686 +            if (to_writeReq != ''):
687 +                xml.write('<extraTags>\n')
688 +                xml.write('<Requirements>\n')
689 +                xml.write('<![CDATA[\n')
690 +                xml.write(to_writeReq)
691 +                xml.write(']]>\n')
692 +                xml.write('</Requirements>\n')
693 +                xml.write('</extraTags>\n')
694 +                pass
695 +
696 +            if (to_write != ''):
697 +                xml.write('<extraTags\n')
698 +                xml.write(to_write)
699 +                xml.write('/>\n')
700 +                pass
701 +            pass
702 +  
703 +        xml.write('<chain scheduler="glite">\n')
704 +        xml.write(jt_string)
705 +
706 +        #executable
707 +        script = job.scriptFilename()
708 +        xml.write('<program exec="' + os.path.basename(script) +'"\n')
709 +        xml.write(jt_string)
710 +    
711 +          
712 +        ### only one .sh  JDL has arguments:
713 +        ### Fabio
714 +        xml.write('args = "' + str(nj+1)+' '+ jbt.getJobTypeArguments(nj, "GLITE") +'"\n')
715 +        xml.write('program_types="crabjob"\n')
716 +        inp_box = 'infiles="'
717 +        inp_box = inp_box + '' + script + ','
718 +
719 +        if inp_sandbox != None:
720 +            for fl in inp_sandbox:
721 +                inp_box = inp_box + '' + fl + ','
722 +                pass
723 +            pass
724 +
725 +        # Marco (VERY TEMPORARY ML STUFF)
726 +        inp_box = inp_box + os.path.abspath(os.environ['CRABDIR']+'/python/'+'report.py') + ',' +\
727 +                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'DashboardAPI.py') + ','+\
728 +                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'Logger.py') + ','+\
729 +                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'ProcInfo.py') + ','+\
730 +                  os.path.abspath(os.environ['CRABDIR']+'/python/'+'apmon.py')
731 +        # End Marco
732 +
733 +        if (not jbt.additional_inbox_files == []):
734 +            inp_box = inp_box + ', '
735 +            for addFile in jbt.additional_inbox_files:
736 +                addFile = os.path.abspath(addFile)
737 +                inp_box = inp_box+''+addFile+','
738 +                pass
739 +
740 +        if inp_box[-1] == ',' : inp_box = inp_box[:-1]
741 +        inp_box = inp_box + ' "\n'
742 +        xml.write(inp_box)
743 +
744 +        xml.write('stderr="' + job.stdout() + '"\n')
745 +        xml.write('stdout="' + job.stderr() + '"\n')
746 +        
747 +        
748 +        if job.stdout() == job.stderr():
749 +          out_box = 'outfiles="' + \
750 +                    job.stdout() + ',.BrokerInfo,'
751 +        else:
752 +          out_box = 'outfiles="' + \
753 +                    job.stdout() + ',' + \
754 +                    job.stderr() + ',.BrokerInfo,'
755 +
756 +        if int(self.return_data) == 1:
757 +            if out_sandbox != None:
758 +                for fl in out_sandbox:
759 +                    out_box = out_box + '' + fl + ','
760 +                    pass
761 +                pass
762 +            pass
763 +                                                                                                                                                            
764 +        if out_box[-1] == ',' : out_box = out_box[:-1]
765 +        out_box = out_box + '"'
766 +        xml.write(out_box+'\n')
767 +
768 +        xml.write('group="'+taskName+'"\n')
769 +        xml.write('BossAttr="[crabjob.INTERNAL_ID=' + str(nj+1) +']"\n')
770 +
771 +        
772 +      
773 +        xml.write('/>\n')
774 +        xml.write('</chain>\n')
775 +
776 +        if int(nj) == int(common.jobDB.nJobs()-1): xml.write('</task>\n')
777 +
778 +
779 +        xml.close()
780 +
781 +        return
782 +
783 +
784 +
785 +    def checkProxy(self):
786 +        """
787 +        Function to check the Globus proxy.
788 +        """
789 +        if (self.proxyValid): return
790 +        timeleft = -999
791 +        minTimeLeft=10*3600 # in seconds
792 +
793 +        minTimeLeftServer = 100 # in hours
794 +
795 +        #cmd = 'voms-proxy-info -exists -valid '+str(minTimeLeft)+':00'
796 +        #cmd = 'voms-proxy-info -timeleft'
797 +        mustRenew = 0
798 +        timeLeftLocal = runCommand('voms-proxy-info -timeleft 2>/dev/null')
799 +        timeLeftServer = -999
800 +        if not timeLeftLocal or int(timeLeftLocal) <= 0 or not isInt(timeLeftLocal):
801 +            mustRenew = 1
802 +        else:
803 +            timeLeftServer = runCommand('voms-proxy-info -actimeleft 2>/dev/null | head -1')
804 +            if not timeLeftServer or not isInt(timeLeftServer):
805 +                mustRenew = 1
806 +            elif timeLeftLocal<minTimeLeft or timeLeftServer<minTimeLeft:
807 +                mustRenew = 1
808 +            pass
809 +        pass
810 +
811 +        if mustRenew:
812 +            common.logger.message( "No valid proxy found or remaining time of validity of already existing proxy shorter than 10 hours!\n Creating a user proxy with default length of 96h\n")
813 +            cmd = 'voms-proxy-init -voms cms -valid 96:00'
814 +            try:
815 +                # SL as above: damn it!
816 +                out = os.system(cmd)
817 +                if (out>0): raise CrabException("Unable to create a valid proxy!\n")
818 +            except:
819 +                msg = "Unable to create a valid proxy!\n"
820 +                raise CrabException(msg)
821 +            # cmd = 'grid-proxy-info -timeleft'
822 +            # cmd_out = runCommand(cmd,0,20)
823 +            pass
824 +
825 +        ## now I do have a voms proxy valid, and I check the myproxy server
826 +        renewProxy = 0
827 +        cmd = 'myproxy-info -d -s '+self.proxyServer
828 +        cmd_out = runCommand(cmd,0,20)
829 +        if not cmd_out:
830 +            common.logger.message('No credential delegated to myproxy server '+self.proxyServer+' will do now')
831 +            renewProxy = 1
832 +        else:
833 +            # if myproxy exist but not long enough, renew
834 +            reTime = re.compile( r'timeleft: (\d+)' )
835 +            #print "<"+str(reTime.search( cmd_out ).group(1))+">"
836 +            if reTime.match( cmd_out ):
837 +                time = reTime.search( line ).group(1)
838 +                if time < minTimeLeftServer:
839 +                    renewProxy = 1
840 +                    common.logger.message('No credential delegation will expire in '+time+' hours: renew it')
841 +                pass
842 +            pass
843 +        
844 +        # if not, create one.
845 +        if renewProxy:
846 +            cmd = 'myproxy-init -d -n -s '+self.proxyServer
847 +            out = os.system(cmd)
848 +            if (out>0):
849 +                raise CrabException("Unable to delegate the proxy to myproxyserver "+self.proxyServer+" !\n")
850 +            pass
851 +
852 +        # cache proxy validity
853 +        self.proxyValid=1
854 +        return
855 +
856 +    def configOpt_(self):
857 +        edg_ui_cfg_opt = ' '
858 +        if self.edg_config:
859 +            edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
860 +        if self.edg_config_vo:
861 +            edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
862 +        return edg_ui_cfg_opt

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines