ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.20 by spiga, Tue Oct 28 17:40:55 2008 UTC vs.
Revision 1.32 by mcinquil, Mon Dec 8 15:40:47 2008 UTC

# Line 1 | Line 1
1 < #!/usr/bin/env python
1 > ##!/usr/bin/env python
2  
3   import sys, os
4   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
# Line 26 | Line 26 | class cmscp:
26          """
27  
28          #set default
29 <        self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
29 >        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
30                             "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31          self.debug = 0
32  
# Line 71 | Line 71 | class cmscp:
71          Check if running on UI (no $middleware) or
72          on WN (on the Grid), and take different action
73          """
74
74          self.processOptions()
75 +        if self.debug: print 'calling run() : \n'
76          # stage out from WN
77          if self.params['middleware'] :
78             results = self.stager(self.params['middleware'],self.params['inputFileList'])
79             self.finalReport(results)
80          # Local interaction with SE
81          else:
82 <           results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'] )
82 >           results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
83             return results
84  
85      def setProtocol( self, middleware ):
# Line 97 | Line 97 | class cmscp:
97          supported_protocol = None
98          if middleware.lower() in ['osg','lcg','condor']:
99              supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100 <                                  (self.params['srm_version'],srmOpt[self.params['srm_version']])]
100 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101          elif middleware.lower() in ['lsf','caf']:
102              supported_protocol = [('rfio',rfioOpt)]
103          else:
# Line 106 | Line 106 | class cmscp:
106              pass
107          return supported_protocol
108  
109 + #   def checkCopy(self, copy_results, list_files):
110 +        """
111 +        #results={}
112 +        list_retry = []
113 +        list_existing = []
114 +        list_ok = []
115 +        if copy_results.keys() == '':
116 +            self.results.update(copy_results)
117 +        else:
118 +            for file, dict in copy_results.iteritems():
119 +                er_code = dict['erCode']
120 +                if er_code == '0':
121 +                    list_ok.append(file)
122 +                    reason = 'Copy succedeed with %s utils'%prot
123 +                    upDict = self.updateReport(file, er_code, reason)
124 +                    copy_results.update(upDict)
125 +                elif er_code == '60303': list_existing.append( file )
126 +                else: list_retry.append( file )
127 +            results.update(copy_results)
128 +            if len(list_ok) != 0:
129 +                msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
130 +                if self.debug : print msg
131 +            if len(list_ok) == len(list_files) :
132 +                msg = 'Copy of  all files succedeed\n'
133 +                #break
134 +            else:
135 +                if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
136 +                #if len(list_retry): list_files = list_retry
137 +        return list_retry, results        
138 +        
139 +        """
140      def stager( self, middleware, list_files ):
141          """
142          Implement the logic for remote stage out
143          """
144 +
145 +        if self.debug: print 'stager() :\n'
146          results={}
147          for prot, opt in self.setProtocol( middleware ):
148 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
148 >            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
149              copy_results = self.copy( list_files, prot, opt )
150 +            ######## to define a new function checkCopy ################
151 +            #list_retry, self.results = self.checkCopy(copy_results, list_files)
152 +        #def checkCopy (self, copy_results):
153 +        #    """
154 +        #    """
155 +        #    results={}
156              list_retry = []
157              list_existing = []
158              list_ok = []
159 <            if copy_results.keys() == '':
159 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
160                  results.update(copy_results)
161              else:
162                  for file, dict in copy_results.iteritems():
# Line 127 | Line 166 | class cmscp:
166                          reason = 'Copy succedeed with %s utils'%prot
167                          upDict = self.updateReport(file, er_code, reason)
168                          copy_results.update(upDict)
169 <                    elif er_code == '60303': list_existing.append( file )
169 >                    elif er_code in ['60303','60302']: list_existing.append( file )
170                      else: list_retry.append( file )
171                  results.update(copy_results)
172 +                msg = ''
173                  if len(list_ok) != 0:
174 <                    msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135 <                    if self.debug : print msg
174 >                    msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
175                  if len(list_ok) == len(list_files) :
176                      break
177                  else:
178 <                    if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
178 >                    if self.debug: msg += '\tCopy of %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
179                      if len(list_retry): list_files = list_retry
180                      else: break
181 <
181 >                if self.debug : print msg
182 >            """
183 >            if len(list_retry):
184 >               list_files = list_retry
185 >            #def backupCopy(list_retry)
186 >               print "in backup"
187 >               self.params['inputFilesList']=list_files
188 >               ### copy backup
189 >               from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
190 >               siteCfg = loadSiteLocalConfig()
191 >               #print siteCfg
192 >               seName = siteCfg.localStageOut.get("se-name", None)
193 >               #print  "seName = ", seName
194 >               self.params['destination']=seName
195 >               #catalog = siteCfg.localStageOut.get("catalog", None)
196 >               #print "catalog = ", catalog
197 >               implName = siteCfg.localStageOut.get("command", None)
198 >               print "implName = ", implName
199 >               if (implName == 'srm'):
200 >                  implName='srmv2'
201 >               self.params['protocol']=implName
202 >               tfc = siteCfg.trivialFileCatalog()
203 >               #print "tfc = ", tfc
204 >               print " self.params['inputFilesList'] = ", self.params['inputFilesList']
205 >               file_backup=[]
206 >               for input in self.params['inputFilesList']:
207 >                   ### to add the correct lfn, passed as argument of cmscp function (--lfn xxxx)
208 >                   file = '/store/'+input
209 >                   pfn = tfc.matchLFN(tfc.preferredProtocol, file)
210 >                   print "pfn = ", pfn
211 >                   file_backup.append(pfn)
212 >               self.params['inputFilesList'] = file_backup
213 >               print "#########################################"
214 >               print "self.params['inputFilesList'] = ", self.params['inputFilesList']
215 >               print "self.params['protocol'] = ", self.params['protocol']
216 >               print "self.params['option'] = ", self.params['option']
217 >               self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'])
218 >               print "#########################################"
219 >               ###list_retry, self.results = checkCopy(copy_results)
220 >                   #check is something fails and created related dict
221 >                   #        backup = self.analyzeResults(results)
222 >                   #        if backup :
223 >                   #            msg = 'WARNING: backup logic is under implementation\n'
224 >                   #            #backupDict = self.backup()
225 >                   #            ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
226 >                   #            results.update(backupDict)
227 >                   #            print msg
228 >            """
229          #### TODO Daniele
230          #check is something fails and created related dict
231    #      backup = self.analyzeResults(results)
# Line 156 | Line 242 | class cmscp:
242          """
243          Instantiate storage interface
244          """
245 +        if self.debug : print 'initializeApi() :\n'  
246          self.source_prot = protocol
247          self.dest_prot = protocol
248          if not self.params['source'] : self.source_prot = 'local'
# Line 164 | Line 251 | class cmscp:
251          Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
252  
253          if self.debug :
254 <            print '(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
255 <            print '(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
254 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
255 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
256 >            print msg
257  
258          return Source_SE, Destination_SE
259  
# Line 174 | Line 262 | class cmscp:
262          Make the real file copy using SE API
263          """
264          if self.debug :
265 <            print 'copy(): using %s protocol'%protocol
265 >            msg  = 'copy() :\n'
266 >            msg += '\tusing %s protocol\n'%protocol
267 >            print msg
268          try:
269              Source_SE, Destination_SE = self.initializeApi( protocol )
270          except Exception, ex:
271              return self.updateReport('', '-1', str(ex))
272  
273          # create remote dir
274 <        if protocol in ['gridftp','rfio']:
274 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
275              try:
276 <                self.createDir( Destination_SE, protocol )
276 >                self.createDir( Destination_SE, Destination_SE.protocol )
277              except Exception, ex:
278                  return self.updateReport('', '60316', str(ex))
279  
# Line 193 | Line 283 | class cmscp:
283              sbi_dest = SBinterface(Destination_SE)
284              sbi_source = SBinterface(Source_SE)
285          except ProtocolMismatch, ex:
286 <            msg = str(ex)+'\n'
287 <            msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
288 <            return self.updateReport('', '-1', str(ex))
286 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
287 >            msg += str(ex)
288 >            return self.updateReport('', '-1', msg)
289  
290          results = {}
291          ## loop over the complete list of files
292          for filetocopy in list_file:
293 <            if self.debug : print 'start real copy for %s'%filetocopy
293 >            if self.debug : print '\tStart real copy for %s'%filetocopy
294              try :
295                  ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy )
296              except Exception, ex:
# Line 208 | Line 298 | class cmscp:
298                  msg = str(ex)  
299              if ErCode == '0':
300                  ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
301 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
301 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
302              results.update( self.updateReport(filetocopy, ErCode, msg))
303          return results
304  
# Line 217 | Line 307 | class cmscp:
307          """
308          Create the storage interface.
309          """
310 +        if self.debug : print 'storageInterface():\n'
311          try:
312              interface = SElement( FullPath(endpoint), protocol )
313          except ProtocolUnknown, ex:
314 <            msg = ''
315 <            if self.debug : msg = str(ex)+'\n'
225 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
314 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
315 >            msg += str(ex)
316              raise Exception(msg)
317  
318          return interface
# Line 232 | Line 322 | class cmscp:
322          Create remote dir for gsiftp REALLY TEMPORARY
323          this should be transparent at SE API level.
324          """
325 +        if self.debug : print 'createDir():\n'
326          msg = ''
327          try:
328              action = SBinterface( Destination_SE )
329              action.createDir()
330 <            if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
330 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
331          except TransferException, ex:
332 <            msg = str(ex)
332 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
333 >            msg += str(ex)
334              if self.debug :
335 <                msg += str(ex.detail)+'\n'
336 <                msg += str(ex.output)+'\n'
337 <            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
338 <            raise Exceptions(msg)
335 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
336 >                dbgmsg += '\t'+str(ex.output)+'\n'
337 >                print dbgmsg
338 >            raise Exception(msg)
339          except OperationException, ex:
340 <            msg = str(ex)
341 <            if self.debug : msg += str(ex.detail)+'\n'
342 <            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
343 <
340 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
341 >            msg += str(ex)
342 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
343 >            raise Exception(msg)
344 >        except MissingDestination, ex:
345 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
346 >            msg += str(ex)
347 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
348 >            raise Exception(msg)
349 >        except AlreadyExistsException, ex:
350 >            if self.debug: print "\tThe directory already exist"
351 >            pass            
352          return msg
353  
354      def checkFileExist( self, sbi_source, sbi_dest, filetocopy ):
# Line 256 | Line 356 | class cmscp:
356          Check both if source file exist AND
357          if destination file ALREADY exist.
358          """
359 +        if self.debug : print 'checkFileExist():\n'
360          ErCode = '0'
361          msg = ''
362          f_tocopy=filetocopy
363          if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
364          try:
365              checkSource = sbi_source.checkExists( f_tocopy )
366 +            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
367          except OperationException, ex:
368 <            msg = str(ex)
368 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
369 >            msg += str(ex)
370              if self.debug :
371 <                msg += str(ex.detail)+'\n'
372 <                msg += str(ex.output)+'\n'
373 <            msg +='ERROR: problems checkig if source file %s exist'%filetocopy
371 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
372 >                dbgmsg += '\t'+str(ex.output)+'\n'
373 >                print dbgmsg
374              raise Exception(msg)
375          except WrongOption, ex:
376 <            msg = str(ex)
376 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
377 >            msg += str(ex)
378              if self.debug :
379 <                msg += str(ex.detail)+'\n'
380 <                msg += str(ex.output)+'\n'
381 <            msg +='ERROR problems checkig if source file % exist'%filetocopy
379 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
380 >                dbgmsg += '\t'+str(ex.output)+'\n'
381 >                print dbgmsg
382 >            raise Exception(msg)
383 >        except MissingDestination, ex:
384 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
385 >            msg += str(ex)
386 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
387              raise Exception(msg)
388          if not checkSource :
389              ErCode = '60302'
390              msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
391              return ErCode, msg
283
392          f_tocopy=filetocopy
393          if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
394          try:
395              check = sbi_dest.checkExists( f_tocopy )
396 +            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
397          except OperationException, ex:
398 <            msg = str(ex)
398 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
399 >            msg += str(ex)
400              if self.debug :
401 <                msg += str(ex.detail)+'\n'
402 <                msg += str(ex.output)+'\n'
403 <            msg +='ERROR: problems checkig if file %s already exist'%filetocopy
401 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
402 >                dbgmsg += '\t'+str(ex.output)+'\n'
403 >                print dbgmsg
404              raise Exception(msg)
405          except WrongOption, ex:
406 <            msg = str(ex)
406 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
407 >            msg += str(ex)
408              if self.debug :
409 <                msg += str(ex.detail)+'\n'
410 <                msg += str(ex.output)+'\n'
411 <            msg +='ERROR problems checkig if file % already exist'%filetocopy
409 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
410 >                msg += '\t'+str(ex.output)+'\n'
411 >            raise Exception(msg)
412 >        except MissingDestination, ex:
413 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
414 >            msg += str(ex)
415 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
416              raise Exception(msg)
417          if check :
418              ErCode = '60303'
# Line 309 | Line 424 | class cmscp:
424          """
425          call the copy API.
426          """
427 +        if self.debug : print 'makeCopy():\n'
428          path = os.path.dirname(filetocopy)
429          file_name =  os.path.basename(filetocopy)
430          source_file = filetocopy
# Line 316 | Line 432 | class cmscp:
432          if self.params['source'] == '' and path == '':
433              source_file = os.path.abspath(filetocopy)
434          elif self.params['destination'] =='':
435 <            dest_file = os.path.join(os.getcwd(),file_name)
435 >            destDir = self.params.get('destinationDir',os.getcwd())
436 >            dest_file = os.path.join(destDir,file_name)
437          elif self.params['source'] != '' and self.params['destination'] != '' :
438              source_file = file_name
439  
# Line 326 | Line 443 | class cmscp:
443          try:
444              sbi.copy( source_file , dest_file , opt = option)
445          except TransferException, ex:
446 <            msg = str(ex)
446 >            msg  = "Problem copying %s file" % filetocopy
447 >            msg += str(ex)
448              if self.debug :
449 <                msg += str(ex.detail)+'\n'
450 <                msg += str(ex.output)+'\n'
451 <            msg += "Problem copying %s file" % filetocopy
449 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
450 >                dbgmsg += '\t'+str(ex.output)+'\n'
451 >                print dbgmsg
452              ErCode = '60307'
453          except WrongOption, ex:
454 <            msg = str(ex)
454 >            msg  = "Problem copying %s file" % filetocopy
455 >            msg += str(ex)
456              if self.debug :
457 <                msg += str(ex.detail)+'\n'
458 <                msg += str(ex.output)+'\n'
459 <            msg += "Problem copying %s file" % filetocopy
457 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
458 >                dbgmsg += '\t'+str(ex.output)+'\n'
459 >                print dbsmsg
460              ErCode = '60307'
461 <        if ErCode == '0' and protocol.find('srm') == 0:
461 >        if ErCode == '0' and protocol.find('srmv') == 0:
462              remote_file_size = -1
463              local_file_size = os.path.getsize( source_file )
464              try:
465                  remote_file_size = sbi_dest.getSize( dest_file )
466 +                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
467              except TransferException, ex:
468 <                msg = str(ex)
468 >                msg  = "Problem checking the size of %s file" % filetocopy
469 >                msg += str(ex)
470                  if self.debug :
471 <                    msg += str(ex.detail)+'\n'
472 <                    msg += str(ex.output)+'\n'
473 <                msg += "Problem checking the size of %s file" % filetocopy
471 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
472 >                    dbgmsg += '\t'+str(ex.output)+'\n'
473 >                    print dbgmsg
474                  ErCode = '60307'
475              except WrongOption, ex:
476 <                msg = str(ex)
476 >                msg  = "Problem checking the size of %s file" % filetocopy
477 >                msg += str(ex)
478                  if self.debug :
479 <                    msg += str(ex.detail)+'\n'
480 <                    msg += str(ex.output)+'\n'
481 <                msg += "Problem checking the size of %s file" % filetocopy
479 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
480 >                    dbgmsg += '\t'+str(ex.output)+'\n'
481 >                    print dbgmsg
482                  ErCode = '60307'
483              if local_file_size != remote_file_size:
484                  msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
485                  ErCode = '60307'
486  
487 +        if ErCode != '0':
488 +            try :
489 +                self.removeFile( sbi_dest, dest_file )
490 +            except Exception, ex:
491 +                msg += '\n'+str(ex)  
492          return ErCode, msg
493  
494 +    def removeFile( self, sbi_dest, filetocopy ):
495 +        """  
496 +        """  
497 +        if self.debug : print 'removeFile():\n'
498 +        f_tocopy=filetocopy
499 +        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
500 +        try:
501 +            sbi_dest.delete( f_tocopy )
502 +            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
503 +        except OperationException, ex:
504 +            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
505 +            msg += str(ex)
506 +            if self.debug :
507 +                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
508 +                dbgmsg += '\t'+str(ex.output)+'\n'
509 +                print dbgmsg
510 +            raise Exception(msg)
511 +
512 +        return
513 +
514      def backup(self):
515          """
516          Check infos from TFC using existing api obtaining:
# Line 411 | Line 558 | class cmscp:
558                      cmscp_exit_status = dict['erCode']
559                      cmscp_exit_status = dict['erCode']
560              else:
561 +                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
562                  cmscp_exit_status = dict['erCode']
563                  cmscp_exit_status = dict['erCode']
564          txt += '\n'
# Line 459 | Line 607 | if __name__ == '__main__' :
607      import getopt
608  
609      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
610 <                  "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
610 >                  "protocol=","option=", "middleware=", "srm_version=", \
611 >                  "destinationDir=","debug", "help"]
612      try:
613          opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
614      except getopt.GetoptError, err:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines