ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.68.2.3 by spiga, Thu Apr 22 14:40:14 2010 UTC vs.
Revision 1.94 by belforte, Wed Jan 9 14:59:47 2013 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2   import sys, os
3 + import time, random
4 + try:
5 +    import json
6 + except:    
7 +    import simplejson as json
8   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
9   from ProdCommon.Storage.SEAPI.SBinterface import *
10   from ProdCommon.Storage.SEAPI.Exceptions import *
11 + from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
12 +
13  
14  
15   class cmscp:
# Line 17 | Line 24 | class cmscp:
24             $3 if needed: file name (the output file name)
25             $5 remote SE (complete endpoint)
26             $6 srm version
27 <           --lfn $LFNBaseName
27 >           --for_lfn $LFNBaseName
28          output:
29               return 0 if all ok
30               return 60307 if srmcp failed
31               return 60303 if file already exists in the SE
32          """
26
27        #set default
33          self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
34 <                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
34 >                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'', "surl_for_grid":''}
35          self.debug = 0
36 +        #### for fallback copy
37          self.local_stage = 0
38          self.params.update( args )
39 +        ## timeout needed for subprocess command of SEAPI
40 +        ## they should be a bit higher then the corresponding passed by command line  
41 +        ## default values
42 +        self.subprocesstimeout = { \
43 +                                   'copy':   3600, \
44 +                                   'exists': 1200, \
45 +                                   'delete': 1200, \
46 +                                   'size':   1200 \
47 +                                 }
48  
49          return
50  
# Line 59 | Line 74 | class cmscp:
74                  file_to_copy.append(self.params['inputFileList'])
75              self.params['inputFileList'] = file_to_copy
76  
77 <        if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
77 >        if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
78          
64        ## TO DO:
65        #### add check for outFiles
66        #### add map {'inFileNAME':'outFileNAME'} to change out name
67
79  
80      def run( self ):
81          """
# Line 76 | Line 87 | class cmscp:
87          # stage out from WN
88          if self.params['middleware'] :
89              results = self.stager(self.params['middleware'],self.params['inputFileList'])
90 +            self.writeJsonFile(results)
91              self.finalReport(results)
92          # Local interaction with SE
93          else:
94              results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
95 +            self.writeJsonFile(results)
96              return results
97 +            
98 +    def writeJsonFile( self, results ):
99 +        """
100 +        write a json file containing copy results for each file
101 +        """
102 +        if self.debug:
103 +            print 'in writeJsonFile() : \n'
104 +            print "---->>>> in writeJsonFile results =  ", results
105 +        jsonOut = "resultCopyFile"
106 +        if os.getenv("RUNTIME_AREA"):
107 +            jsonOut = "%s/resultCopyFile"%os.getenv("RUNTIME_AREA")
108 +        fp = open(jsonOut, 'w')
109 +        json.dump(results, fp)
110 +        fp.close()
111 +        if self.debug:
112 +            print '    reading resultCopyFile : \n'
113 +            lp = open(jsonOut, "r")
114 +            inputDict = json.load(lp)
115 +            lp.close()
116 +            print "    inputDict = ", inputDict
117 +        return
118  
119      def checkLcgUtils( self ):
120          """
# Line 111 | Line 145 | class cmscp:
145          if self.debug:
146              print 'setProtocol() :\n'
147              print '\tmiddleware =  %s utils \n'%middleware
148 <        
148 >
149          lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
150                  'srmv2':'-b -D srmv2  -t 2400 --verbose'}
151          if self.checkLcgUtils() >= 17:
152 <            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose',
153 <                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose'}
152 >            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
153 >                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
154  
155          srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
156                  'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
157          rfioOpt=''
158 +        #### FEDE FOR XROOTD #########
159 +        xrootdOpt=''
160 +        #############################
161  
162          supported_protocol = None
163          if middleware.lower() in ['osg','lcg','condor','sge']:
164 <            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
165 <                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
166 <        elif middleware.lower() in ['lsf','caf']:
164 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']])]#,\
165 >                               #  (self.params['srm_version'],srmOpt[self.params['srm_version']])]
166 >        #elif middleware.lower() in ['lsf','caf']:
167 >        elif middleware.lower() in ['lsf']:
168              supported_protocol = [('rfio',rfioOpt)]
169          elif middleware.lower() in ['pbs']:
170              supported_protocol = [('rfio',rfioOpt),('local','')]
171          elif middleware.lower() in ['arc']:
172              supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
173 +        #### FEDE FOR XROOTD ##########
174 +        elif middleware.lower() in ['caf']:
175 +            if self.params['protocol']:
176 +                supported_protocol = [(self.params['protocol'], '')]
177 +            else:
178 +                supported_protocol = [('rfio',rfioOpt)]
179 +            #######################################    
180          else:
181              ## here we can add support for any kind of protocol,
182              ## maybe some local schedulers need something dedicated
183              pass
184 +        
185 +        # force lstore protocol for Vanderbilt, waiting for better
186 +        # long term solution
187 +        if os.path.exists('/usr/local/cms-stageout') and \
188 +           ( os.uname()[1].endswith('.vampire') or \
189 +             os.uname()[1].endwith('vanderbilt.edu') ):
190 +            print "*** I am at Vanderbilt. Trying lstore first ***"
191 +            supported_protocol.insert(0, ('lstore','') )
192 +            
193          return supported_protocol
194  
195  
196 <    def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
196 >    def checkCopy (self, copy_results, len_list_files, prot):
197          """
198          Checks the status of copy and update result dictionary
199          """
200 +        
201          list_retry = []
147        list_retry_localSE = []
202          list_not_existing = []
203 +        list_already_existing = []
204 +        list_fallback = []
205          list_ok = []
206          
207          if self.debug:
208              print 'in checkCopy() :\n'
209 +        
210          for file, dict in copy_results.iteritems():
211              er_code = dict['erCode']
212              if er_code == '0':
213                  list_ok.append(file)
214                  reason = 'Copy succedeed with %s utils'%prot
215                  dict['reason'] = reason
216 +            elif er_code == '60308':
217 +                list_fallback.append( file )
218 +                reason = 'Copy succedeed with %s utils'%prot
219 +                dict['reason'] = reason
220              elif er_code == '60302':
221                  list_not_existing.append( file )
222 <            elif er_code == '10041':
222 >            elif er_code == '60303':
223 >                list_already_existing.append( file )
224 >            else :    
225                  list_retry.append( file )
226 <            ## WHAT TO DO IN GENERAL FAILURE CONDITION
164 <            else:
165 <                list_retry_localSE.append( file )
166 <                
226 >          
227              if self.debug:
228                  print "\t file %s \n"%file
229                  print "\t dict['erCode'] %s \n"%dict['erCode']
230                  print "\t dict['reason'] %s \n"%dict['reason']
231                  
232 <            if (lfn != '') and (se != ''):
173 <                upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
174 <            else:
175 <                upDict = self.updateReport(file, er_code, dict['reason'])
232 >            upDict = self.updateReport(file, er_code, dict['reason'])
233  
234              copy_results.update(upDict)
235          
# Line 180 | Line 237 | class cmscp:
237          if len(list_ok) != 0:
238              msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
239          if len(list_ok) != len_list_files :
240 <            msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
241 <            msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
240 >            if len(list_fallback)!=0:
241 >                msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
242 >            if len(list_retry)!=0:
243 >                msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
244 >            if len(list_not_existing)!=0:
245 >                msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
246 >            if len(list_already_existing)!=0:
247 >                msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
248          if self.debug : print msg
249 +        return copy_results, list_ok, list_retry, list_fallback
250          
251 <        return copy_results, list_ok, list_retry, list_retry_localSE
251 >    def check_for_retry_localSE (self, copy_results):
252 >        """
253 >        Checks the status of copy and create the list of file to copy to CloseSE
254 >        """
255 >        list_retry_localSE = []
256 >
257 >        if self.debug:
258 >            print 'in check_for_retry_localSE() :\n'
259 >            print "\t results in check local = ", copy_results
260 >        for file, dict in copy_results.iteritems():
261 >            er_code = dict['erCode']
262 >            if er_code != '0' and  er_code != '60302' and er_code != '60308':
263 >                list_retry_localSE.append( file )
264 >                
265 >            if self.debug:
266 >                print "\t file %s \n"%file
267 >                print "\t dict['erCode'] %s \n"%dict['erCode']
268 >                print "\t dict['reason'] %s \n"%dict['reason']
269 >                
270 >        return list_retry_localSE
271 >
272          
273      def LocalCopy(self, list_retry, results):
274          """
# Line 195 | Line 279 | class cmscp:
279              print '\t list_retry %s utils \n'%list_retry
280              print '\t len(list_retry) %s \n'%len(list_retry)
281                  
282 <        list_files = list_retry  
283 <        self.params['inputFilesList']=list_files
284 <        
282 >        list_files = list_retry  
283 >        self.params['inputFileList']=list_files
284 >
285          ### copy backup
286          from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
287          siteCfg = loadSiteLocalConfig()
204        seName = siteCfg.localStageOut.get("se-name", None)
288          catalog = siteCfg.localStageOut.get("catalog", None)
206        implName = siteCfg.localStageOut.get("command", None)
207        if (implName == 'srm'):
208           implName='srmv1'
209           self.params['srm_version']=implName
210        ##### to be improved ###############
211        if (implName == 'rfcp'):
212            self.params['middleware']='lsf'
213        ####################################    
214                  
215        self.params['protocol']=implName
289          tfc = siteCfg.trivialFileCatalog()
290 <            
290 >        seName = siteCfg.localStageOutSEName()
291 >        if seName is None:
292 >            seName = ""
293 >        option = siteCfg.localStageOutOption()
294 >        if option is None:
295 >            option = ""
296 >        implName = siteCfg.localStageOutCommand()
297 >        if implName is None:
298 >            implName = ""
299 >
300 >        if (implName == 'srm'):
301 >           protocol = 'srmv2'
302 >        elif (implName == 'srmv2-lcg'):
303 >           protocol = 'srm-lcg'
304 >           option = option + ' -b -D srmv2 '
305 >        elif (implName == 'rfcp-CERN'):
306 >           protocol = 'rfio'
307 >        elif (implName == 'rfcp'):
308 >           protocol = 'rfio'
309 >        elif (implName == 'cp'):
310 >           protocol = 'local'
311 >        else: protocol = implName
312 >        
313 >        self.params['protocol']=protocol
314 >        self.params['option']=option
315 >
316          if self.debug:
317              print '\t siteCFG %s \n'%siteCfg
220            print '\t seName %s \n'%seName
318              print '\t catalog %s \n'%catalog
222            print "\t self.params['protocol'] %s \n"%self.params['protocol']            
319              print '\t tfc %s '%tfc
320 <            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
320 >            print '\t fallback seName %s \n'%seName
321 >            print "\t fallback protocol %s \n"%protocol            
322 >            print "\t fallback option %s \n"%option
323 >            print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
324                  
325 <        #if (str(self.params['lfn']).find("/store/") != -1):
326 <        #    temp = str(self.params['lfn']).split("/store/")
327 <        #    self.params['lfn']= "/store/temp/" + temp[1]
229 <        if (str(self.params['lfn']).find("/store/") == 0):
230 <            temp = str(self.params['lfn']).replace("/store/","/store/temp/",1)
231 <            self.params['lfn']= temp
325 >        if (str(self.params['for_lfn']).find("/store/") == 0):
326 >            temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
327 >            self.params['for_lfn']= temp
328          
329 <        if ( self.params['lfn'][-1] != '/' ) : self.params['lfn'] = self.params['lfn'] + '/'
329 >        if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
330              
331          file_backup=[]
332 <        for input in self.params['inputFilesList']:
333 <            file = self.params['lfn'] + os.path.basename(input)
332 >        file_backup_surlgrid=[]
333 >        for input in self.params['inputFileList']:
334 >            file = self.params['for_lfn'] + os.path.basename(input)
335              surl = tfc.matchLFN(tfc.preferredProtocol, file)
336 +            
337 +            ###### FEDE TEST_FOR_SURL_GRID
338 +            surl_for_grid = tfc.matchLFN('srmv2', file)
339 +            if (surl_for_grid == None):
340 +                surl_for_grid = tfc.matchLFN('srmv2-lcg', file)
341 +                if (surl_for_grid == None):
342 +                    surl_for_grid = tfc.matchLFN('srm', file)
343 +            if surl_for_grid:
344 +                file_backup_surlgrid.append(surl_for_grid)
345 +            ######
346 +
347              file_backup.append(surl)
348              if self.debug:
349 <                print '\t lfn %s \n'%self.params['lfn']
349 >                print '\t for_lfn %s \n'%self.params['for_lfn']
350                  print '\t file %s \n'%file
351                  print '\t surl %s \n'%surl
352                      
353          destination=os.path.dirname(file_backup[0])
354          if ( destination[-1] != '/' ) : destination = destination + '/'
355          self.params['destination']=destination
356 <            
356 >
357 >        self.params['se_name']=seName
358 >
359 >        ######
360 >        if (len(file_backup_surlgrid)>0) :
361 >            surl_for_grid=os.path.dirname(file_backup_surlgrid[0])
362 >            if ( surl_for_grid[-1] != '/' ) : surl_for_grid = surl_for_grid + '/'
363 >        else:
364 >            surl_for_grid=''
365 >
366 >        print "surl_for_grid = ", surl_for_grid    
367 >        self.params['surl_for_grid']=surl_for_grid
368 >        #####
369 >
370          if self.debug:
371 <            print "\t self.params['destination']%s \n"%self.params['destination']
372 <            print "\t self.params['protocol'] %s \n"%self.params['protocol']
373 <            print "\t self.params['option']%s \n"%self.params['option']
374 <              
375 <        for prot, opt in self.setProtocol( self.params['middleware'] ):
376 <            if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
377 <            localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
378 <            if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
379 <                results.update(localCopy_results)
380 <            else:
381 <                localCopy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
382 <                results.update(localCopy_results)
383 <                if len(list_ok) == len(list_files) :
384 <                    break
385 <                if len(list_retry):
265 <                    list_files = list_retry
266 <                else: break
267 <            if self.debug:
268 <                print "\t localCopy_results = %s \n"%localCopy_results
269 <        
371 >            print '\tIn LocalCopy trying the stage out with: \n'
372 >            print "\tself.params['destination'] %s \n"%self.params['destination']
373 >            print "\tself.params['protocol'] %s \n"%self.params['protocol']
374 >            print "\tself.params['option'] %s \n"%self.params['option']
375 >
376 >        localCopy_results = self.copy( self.params['inputFileList'], self.params['protocol'], self.params['option'], backup='yes' )
377 >          
378 >        if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
379 >            results.update(localCopy_results)
380 >        else:
381 >            localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), self.params['protocol'])
382 >                
383 >            results.update(localCopy_results)
384 >        if self.debug:
385 >            print "\t localCopy_results = %s \n"%localCopy_results
386          return results        
387  
388      def stager( self, middleware, list_files ):
# Line 281 | Line 397 | class cmscp:
397          
398          results={}
399          for prot, opt in self.setProtocol( middleware ):
400 <            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
400 >            if self.debug:
401 >                print '\tTrying the stage out with %s utils \n'%prot
402 >                print '\tand options %s\n'%opt
403 >                
404              copy_results = self.copy( list_files, prot, opt )
405              if copy_results.keys() == [''] or copy_results.keys() == '' :
406                  results.update(copy_results)
407              else:
408 <                copy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(copy_results, len(list_files), prot)
408 >                copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
409                  results.update(copy_results)
410                  if len(list_ok) == len(list_files) :
411                      break
412 <                if len(list_retry):
412 >                if len(list_retry):
413                      list_files = list_retry
414 +                    #### FEDE added ramdom time before the retry copy with other protocol
415 +                    sec =  240 * random.random()
416 +                    sec = sec + 60
417 +                    time.sleep(sec)
418                  else: break
419 <                
419 >
420          if self.local_stage:
421 +            list_retry_localSE = self.check_for_retry_localSE(results)
422              if len(list_retry_localSE):
423 +                if self.debug:
424 +                    print "\t list_retry_localSE %s \n"%list_retry_localSE
425                  results = self.LocalCopy(list_retry_localSE, results)
426 <            
426 >
427          if self.debug:
428              print "\t results %s \n"%results
429          return results
# Line 311 | Line 437 | class cmscp:
437          self.dest_prot = protocol
438          if not self.params['source'] : self.source_prot = 'local'
439          Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
440 <        if not self.params['destination'] : self.dest_prot = 'local'
441 <        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
440 >        if not self.params['destination'] :
441 >            self.dest_prot = 'local'
442 >            Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
443 >        else:    
444 >            Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
445  
446          if self.debug :
447              msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
448              msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
449 +            msg += '\t(destinationDir=%s,  protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
450              print msg
451  
452          return Source_SE, Destination_SE
# Line 326 | Line 456 | class cmscp:
456          Make the real file copy using SE API
457          """
458          msg = ""
459 +        results = {}
460          if self.debug :
461              msg  = 'copy() :\n'
462              msg += '\tusing %s protocol\n'%protocol
463 +            msg += '\tusing %s options\n'%options
464 +            msg += '\tlist_file %s\n'%list_file
465              print msg
466          try:
467              Source_SE, Destination_SE = self.initializeApi( protocol )
# Line 337 | Line 470 | class cmscp:
470                  results.update( self.updateReport(filetocopy, '-1', str(ex)))
471              return results
472  
473 <        # create remote dir
474 <        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
473 >        self.hostname = Destination_SE.hostname
474 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop','lstore','local']:
475              try:
476                  self.createDir( Destination_SE, Destination_SE.protocol )
477              except OperationException, ex:
# Line 356 | Line 489 | class cmscp:
489                  for filetocopy in list_file:
490                      results.update( self.updateReport(filetocopy, '-1', msg))
491                  return results
359
492          ## prepare for real copy  ##
493          try :
494              sbi = SBinterface( Source_SE, Destination_SE )
# Line 369 | Line 501 | class cmscp:
501                  results.update( self.updateReport(filetocopy, '-1', msg))
502              return results
503  
504 <        results = {}
504 >        
505          ## loop over the complete list of files
506          for filetocopy in list_file:
507              if self.debug : print '\tStart real copy for %s'%filetocopy
# Line 432 | Line 564 | class cmscp:
564              raise Exception(msg)
565          except AlreadyExistsException, ex:
566              if self.debug: print "\tThe directory already exist"
567 <            pass            
567 >            pass
568 >        except Exception, ex:    
569 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
570 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
571 >            raise Exception(msg)
572          return msg
573  
574      def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
# Line 444 | Line 580 | class cmscp:
580          ErCode = '0'
581          msg = ''
582          f_tocopy=filetocopy
583 <        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
583 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
584          try:
585 <            checkSource = sbi_source.checkExists( f_tocopy , opt=option )
586 <            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
585 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
586 >            if self.debug : print '\tCheck for local file %s existance executed \n'%f_tocopy  
587          except OperationException, ex:
588 <            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
588 >            msg  ='ERROR: problems checking source file %s existance'%filetocopy
589              msg += str(ex)
590              if self.debug :
591                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
# Line 457 | Line 593 | class cmscp:
593                  print dbgmsg
594              raise Exception(msg)
595          except WrongOption, ex:
596 <            msg  ='ERROR problems checkig if source file % exist'%filetocopy
596 >            msg  ='ERROR: problems checking source file %s existance'%filetocopy
597              msg += str(ex)
598              if self.debug :
599                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
# Line 465 | Line 601 | class cmscp:
601                  print dbgmsg
602              raise Exception(msg)
603          except MissingDestination, ex:
604 <            msg  ='ERROR problems checkig if source file % exist'%filetocopy
604 >            msg  ='ERROR: problems checking source file %s existance'%filetocopy
605              msg += str(ex)
606              if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
607              raise Exception(msg)
# Line 476 | Line 612 | class cmscp:
612              return ErCode, msg
613          if not checkSource :
614              ErCode = '60302'
615 <            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
615 >            msg = "ERROR file %s does not exist"%os.path.basename(filetocopy)
616              return ErCode, msg
617 <        f_tocopy=filetocopy
482 <        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
617 >        f_tocopy=os.path.basename(filetocopy)
618          try:
619 <            check = sbi_dest.checkExists( f_tocopy, opt=option )
620 <            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
619 >            check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
620 >            if self.debug : print '\tCheck for remote file %s existance executed \n'%f_tocopy  
621 >            if self.debug : print '\twith exit code = %s \n'%check  
622          except OperationException, ex:
623 <            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
623 >            msg  = 'ERROR: problems checking if file %s already exist'%filetocopy
624              msg += str(ex)
625              if self.debug :
626                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
# Line 492 | Line 628 | class cmscp:
628                  print dbgmsg
629              raise Exception(msg)
630          except WrongOption, ex:
631 <            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
631 >            msg  = 'ERROR problems checking if file % already exists'%filetocopy
632              msg += str(ex)
633              if self.debug :
634                  msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
635                  msg += '\t'+str(ex.output)+'\n'
636              raise Exception(msg)
637          except MissingDestination, ex:
638 <            msg  ='ERROR problems checkig if source file % exist'%filetocopy
638 >            msg  ='ERROR problems checking if destination file % exists'%filetocopy
639              msg += str(ex)
640              if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
641              raise Exception(msg)
# Line 534 | Line 670 | class cmscp:
670          ErCode = '0'
671          msg = ''
672  
673 <        if  self.params['option'].find('space_token')>0:
673 >        copy_option = option
674 >        if  self.params['option'].find('space_token')>=0:
675              space_token=self.params['option'].split('=')[1]
676 <            if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
677 <            if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
676 >            if protocol == 'srmv2': copy_option = '%s -space_token=%s'%(option,space_token)
677 >            if protocol == 'srm-lcg': copy_option = '%s -S %s'%(option,space_token)
678          try:
679 <            sbi.copy( source_file , dest_file , opt = option)
679 >            sbi.copy( source_file , dest_file , opt = copy_option, tout = self.subprocesstimeout['copy'])
680          except TransferException, ex:
681              msg  = "Problem copying %s file" % filetocopy
682              msg += str(ex)
# Line 554 | Line 691 | class cmscp:
691              if self.debug :
692                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
693                  dbgmsg += '\t'+str(ex.output)+'\n'
694 <                print dbgmsg
694 >                print dbgmsg
695 >            ### FEDE added for savannah 97460###    
696 >            ErCode = '60307'
697 >            ####################################
698          except SizeZeroException, ex:
699              msg  = "Problem copying %s file" % filetocopy
700              msg += str(ex)
# Line 580 | Line 720 | class cmscp:
720                  dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
721                  dbgmsg += '\t'+str(ex.output)+'\n'
722                  print dbgmsg
723 +        except SEAPITimeout, ex:
724 +            ErCode = '60317'
725 +            msg  = "Problem copying %s file" % filetocopy
726 +            msg += str(ex)
727 +            if self.debug :
728 +                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
729 +                dbgmsg += '\t'+str(ex.output)+'\n'
730 +                print dbgmsg
731 +
732          if ErCode == '0' and protocol.find('srmv') == 0:
733              remote_file_size = -1
734              local_file_size = os.path.getsize( source_file )
735              try:
736 <                remote_file_size = sbi_dest.getSize( dest_file, opt=option )
736 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
737                  if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
738              except TransferException, ex:
739                  msg  = "Problem checking the size of %s file" % filetocopy
# Line 620 | Line 769 | class cmscp:
769          f_tocopy=filetocopy
770          if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
771          try:
772 <            sbi_dest.delete( f_tocopy, opt=option )
772 >            sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
773              if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
774          except OperationException, ex:
775              msg  ='ERROR: problems removing partially staged file %s'%filetocopy
# Line 633 | Line 782 | class cmscp:
782  
783          return
784  
785 <    def updateReport(self, file, erCode, reason, lfn='', se='' ):
785 >    def updateReport(self, file, erCode, reason):
786          """
787          Update the final stage out infos
788          """
789          jobStageInfo={}
790          jobStageInfo['erCode']=erCode
791          jobStageInfo['reason']=reason
792 <        jobStageInfo['lfn']=lfn
793 <        jobStageInfo['se']=se
794 <
792 >        if not self.params['for_lfn']: self.params['for_lfn']=''
793 >        if not self.params['se_name']: self.params['se_name']=''
794 >        if not self.hostname: self.hostname=''
795 >        if (erCode != '0') and (erCode != '60308'):
796 >           jobStageInfo['for_lfn']='/copy_problem/'
797 >        else:  
798 >            jobStageInfo['for_lfn']=self.params['for_lfn']
799 >        jobStageInfo['se_name']=self.params['se_name']
800 >        jobStageInfo['endpoint']=self.hostname
801 >        ### ADDING SURLFORGRID FOR COPYDATA
802 >        if not self.params['surl_for_grid']: self.params['surl_for_grid']=''
803 >        jobStageInfo['surl_for_grid']=self.params['surl_for_grid']
804 >        #####
805          report = { file : jobStageInfo}
806          return report
807  
# Line 651 | Line 810 | class cmscp:
810          It a list of LFNs for each SE where data are stored.
811          allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
812          """
813 <        outFile = open('cmscpReport.sh',"a")
813 >        
814 >        outFile = 'cmscpReport.sh'
815 >        if os.getenv("RUNTIME_AREA"):
816 >            #print "RUNTIME_AREA = ", os.getenv("RUNTIME_AREA")
817 >            outFile = "%s/cmscpReport.sh"%os.getenv("RUNTIME_AREA")
818 >            #print "--->>> outFile = ", outFile
819 >        fp = open(outFile, "w")
820 >
821          cmscp_exit_status = 0
822 <        txt = ''
822 >        txt = '#!/bin/bash\n'
823          for file, dict in results.iteritems():
824              reason = str(dict['reason'])
825              if str(reason).find("'") > -1:
826                  reason = " ".join(reason.split("'"))
827              reason="'%s'"%reason
828              if file:
829 <                if dict['lfn']=='':
829 >                if dict['for_lfn']=='':
830                      lfn = '${LFNBaseName}'+os.path.basename(file)
831                      se  = '$SE'
832                      LFNBaseName = '$LFNBaseName'
833                  else:
834 <                    lfn = dict['lfn']+os.path.basename(file)
835 <                    se = dict['se']
834 >                    lfn = dict['for_lfn']+os.path.basename(file)
835 >                    se = dict['se_name']
836                      LFNBaseName = os.path.dirname(lfn)
837                      if (LFNBaseName[-1] != '/'):
838                          LFNBaseName = LFNBaseName + '/'
839 <                #dict['lfn'] # to be implemented
839 >
840 >                
841                  txt += 'echo "Report for File: '+file+'"\n'
842                  txt += 'echo "LFN: '+lfn+'"\n'
843                  txt += 'echo "StorageElement: '+se+'"\n'
844                  txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
845                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
679                txt += 'export LFNBaseName='+LFNBaseName+'\n'
680                txt += 'export SE='+se+'\n'
846  
682                txt += 'export endpoint='+self.params['destination']+'\n'
847                  
848                  if dict['erCode'] != '0':
849                      cmscp_exit_status = dict['erCode']
# Line 689 | Line 853 | class cmscp:
853          txt += '\n'
854          txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
855          txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
856 <        outFile.write(str(txt))
857 <        outFile.close()
856 >        fp.write(str(txt))
857 >        fp.close()
858 >        if self.debug:
859 >            print '--- reading cmscpReport.sh: \n'
860 >            lp = open(outFile, "r")
861 >            content = lp.read()
862 >            lp.close()
863 >            print "    content = ", content
864 >            print '--- end reading cmscpReport.sh'
865          return
866  
867  
# Line 749 | Line 920 | if __name__ == '__main__' :
920  
921      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
922                    "protocol=","option=", "middleware=", "srm_version=", \
923 <                  "destinationDir=", "lfn=", "local_stage", "debug", "help"]
923 >                  "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
924      try:
925          opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
926      except getopt.GetoptError, err:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines