ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.12 by spiga, Thu Oct 9 11:50:20 2008 UTC vs.
Revision 1.20 by spiga, Tue Oct 28 17:40:55 2008 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2  
3 < import sys, string
4 < import os, popen2
3 > import sys, os
4   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5   from ProdCommon.Storage.SEAPI.SBinterface import *
6   from ProdCommon.Storage.SEAPI.Exceptions import *
# Line 29 | Line 28 | class cmscp:
28          #set default
29          self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
30                             "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31 <        self.debug = 0  
32 <
31 >        self.debug = 0
32 >
33          self.params.update( args )
34  
35          return
# Line 40 | Line 39 | class cmscp:
39          check command line parameter
40          """
41  
42 <        if 'help' in self.params.keys(): HelpOptions()        
43 <        if 'debug' in self.params.keys(): self.debug = 1        
42 >        if 'help' in self.params.keys(): HelpOptions()
43 >        if 'debug' in self.params.keys(): self.debug = 1
44  
45          # source and dest cannot be undefined at same time
46          if not self.params['source']  and not self.params['destination'] :
# Line 55 | Line 54 | class cmscp:
54          if not self.params['inputFileList'] : HelpOptions()
55          else:
56              file_to_copy=[]
57 <            if self.params['inputFileList'].find(','):
57 >            if self.params['inputFileList'].find(','):
58                  [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59              else:
60                  file_to_copy.append(self.params['inputFileList'])
# Line 80 | Line 79 | class cmscp:
79             self.finalReport(results)
80          # Local interaction with SE
81          else:
82 <           results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.protocols['option'] )
82 >           results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'] )
83             return results
84  
85      def setProtocol( self, middleware ):
# Line 89 | Line 88 | class cmscp:
88          which depend on scheduler
89          """
90          # default To be used with "middleware"
91 <        lcgOpt={'srmv1':'-b -D srmv1 --vo cms -t 2400 --verbose',
92 <                'srmv2':'-b -D srmv2 --vo cms -t 2400 --verbose'}
93 <        srmOpt={'srmv1':'-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
94 <                'srmv2':'-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -srm_protocol_version 2 '}
91 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
92 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
93 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
94 >                'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
95          rfioOpt=''
96  
97 <        if middleware.lower() in ['osg','lcg']:
97 >        supported_protocol = None
98 >        if middleware.lower() in ['osg','lcg','condor']:
99              supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100                                    (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101          elif middleware.lower() in ['lsf','caf']:
102 <            supported_protocol = [('rfio',rfioOpt)]
102 >            supported_protocol = [('rfio',rfioOpt)]
103          else:
104              ## here we can add support for any kind of protocol,
105              ## maybe some local schedulers need something dedicated
# Line 110 | Line 110 | class cmscp:
110          """
111          Implement the logic for remote stage out
112          """
113        count=0
113          results={}
114          for prot, opt in self.setProtocol( middleware ):
115              if self.debug: print 'Trying stage out with %s utils \n'%prot
# Line 118 | Line 117 | class cmscp:
117              list_retry = []
118              list_existing = []
119              list_ok = []
120 <            for file, dict in copy_results.iteritems():
121 <                er_code = dict['erCode']
123 <                if er_code == '60307': list_retry.append( file )
124 <                elif er_code == '60303': list_existing.append( file )
125 <                else:
126 <                    list_ok.append(file)
127 <                    reason = 'Copy succedeed with %s utils'%prot
128 <                    upDict = self.updateReport(file, er_code, reason)
129 <                    copy_results.update(upDict)
130 <            results.update(copy_results)
131 <            if len(list_ok) != 0:
132 <                msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
133 <                if self.debug : print msg
134 <            if len(list_ok) == len(list_files) :
135 <                break
120 >            if copy_results.keys() == '':
121 >                results.update(copy_results)
122              else:
123 <                if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
124 <                if len(list_retry): list_files = list_retry
125 <                else: break
126 <            count =+1
123 >                for file, dict in copy_results.iteritems():
124 >                    er_code = dict['erCode']
125 >                    if er_code == '0':
126 >                        list_ok.append(file)
127 >                        reason = 'Copy succedeed with %s utils'%prot
128 >                        upDict = self.updateReport(file, er_code, reason)
129 >                        copy_results.update(upDict)
130 >                    elif er_code == '60303': list_existing.append( file )
131 >                    else: list_retry.append( file )
132 >                results.update(copy_results)
133 >                if len(list_ok) != 0:
134 >                    msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135 >                    if self.debug : print msg
136 >                if len(list_ok) == len(list_files) :
137 >                    break
138 >                else:
139 >                    if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
140 >                    if len(list_retry): list_files = list_retry
141 >                    else: break
142  
143          #### TODO Daniele
144          #check is something fails and created related dict
# Line 155 | Line 156 | class cmscp:
156          """
157          Instantiate storage interface
158          """
159 <        source_prot = protocol
160 <        dest_prot = protocol
161 <        if not self.params['source'] : source_prot = 'local'
162 <        Source_SE  = self.storageInterface( self.params['source'], source_prot )
163 <        if not self.params['destination'] : dest_prot = 'local'
164 <        Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
159 >        self.source_prot = protocol
160 >        self.dest_prot = protocol
161 >        if not self.params['source'] : self.source_prot = 'local'
162 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
163 >        if not self.params['destination'] : self.dest_prot = 'local'
164 >        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
165  
166          if self.debug :
167 <            print '(source=%s,  protocol=%s)'%(self.params['source'], source_prot)
168 <            print '(destination=%s,  protocol=%s)'%(self.params['destination'], dest_prot)
167 >            print '(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
168 >            print '(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
169  
170          return Source_SE, Destination_SE
171  
# Line 174 | Line 175 | class cmscp:
175          """
176          if self.debug :
177              print 'copy(): using %s protocol'%protocol
178 <        Source_SE, Destination_SE = self.initializeApi( protocol )
178 >        try:
179 >            Source_SE, Destination_SE = self.initializeApi( protocol )
180 >        except Exception, ex:
181 >            return self.updateReport('', '-1', str(ex))
182  
183          # create remote dir
184          if protocol in ['gridftp','rfio']:
185 <            self.createDir( Destination_SE, protocol )
185 >            try:
186 >                self.createDir( Destination_SE, protocol )
187 >            except Exception, ex:
188 >                return self.updateReport('', '60316', str(ex))
189  
190          ## prepare for real copy  ##
191          try :
192              sbi = SBinterface( Source_SE, Destination_SE )
193              sbi_dest = SBinterface(Destination_SE)
194 +            sbi_source = SBinterface(Source_SE)
195          except ProtocolMismatch, ex:
196              msg = str(ex)+'\n'
197              msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
198 <            raise msg
198 >            return self.updateReport('', '-1', str(ex))
199  
200          results = {}
201          ## loop over the complete list of files
202          for filetocopy in list_file:
203              if self.debug : print 'start real copy for %s'%filetocopy
204 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
204 >            try :
205 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy )
206 >            except Exception, ex:
207 >                ErCode = -1
208 >                msg = str(ex)  
209              if ErCode == '0':
210 <                ErCode, msg = self.makeCopy( sbi, filetocopy , options )
210 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
211              if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
212              results.update( self.updateReport(filetocopy, ErCode, msg))
213          return results
# Line 211 | Line 223 | class cmscp:
223              msg = ''
224              if self.debug : msg = str(ex)+'\n'
225              msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
226 <            raise msg
226 >            raise Exception(msg)
227  
228          return interface
229  
# Line 220 | Line 232 | class cmscp:
232          Create remote dir for gsiftp REALLY TEMPORARY
233          this should be transparent at SE API level.
234          """
235 <        ErCode = '0'
224 <        msg = ''
235 >        msg = ''
236          try:
237              action = SBinterface( Destination_SE )
238              action.createDir()
239 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
239 >            if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
240          except TransferException, ex:
241              msg = str(ex)
242              if self.debug :
243                  msg += str(ex.detail)+'\n'
244                  msg += str(ex.output)+'\n'
245 <            msg_rep = "ERROR: problem with the directory creation using %s protocol \n"%protocol
246 <            ErCode = '60316'
236 <            res = self.updateReport('', ErCode, msg_rep )
237 <            self.finalReport( res )
238 <            raise msg
245 >            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
246 >            raise Exceptions(msg)
247          except OperationException, ex:
248              msg = str(ex)
249              if self.debug : msg += str(ex.detail)+'\n'
250 <            print msg
250 >            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
251  
252 <        return ErCode, msg
252 >        return msg
253  
254 <    def checkFileExist( self, sbi, filetocopy ):
254 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy ):
255          """
256 <        Check if file to copy already exist
256 >        Check both if source file exist AND
257 >        if destination file ALREADY exist.
258          """
259          ErCode = '0'
260          msg = ''
261 +        f_tocopy=filetocopy
262 +        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
263          try:
264 <            check = sbi.checkExists(filetocopy)
264 >            checkSource = sbi_source.checkExists( f_tocopy )
265          except OperationException, ex:
266              msg = str(ex)
267              if self.debug :
268                  msg += str(ex.detail)+'\n'
269                  msg += str(ex.output)+'\n'
270 <            msg +='problems checkig if file already exist'
271 <            raise msg
270 >            msg +='ERROR: problems checkig if source file %s exist'%filetocopy
271 >            raise Exception(msg)
272          except WrongOption, ex:
273              msg = str(ex)
274              if self.debug :
275                  msg += str(ex.detail)+'\n'
276                  msg += str(ex.output)+'\n'
277 <            raise msg
278 <        if check :    
277 >            msg +='ERROR problems checkig if source file % exist'%filetocopy
278 >            raise Exception(msg)
279 >        if not checkSource :
280 >            ErCode = '60302'
281 >            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
282 >            return ErCode, msg
283 >
284 >        f_tocopy=filetocopy
285 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
286 >        try:
287 >            check = sbi_dest.checkExists( f_tocopy )
288 >        except OperationException, ex:
289 >            msg = str(ex)
290 >            if self.debug :
291 >                msg += str(ex.detail)+'\n'
292 >                msg += str(ex.output)+'\n'
293 >            msg +='ERROR: problems checkig if file %s already exist'%filetocopy
294 >            raise Exception(msg)
295 >        except WrongOption, ex:
296 >            msg = str(ex)
297 >            if self.debug :
298 >                msg += str(ex.detail)+'\n'
299 >                msg += str(ex.output)+'\n'
300 >            msg +='ERROR problems checkig if file % already exist'%filetocopy
301 >            raise Exception(msg)
302 >        if check :
303              ErCode = '60303'
304 <            msg = "file %s already exist"%filetocopy
304 >            msg = "file %s already exist"%os.path.basename(filetocopy)
305  
306 <        return ErCode,msg
306 >        return ErCode, msg
307  
308 <    def makeCopy(self, sbi, filetocopy, option ):
308 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
309          """
310          call the copy API.
311          """
# Line 302 | Line 337 | class cmscp:
337              if self.debug :
338                  msg += str(ex.detail)+'\n'
339                  msg += str(ex.output)+'\n'
340 <            raise msg
341 <
342 <         ## TO BE IMPLEMENTED if NEEDED
343 <         ## NOTE: SE API Already available
344 <    #    if self.protocol.find('srm')  : self.checkSize( sbi, filetocopy )
345 <          
340 >            msg += "Problem copying %s file" % filetocopy
341 >            ErCode = '60307'
342 >        if ErCode == '0' and protocol.find('srm') == 0:
343 >            remote_file_size = -1
344 >            local_file_size = os.path.getsize( source_file )
345 >            try:
346 >                remote_file_size = sbi_dest.getSize( dest_file )
347 >            except TransferException, ex:
348 >                msg = str(ex)
349 >                if self.debug :
350 >                    msg += str(ex.detail)+'\n'
351 >                    msg += str(ex.output)+'\n'
352 >                msg += "Problem checking the size of %s file" % filetocopy
353 >                ErCode = '60307'
354 >            except WrongOption, ex:
355 >                msg = str(ex)
356 >                if self.debug :
357 >                    msg += str(ex.detail)+'\n'
358 >                    msg += str(ex.output)+'\n'
359 >                msg += "Problem checking the size of %s file" % filetocopy
360 >                ErCode = '60307'
361 >            if local_file_size != remote_file_size:
362 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
363 >                ErCode = '60307'
364 >
365          return ErCode, msg
366  
367      def backup(self):
# Line 339 | Line 393 | class cmscp:
393          outFile = open('cmscpReport.sh',"a")
394          cmscp_exit_status = 0
395          txt = ''
396 <        for file, dict in results.iteritems():
397 <            if file:
396 >        for file, dict in results.iteritems():
397 >            if file:
398                  if dict['lfn']=='':
399                      lfn = '$LFNBaseName/'+os.path.basename(file)
400                      se  = '$SE'
401                  else:
402 <                    lfn = dict['lfn']+os.pat.basename(file)
402 >                    lfn = dict['lfn']+os.path.basename(file)
403                      se = dict['se']
404                  #dict['lfn'] # to be implemented
405                  txt +=  'echo "Report for File: '+file+'"\n'
# Line 379 | Line 433 | def usage():
433  
434      optional parameters
435      """
436 <    print msg
436 >    print msg
437  
438 <    return
438 >    return
439  
440   def HelpOptions(opts=[]):
441      """
442      Check otps, print help if needed
443 <    prepare dict = { opt : value }  
443 >    prepare dict = { opt : value }
444      """
445      dict_args = {}
446      if len(opts):
447          for opt, arg in opts:
448 <            dict_args[opt.split('--')[1]] = arg
448 >            dict_args[opt.split('--')[1]] = arg
449              if opt in ('-h','-help','--help') :
450                  usage()
451                  sys.exit(0)
# Line 402 | Line 456 | def HelpOptions(opts=[]):
456  
457   if __name__ == '__main__' :
458  
459 <    import getopt
459 >    import getopt
460  
461      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
462                    "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
463 <    try:    
464 <        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
463 >    try:
464 >        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
465      except getopt.GetoptError, err:
466          print err
467          HelpOptions()
468          sys.exit(2)
469 <
469 >
470      dictArgs = HelpOptions(opts)
471      try:
472          cmscp_ = cmscp(dictArgs)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines