ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.46 by spiga, Sun Mar 8 10:09:44 2009 UTC vs.
Revision 1.68.2.3 by spiga, Thu Apr 22 14:40:14 2010 UTC

# Line 1 | Line 1
1 < ##!/usr/bin/env python
2 <
1 > #!/usr/bin/env python
2   import sys, os
3   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
4   from ProdCommon.Storage.SEAPI.SBinterface import *
# Line 83 | Line 82 | class cmscp:
82              results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
83              return results
84  
85 +    def checkLcgUtils( self ):
86 +        """
87 +        _checkLcgUtils_
88 +        check the lcg-utils version and report
89 +        """
90 +        import commands
91 +        cmd = "lcg-cp --version | grep lcg_util"
92 +        status, output = commands.getstatusoutput( cmd )
93 +        num_ver = -1
94 +        if output.find("not found") == -1 or status == 0:
95 +            temp = output.split("-")
96 +            version = ""
97 +            if len(temp) >= 2:
98 +                version = output.split("-")[1]
99 +                temp = version.split(".")
100 +                if len(temp) >= 1:
101 +                    num_ver = int(temp[0])*10
102 +                    num_ver += int(temp[1])
103 +        return num_ver
104 +
105      def setProtocol( self, middleware ):
106          """
107          define the allowed potocols based on $middlware
# Line 95 | Line 114 | class cmscp:
114          
115          lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
116                  'srmv2':'-b -D srmv2  -t 2400 --verbose'}
117 +        if self.checkLcgUtils() >= 17:
118 +            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose',
119 +                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose'}
120 +
121          srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
122 <                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 '}
122 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
123          rfioOpt=''
124  
125          supported_protocol = None
# Line 105 | Line 128 | class cmscp:
128                                   (self.params['srm_version'],srmOpt[self.params['srm_version']])]
129          elif middleware.lower() in ['lsf','caf']:
130              supported_protocol = [('rfio',rfioOpt)]
131 +        elif middleware.lower() in ['pbs']:
132 +            supported_protocol = [('rfio',rfioOpt),('local','')]
133 +        elif middleware.lower() in ['arc']:
134 +            supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
135          else:
136              ## here we can add support for any kind of protocol,
137              ## maybe some local schedulers need something dedicated
# Line 117 | Line 144 | class cmscp:
144          Checks the status of copy and update result dictionary
145          """
146          list_retry = []
147 +        list_retry_localSE = []
148          list_not_existing = []
149          list_ok = []
150          
# Line 130 | Line 158 | class cmscp:
158                  dict['reason'] = reason
159              elif er_code == '60302':
160                  list_not_existing.append( file )
161 <            else:
161 >            elif er_code == '10041':
162                  list_retry.append( file )
163 +            ## WHAT TO DO IN GENERAL FAILURE CONDITION
164 +            else:
165 +                list_retry_localSE.append( file )
166                  
167              if self.debug:
168                  print "\t file %s \n"%file
# Line 153 | Line 184 | class cmscp:
184              msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
185          if self.debug : print msg
186          
187 <        return copy_results, list_ok, list_retry
187 >        return copy_results, list_ok, list_retry, list_retry_localSE
188          
189      def LocalCopy(self, list_retry, results):
190          """
# Line 192 | Line 223 | class cmscp:
223              print '\t tfc %s '%tfc
224              print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
225                  
226 +        #if (str(self.params['lfn']).find("/store/") != -1):
227 +        #    temp = str(self.params['lfn']).split("/store/")
228 +        #    self.params['lfn']= "/store/temp/" + temp[1]
229 +        if (str(self.params['lfn']).find("/store/") == 0):
230 +            temp = str(self.params['lfn']).replace("/store/","/store/temp/",1)
231 +            self.params['lfn']= temp
232 +        
233 +        if ( self.params['lfn'][-1] != '/' ) : self.params['lfn'] = self.params['lfn'] + '/'
234 +            
235          file_backup=[]
236          for input in self.params['inputFilesList']:
237              file = self.params['lfn'] + os.path.basename(input)
# Line 203 | Line 243 | class cmscp:
243                  print '\t surl %s \n'%surl
244                      
245          destination=os.path.dirname(file_backup[0])
246 +        if ( destination[-1] != '/' ) : destination = destination + '/'
247          self.params['destination']=destination
248              
249          if self.debug:
# Line 212 | Line 253 | class cmscp:
253                
254          for prot, opt in self.setProtocol( self.params['middleware'] ):
255              if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
256 <            localCopy_results = self.copy( self.params['inputFileList'], prot, opt )
256 >            localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
257              if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
258                  results.update(localCopy_results)
259              else:
260 <                localCopy_results, list_ok, list_retry = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
260 >                localCopy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
261                  results.update(localCopy_results)
262                  if len(list_ok) == len(list_files) :
263                      break
# Line 245 | Line 286 | class cmscp:
286              if copy_results.keys() == [''] or copy_results.keys() == '' :
287                  results.update(copy_results)
288              else:
289 <                copy_results, list_ok, list_retry = self.checkCopy(copy_results, len(list_files), prot)
289 >                copy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(copy_results, len(list_files), prot)
290                  results.update(copy_results)
291                  if len(list_ok) == len(list_files) :
292                      break
# Line 254 | Line 295 | class cmscp:
295                  else: break
296                  
297          if self.local_stage:
298 <            if len(list_retry):
299 <                results = self.LocalCopy(list_retry, results)
298 >            if len(list_retry_localSE):
299 >                results = self.LocalCopy(list_retry_localSE, results)
300              
301          if self.debug:
302              print "\t results %s \n"%results
# Line 280 | Line 321 | class cmscp:
321  
322          return Source_SE, Destination_SE
323  
324 <    def copy( self, list_file, protocol, options ):
324 >    def copy( self, list_file, protocol, options, backup='no' ):
325          """
326          Make the real file copy using SE API
327          """
# Line 292 | Line 333 | class cmscp:
333          try:
334              Source_SE, Destination_SE = self.initializeApi( protocol )
335          except Exception, ex:
336 <            return self.updateReport('', '-1', str(ex))
336 >            for filetocopy in list_file:
337 >                results.update( self.updateReport(filetocopy, '-1', str(ex)))
338 >            return results
339  
340          # create remote dir
341          if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
342              try:
343                  self.createDir( Destination_SE, Destination_SE.protocol )
344 +            except OperationException, ex:
345 +                for filetocopy in list_file:
346 +                    results.update( self.updateReport(filetocopy, '60316', str(ex)))
347 +                return results
348 +            ## when the client commands are not found (wrong env or really missing)
349 +            except MissingCommand, ex:
350 +                msg = "ERROR %s %s" %(str(ex), str(ex.detail))
351 +                for filetocopy in list_file:
352 +                    results.update( self.updateReport(filetocopy, '10041', msg))
353 +                return results
354              except Exception, ex:
355 <                return self.updateReport('', '60316', str(ex))
355 >                msg = "ERROR %s" %(str(ex))
356 >                for filetocopy in list_file:
357 >                    results.update( self.updateReport(filetocopy, '-1', msg))
358 >                return results
359  
360          ## prepare for real copy  ##
361          try :
# Line 309 | Line 365 | class cmscp:
365          except ProtocolMismatch, ex:
366              msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
367              msg += str(ex)
368 <            return self.updateReport('', '-1', msg)
368 >            for filetocopy in list_file:
369 >                results.update( self.updateReport(filetocopy, '-1', msg))
370 >            return results
371  
372          results = {}
373          ## loop over the complete list of files
# Line 318 | Line 376 | class cmscp:
376              try :
377                  ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
378              except Exception, ex:
379 <                ErCode = -1
379 >                ErCode = '60307'
380                  msg = str(ex)  
381              if ErCode == '0':
382                  ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
383 +                if (ErCode == '0') and (backup == 'yes'):
384 +                    ErCode = '60308'
385              if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
386              results.update( self.updateReport(filetocopy, ErCode, msg))
387          return results
# Line 409 | Line 469 | class cmscp:
469              msg += str(ex)
470              if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
471              raise Exception(msg)
472 +        ## when the client commands are not found (wrong env or really missing)
473 +        except MissingCommand, ex:
474 +            ErCode = '10041'
475 +            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
476 +            return ErCode, msg
477          if not checkSource :
478              ErCode = '60302'
479              msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
# Line 438 | Line 503 | class cmscp:
503              msg += str(ex)
504              if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
505              raise Exception(msg)
506 +        ## when the client commands are not found (wrong env or really missing)
507 +        except MissingCommand, ex:
508 +            ErCode = '10041'
509 +            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
510 +            return ErCode, msg
511          if check :
512              ErCode = '60303'
513              msg = "file %s already exist"%os.path.basename(filetocopy)
# Line 464 | Line 534 | class cmscp:
534          ErCode = '0'
535          msg = ''
536  
537 +        if  self.params['option'].find('space_token')>0:
538 +            space_token=self.params['option'].split('=')[1]
539 +            if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
540 +            if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
541          try:
542              sbi.copy( source_file , dest_file , opt = option)
543          except TransferException, ex:
# Line 480 | Line 554 | class cmscp:
554              if self.debug :
555                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
556                  dbgmsg += '\t'+str(ex.output)+'\n'
557 <                print dbsmsg
557 >                print dbgmsg
558          except SizeZeroException, ex:
559              msg  = "Problem copying %s file" % filetocopy
560              msg += str(ex)
561              if self.debug :
562                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
563                  dbgmsg += '\t'+str(ex.output)+'\n'
564 <                print dbsmsg
564 >                print dbgmsg
565 >            ErCode = '60307'
566 >        ## when the client commands are not found (wrong env or really missing)
567 >        except MissingCommand, ex:
568 >            ErCode = '10041'
569 >            msg  = "Problem copying %s file" % filetocopy
570 >            msg += str(ex)
571 >            if self.debug :
572 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
573 >                dbgmsg += '\t'+str(ex.output)+'\n'
574 >                print dbgmsg
575 >        except AuthorizationException, ex:
576              ErCode = '60307'
577 +            msg  = "Problem copying %s file" % filetocopy
578 +            msg += str(ex)
579 +            if self.debug :
580 +                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
581 +                dbgmsg += '\t'+str(ex.output)+'\n'
582 +                print dbgmsg
583          if ErCode == '0' and protocol.find('srmv') == 0:
584              remote_file_size = -1
585              local_file_size = os.path.getsize( source_file )
# Line 564 | Line 655 | class cmscp:
655          cmscp_exit_status = 0
656          txt = ''
657          for file, dict in results.iteritems():
658 +            reason = str(dict['reason'])
659 +            if str(reason).find("'") > -1:
660 +                reason = " ".join(reason.split("'"))
661 +            reason="'%s'"%reason
662              if file:
663                  if dict['lfn']=='':
664 <                    lfn = '$LFNBaseName/'+os.path.basename(file)
664 >                    lfn = '${LFNBaseName}'+os.path.basename(file)
665                      se  = '$SE'
666 +                    LFNBaseName = '$LFNBaseName'
667                  else:
668                      lfn = dict['lfn']+os.path.basename(file)
669                      se = dict['se']
670 <                    
670 >                    LFNBaseName = os.path.dirname(lfn)
671 >                    if (LFNBaseName[-1] != '/'):
672 >                        LFNBaseName = LFNBaseName + '/'
673                  #dict['lfn'] # to be implemented
674                  txt += 'echo "Report for File: '+file+'"\n'
675                  txt += 'echo "LFN: '+lfn+'"\n'
676                  txt += 'echo "StorageElement: '+se+'"\n'
677 <                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
677 >                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
678                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
679 <                #txt += 'export LFNBaseName='+lfn+'\n'
679 >                txt += 'export LFNBaseName='+LFNBaseName+'\n'
680                  txt += 'export SE='+se+'\n'
681 +
682 +                txt += 'export endpoint='+self.params['destination']+'\n'
683                  
684                  if dict['erCode'] != '0':
685                      cmscp_exit_status = dict['erCode']
686              else:
687 <                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
588 <                cmscp_exit_status = dict['erCode']
687 >                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
688                  cmscp_exit_status = dict['erCode']
689          txt += '\n'
690          txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
691 <        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
691 >        txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
692          outFile.write(str(txt))
693          outFile.close()
694          return

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines