ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.56.2.3 by spiga, Tue Sep 29 16:16:12 2009 UTC vs.
Revision 1.93 by fanzago, Thu Sep 13 11:14:31 2012 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2   import sys, os
3 + import time, random
4 + try:
5 +    import json
6 + except:    
7 +    import simplejson as json
8   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
9   from ProdCommon.Storage.SEAPI.SBinterface import *
10   from ProdCommon.Storage.SEAPI.Exceptions import *
# Line 17 | Line 22 | class cmscp:
22             $3 if needed: file name (the output file name)
23             $5 remote SE (complete endpoint)
24             $6 srm version
25 <           --lfn $LFNBaseName
25 >           --for_lfn $LFNBaseName
26          output:
27               return 0 if all ok
28               return 60307 if srmcp failed
29               return 60303 if file already exists in the SE
30          """
26
27        #set default
31          self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
32 <                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
32 >                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'', "surl_for_grid":''}
33          self.debug = 0
34 +        #### for fallback copy
35          self.local_stage = 0
36          self.params.update( args )
37 +        ## timeout needed for subprocess command of SEAPI
38 +        ## they should be a bit higher then the corresponding passed by command line  
39 +        ## default values
40 +        self.subprocesstimeout = { \
41 +                                   'copy':   3600, \
42 +                                   'exists': 1200, \
43 +                                   'delete': 1200, \
44 +                                   'size':   1200 \
45 +                                 }
46  
47          return
48  
# Line 59 | Line 72 | class cmscp:
72                  file_to_copy.append(self.params['inputFileList'])
73              self.params['inputFileList'] = file_to_copy
74  
75 <        if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
75 >        if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
76          
64        ## TO DO:
65        #### add check for outFiles
66        #### add map {'inFileNAME':'outFileNAME'} to change out name
67
77  
78      def run( self ):
79          """
# Line 76 | Line 85 | class cmscp:
85          # stage out from WN
86          if self.params['middleware'] :
87              results = self.stager(self.params['middleware'],self.params['inputFileList'])
88 +            self.writeJsonFile(results)
89              self.finalReport(results)
90          # Local interaction with SE
91          else:
92              results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
93 +            self.writeJsonFile(results)
94              return results
95 +            
96 +    def writeJsonFile( self, results ):
97 +        """
98 +        write a json file containing copy results for each file
99 +        """
100 +        if self.debug:
101 +            print 'in writeJsonFile() : \n'
102 +            print "---->>>> in writeJsonFile results =  ", results
103 +        jsonOut = "resultCopyFile"
104 +        if os.getenv("RUNTIME_AREA"):
105 +            jsonOut = "%s/resultCopyFile"%os.getenv("RUNTIME_AREA")
106 +        fp = open(jsonOut, 'w')
107 +        json.dump(results, fp)
108 +        fp.close()
109 +        if self.debug:
110 +            print '    reading resultCopyFile : \n'
111 +            lp = open(jsonOut, "r")
112 +            inputDict = json.load(lp)
113 +            lp.close()
114 +            print "    inputDict = ", inputDict
115 +        return
116  
117      def checkLcgUtils( self ):
118          """
# Line 111 | Line 143 | class cmscp:
143          if self.debug:
144              print 'setProtocol() :\n'
145              print '\tmiddleware =  %s utils \n'%middleware
146 <        
146 >
147          lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
148                  'srmv2':'-b -D srmv2  -t 2400 --verbose'}
149          if self.checkLcgUtils() >= 17:
150 <            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 240 --connect-timeout 240 --verbose',
151 <                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 240 --connect-timeout 240 --verbose'}
150 >            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
151 >                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
152  
153          srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
154 <                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 '}
154 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
155          rfioOpt=''
156 +        #### FEDE FOR XROOTD #########
157 +        xrootdOpt=''
158 +        #############################
159  
160          supported_protocol = None
161          if middleware.lower() in ['osg','lcg','condor','sge']:
162 <            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
163 <                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
164 <        elif middleware.lower() in ['lsf','caf']:
162 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']])]#,\
163 >                               #  (self.params['srm_version'],srmOpt[self.params['srm_version']])]
164 >        #elif middleware.lower() in ['lsf','caf']:
165 >        elif middleware.lower() in ['lsf']:
166              supported_protocol = [('rfio',rfioOpt)]
167 +        elif middleware.lower() in ['pbs']:
168 +            supported_protocol = [('rfio',rfioOpt),('local','')]
169          elif middleware.lower() in ['arc']:
170              supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
171 +        #### FEDE FOR XROOTD ##########
172 +        elif middleware.lower() in ['caf']:
173 +            if self.params['protocol']:
174 +                supported_protocol = [(self.params['protocol'], '')]
175 +            else:
176 +                supported_protocol = [('rfio',rfioOpt)]
177 +            #######################################    
178          else:
179              ## here we can add support for any kind of protocol,
180              ## maybe some local schedulers need something dedicated
# Line 137 | Line 182 | class cmscp:
182          return supported_protocol
183  
184  
185 <    def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
185 >    def checkCopy (self, copy_results, len_list_files, prot):
186          """
187          Checks the status of copy and update result dictionary
188          """
189 +        
190          list_retry = []
145        list_retry_localSE = []
191          list_not_existing = []
192 +        list_already_existing = []
193 +        list_fallback = []
194          list_ok = []
195          
196          if self.debug:
197              print 'in checkCopy() :\n'
198 +        
199          for file, dict in copy_results.iteritems():
200              er_code = dict['erCode']
201              if er_code == '0':
202                  list_ok.append(file)
203                  reason = 'Copy succedeed with %s utils'%prot
204                  dict['reason'] = reason
205 +            elif er_code == '60308':
206 +                list_fallback.append( file )
207 +                reason = 'Copy succedeed with %s utils'%prot
208 +                dict['reason'] = reason
209              elif er_code == '60302':
210                  list_not_existing.append( file )
211 <            elif er_code == '10041':
211 >            elif er_code == '60303':
212 >                list_already_existing.append( file )
213 >            else :    
214                  list_retry.append( file )
215 <            ## WHAT TO DO IN GENERAL FAILURE CONDITION
162 <            else:
163 <                list_retry_localSE.append( file )
164 <                
215 >          
216              if self.debug:
217                  print "\t file %s \n"%file
218                  print "\t dict['erCode'] %s \n"%dict['erCode']
219                  print "\t dict['reason'] %s \n"%dict['reason']
220                  
221 <            if (lfn != '') and (se != ''):
171 <                upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
172 <            else:
173 <                upDict = self.updateReport(file, er_code, dict['reason'])
221 >            upDict = self.updateReport(file, er_code, dict['reason'])
222  
223              copy_results.update(upDict)
224          
# Line 178 | Line 226 | class cmscp:
226          if len(list_ok) != 0:
227              msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
228          if len(list_ok) != len_list_files :
229 <            msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
230 <            msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
229 >            if len(list_fallback)!=0:
230 >                msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
231 >            if len(list_retry)!=0:
232 >                msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
233 >            if len(list_not_existing)!=0:
234 >                msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
235 >            if len(list_already_existing)!=0:
236 >                msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
237          if self.debug : print msg
238 +        return copy_results, list_ok, list_retry, list_fallback
239          
240 <        return copy_results, list_ok, list_retry, list_retry_localSE
240 >    def check_for_retry_localSE (self, copy_results):
241 >        """
242 >        Checks the status of copy and create the list of file to copy to CloseSE
243 >        """
244 >        list_retry_localSE = []
245 >
246 >        if self.debug:
247 >            print 'in check_for_retry_localSE() :\n'
248 >            print "\t results in check local = ", copy_results
249 >        for file, dict in copy_results.iteritems():
250 >            er_code = dict['erCode']
251 >            if er_code != '0' and  er_code != '60302' and er_code != '60308':
252 >                list_retry_localSE.append( file )
253 >                
254 >            if self.debug:
255 >                print "\t file %s \n"%file
256 >                print "\t dict['erCode'] %s \n"%dict['erCode']
257 >                print "\t dict['reason'] %s \n"%dict['reason']
258 >                
259 >        return list_retry_localSE
260 >
261          
262      def LocalCopy(self, list_retry, results):
263          """
# Line 193 | Line 268 | class cmscp:
268              print '\t list_retry %s utils \n'%list_retry
269              print '\t len(list_retry) %s \n'%len(list_retry)
270                  
271 <        list_files = list_retry  
272 <        self.params['inputFilesList']=list_files
273 <        
271 >        list_files = list_retry  
272 >        self.params['inputFileList']=list_files
273 >
274          ### copy backup
275          from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
276          siteCfg = loadSiteLocalConfig()
202        seName = siteCfg.localStageOut.get("se-name", None)
277          catalog = siteCfg.localStageOut.get("catalog", None)
204        implName = siteCfg.localStageOut.get("command", None)
205        if (implName == 'srm'):
206           implName='srmv1'
207           self.params['srm_version']=implName
208        ##### to be improved ###############
209        if (implName == 'rfcp'):
210            self.params['middleware']='lsf'
211        ####################################    
212                  
213        self.params['protocol']=implName
278          tfc = siteCfg.trivialFileCatalog()
279 <            
279 >        seName = siteCfg.localStageOutSEName()
280 >        if seName is None:
281 >            seName = ""
282 >        option = siteCfg.localStageOutOption()
283 >        if option is None:
284 >            option = ""
285 >        implName = siteCfg.localStageOutCommand()
286 >        if implName is None:
287 >            implName = ""
288 >
289 >        if (implName == 'srm'):
290 >           protocol = 'srmv2'
291 >        elif (implName == 'srmv2-lcg'):
292 >           protocol = 'srm-lcg'
293 >           option = option + ' -b -D srmv2 '
294 >        elif (implName == 'rfcp-CERN'):
295 >           protocol = 'rfio'
296 >        elif (implName == 'rfcp'):
297 >           protocol = 'rfio'
298 >        elif (implName == 'cp'):
299 >           protocol = 'local'
300 >        else: protocol = implName
301 >        
302 >        self.params['protocol']=protocol
303 >        self.params['option']=option
304 >
305          if self.debug:
306              print '\t siteCFG %s \n'%siteCfg
218            print '\t seName %s \n'%seName
307              print '\t catalog %s \n'%catalog
220            print "\t self.params['protocol'] %s \n"%self.params['protocol']            
308              print '\t tfc %s '%tfc
309 <            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
309 >            print '\t fallback seName %s \n'%seName
310 >            print "\t fallback protocol %s \n"%protocol            
311 >            print "\t fallback option %s \n"%option
312 >            print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
313                  
314 +        if (str(self.params['for_lfn']).find("/store/") == 0):
315 +            temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
316 +            self.params['for_lfn']= temp
317 +        
318 +        if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
319 +            
320          file_backup=[]
321 <        for input in self.params['inputFilesList']:
322 <            file = self.params['lfn'] + os.path.basename(input)
321 >        file_backup_surlgrid=[]
322 >        for input in self.params['inputFileList']:
323 >            file = self.params['for_lfn'] + os.path.basename(input)
324              surl = tfc.matchLFN(tfc.preferredProtocol, file)
325 +            
326 +            ###### FEDE TEST_FOR_SURL_GRID
327 +            surl_for_grid = tfc.matchLFN('srmv2', file)
328 +            if (surl_for_grid == None):
329 +                surl_for_grid = tfc.matchLFN('srmv2-lcg', file)
330 +                if (surl_for_grid == None):
331 +                    surl_for_grid = tfc.matchLFN('srm', file)
332 +            if surl_for_grid:
333 +                file_backup_surlgrid.append(surl_for_grid)
334 +            ######
335 +
336              file_backup.append(surl)
337              if self.debug:
338 <                print '\t lfn %s \n'%self.params['lfn']
338 >                print '\t for_lfn %s \n'%self.params['for_lfn']
339                  print '\t file %s \n'%file
340                  print '\t surl %s \n'%surl
341                      
342          destination=os.path.dirname(file_backup[0])
235        ### FEDE added check for final /
343          if ( destination[-1] != '/' ) : destination = destination + '/'
237        #####################################
344          self.params['destination']=destination
345 <            
345 >
346 >        self.params['se_name']=seName
347 >
348 >        ######
349 >        if (len(file_backup_surlgrid)>0) :
350 >            surl_for_grid=os.path.dirname(file_backup_surlgrid[0])
351 >            if ( surl_for_grid[-1] != '/' ) : surl_for_grid = surl_for_grid + '/'
352 >        else:
353 >            surl_for_grid=''
354 >
355 >        print "surl_for_grid = ", surl_for_grid    
356 >        self.params['surl_for_grid']=surl_for_grid
357 >        #####
358 >
359          if self.debug:
360 <            print "\t self.params['destination']%s \n"%self.params['destination']
361 <            print "\t self.params['protocol'] %s \n"%self.params['protocol']
362 <            print "\t self.params['option']%s \n"%self.params['option']
363 <              
364 <        for prot, opt in self.setProtocol( self.params['middleware'] ):
365 <            if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
366 <            localCopy_results = self.copy( self.params['inputFileList'], prot, opt )
367 <            if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
368 <                results.update(localCopy_results)
369 <            else:
370 <                localCopy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
371 <                results.update(localCopy_results)
372 <                if len(list_ok) == len(list_files) :
373 <                    break
374 <                if len(list_retry):
256 <                    list_files = list_retry
257 <                else: break
258 <            if self.debug:
259 <                print "\t localCopy_results = %s \n"%localCopy_results
260 <        
360 >            print '\tIn LocalCopy trying the stage out with: \n'
361 >            print "\tself.params['destination'] %s \n"%self.params['destination']
362 >            print "\tself.params['protocol'] %s \n"%self.params['protocol']
363 >            print "\tself.params['option'] %s \n"%self.params['option']
364 >
365 >        localCopy_results = self.copy( self.params['inputFileList'], self.params['protocol'], self.params['option'], backup='yes' )
366 >          
367 >        if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
368 >            results.update(localCopy_results)
369 >        else:
370 >            localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), self.params['protocol'])
371 >                
372 >            results.update(localCopy_results)
373 >        if self.debug:
374 >            print "\t localCopy_results = %s \n"%localCopy_results
375          return results        
376  
377      def stager( self, middleware, list_files ):
# Line 272 | Line 386 | class cmscp:
386          
387          results={}
388          for prot, opt in self.setProtocol( middleware ):
389 <            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
389 >            if self.debug:
390 >                print '\tTrying the stage out with %s utils \n'%prot
391 >                print '\tand options %s\n'%opt
392 >                
393              copy_results = self.copy( list_files, prot, opt )
394              if copy_results.keys() == [''] or copy_results.keys() == '' :
395                  results.update(copy_results)
396              else:
397 <                copy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(copy_results, len(list_files), prot)
397 >                copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
398                  results.update(copy_results)
399                  if len(list_ok) == len(list_files) :
400                      break
401 <                if len(list_retry):
401 >                if len(list_retry):
402                      list_files = list_retry
403 +                    #### FEDE added ramdom time before the retry copy with other protocol
404 +                    sec =  240 * random.random()
405 +                    sec = sec + 60
406 +                    time.sleep(sec)
407                  else: break
408 <                
408 >
409          if self.local_stage:
410 +            list_retry_localSE = self.check_for_retry_localSE(results)
411              if len(list_retry_localSE):
412 +                if self.debug:
413 +                    print "\t list_retry_localSE %s \n"%list_retry_localSE
414                  results = self.LocalCopy(list_retry_localSE, results)
415 <            
415 >
416          if self.debug:
417              print "\t results %s \n"%results
418          return results
# Line 302 | Line 426 | class cmscp:
426          self.dest_prot = protocol
427          if not self.params['source'] : self.source_prot = 'local'
428          Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
429 <        if not self.params['destination'] : self.dest_prot = 'local'
430 <        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
429 >        if not self.params['destination'] :
430 >            self.dest_prot = 'local'
431 >            Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
432 >        else:    
433 >            Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
434  
435          if self.debug :
436              msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
437              msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
438 +            msg += '\t(destinationDir=%s,  protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
439              print msg
440  
441          return Source_SE, Destination_SE
442  
443 <    def copy( self, list_file, protocol, options ):
443 >    def copy( self, list_file, protocol, options, backup='no' ):
444          """
445          Make the real file copy using SE API
446          """
447          msg = ""
448 +        results = {}
449          if self.debug :
450              msg  = 'copy() :\n'
451              msg += '\tusing %s protocol\n'%protocol
452 +            msg += '\tusing %s options\n'%options
453 +            msg += '\tlist_file %s\n'%list_file
454              print msg
455          try:
456              Source_SE, Destination_SE = self.initializeApi( protocol )
457          except Exception, ex:
458 <            return self.updateReport('', '-1', str(ex))
458 >            for filetocopy in list_file:
459 >                results.update( self.updateReport(filetocopy, '-1', str(ex)))
460 >            return results
461  
462 <        # create remote dir
463 <        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
462 >        self.hostname = Destination_SE.hostname
463 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop','local']:
464              try:
465                  self.createDir( Destination_SE, Destination_SE.protocol )
466              except OperationException, ex:
467 <                return self.updateReport('', '60316', str(ex))
467 >                for filetocopy in list_file:
468 >                    results.update( self.updateReport(filetocopy, '60316', str(ex)))
469 >                return results
470              ## when the client commands are not found (wrong env or really missing)
471              except MissingCommand, ex:
472                  msg = "ERROR %s %s" %(str(ex), str(ex.detail))
473 <                return self.updateReport('', '10041', msg)
474 <
473 >                for filetocopy in list_file:
474 >                    results.update( self.updateReport(filetocopy, '10041', msg))
475 >                return results
476 >            except Exception, ex:
477 >                msg = "ERROR %s" %(str(ex))
478 >                for filetocopy in list_file:
479 >                    results.update( self.updateReport(filetocopy, '-1', msg))
480 >                return results
481          ## prepare for real copy  ##
482          try :
483              sbi = SBinterface( Source_SE, Destination_SE )
# Line 345 | Line 486 | class cmscp:
486          except ProtocolMismatch, ex:
487              msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
488              msg += str(ex)
489 <            return self.updateReport('', '-1', msg)
489 >            for filetocopy in list_file:
490 >                results.update( self.updateReport(filetocopy, '-1', msg))
491 >            return results
492  
493 <        results = {}
493 >        
494          ## loop over the complete list of files
495          for filetocopy in list_file:
496              if self.debug : print '\tStart real copy for %s'%filetocopy
# Line 358 | Line 501 | class cmscp:
501                  msg = str(ex)  
502              if ErCode == '0':
503                  ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
504 +                if (ErCode == '0') and (backup == 'yes'):
505 +                    ErCode = '60308'
506              if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
507              results.update( self.updateReport(filetocopy, ErCode, msg))
508          return results
# Line 408 | Line 553 | class cmscp:
553              raise Exception(msg)
554          except AlreadyExistsException, ex:
555              if self.debug: print "\tThe directory already exist"
556 <            pass            
556 >            pass
557 >        except Exception, ex:    
558 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
559 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
560 >            raise Exception(msg)
561          return msg
562  
563      def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
# Line 420 | Line 569 | class cmscp:
569          ErCode = '0'
570          msg = ''
571          f_tocopy=filetocopy
572 <        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
572 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
573          try:
574 <            checkSource = sbi_source.checkExists( f_tocopy , opt=option )
575 <            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
574 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
575 >            if self.debug : print '\tCheck for local file %s existance executed \n'%f_tocopy  
576          except OperationException, ex:
577 <            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
577 >            msg  ='ERROR: problems checking source file %s existance'%filetocopy
578              msg += str(ex)
579              if self.debug :
580                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
# Line 433 | Line 582 | class cmscp:
582                  print dbgmsg
583              raise Exception(msg)
584          except WrongOption, ex:
585 <            msg  ='ERROR problems checkig if source file % exist'%filetocopy
585 >            msg  ='ERROR: problems checking source file %s existance'%filetocopy
586              msg += str(ex)
587              if self.debug :
588                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
# Line 441 | Line 590 | class cmscp:
590                  print dbgmsg
591              raise Exception(msg)
592          except MissingDestination, ex:
593 <            msg  ='ERROR problems checkig if source file % exist'%filetocopy
593 >            msg  ='ERROR: problems checking source file %s existance'%filetocopy
594              msg += str(ex)
595              if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
596              raise Exception(msg)
# Line 452 | Line 601 | class cmscp:
601              return ErCode, msg
602          if not checkSource :
603              ErCode = '60302'
604 <            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
604 >            msg = "ERROR file %s does not exist"%os.path.basename(filetocopy)
605              return ErCode, msg
606 <        f_tocopy=filetocopy
458 <        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
606 >        f_tocopy=os.path.basename(filetocopy)
607          try:
608 <            check = sbi_dest.checkExists( f_tocopy, opt=option )
609 <            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
608 >            check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
609 >            if self.debug : print '\tCheck for remote file %s existance executed \n'%f_tocopy  
610 >            if self.debug : print '\twith exit code = %s \n'%check  
611          except OperationException, ex:
612 <            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
612 >            msg  = 'ERROR: problems checking if file %s already exist'%filetocopy
613              msg += str(ex)
614              if self.debug :
615                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
# Line 468 | Line 617 | class cmscp:
617                  print dbgmsg
618              raise Exception(msg)
619          except WrongOption, ex:
620 <            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
620 >            msg  = 'ERROR problems checking if file % already exists'%filetocopy
621              msg += str(ex)
622              if self.debug :
623                  msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
624                  msg += '\t'+str(ex.output)+'\n'
625              raise Exception(msg)
626          except MissingDestination, ex:
627 <            msg  ='ERROR problems checkig if source file % exist'%filetocopy
627 >            msg  ='ERROR problems checking if destination file % exists'%filetocopy
628              msg += str(ex)
629              if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
630              raise Exception(msg)
# Line 510 | Line 659 | class cmscp:
659          ErCode = '0'
660          msg = ''
661  
662 <        if  self.params['option'].find('space_token'):
663 <            space_tocken=self.params['option'].split('=')[1]
664 <            if protocol == 'srmv2': option = '%s -space_tocken=%s'%(option,space_tocken)
665 <            if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_tocken)
662 >        copy_option = option
663 >        if  self.params['option'].find('space_token')>=0:
664 >            space_token=self.params['option'].split('=')[1]
665 >            if protocol == 'srmv2': copy_option = '%s -space_token=%s'%(option,space_token)
666 >            if protocol == 'srm-lcg': copy_option = '%s -S %s'%(option,space_token)
667          try:
668 <            sbi.copy( source_file , dest_file , opt = option)
668 >            sbi.copy( source_file , dest_file , opt = copy_option, tout = self.subprocesstimeout['copy'])
669          except TransferException, ex:
670              msg  = "Problem copying %s file" % filetocopy
671              msg += str(ex)
# Line 530 | Line 680 | class cmscp:
680              if self.debug :
681                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
682                  dbgmsg += '\t'+str(ex.output)+'\n'
683 <                print dbgmsg
683 >                print dbgmsg
684 >            ### FEDE added for savannah 97460###    
685 >            ErCode = '60307'
686 >            ####################################
687          except SizeZeroException, ex:
688              msg  = "Problem copying %s file" % filetocopy
689              msg += str(ex)
# Line 556 | Line 709 | class cmscp:
709                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
710                  dbgmsg += '\t'+str(ex.output)+'\n'
711                  print dbgmsg
712 +        except SEAPITimeout, ex:
713 +            ErCode = '60317'
714 +            msg  = "Problem copying %s file" % filetocopy
715 +            msg += str(ex)
716 +            if self.debug :
717 +                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
718 +                dbgmsg += '\t'+str(ex.output)+'\n'
719 +                print dbgmsg
720 +
721          if ErCode == '0' and protocol.find('srmv') == 0:
722              remote_file_size = -1
723              local_file_size = os.path.getsize( source_file )
724              try:
725 <                remote_file_size = sbi_dest.getSize( dest_file, opt=option )
725 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
726                  if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
727              except TransferException, ex:
728                  msg  = "Problem checking the size of %s file" % filetocopy
# Line 596 | Line 758 | class cmscp:
758          f_tocopy=filetocopy
759          if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
760          try:
761 <            sbi_dest.delete( f_tocopy, opt=option )
761 >            sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
762              if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
763          except OperationException, ex:
764              msg  ='ERROR: problems removing partially staged file %s'%filetocopy
# Line 609 | Line 771 | class cmscp:
771  
772          return
773  
774 <    def updateReport(self, file, erCode, reason, lfn='', se='' ):
774 >    def updateReport(self, file, erCode, reason):
775          """
776          Update the final stage out infos
777          """
778          jobStageInfo={}
779          jobStageInfo['erCode']=erCode
780          jobStageInfo['reason']=reason
781 <        jobStageInfo['lfn']=lfn
782 <        jobStageInfo['se']=se
783 <
781 >        if not self.params['for_lfn']: self.params['for_lfn']=''
782 >        if not self.params['se_name']: self.params['se_name']=''
783 >        if not self.hostname: self.hostname=''
784 >        if (erCode != '0') and (erCode != '60308'):
785 >           jobStageInfo['for_lfn']='/copy_problem/'
786 >        else:  
787 >            jobStageInfo['for_lfn']=self.params['for_lfn']
788 >        jobStageInfo['se_name']=self.params['se_name']
789 >        jobStageInfo['endpoint']=self.hostname
790 >        ### ADDING SURLFORGRID FOR COPYDATA
791 >        if not self.params['surl_for_grid']: self.params['surl_for_grid']=''
792 >        jobStageInfo['surl_for_grid']=self.params['surl_for_grid']
793 >        #####
794          report = { file : jobStageInfo}
795          return report
796  
# Line 627 | Line 799 | class cmscp:
799          It a list of LFNs for each SE where data are stored.
800          allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
801          """
802 <        outFile = open('cmscpReport.sh',"a")
802 >        
803 >        outFile = 'cmscpReport.sh'
804 >        if os.getenv("RUNTIME_AREA"):
805 >            #print "RUNTIME_AREA = ", os.getenv("RUNTIME_AREA")
806 >            outFile = "%s/cmscpReport.sh"%os.getenv("RUNTIME_AREA")
807 >            #print "--->>> outFile = ", outFile
808 >        fp = open(outFile, "w")
809 >
810          cmscp_exit_status = 0
811 <        txt = ''
811 >        txt = '#!/bin/bash\n'
812          for file, dict in results.iteritems():
813              reason = str(dict['reason'])
814              if str(reason).find("'") > -1:
815                  reason = " ".join(reason.split("'"))
816              reason="'%s'"%reason
817              if file:
818 <                if dict['lfn']=='':
819 <                    lfn = '$LFNBaseName/'+os.path.basename(file)
818 >                if dict['for_lfn']=='':
819 >                    lfn = '${LFNBaseName}'+os.path.basename(file)
820                      se  = '$SE'
821 +                    LFNBaseName = '$LFNBaseName'
822                  else:
823 <                    lfn = dict['lfn']+os.path.basename(file)
824 <                    se = dict['se']
825 <                #dict['lfn'] # to be implemented
823 >                    lfn = dict['for_lfn']+os.path.basename(file)
824 >                    se = dict['se_name']
825 >                    LFNBaseName = os.path.dirname(lfn)
826 >                    if (LFNBaseName[-1] != '/'):
827 >                        LFNBaseName = LFNBaseName + '/'
828 >
829 >                
830                  txt += 'echo "Report for File: '+file+'"\n'
831                  txt += 'echo "LFN: '+lfn+'"\n'
832                  txt += 'echo "StorageElement: '+se+'"\n'
833                  txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
834                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
651                #txt += 'export LFNBaseName='+lfn+'\n'
652                txt += 'export SE='+se+'\n'
653                ### FEDE per CopyData ####
835  
655                txt += 'export endpoint='+self.params['destination']+'\n'
836                  
837                  if dict['erCode'] != '0':
838                      cmscp_exit_status = dict['erCode']
839              else:
840                  txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
841                  cmscp_exit_status = dict['erCode']
662                cmscp_exit_status = dict['erCode']
842          txt += '\n'
843          txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
844 <        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
845 <        outFile.write(str(txt))
846 <        outFile.close()
844 >        txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
845 >        fp.write(str(txt))
846 >        fp.close()
847 >        if self.debug:
848 >            print '--- reading cmscpReport.sh: \n'
849 >            lp = open(outFile, "r")
850 >            content = lp.read()
851 >            lp.close()
852 >            print "    content = ", content
853 >            print '--- end reading cmscpReport.sh'
854          return
855  
856  
# Line 723 | Line 909 | if __name__ == '__main__' :
909  
910      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
911                    "protocol=","option=", "middleware=", "srm_version=", \
912 <                  "destinationDir=", "lfn=", "local_stage", "debug", "help"]
912 >                  "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
913      try:
914          opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
915      except getopt.GetoptError, err:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines