ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerEdg.py (file contents):
Revision 1.14 by fanzago, Tue Oct 4 18:19:33 2005 UTC vs.
Revision 1.29 by fanzago, Fri Nov 18 15:50:23 2005 UTC

# Line 4 | Line 4 | from crab_exceptions import *
4   from crab_util import *
5   import common
6  
7 < import os, sys, tempfile
7 > import os, sys, time
8  
9   class SchedulerEdg(Scheduler):
10      def __init__(self):
# Line 77 | Line 77 | class SchedulerEdg(Scheduler):
77          try: self.EDG_cpu_time = cfg_params['EDG.max_cpu_time']
78          except KeyError: self.EDG_cpu_time = ''
79  
80 < #        # Add EDG_WL_LOCATION to the python path
81 < #
82 < #        try:
83 < #           path = os.environ['EDG_WL_LOCATION']
84 < #       except:
85 < #           msg = "Error: the EDG_WL_LOCATION variable is not set."
86 < #           raise CrabException(msg)
87 < #
88 < #       libPath=os.path.join(path, "lib")
89 < #       sys.path.append(libPath)
90 < #       libPath=os.path.join(path, "lib", "python")
91 < #       sys.path.append(libPath)
80 >        # Add EDG_WL_LOCATION to the python path
81  
82 <        self.checkProxy_()
82 >        try:
83 >            path = os.environ['EDG_WL_LOCATION']
84 >        except:
85 >            msg = "Error: the EDG_WL_LOCATION variable is not set."
86 >            raise CrabException(msg)
87 >
88 >        libPath=os.path.join(path, "lib")
89 >        sys.path.append(libPath)
90 >        libPath=os.path.join(path, "lib", "python")
91 >        sys.path.append(libPath)
92 >
93 >        self.proxyValid=0
94          return
95      
96  
# Line 115 | Line 115 | class SchedulerEdg(Scheduler):
115          """
116  
117          txt = ''
118        ### FEDE ####
118          if self.copy_data:
119             if self.SE:
120                txt += 'export SE='+self.SE+'\n'
121 +              txt += 'echo "SE = $SE"\n'
122             if self.SE_PATH:
123                if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
124                txt += 'export SE_PATH='+self.SE_PATH+'\n'
125 +              txt += 'echo "SE_PATH = $SE_PATH"\n'
126                                                                                                                                                              
127          if self.register_data:
128             if self.VO:
# Line 129 | Line 130 | class SchedulerEdg(Scheduler):
130             if self.LFN:
131                txt += 'export LFN='+self.LFN+'\n'
132                txt += '\n'
132        ########
133          txt += 'CloseCEs=`edg-brokerinfo getCE`\n'
134          txt += 'echo "CloseCEs = $CloseCEs"\n'
135          txt += 'CE=`echo $CloseCEs | sed -e "s/:.*//"`\n'
136          txt += 'echo "CE = $CE"\n'
137          return txt
138 <    #### FEDE  
138 >
139      def wsCopyOutput(self):
140          """
141          Write a CopyResults part of a job script, e.g.
# Line 145 | Line 145 | class SchedulerEdg(Scheduler):
145          if self.copy_data:
146             copy = 'globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file'
147             txt += '#\n'
148 <           txt += '#   Copy output into user SE = $SE\n'
148 >           txt += '#   Copy output to SE = $SE\n'
149             txt += '#\n'
150 <           txt += 'copy_exit_status=1\n'
151 <           txt += 'if [ $executable_exit_status -eq 0 ]; then\n'
150 >           txt += 'if [ $exe_result -eq 0 ]; then\n'
151             txt += '  for out_file in $file_list ; do\n'
152             txt += '    echo "Trying to copy output file to $SE "\n'
153             txt += '    echo "'+copy+'"\n'
154             txt += '    '+copy+' 2>&1\n'
155             txt += '    copy_exit_status=$?\n'
156 <           txt += '    echo "copy_exit_status= $copy_exit_status"\n'
156 >           txt += '    echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
157 >           txt += '    echo "STAGE_OUT = $copy_exit_status"\n'
158             txt += '    if [ $copy_exit_status -ne 0 ]; then \n'
159             txt += '       echo "Problems with SE= $SE" \n'
160           txt += '       echo "output lost!"\n'
160             txt += '    else \n'
161             txt += '       echo "output copied into $SE/$SE_PATH directory"\n'
162             txt += '    fi \n'
163             txt += '  done\n'
164             txt += 'fi \n'
166           txt += 'echo "COPY_EXIT_STATUS=$copy_exit_status"\n'
165          return txt
166  
167      def wsRegisterOutput(self):
# Line 174 | Line 172 | class SchedulerEdg(Scheduler):
172          txt = ''
173          if self.register_data:
174             txt += '#\n'
175 <           txt += '#   Register output into RLS\n'
175 >           txt += '#  Register output to RLS\n'
176             txt += '#\n'
177 <           txt += 'register_exit_status=1\n'
180 <           txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
177 >           txt += 'if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
178             txt += '   for out_file in $file_list ; do\n'
179             txt += '      echo "Trying to register the output file into RLS"\n'
180             txt += '      echo "lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file"\n'
181             txt += '      lcg-rf -l $LFN/$out_file --vo $VO sfn://$SE$SE_PATH/$out_file 2>&1 \n'
182             txt += '      register_exit_status=$?\n'
183 <           txt += '      echo "register_exit_status= $register_exit_status"\n'
183 >           txt += '      echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
184 >           txt += '      echo "STAGE_OUT = $register_exit_status"\n'
185             txt += '      if [ $register_exit_status -ne 0 ]; then \n'
186 <           txt += '         echo "Problems with the registration into RLS" \n'
186 >           txt += '         echo "Problems with the registration to RLS" \n'
187             txt += '         echo "Try with srm protocol" \n'
188             txt += '         echo "lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file"\n'
189             txt += '         lcg-rf -l $LFN/$out_file --vo $VO srm://$SE$SE_PATH/$out_file 2>&1 \n'
190             txt += '         register_exit_status=$?\n'
191 <           txt += '         echo "register_exit_status= $register_exit_status"\n'
191 >           txt += '         echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
192 >           txt += '         echo "STAGE_OUT = $register_exit_status"\n'
193             txt += '         if [ $register_exit_status -ne 0 ]; then \n'
194             txt += '            echo "Problems with the registration into RLS" \n'
195             txt += '         fi \n'
196             txt += '      else \n'
197 <           txt += '         echo "output registered into RLS"\n'
197 >           txt += '         echo "output registered to RLS"\n'
198             txt += '      fi \n'
199             txt += '   done\n'
200 <           txt += 'elif [[ $executable_exit_status -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
200 >           txt += 'elif [[ $exe_result -eq 0 && $copy_exit_status -ne 0 ]]; then \n'
201             txt += '   echo "Trying to copy output file to CloseSE"\n'
202             txt += '   CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
203             txt += '   for out_file in $file_list ; do\n'
204 <           txt += '      echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
205 <           txt += '      lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
204 >           txt += '      echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
205 >           txt += '      lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
206             txt += '      register_exit_status=$?\n'
207 <           txt += '      echo "register_exit_status= $register_exit_status"\n'
207 >           txt += '      echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
208 >           txt += '      echo "STAGE_OUT = $register_exit_status"\n'
209             txt += '      if [ $register_exit_status -ne 0 ]; then \n'
210             txt += '         echo "Problems with CloseSE" \n'
211             txt += '      else \n'
212             txt += '         echo "The program was successfully executed"\n'
213 <           txt += '         echo "Output storage element used: $CLOSE_SE"\n'
214 <           txt += '         echo "the LFN for the file is LFN=${LFN}/$out_file"\n'
213 >           txt += '         echo "SE = $CLOSE_SE"\n'
214 >           txt += '         echo "LFN for the file is LFN=${LFN}/$out_file"\n'
215             txt += '      fi \n'
216             txt += '   done\n'
217             txt += 'else\n'
218 <           txt += '  echo "Problem with the executable"\n'
218 >           txt += '   echo "Problem with the executable"\n'
219             txt += 'fi \n'
220           txt += 'echo "REGISTER_EXIT_STATUS=$register_exit_status"\n'
220          return txt
222        #####################
221  
222 <    def loggingInfo(self, nj):
222 >    def loggingInfo(self, id):
223          """
224          retrieve the logging info from logging and bookkeeping and return it
225          """
226 <        id = common.jobDB.jobId(nj)
227 <        edg_ui_cfg_opt = ''
228 <        if self.edg_config:
229 <          edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
230 <        cmd = 'edg-job-get-logging-info -v 2 ' + edg_ui_cfg_opt + id
233 <        print cmd
234 <        myCmd = os.popen(cmd)
235 <        cmd_out = myCmd.readlines()
236 <        myCmd.close()
226 >        self.checkProxy()
227 >      #  id = common.jobDB.jobId(nj)
228 >        cmd = 'edg-job-get-logging-info -v 2 ' + id
229 >        cmd_out = os.popen(cmd)
230 >      #  cmd_out = runCommand(cmd)
231          return cmd_out
232  
233      def listMatch(self, nj):
234          """
235          Check the compatibility of available resources
236          """
237 +        self.checkProxy()
238          jdl = common.job_list[nj].jdlFilename()
239 <        edg_ui_cfg_opt = ''
245 <        if self.edg_config:
246 <          edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
247 <        if self.edg_config_vo:
248 <          edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
249 <        cmd = 'edg-job-list-match ' + edg_ui_cfg_opt + jdl
239 >        cmd = 'edg-job-list-match ' + self.configOpt_() + jdl
240          myCmd = os.popen(cmd)
241          cmd_out = myCmd.readlines()
242          myCmd.close()
243          return self.parseListMatch_(cmd_out, jdl)
244  
245      def parseListMatch_(self, out, jdl):
246 +
247          reComment = re.compile( r'^\**$' )
248          reEmptyLine = re.compile( r'^$' )
249          reVO = re.compile( r'Selected Virtual Organisation name.*' )
# Line 262 | Line 253 | class SchedulerEdg(Scheduler):
253          next = 0
254          CEs=[]
255          Match=0
256 +
257          for line in out:
258              line = line.strip()
259              if reComment.match( line ):
# Line 282 | Line 274 | class SchedulerEdg(Scheduler):
274                  continue
275              if next:
276                  CE=line.split(':')[0]
277 <                CEs.append(CE)
277 >                if (CEs.count(CE) > 0):
278 >                   pass
279 >                else:
280 >                   CEs.append(CE)  
281 >                   Match=Match+1
282                  common.logger.debug(5, 'Matched CE   :'+CE)
287                Match=Match+1
283                  pass
284              if reNO.match( line ):
285                  common.logger.debug(5,line)
# Line 317 | Line 312 | class SchedulerEdg(Scheduler):
312          Submit one EDG job.
313          """
314  
315 +        self.checkProxy()
316          jid = None
317          jdl = common.job_list[nj].jdlFilename()
318 <        id_tmp = tempfile.mktemp()
319 <        edg_ui_cfg_opt = ' '
324 <        if self.edg_config:
325 <          edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
326 <        if self.edg_config_vo:
327 <          edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
328 <        cmd = 'edg-job-submit -o ' + id_tmp + edg_ui_cfg_opt + jdl
318 >
319 >        cmd = 'edg-job-submit ' + self.configOpt_() + jdl
320          cmd_out = runCommand(cmd)
321          if cmd_out != None:
322 <            idfile = open(id_tmp)
323 <            jid_line = idfile.readline()
333 <            while jid_line[0] == '#':
334 <                jid_line = idfile.readline()
335 <                pass
336 <            jid = string.strip(jid_line)
337 <            os.unlink(id_tmp)
322 >            reSid = re.compile( r'https.+' )
323 >            jid = reSid.search(cmd_out).group()
324              pass
325          return jid
326  
# Line 351 | Line 337 | class SchedulerEdg(Scheduler):
337      def getStatusAttribute_(self, id, attr):
338          """ Query a status of the job with id """
339  
340 +        self.checkProxy()
341          hstates = {}
342          Status = importName('edg_wl_userinterface_common_LbWrapper', 'Status')
343          # Bypass edg-job-status interfacing directly to C++ API
# Line 362 | Line 349 | class SchedulerEdg(Scheduler):
349          jobStat.getStatus(id, level)
350          err, apiMsg = jobStat.get_error()
351          if err:
352 <            print 'Error caught', apiMsg
353 <            common.log.message(apiMsg)
352 >            #print 'Error caught', apiMsg
353 >            #common.log.message(apiMsg)
354 >            common.logger.debug(5,'Error caught' + apiMsg)
355              return None
356          else:
357              for i in range(len(self.states)):
# Line 384 | Line 372 | class SchedulerEdg(Scheduler):
372          Returns the name of directory with results.
373          """
374  
375 +        self.checkProxy()
376          cmd = 'edg-job-get-output --dir ' + common.work_space.resDir() + ' ' + id
377          cmd_out = runCommand(cmd)
378  
# Line 395 | Line 384 | class SchedulerEdg(Scheduler):
384  
385      def cancel(self, id):
386          """ Cancel the EDG job with id """
387 +        self.checkProxy()
388          cmd = 'edg-job-cancel --noint ' + id
389          cmd_out = runCommand(cmd)
390          return cmd_out
391  
402    def checkProxy_(self):
403        """
404        Function to check the Globus proxy.
405        """
406        cmd = 'grid-proxy-info -timeleft'
407        cmd_out = runCommand(cmd)
408        ok = 1
409        timeleft = -999
410        try: timeleft = int(cmd_out)
411        except ValueError: ok=0
412        except TypeError: ok=0
413        if timeleft < 1:  ok=0
414
415        if ok==0:
416            print "No valid proxy found !\n"
417            print "Creating a user proxy with default length of 100h\n"
418            msg = "Unable to create a valid proxy!\n"
419            if os.system("grid-proxy-init -valid 100:00"):
420                raise CrabException(msg)
421        return
422    
392      def createSchScript(self, nj):
393          """
394          Create a JDL-file for EDG.
# Line 532 | Line 501 | class SchedulerEdg(Scheduler):
501  
502          jdl.close()
503          return
504 +
505 +    def checkProxy(self):
506 +        """
507 +        Function to check the Globus proxy.
508 +        """
509 +        if (self.proxyValid): return
510 +        timeleft = -999
511 +        minTimeLeft=10 # in hours
512 +        cmd = 'grid-proxy-info -e -v '+str(minTimeLeft)+':00'
513 +        try: cmd_out = runCommand(cmd,0)
514 +        except: print cmd_out
515 +        if (cmd_out == None or cmd_out=='1'):
516 +            common.logger.message( "No valid proxy found or timeleft too short!\n Creating a user proxy with default length of 100h\n")
517 +            cmd = 'grid-proxy-init -valid 100:00'
518 +            try:
519 +                out = os.system(cmd)
520 +                if (out>0): raise CrabException("Unable to create a valid proxy!\n")
521 +            except:
522 +                msg = "Unable to create a valid proxy!\n"
523 +                raise CrabException(msg)
524 +            cmd = 'grid-proxy-info -timeleft'
525 +            cmd_out = runCommand(cmd,0)
526 +            #print cmd_out, time.time()
527 +            #time.time(cms_out)
528 +            pass
529 +        self.proxyValid=1
530 +        return
531 +    
532 +    def configOpt_(self):
533 +        edg_ui_cfg_opt = ' '
534 +        if self.edg_config:
535 +          edg_ui_cfg_opt = ' -c ' + self.edg_config + ' '
536 +        if self.edg_config_vo:
537 +          edg_ui_cfg_opt += ' --config-vo ' + self.edg_config_vo + ' '
538 +        return edg_ui_cfg_opt

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines