ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerEdg.py (file contents):
Revision 1.22 by slacapra, Sat Nov 5 11:44:32 2005 UTC vs.
Revision 1.31 by slacapra, Tue Dec 6 13:18:48 2005 UTC

# Line 40 | Line 40 | class SchedulerEdg(Scheduler):
40          except KeyError: self.VO = 'cms'
41  
42          try: self.return_data = cfg_params['USER.return_data']
43 <        except KeyError: self.return_data = ''
43 >        except KeyError: self.return_data = 1
44  
45          try:
46              self.copy_data = cfg_params["USER.copy_data"]
47 <            try:
48 <                self.SE = cfg_params['USER.storage_element']
49 <                self.SE_PATH = cfg_params['USER.storage_path']
50 <            except KeyError:
51 <                msg = "Error. The [USER] section does not have 'storage_element'"
52 <                msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
53 <                common.logger.message(msg)
54 <                raise CrabException(msg)
55 <        except KeyError: self.copy_data = ''
56 <
47 >            if int(self.copy_data) == 1:
48 >                try:
49 >                    self.SE = cfg_params['USER.storage_element']
50 >                    self.SE_PATH = cfg_params['USER.storage_path']
51 >                except KeyError:
52 >                    msg = "Error. The [USER] section does not have 'storage_element'"
53 >                    msg = msg + " and/or 'storage_path' entries, necessary to copy the output"
54 >                    common.logger.message(msg)
55 >                    raise CrabException(msg)
56 >        except KeyError: self.copy_data = 0
57 >
58 >        if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
59 >           msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
60 >           msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
61 >           raise CrabException(msg)
62 >      
63          try:
64              self.register_data = cfg_params["USER.register_data"]
65 <            try:
66 <                 self.LFN = cfg_params['USER.lfn_dir']
67 <            except KeyError:
68 <                msg = "Error. The [USER] section does not have 'lfn_dir' value"
69 <                msg = msg + " it's necessary for RLS registration"
70 <                common.logger.message(msg)
71 <                raise CrabException(msg)
72 <        except KeyError: self.register_data= ''
65 >            if int(self.register_data) == 1:
66 >                try:
67 >                    self.LFN = cfg_params['USER.lfn_dir']
68 >                except KeyError:
69 >                    msg = "Error. The [USER] section does not have 'lfn_dir' value"
70 >                    msg = msg + " it's necessary for RLS registration"
71 >                    common.logger.message(msg)
72 >                    raise CrabException(msg)
73 >        except KeyError: self.register_data = 0
74 >
75 >        if ( int(self.copy_data) == 0 and int(self.register_data) == 1 ):
76 >           msg = 'Warning: register_data = 1 must be used with copy_data = 1\n'
77 >           msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
78 >           common.logger.message(msg)
79 >           raise CrabException(msg)
80  
81          try: self.EDG_requirements = cfg_params['EDG.requirements']
82          except KeyError: self.EDG_requirements = ''
# Line 115 | Line 128 | class SchedulerEdg(Scheduler):
128          """
129  
130          txt = ''
131 <        if self.copy_data:
131 >        if int(self.copy_data) == 1:
132             if self.SE:
133                txt += 'export SE='+self.SE+'\n'
134                txt += 'echo "SE = $SE"\n'
# Line 124 | Line 137 | class SchedulerEdg(Scheduler):
137                txt += 'export SE_PATH='+self.SE_PATH+'\n'
138                txt += 'echo "SE_PATH = $SE_PATH"\n'
139                                                                                                                                                              
140 <        if self.register_data:
140 >        if int(self.register_data) == 1:
141             if self.VO:
142                txt += 'export VO='+self.VO+'\n'
143             if self.LFN:
# Line 142 | Line 155 | class SchedulerEdg(Scheduler):
155          to copy produced output into a storage element.
156          """
157          txt = ''
158 <        if self.copy_data:
158 >        if int(self.copy_data) == 1:
159             copy = 'globus-url-copy file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file'
160             txt += '#\n'
161             txt += '#   Copy output to SE = $SE\n'
162             txt += '#\n'
150           #### per orca l'exit_status non e' affidabile.....
151           #txt += 'if [ $executable_exit_status -eq 0 ]; then\n'
163             txt += 'if [ $exe_result -eq 0 ]; then\n'
164             txt += '  for out_file in $file_list ; do\n'
165             txt += '    echo "Trying to copy output file to $SE "\n'
# Line 172 | Line 183 | class SchedulerEdg(Scheduler):
183          """
184  
185          txt = ''
186 <        if self.register_data:
186 >        if int(self.register_data) == 1:
187             txt += '#\n'
188             txt += '#  Register output to RLS\n'
189             txt += '#\n'
179           ### analogo
180           #txt += 'if [[ $executable_exit_status -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
190             txt += 'if [[ $exe_result -eq 0 && $copy_exit_status -eq 0 ]]; then\n'
191             txt += '   for out_file in $file_list ; do\n'
192             txt += '      echo "Trying to register the output file into RLS"\n'
# Line 205 | Line 214 | class SchedulerEdg(Scheduler):
214             txt += '   echo "Trying to copy output file to CloseSE"\n'
215             txt += '   CLOSE_SE=`edg-brokerinfo getCloseSEs | head -1`\n'
216             txt += '   for out_file in $file_list ; do\n'
217 <           txt += '      echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
218 <           txt += '      lcg-cr -v -l lfn:${LFN}/$out_file -d $SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
217 >           txt += '      echo "lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file" \n'
218 >           txt += '      lcg-cr -v -l lfn:${LFN}/$out_file -d $CLOSE_SE -P $LFN/$out_file --vo $VO file://`pwd`/$out_file 2>&1 \n'
219             txt += '      register_exit_status=$?\n'
220             txt += '      echo "REGISTER_EXIT_STATUS = $register_exit_status"\n'
221             txt += '      echo "STAGE_OUT = $register_exit_status"\n'
# Line 222 | Line 231 | class SchedulerEdg(Scheduler):
231             txt += '   echo "Problem with the executable"\n'
232             txt += 'fi \n'
233          return txt
225        #####################
234  
235 <    def loggingInfo(self, nj):
235 >    def loggingInfo(self, id):
236          """
237          retrieve the logging info from logging and bookkeeping and return it
238          """
239          self.checkProxy()
240 <        id = common.jobDB.jobId(nj)
241 <        cmd = 'edg-job-get-logging-info -v 2 ' + self.configOpt_() + id
242 <        cmd_out = runCommand(cmd)
240 >      #  id = common.jobDB.jobId(nj)
241 >        cmd = 'edg-job-get-logging-info -v 2 ' + id
242 >        cmd_out = os.popen(cmd)
243 >      #  cmd_out = runCommand(cmd)
244          return cmd_out
245  
246      def listMatch(self, nj):
# Line 241 | Line 250 | class SchedulerEdg(Scheduler):
250          self.checkProxy()
251          jdl = common.job_list[nj].jdlFilename()
252          cmd = 'edg-job-list-match ' + self.configOpt_() + jdl
253 <        cmd_out = runCommand(cmd)
253 >        # myCmd = os.popen(cmd)
254 >        # cmd_out = myCmd.readlines()
255 >        # myCmd.close()
256 >        cmd_out = runCommand(cmd,0,240)
257          return self.parseListMatch_(cmd_out, jdl)
258  
259      def parseListMatch_(self, out, jdl):
260 +        """
261 +        Parse the f* output of edg-list-match and produce something sensible
262 +        """
263          reComment = re.compile( r'^\**$' )
264          reEmptyLine = re.compile( r'^$' )
265          reVO = re.compile( r'Selected Virtual Organisation name.*' )
266 <        reCE = re.compile( r'CEId.*\n((.*:.*)\n)*' )
266 >        reLine = re.compile( r'.*')
267 >        reCE = re.compile( r'(.*:.*)')
268 >        reCEId = re.compile( r'CEId.*')
269          reNO = re.compile( r'No Computing Element matching' )
270          reRB = re.compile( r'Connecting to host' )
271          next = 0
272          CEs=[]
273          Match=0
274  
275 <        if reNO.match( out ):
276 <            common.logger.debug(5,out)
277 <            self.noMatchFound_(jdl)
278 <            Match=0
279 <            pass
280 <        if reVO.match( out ):
281 <            VO =reVO.match( out ).group()
282 <            common.logger.debug(5, 'VO           :'+VO)
283 <            pass
275 >        #print out
276 >        lines = reLine.findall(out)
277 >
278 >        i=0
279 >        CEs=[]
280 >        for line in lines:
281 >            string.strip(line)
282 >            #print line
283 >            if reNO.match( line ):
284 >                common.logger.debug(5,line)
285 >                return 0
286 >                pass
287 >            if reVO.match( line ):
288 >                VO =reVO.match( line ).group()
289 >                common.logger.debug(5,"VO "+VO)
290 >                pass
291  
292 <        if reRB.match( out ):
293 <            RB =reRB.match(out).group()
294 <            common.logger.debug(5, 'Using RB     :'+RB)
292 >            if reRB.match( line ):
293 >                RB = reRB.match(line).group()
294 >                common.logger.debug(5,"RB "+RB)
295 >                pass
296 >
297 >            if reCEId.search( line ):
298 >                for lineCE in lines[i:-1]:
299 >                    if reCE.match( lineCE ):
300 >                        CE = string.strip(reCE.search(lineCE).group(1))
301 >                        CEs.append(CE.split(':')[0])
302 >                        pass
303 >                    pass
304 >                pass
305 >            i=i+1
306              pass
307  
308 <        if reCE.search( out ):
309 <            groups=reCE.search(out).groups()
310 <            for CE in groups:
311 <                tmp = string.strip(CE)
277 <                CEs.append(tmp)
278 <                common.logger.debug(5, 'Matched CE   :'+tmp)
279 <                Match=Match+1
280 <            pass
308 >        common.logger.debug(5,"All CE :"+str(CEs))
309 >
310 >        sites = []
311 >        [sites.append(it) for it in CEs if not sites.count(it)]
312  
313 <        return Match
313 >        common.logger.debug(5,"All Sites :"+str(sites))
314 >        return len(sites)
315  
316      def noMatchFound_(self, jdl):
317          reReq = re.compile( r'Requirements' )
# Line 342 | Line 374 | class SchedulerEdg(Scheduler):
374          jobStat.getStatus(id, level)
375          err, apiMsg = jobStat.get_error()
376          if err:
377 <            print 'Error caught', apiMsg
346 <            common.log.message(apiMsg)
377 >            common.logger.debug(5,'Error caught' + apiMsg)
378              return None
379          else:
380              for i in range(len(self.states)):
# Line 447 | Line 478 | class SchedulerEdg(Scheduler):
478                      job.stdout() + '", "' + \
479                      job.stderr() + '", ".BrokerInfo",'
480  
481 <        if self.return_data :
481 >        if int(self.return_data) == 1:
482              if out_sandbox != None:
483                  for fl in out_sandbox:
484                      out_box = out_box + ' "' + fl + '",'
# Line 515 | Line 546 | class SchedulerEdg(Scheduler):
546                  raise CrabException(msg)
547              cmd = 'grid-proxy-info -timeleft'
548              cmd_out = runCommand(cmd,0)
518            #print cmd_out, time.time()
519            #time.time(cms_out)
549              pass
550          self.proxyValid=1
551          return

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines