ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.68.2.3 by spiga, Thu Apr 22 14:40:14 2010 UTC vs.
Revision 1.79 by fanzago, Wed Jul 21 10:12:52 2010 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2   import sys, os
3 + try:
4 +    import json
5 + except:    
6 +    import simplejson as json
7   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
8   from ProdCommon.Storage.SEAPI.SBinterface import *
9   from ProdCommon.Storage.SEAPI.Exceptions import *
# Line 17 | Line 21 | class cmscp:
21             $3 if needed: file name (the output file name)
22             $5 remote SE (complete endpoint)
23             $6 srm version
24 <           --lfn $LFNBaseName
24 >           --for_lfn $LFNBaseName
25          output:
26               return 0 if all ok
27               return 60307 if srmcp failed
28               return 60303 if file already exists in the SE
29          """
26
27        #set default
30          self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
31 <                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
31 >                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'' }
32          self.debug = 0
33 +        #### for fallback copy
34          self.local_stage = 0
35          self.params.update( args )
36 +        ## timeout needed for subprocess command of SEAPI
37 +        ## they should be a bit higher then the corresponding passed by command line  
38 +        ## default values
39 +        self.subprocesstimeout = { \
40 +                                   'copy':   3600, \
41 +                                   'exists': 1200, \
42 +                                   'delete': 1200, \
43 +                                   'size':   1200 \
44 +                                 }
45  
46          return
47  
# Line 59 | Line 71 | class cmscp:
71                  file_to_copy.append(self.params['inputFileList'])
72              self.params['inputFileList'] = file_to_copy
73  
74 <        if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
74 >        #if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
75 >        if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
76          
77          ## TO DO:
78          #### add check for outFiles
# Line 76 | Line 89 | class cmscp:
89          # stage out from WN
90          if self.params['middleware'] :
91              results = self.stager(self.params['middleware'],self.params['inputFileList'])
92 +            self.writeJsonFile(results)
93              self.finalReport(results)
94          # Local interaction with SE
95          else:
96              results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
97 +            self.writeJsonFile(results)
98              return results
99 +            
100 +    def writeJsonFile( self, results ):
101 +        """
102 +        write a json file containing copy results for each file
103 +        """
104 +        if self.debug:
105 +            print 'in writeJsonFile() : \n'
106 +            print "---->>>> in writeJsonFile results =  ", results
107 +        fp = open('resultCopyFile', 'w')
108 +        json.dump(results, fp)
109 +        fp.close()
110 +        if self.debug:
111 +            print '    reading resultCopyFile : \n'
112 +            lp = open('resultCopyFile', "r")
113 +            inputDict = json.load(lp)
114 +            lp.close()
115 +            print "    inputDict = ", inputDict
116 +        return
117  
118      def checkLcgUtils( self ):
119          """
# Line 111 | Line 144 | class cmscp:
144          if self.debug:
145              print 'setProtocol() :\n'
146              print '\tmiddleware =  %s utils \n'%middleware
147 <        
147 >
148          lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
149                  'srmv2':'-b -D srmv2  -t 2400 --verbose'}
150          if self.checkLcgUtils() >= 17:
151 <            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose',
152 <                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose'}
151 >            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
152 >                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
153  
154          srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
155                  'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
156          rfioOpt=''
157  
158 +        ## FEDE for hadoop ###
159 +        hadoopOpt = ''
160 +        ####
161 +
162          supported_protocol = None
163          if middleware.lower() in ['osg','lcg','condor','sge']:
164              supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
# Line 132 | Line 169 | class cmscp:
169              supported_protocol = [('rfio',rfioOpt),('local','')]
170          elif middleware.lower() in ['arc']:
171              supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
172 +        ## FEDE for hadoop ###
173 +        elif middleware.lower() in ['hadoop']:
174 +            supported_protocol = [('hadoop', hadoopOpt)]
175 +        ####    
176          else:
177              ## here we can add support for any kind of protocol,
178              ## maybe some local schedulers need something dedicated
# Line 139 | Line 180 | class cmscp:
180          return supported_protocol
181  
182  
183 <    def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
183 >    def checkCopy (self, copy_results, len_list_files, prot):
184          """
185          Checks the status of copy and update result dictionary
186          """
187 +        
188          list_retry = []
147        list_retry_localSE = []
189          list_not_existing = []
190 +        list_already_existing = []
191 +        list_fallback = []
192          list_ok = []
193          
194          if self.debug:
# Line 156 | Line 199 | class cmscp:
199                  list_ok.append(file)
200                  reason = 'Copy succedeed with %s utils'%prot
201                  dict['reason'] = reason
202 +            elif er_code == '60308':
203 +                list_fallback.append( file )
204 +                reason = 'Copy succedeed with %s utils'%prot
205 +                dict['reason'] = reason
206              elif er_code == '60302':
207                  list_not_existing.append( file )
208 <            elif er_code == '10041':
208 >            elif er_code == '60303':
209 >                list_already_existing.append( file )
210 >            else :    
211                  list_retry.append( file )
163            ## WHAT TO DO IN GENERAL FAILURE CONDITION
164            else:
165                list_retry_localSE.append( file )
212                  
213              if self.debug:
214                  print "\t file %s \n"%file
215                  print "\t dict['erCode'] %s \n"%dict['erCode']
216                  print "\t dict['reason'] %s \n"%dict['reason']
217                  
218 <            if (lfn != '') and (se != ''):
173 <                upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
174 <            else:
175 <                upDict = self.updateReport(file, er_code, dict['reason'])
218 >            upDict = self.updateReport(file, er_code, dict['reason'])
219  
220              copy_results.update(upDict)
221          
# Line 180 | Line 223 | class cmscp:
223          if len(list_ok) != 0:
224              msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
225          if len(list_ok) != len_list_files :
226 <            msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
227 <            msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
226 >            if len(list_fallback)!=0:
227 >                msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
228 >            if len(list_retry)!=0:
229 >                msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
230 >            if len(list_not_existing)!=0:
231 >                msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
232 >            if len(list_already_existing)!=0:
233 >                msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
234          if self.debug : print msg
235          
236 <        return copy_results, list_ok, list_retry, list_retry_localSE
236 >        return copy_results, list_ok, list_retry, list_fallback
237 >        
238 >    def check_for_retry_localSE (self, copy_results):
239 >        """
240 >        Checks the status of copy and create the list of file to copy to CloseSE
241 >        """
242 >        list_retry_localSE = []
243 >
244 >        if self.debug:
245 >            print 'in check_for_retry_localSE() :\n'
246 >            print "\t results in check local = ", copy_results
247 >        for file, dict in copy_results.iteritems():
248 >            er_code = dict['erCode']
249 >            if er_code != '0' and  er_code != '60302' and er_code != '60308':
250 >                list_retry_localSE.append( file )
251 >                
252 >            if self.debug:
253 >                print "\t file %s \n"%file
254 >                print "\t dict['erCode'] %s \n"%dict['erCode']
255 >                print "\t dict['reason'] %s \n"%dict['reason']
256 >                
257 >        return list_retry_localSE
258 >
259          
260      def LocalCopy(self, list_retry, results):
261          """
# Line 195 | Line 266 | class cmscp:
266              print '\t list_retry %s utils \n'%list_retry
267              print '\t len(list_retry) %s \n'%len(list_retry)
268                  
269 <        list_files = list_retry  
270 <        self.params['inputFilesList']=list_files
271 <        
269 >        list_files = list_retry
270 >        self.params['inputFileList']=list_files
271 >
272          ### copy backup
273          from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
274          siteCfg = loadSiteLocalConfig()
# Line 210 | Line 281 | class cmscp:
281          ##### to be improved ###############
282          if (implName == 'rfcp'):
283              self.params['middleware']='lsf'
284 +        #### FEDE for hadoop    
285 +        if implName == 'hadoop':
286 +            self.params['middleware'] = 'hadoop'
287          ####################################    
288                    
289          self.params['protocol']=implName
# Line 221 | Line 295 | class cmscp:
295              print '\t catalog %s \n'%catalog
296              print "\t self.params['protocol'] %s \n"%self.params['protocol']            
297              print '\t tfc %s '%tfc
298 <            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
298 >            print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
299                  
300 <        #if (str(self.params['lfn']).find("/store/") != -1):
301 <        #    temp = str(self.params['lfn']).split("/store/")
302 <        #    self.params['lfn']= "/store/temp/" + temp[1]
229 <        if (str(self.params['lfn']).find("/store/") == 0):
230 <            temp = str(self.params['lfn']).replace("/store/","/store/temp/",1)
231 <            self.params['lfn']= temp
300 >        if (str(self.params['for_lfn']).find("/store/") == 0):
301 >            temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
302 >            self.params['for_lfn']= temp
303          
304 <        if ( self.params['lfn'][-1] != '/' ) : self.params['lfn'] = self.params['lfn'] + '/'
304 >        if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
305              
306          file_backup=[]
307 <        for input in self.params['inputFilesList']:
308 <            file = self.params['lfn'] + os.path.basename(input)
307 >        for input in self.params['inputFileList']:
308 >            file = self.params['for_lfn'] + os.path.basename(input)
309              surl = tfc.matchLFN(tfc.preferredProtocol, file)
310              file_backup.append(surl)
311              if self.debug:
312 <                print '\t lfn %s \n'%self.params['lfn']
312 >                print '\t for_lfn %s \n'%self.params['for_lfn']
313                  print '\t file %s \n'%file
314                  print '\t surl %s \n'%surl
315                      
316          destination=os.path.dirname(file_backup[0])
317          if ( destination[-1] != '/' ) : destination = destination + '/'
318 +      
319          self.params['destination']=destination
320 +        
321 +        self.params['se_name']=seName
322              
323          if self.debug:
324              print "\t self.params['destination']%s \n"%self.params['destination']
# Line 257 | Line 331 | class cmscp:
331              if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
332                  results.update(localCopy_results)
333              else:
334 <                localCopy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
334 >                localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), prot)
335                  results.update(localCopy_results)
336 <                if len(list_ok) == len(list_files) :
336 >                if len(list_fallback) == len(list_files) :
337                      break
338                  if len(list_retry):
339                      list_files = list_retry
# Line 281 | Line 355 | class cmscp:
355          
356          results={}
357          for prot, opt in self.setProtocol( middleware ):
358 <            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
358 >            if self.debug:
359 >                print '\tTrying the stage out with %s utils \n'%prot
360 >                print '\tand options %s\n'%opt
361 >                
362              copy_results = self.copy( list_files, prot, opt )
363              if copy_results.keys() == [''] or copy_results.keys() == '' :
364                  results.update(copy_results)
365              else:
366 <                copy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(copy_results, len(list_files), prot)
366 >                copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
367                  results.update(copy_results)
368                  if len(list_ok) == len(list_files) :
369                      break
370 <                if len(list_retry):
370 >                if len(list_retry):
371                      list_files = list_retry
372                  else: break
373 <                
373 >
374          if self.local_stage:
375 +            list_retry_localSE = self.check_for_retry_localSE(results)
376              if len(list_retry_localSE):
377 +                if self.debug:
378 +                    print "\t list_retry_localSE %s \n"%list_retry_localSE
379                  results = self.LocalCopy(list_retry_localSE, results)
380 <            
380 >
381          if self.debug:
382              print "\t results %s \n"%results
383          return results
# Line 311 | Line 391 | class cmscp:
391          self.dest_prot = protocol
392          if not self.params['source'] : self.source_prot = 'local'
393          Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
394 <        if not self.params['destination'] : self.dest_prot = 'local'
395 <        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
394 >        if not self.params['destination'] :
395 >            self.dest_prot = 'local'
396 >            Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
397 >        else:    
398 >            Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
399  
400          if self.debug :
401              msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
402              msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
403 +            msg += '\t(destinationDir=%s,  protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
404              print msg
405  
406          return Source_SE, Destination_SE
# Line 326 | Line 410 | class cmscp:
410          Make the real file copy using SE API
411          """
412          msg = ""
413 +        results = {}
414          if self.debug :
415              msg  = 'copy() :\n'
416              msg += '\tusing %s protocol\n'%protocol
# Line 336 | Line 421 | class cmscp:
421              for filetocopy in list_file:
422                  results.update( self.updateReport(filetocopy, '-1', str(ex)))
423              return results
424 +            
425 +        prot = Destination_SE.protocol
426 +        self.hostname=Destination_SE.hostname
427  
428          # create remote dir
429 <        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
429 >        ### FEDE for hadoop
430 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop']:
431              try:
432                  self.createDir( Destination_SE, Destination_SE.protocol )
433              except OperationException, ex:
# Line 369 | Line 458 | class cmscp:
458                  results.update( self.updateReport(filetocopy, '-1', msg))
459              return results
460  
461 <        results = {}
461 >        self.hostname = Destination_SE.hostname
462 >        
463          ## loop over the complete list of files
464          for filetocopy in list_file:
465              if self.debug : print '\tStart real copy for %s'%filetocopy
# Line 432 | Line 522 | class cmscp:
522              raise Exception(msg)
523          except AlreadyExistsException, ex:
524              if self.debug: print "\tThe directory already exist"
525 <            pass            
525 >            pass
526 >        except Exception, ex:    
527 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
528 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
529 >            raise Exception(msg)
530          return msg
531  
532      def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
# Line 446 | Line 540 | class cmscp:
540          f_tocopy=filetocopy
541          if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
542          try:
543 <            checkSource = sbi_source.checkExists( f_tocopy , opt=option )
543 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
544              if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
545          except OperationException, ex:
546              msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
# Line 481 | Line 575 | class cmscp:
575          f_tocopy=filetocopy
576          if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
577          try:
578 <            check = sbi_dest.checkExists( f_tocopy, opt=option )
578 >            check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
579              if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
580          except OperationException, ex:
581              msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
# Line 534 | Line 628 | class cmscp:
628          ErCode = '0'
629          msg = ''
630  
631 <        if  self.params['option'].find('space_token')>0:
631 >        if  self.params['option'].find('space_token')>=0:
632              space_token=self.params['option'].split('=')[1]
633              if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
634              if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
635          try:
636 <            sbi.copy( source_file , dest_file , opt = option)
636 >            sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
637          except TransferException, ex:
638              msg  = "Problem copying %s file" % filetocopy
639              msg += str(ex)
# Line 580 | Line 674 | class cmscp:
674                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
675                  dbgmsg += '\t'+str(ex.output)+'\n'
676                  print dbgmsg
677 +        except SEAPITimeout, ex:
678 +            ErCode = '60317'
679 +            msg  = "Problem copying %s file" % filetocopy
680 +            msg += str(ex)
681 +            if self.debug :
682 +                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
683 +                dbgmsg += '\t'+str(ex.output)+'\n'
684 +                print dbgmsg
685 +
686          if ErCode == '0' and protocol.find('srmv') == 0:
687              remote_file_size = -1
688              local_file_size = os.path.getsize( source_file )
689              try:
690 <                remote_file_size = sbi_dest.getSize( dest_file, opt=option )
690 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
691                  if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
692              except TransferException, ex:
693                  msg  = "Problem checking the size of %s file" % filetocopy
# Line 620 | Line 723 | class cmscp:
723          f_tocopy=filetocopy
724          if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
725          try:
726 <            sbi_dest.delete( f_tocopy, opt=option )
726 >            sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
727              if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
728          except OperationException, ex:
729              msg  ='ERROR: problems removing partially staged file %s'%filetocopy
# Line 633 | Line 736 | class cmscp:
736  
737          return
738  
739 <    def updateReport(self, file, erCode, reason, lfn='', se='' ):
739 >    #def updateReport(self, file, erCode, reason, lfn='', se='' ):
740 >    def updateReport(self, file, erCode, reason):
741          """
742          Update the final stage out infos
743          """
744          jobStageInfo={}
745          jobStageInfo['erCode']=erCode
746          jobStageInfo['reason']=reason
747 <        jobStageInfo['lfn']=lfn
748 <        jobStageInfo['se']=se
747 >        if not self.params['for_lfn']: self.params['for_lfn']=''
748 >        if not self.params['se_name']: self.params['se_name']=''
749 >        if not self.hostname: self.hostname=''
750 >        if (erCode != '0') and (erCode != '60308'):
751 >           jobStageInfo['for_lfn']='/copy_problem/'
752 >        else:  
753 >            jobStageInfo['for_lfn']=self.params['for_lfn']
754 >        jobStageInfo['se_name']=self.params['se_name']
755 >        jobStageInfo['endpoint']=self.hostname
756  
757          report = { file : jobStageInfo}
758          return report
# Line 660 | Line 771 | class cmscp:
771                  reason = " ".join(reason.split("'"))
772              reason="'%s'"%reason
773              if file:
774 <                if dict['lfn']=='':
774 >                if dict['for_lfn']=='':
775                      lfn = '${LFNBaseName}'+os.path.basename(file)
776                      se  = '$SE'
777                      LFNBaseName = '$LFNBaseName'
778                  else:
779 <                    lfn = dict['lfn']+os.path.basename(file)
780 <                    se = dict['se']
779 >                    lfn = dict['for_lfn']+os.path.basename(file)
780 >                    se = dict['se_name']
781                      LFNBaseName = os.path.dirname(lfn)
782                      if (LFNBaseName[-1] != '/'):
783                          LFNBaseName = LFNBaseName + '/'
784 <                #dict['lfn'] # to be implemented
784 >
785 >                
786                  txt += 'echo "Report for File: '+file+'"\n'
787                  txt += 'echo "LFN: '+lfn+'"\n'
788                  txt += 'echo "StorageElement: '+se+'"\n'
789                  txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
790                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
679                txt += 'export LFNBaseName='+LFNBaseName+'\n'
680                txt += 'export SE='+se+'\n'
791  
682                txt += 'export endpoint='+self.params['destination']+'\n'
792                  
793                  if dict['erCode'] != '0':
794                      cmscp_exit_status = dict['erCode']
# Line 749 | Line 858 | if __name__ == '__main__' :
858  
859      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
860                    "protocol=","option=", "middleware=", "srm_version=", \
861 <                  "destinationDir=", "lfn=", "local_stage", "debug", "help"]
861 >                  "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
862      try:
863          opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
864      except getopt.GetoptError, err:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines