ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.20 by spiga, Tue Oct 28 17:40:55 2008 UTC vs.
Revision 1.87 by fanzago, Mon Mar 28 10:02:15 2011 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2
2   import sys, os
3 + import time, random
4 + try:
5 +    import json
6 + except:    
7 +    import simplejson as json
8   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
9   from ProdCommon.Storage.SEAPI.SBinterface import *
10   from ProdCommon.Storage.SEAPI.Exceptions import *
# Line 10 | Line 14 | class cmscp:
14      def __init__(self, args):
15          """
16          cmscp
17 <
14 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
17 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
18          including success checking  version also for CAF using rfcp command to copy the output to SE
19          input:
20             $1 middleware (CAF, LSF, LCG, OSG)
# Line 19 | Line 22 | class cmscp:
22             $3 if needed: file name (the output file name)
23             $5 remote SE (complete endpoint)
24             $6 srm version
25 +           --for_lfn $LFNBaseName
26          output:
27               return 0 if all ok
28               return 60307 if srmcp failed
29               return 60303 if file already exists in the SE
30          """
31 <
32 <        #set default
29 <        self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
30 <                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31 >        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
32 >                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'', "surl_for_grid":''}
33          self.debug = 0
34 <
34 >        #### for fallback copy
35 >        self.local_stage = 0
36          self.params.update( args )
37 +        ## timeout needed for subprocess command of SEAPI
38 +        ## they should be a bit higher then the corresponding passed by command line  
39 +        ## default values
40 +        self.subprocesstimeout = { \
41 +                                   'copy':   3600, \
42 +                                   'exists': 1200, \
43 +                                   'delete': 1200, \
44 +                                   'size':   1200 \
45 +                                 }
46  
47          return
48  
# Line 38 | Line 50 | class cmscp:
50          """
51          check command line parameter
52          """
41
53          if 'help' in self.params.keys(): HelpOptions()
54          if 'debug' in self.params.keys(): self.debug = 1
55 +        if 'local_stage' in self.params.keys(): self.local_stage = 1
56  
57          # source and dest cannot be undefined at same time
58          if not self.params['source']  and not self.params['destination'] :
59              HelpOptions()
48
60          # if middleware is not defined --> protocol cannot be empty
61          if not self.params['middleware'] and not self.params['protocol'] :
62              HelpOptions()
63  
64          # input file must be defined
65 <        if not self.params['inputFileList'] : HelpOptions()
65 >        if not self.params['inputFileList'] :
66 >            HelpOptions()
67          else:
68              file_to_copy=[]
69              if self.params['inputFileList'].find(','):
# Line 60 | Line 72 | class cmscp:
72                  file_to_copy.append(self.params['inputFileList'])
73              self.params['inputFileList'] = file_to_copy
74  
75 <        ## TO DO:
76 <        #### add check for outFiles
65 <        #### add map {'inFileNAME':'outFileNAME'} to change out name
66 <
67 <        return
75 >        if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
76 >        
77  
78      def run( self ):
79          """
80          Check if running on UI (no $middleware) or
81          on WN (on the Grid), and take different action
82          """
74
83          self.processOptions()
84 +        if self.debug: print 'calling run() : \n'
85          # stage out from WN
86          if self.params['middleware'] :
87 <           results = self.stager(self.params['middleware'],self.params['inputFileList'])
88 <           self.finalReport(results)
87 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
88 >            self.writeJsonFile(results)
89 >            self.finalReport(results)
90          # Local interaction with SE
91          else:
92 <           results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'] )
93 <           return results
92 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
93 >            self.writeJsonFile(results)
94 >            return results
95 >            
96 >    def writeJsonFile( self, results ):
97 >        """
98 >        write a json file containing copy results for each file
99 >        """
100 >        if self.debug:
101 >            print 'in writeJsonFile() : \n'
102 >            print "---->>>> in writeJsonFile results =  ", results
103 >        jsonOut = "resultCopyFile"
104 >        if os.getenv("RUNTIME_AREA")
105 >            jsonOut = "%s/resultCopyFile"%os.getenv("RUNTIME_AREA")
106 >        fp = open(jsonOut, 'w')
107 >        json.dump(results, fp)
108 >        fp.close()
109 >        if self.debug:
110 >            print '    reading resultCopyFile : \n'
111 >            lp = open(jsonOut, "r")
112 >            inputDict = json.load(lp)
113 >            lp.close()
114 >            print "    inputDict = ", inputDict
115 >        return
116 >
117 >    def checkLcgUtils( self ):
118 >        """
119 >        _checkLcgUtils_
120 >        check the lcg-utils version and report
121 >        """
122 >        import commands
123 >        cmd = "lcg-cp --version | grep lcg_util"
124 >        status, output = commands.getstatusoutput( cmd )
125 >        num_ver = -1
126 >        if output.find("not found") == -1 or status == 0:
127 >            temp = output.split("-")
128 >            version = ""
129 >            if len(temp) >= 2:
130 >                version = output.split("-")[1]
131 >                temp = version.split(".")
132 >                if len(temp) >= 1:
133 >                    num_ver = int(temp[0])*10
134 >                    num_ver += int(temp[1])
135 >        return num_ver
136  
137      def setProtocol( self, middleware ):
138          """
# Line 88 | Line 140 | class cmscp:
140          which depend on scheduler
141          """
142          # default To be used with "middleware"
143 +        if self.debug:
144 +            print 'setProtocol() :\n'
145 +            print '\tmiddleware =  %s utils \n'%middleware
146 +
147          lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
148                  'srmv2':'-b -D srmv2  -t 2400 --verbose'}
149 +        if self.checkLcgUtils() >= 17:
150 +            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
151 +                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
152 +
153          srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
154 <                'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
154 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
155          rfioOpt=''
156  
157          supported_protocol = None
158 <        if middleware.lower() in ['osg','lcg','condor']:
158 >        if middleware.lower() in ['osg','lcg','condor','sge']:
159              supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
160 <                                  (self.params['srm_version'],srmOpt[self.params['srm_version']])]
160 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
161          elif middleware.lower() in ['lsf','caf']:
162              supported_protocol = [('rfio',rfioOpt)]
163 +        elif middleware.lower() in ['pbs']:
164 +            supported_protocol = [('rfio',rfioOpt),('local','')]
165 +        elif middleware.lower() in ['arc']:
166 +            supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
167          else:
168              ## here we can add support for any kind of protocol,
169              ## maybe some local schedulers need something dedicated
170              pass
171          return supported_protocol
172  
173 +
174 +    def checkCopy (self, copy_results, len_list_files, prot):
175 +        """
176 +        Checks the status of copy and update result dictionary
177 +        """
178 +        
179 +        list_retry = []
180 +        list_not_existing = []
181 +        list_already_existing = []
182 +        list_fallback = []
183 +        list_ok = []
184 +        
185 +        if self.debug:
186 +            print 'in checkCopy() :\n'
187 +        
188 +        for file, dict in copy_results.iteritems():
189 +            er_code = dict['erCode']
190 +            if er_code == '0':
191 +                list_ok.append(file)
192 +                reason = 'Copy succedeed with %s utils'%prot
193 +                dict['reason'] = reason
194 +            elif er_code == '60308':
195 +                list_fallback.append( file )
196 +                reason = 'Copy succedeed with %s utils'%prot
197 +                dict['reason'] = reason
198 +            elif er_code == '60302':
199 +                list_not_existing.append( file )
200 +            elif er_code == '60303':
201 +                list_already_existing.append( file )
202 +            else :    
203 +                list_retry.append( file )
204 +          
205 +            if self.debug:
206 +                print "\t file %s \n"%file
207 +                print "\t dict['erCode'] %s \n"%dict['erCode']
208 +                print "\t dict['reason'] %s \n"%dict['reason']
209 +                
210 +            upDict = self.updateReport(file, er_code, dict['reason'])
211 +
212 +            copy_results.update(upDict)
213 +        
214 +        msg = ''
215 +        if len(list_ok) != 0:
216 +            msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
217 +        if len(list_ok) != len_list_files :
218 +            if len(list_fallback)!=0:
219 +                msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
220 +            if len(list_retry)!=0:
221 +                msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
222 +            if len(list_not_existing)!=0:
223 +                msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
224 +            if len(list_already_existing)!=0:
225 +                msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
226 +        if self.debug : print msg
227 +        return copy_results, list_ok, list_retry, list_fallback
228 +        
229 +    def check_for_retry_localSE (self, copy_results):
230 +        """
231 +        Checks the status of copy and create the list of file to copy to CloseSE
232 +        """
233 +        list_retry_localSE = []
234 +
235 +        if self.debug:
236 +            print 'in check_for_retry_localSE() :\n'
237 +            print "\t results in check local = ", copy_results
238 +        for file, dict in copy_results.iteritems():
239 +            er_code = dict['erCode']
240 +            if er_code != '0' and  er_code != '60302' and er_code != '60308':
241 +                list_retry_localSE.append( file )
242 +                
243 +            if self.debug:
244 +                print "\t file %s \n"%file
245 +                print "\t dict['erCode'] %s \n"%dict['erCode']
246 +                print "\t dict['reason'] %s \n"%dict['reason']
247 +                
248 +        return list_retry_localSE
249 +
250 +        
251 +    def LocalCopy(self, list_retry, results):
252 +        """
253 +        Tries the stage out to the CloseSE
254 +        """
255 +        if self.debug:
256 +            print 'in LocalCopy() :\n'
257 +            print '\t list_retry %s utils \n'%list_retry
258 +            print '\t len(list_retry) %s \n'%len(list_retry)
259 +                
260 +        list_files = list_retry  
261 +        self.params['inputFileList']=list_files
262 +
263 +        ### copy backup
264 +        from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
265 +        siteCfg = loadSiteLocalConfig()
266 +        catalog = siteCfg.localStageOut.get("catalog", None)
267 +        tfc = siteCfg.trivialFileCatalog()
268 +        seName = siteCfg.localStageOutSEName()
269 +        if seName is None:
270 +            seName = ""
271 +        option = siteCfg.localStageOutOption()
272 +        if option is None:
273 +            option = ""
274 +        implName = siteCfg.localStageOutCommand()
275 +        if implName is None:
276 +            implName = ""
277 +
278 +        if (implName == 'srm'):
279 +           protocol = 'srmv2'
280 +        elif (implName == 'srmv2-lcg'):
281 +           protocol = 'srm-lcg'
282 +           option = option + ' -b -D srmv2 '
283 +        elif (implName == 'rfcp-CERN'):
284 +           protocol = 'rfio'
285 +        elif (implName == 'rfcp'):
286 +           protocol = 'rfio'
287 +        elif (implName == 'cp'):
288 +           protocol = 'local'
289 +        else: protocol = implName
290 +        
291 +        self.params['protocol']=protocol
292 +        self.params['option']=option
293 +
294 +        if self.debug:
295 +            print '\t siteCFG %s \n'%siteCfg
296 +            print '\t catalog %s \n'%catalog
297 +            print '\t tfc %s '%tfc
298 +            print '\t fallback seName %s \n'%seName
299 +            print "\t fallback protocol %s \n"%protocol            
300 +            print "\t fallback option %s \n"%option
301 +            print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
302 +                
303 +        if (str(self.params['for_lfn']).find("/store/") == 0):
304 +            temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
305 +            self.params['for_lfn']= temp
306 +        
307 +        if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
308 +            
309 +        file_backup=[]
310 +        file_backup_surlgrid=[]
311 +        for input in self.params['inputFileList']:
312 +            file = self.params['for_lfn'] + os.path.basename(input)
313 +            surl = tfc.matchLFN(tfc.preferredProtocol, file)
314 +            
315 +            ###### FEDE TEST_FOR_SURL_GRID
316 +            surl_for_grid = tfc.matchLFN('srmv2', file)
317 +            if (surl_for_grid == None):
318 +                surl_for_grid = tfc.matchLFN('srmv2-lcg', file)
319 +                if (surl_for_grid == None):
320 +                    surl_for_grid = tfc.matchLFN('srm', file)
321 +            if surl_for_grid:
322 +                file_backup_surlgrid.append(surl_for_grid)
323 +            ######
324 +
325 +            file_backup.append(surl)
326 +            if self.debug:
327 +                print '\t for_lfn %s \n'%self.params['for_lfn']
328 +                print '\t file %s \n'%file
329 +                print '\t surl %s \n'%surl
330 +                    
331 +        destination=os.path.dirname(file_backup[0])
332 +        if ( destination[-1] != '/' ) : destination = destination + '/'
333 +        self.params['destination']=destination
334 +
335 +        self.params['se_name']=seName
336 +
337 +        ######
338 +        if (len(file_backup_surlgrid)>0) :
339 +            surl_for_grid=os.path.dirname(file_backup_surlgrid[0])
340 +            if ( surl_for_grid[-1] != '/' ) : surl_for_grid = surl_for_grid + '/'
341 +        else:
342 +            surl_for_grid=''
343 +
344 +        print "surl_for_grid = ", surl_for_grid    
345 +        self.params['surl_for_grid']=surl_for_grid
346 +        #####
347 +
348 +        if self.debug:
349 +            print '\tIn LocalCopy trying the stage out with: \n'
350 +            print "\tself.params['destination'] %s \n"%self.params['destination']
351 +            print "\tself.params['protocol'] %s \n"%self.params['protocol']
352 +            print "\tself.params['option'] %s \n"%self.params['option']
353 +
354 +        localCopy_results = self.copy( self.params['inputFileList'], self.params['protocol'], self.params['option'], backup='yes' )
355 +          
356 +        if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
357 +            results.update(localCopy_results)
358 +        else:
359 +            localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), self.params['protocol'])
360 +                
361 +            results.update(localCopy_results)
362 +        if self.debug:
363 +            print "\t localCopy_results = %s \n"%localCopy_results
364 +        return results        
365 +
366      def stager( self, middleware, list_files ):
367          """
368          Implement the logic for remote stage out
369          """
370 +
371 +        if self.debug:
372 +            print 'stager() :\n'
373 +            print '\tmiddleware %s\n'%middleware
374 +            print '\tlist_files %s\n'%list_files
375 +        
376          results={}
377          for prot, opt in self.setProtocol( middleware ):
378 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
378 >            if self.debug:
379 >                print '\tTrying the stage out with %s utils \n'%prot
380 >                print '\tand options %s\n'%opt
381 >                
382              copy_results = self.copy( list_files, prot, opt )
383 <            list_retry = []
118 <            list_existing = []
119 <            list_ok = []
120 <            if copy_results.keys() == '':
383 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
384                  results.update(copy_results)
385              else:
386 <                for file, dict in copy_results.iteritems():
124 <                    er_code = dict['erCode']
125 <                    if er_code == '0':
126 <                        list_ok.append(file)
127 <                        reason = 'Copy succedeed with %s utils'%prot
128 <                        upDict = self.updateReport(file, er_code, reason)
129 <                        copy_results.update(upDict)
130 <                    elif er_code == '60303': list_existing.append( file )
131 <                    else: list_retry.append( file )
386 >                copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
387                  results.update(copy_results)
133                if len(list_ok) != 0:
134                    msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135                    if self.debug : print msg
388                  if len(list_ok) == len(list_files) :
389                      break
390 <                else:
391 <                    if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
392 <                    if len(list_retry): list_files = list_retry
393 <                    else: break
394 <
395 <        #### TODO Daniele
396 <        #check is something fails and created related dict
397 <  #      backup = self.analyzeResults(results)
398 <
399 <  #      if backup :
400 <  #          msg = 'WARNING: backup logic is under implementation\n'
401 <  #          #backupDict = self.backup()
402 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
403 <  #          results.update(backupDict)
404 <  #          print msg
390 >                if len(list_retry):
391 >                    list_files = list_retry
392 >                    #### FEDE added ramdom time before the retry copy with other protocol
393 >                    sec =  240 * random.random()
394 >                    sec = sec + 60
395 >                    time.sleep(sec)
396 >                else: break
397 >
398 >        if self.local_stage:
399 >            list_retry_localSE = self.check_for_retry_localSE(results)
400 >            if len(list_retry_localSE):
401 >                if self.debug:
402 >                    print "\t list_retry_localSE %s \n"%list_retry_localSE
403 >                results = self.LocalCopy(list_retry_localSE, results)
404 >
405 >        if self.debug:
406 >            print "\t results %s \n"%results
407          return results
408  
409      def initializeApi(self, protocol ):
410          """
411          Instantiate storage interface
412          """
413 +        if self.debug : print 'initializeApi() :\n'  
414          self.source_prot = protocol
415          self.dest_prot = protocol
416          if not self.params['source'] : self.source_prot = 'local'
417          Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
418 <        if not self.params['destination'] : self.dest_prot = 'local'
419 <        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
418 >        if not self.params['destination'] :
419 >            self.dest_prot = 'local'
420 >            Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
421 >        else:    
422 >            Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
423  
424          if self.debug :
425 <            print '(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
426 <            print '(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
425 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
426 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
427 >            msg += '\t(destinationDir=%s,  protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
428 >            print msg
429  
430          return Source_SE, Destination_SE
431  
432 <    def copy( self, list_file, protocol, options ):
432 >    def copy( self, list_file, protocol, options, backup='no' ):
433          """
434          Make the real file copy using SE API
435          """
436 +        msg = ""
437 +        results = {}
438          if self.debug :
439 <            print 'copy(): using %s protocol'%protocol
439 >            msg  = 'copy() :\n'
440 >            msg += '\tusing %s protocol\n'%protocol
441 >            msg += '\tusing %s options\n'%options
442 >            msg += '\tlist_file %s\n'%list_file
443 >            print msg
444          try:
445              Source_SE, Destination_SE = self.initializeApi( protocol )
446          except Exception, ex:
447 <            return self.updateReport('', '-1', str(ex))
447 >            for filetocopy in list_file:
448 >                results.update( self.updateReport(filetocopy, '-1', str(ex)))
449 >            return results
450  
451 <        # create remote dir
452 <        if protocol in ['gridftp','rfio']:
451 >        self.hostname = Destination_SE.hostname
452 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop','local']:
453              try:
454 <                self.createDir( Destination_SE, protocol )
454 >                self.createDir( Destination_SE, Destination_SE.protocol )
455 >            except OperationException, ex:
456 >                for filetocopy in list_file:
457 >                    results.update( self.updateReport(filetocopy, '60316', str(ex)))
458 >                return results
459 >            ## when the client commands are not found (wrong env or really missing)
460 >            except MissingCommand, ex:
461 >                msg = "ERROR %s %s" %(str(ex), str(ex.detail))
462 >                for filetocopy in list_file:
463 >                    results.update( self.updateReport(filetocopy, '10041', msg))
464 >                return results
465              except Exception, ex:
466 <                return self.updateReport('', '60316', str(ex))
467 <
466 >                msg = "ERROR %s" %(str(ex))
467 >                for filetocopy in list_file:
468 >                    results.update( self.updateReport(filetocopy, '-1', msg))
469 >                return results
470          ## prepare for real copy  ##
471          try :
472              sbi = SBinterface( Source_SE, Destination_SE )
473              sbi_dest = SBinterface(Destination_SE)
474              sbi_source = SBinterface(Source_SE)
475          except ProtocolMismatch, ex:
476 <            msg = str(ex)+'\n'
477 <            msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
478 <            return self.updateReport('', '-1', str(ex))
476 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
477 >            msg += str(ex)
478 >            for filetocopy in list_file:
479 >                results.update( self.updateReport(filetocopy, '-1', msg))
480 >            return results
481  
482 <        results = {}
482 >        
483          ## loop over the complete list of files
484          for filetocopy in list_file:
485 <            if self.debug : print 'start real copy for %s'%filetocopy
485 >            if self.debug : print '\tStart real copy for %s'%filetocopy
486              try :
487 <                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy )
487 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
488              except Exception, ex:
489 <                ErCode = -1
489 >                ErCode = '60307'
490                  msg = str(ex)  
491              if ErCode == '0':
492                  ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
493 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
493 >                if (ErCode == '0') and (backup == 'yes'):
494 >                    ErCode = '60308'
495 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
496              results.update( self.updateReport(filetocopy, ErCode, msg))
497          return results
498  
# Line 217 | Line 501 | class cmscp:
501          """
502          Create the storage interface.
503          """
504 +        if self.debug : print 'storageInterface():\n'
505          try:
506              interface = SElement( FullPath(endpoint), protocol )
507          except ProtocolUnknown, ex:
508 <            msg = ''
509 <            if self.debug : msg = str(ex)+'\n'
225 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
508 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
509 >            msg += str(ex)
510              raise Exception(msg)
511  
512          return interface
# Line 232 | Line 516 | class cmscp:
516          Create remote dir for gsiftp REALLY TEMPORARY
517          this should be transparent at SE API level.
518          """
519 +        if self.debug : print 'createDir():\n'
520          msg = ''
521          try:
522              action = SBinterface( Destination_SE )
523              action.createDir()
524 <            if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
524 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
525          except TransferException, ex:
526 <            msg = str(ex)
526 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
527 >            msg += str(ex)
528              if self.debug :
529 <                msg += str(ex.detail)+'\n'
530 <                msg += str(ex.output)+'\n'
531 <            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
532 <            raise Exceptions(msg)
529 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
530 >                dbgmsg += '\t'+str(ex.output)+'\n'
531 >                print dbgmsg
532 >            raise Exception(msg)
533          except OperationException, ex:
534 <            msg = str(ex)
535 <            if self.debug : msg += str(ex.detail)+'\n'
536 <            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
537 <
534 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
535 >            msg += str(ex)
536 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
537 >            raise Exception(msg)
538 >        except MissingDestination, ex:
539 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
540 >            msg += str(ex)
541 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
542 >            raise Exception(msg)
543 >        except AlreadyExistsException, ex:
544 >            if self.debug: print "\tThe directory already exist"
545 >            pass
546 >        except Exception, ex:    
547 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
548 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
549 >            raise Exception(msg)
550          return msg
551  
552 <    def checkFileExist( self, sbi_source, sbi_dest, filetocopy ):
552 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
553          """
554          Check both if source file exist AND
555          if destination file ALREADY exist.
556          """
557 +        if self.debug : print 'checkFileExist():\n'
558          ErCode = '0'
559          msg = ''
560          f_tocopy=filetocopy
561 <        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
561 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
562          try:
563 <            checkSource = sbi_source.checkExists( f_tocopy )
563 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
564 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
565          except OperationException, ex:
566 <            msg = str(ex)
566 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
567 >            msg += str(ex)
568              if self.debug :
569 <                msg += str(ex.detail)+'\n'
570 <                msg += str(ex.output)+'\n'
571 <            msg +='ERROR: problems checkig if source file %s exist'%filetocopy
569 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
570 >                dbgmsg += '\t'+str(ex.output)+'\n'
571 >                print dbgmsg
572              raise Exception(msg)
573          except WrongOption, ex:
574 <            msg = str(ex)
574 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
575 >            msg += str(ex)
576              if self.debug :
577 <                msg += str(ex.detail)+'\n'
578 <                msg += str(ex.output)+'\n'
579 <            msg +='ERROR problems checkig if source file % exist'%filetocopy
577 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
578 >                dbgmsg += '\t'+str(ex.output)+'\n'
579 >                print dbgmsg
580 >            raise Exception(msg)
581 >        except MissingDestination, ex:
582 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
583 >            msg += str(ex)
584 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
585              raise Exception(msg)
586 +        ## when the client commands are not found (wrong env or really missing)
587 +        except MissingCommand, ex:
588 +            ErCode = '10041'
589 +            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
590 +            return ErCode, msg
591          if not checkSource :
592              ErCode = '60302'
593              msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
594              return ErCode, msg
595 <
284 <        f_tocopy=filetocopy
285 <        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
595 >        f_tocopy=os.path.basename(filetocopy)
596          try:
597 <            check = sbi_dest.checkExists( f_tocopy )
597 >            check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
598 >            if self.debug : print '\tCheck for remote file %s exist done \n'%f_tocopy  
599 >            if self.debug : print '\twith exit code = %s \n'%check  
600          except OperationException, ex:
601 <            msg = str(ex)
601 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
602 >            msg += str(ex)
603              if self.debug :
604 <                msg += str(ex.detail)+'\n'
605 <                msg += str(ex.output)+'\n'
606 <            msg +='ERROR: problems checkig if file %s already exist'%filetocopy
604 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
605 >                dbgmsg += '\t'+str(ex.output)+'\n'
606 >                print dbgmsg
607              raise Exception(msg)
608          except WrongOption, ex:
609 <            msg = str(ex)
609 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
610 >            msg += str(ex)
611              if self.debug :
612 <                msg += str(ex.detail)+'\n'
613 <                msg += str(ex.output)+'\n'
614 <            msg +='ERROR problems checkig if file % already exist'%filetocopy
612 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
613 >                msg += '\t'+str(ex.output)+'\n'
614 >            raise Exception(msg)
615 >        except MissingDestination, ex:
616 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
617 >            msg += str(ex)
618 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
619              raise Exception(msg)
620 +        ## when the client commands are not found (wrong env or really missing)
621 +        except MissingCommand, ex:
622 +            ErCode = '10041'
623 +            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
624 +            return ErCode, msg
625          if check :
626              ErCode = '60303'
627              msg = "file %s already exist"%os.path.basename(filetocopy)
# Line 309 | Line 632 | class cmscp:
632          """
633          call the copy API.
634          """
635 +        if self.debug : print 'makeCopy():\n'
636          path = os.path.dirname(filetocopy)
637          file_name =  os.path.basename(filetocopy)
638          source_file = filetocopy
# Line 316 | Line 640 | class cmscp:
640          if self.params['source'] == '' and path == '':
641              source_file = os.path.abspath(filetocopy)
642          elif self.params['destination'] =='':
643 <            dest_file = os.path.join(os.getcwd(),file_name)
643 >            destDir = self.params.get('destinationDir',os.getcwd())
644 >            dest_file = os.path.join(destDir,file_name)
645          elif self.params['source'] != '' and self.params['destination'] != '' :
646              source_file = file_name
647  
648          ErCode = '0'
649          msg = ''
650  
651 +        copy_option = option
652 +        if  self.params['option'].find('space_token')>=0:
653 +            space_token=self.params['option'].split('=')[1]
654 +            if protocol == 'srmv2': copy_option = '%s -space_token=%s'%(option,space_token)
655 +            if protocol == 'srm-lcg': copy_option = '%s -S %s'%(option,space_token)
656          try:
657 <            sbi.copy( source_file , dest_file , opt = option)
657 >            sbi.copy( source_file , dest_file , opt = copy_option, tout = self.subprocesstimeout['copy'])
658          except TransferException, ex:
659 <            msg = str(ex)
659 >            msg  = "Problem copying %s file" % filetocopy
660 >            msg += str(ex)
661              if self.debug :
662 <                msg += str(ex.detail)+'\n'
663 <                msg += str(ex.output)+'\n'
664 <            msg += "Problem copying %s file" % filetocopy
662 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
663 >                dbgmsg += '\t'+str(ex.output)+'\n'
664 >                print dbgmsg
665              ErCode = '60307'
666          except WrongOption, ex:
667 <            msg = str(ex)
667 >            msg  = "Problem copying %s file" % filetocopy
668 >            msg += str(ex)
669 >            if self.debug :
670 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
671 >                dbgmsg += '\t'+str(ex.output)+'\n'
672 >                print dbgmsg
673 >        except SizeZeroException, ex:
674 >            msg  = "Problem copying %s file" % filetocopy
675 >            msg += str(ex)
676              if self.debug :
677 <                msg += str(ex.detail)+'\n'
678 <                msg += str(ex.output)+'\n'
679 <            msg += "Problem copying %s file" % filetocopy
677 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
678 >                dbgmsg += '\t'+str(ex.output)+'\n'
679 >                print dbgmsg
680              ErCode = '60307'
681 <        if ErCode == '0' and protocol.find('srm') == 0:
681 >        ## when the client commands are not found (wrong env or really missing)
682 >        except MissingCommand, ex:
683 >            ErCode = '10041'
684 >            msg  = "Problem copying %s file" % filetocopy
685 >            msg += str(ex)
686 >            if self.debug :
687 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
688 >                dbgmsg += '\t'+str(ex.output)+'\n'
689 >                print dbgmsg
690 >        except AuthorizationException, ex:
691 >            ErCode = '60307'
692 >            msg  = "Problem copying %s file" % filetocopy
693 >            msg += str(ex)
694 >            if self.debug :
695 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
696 >                dbgmsg += '\t'+str(ex.output)+'\n'
697 >                print dbgmsg
698 >        except SEAPITimeout, ex:
699 >            ErCode = '60317'
700 >            msg  = "Problem copying %s file" % filetocopy
701 >            msg += str(ex)
702 >            if self.debug :
703 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
704 >                dbgmsg += '\t'+str(ex.output)+'\n'
705 >                print dbgmsg
706 >
707 >        if ErCode == '0' and protocol.find('srmv') == 0:
708              remote_file_size = -1
709              local_file_size = os.path.getsize( source_file )
710              try:
711 <                remote_file_size = sbi_dest.getSize( dest_file )
711 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
712 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
713              except TransferException, ex:
714 <                msg = str(ex)
714 >                msg  = "Problem checking the size of %s file" % filetocopy
715 >                msg += str(ex)
716                  if self.debug :
717 <                    msg += str(ex.detail)+'\n'
718 <                    msg += str(ex.output)+'\n'
719 <                msg += "Problem checking the size of %s file" % filetocopy
717 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
718 >                    dbgmsg += '\t'+str(ex.output)+'\n'
719 >                    print dbgmsg
720                  ErCode = '60307'
721              except WrongOption, ex:
722 <                msg = str(ex)
722 >                msg  = "Problem checking the size of %s file" % filetocopy
723 >                msg += str(ex)
724                  if self.debug :
725 <                    msg += str(ex.detail)+'\n'
726 <                    msg += str(ex.output)+'\n'
727 <                msg += "Problem checking the size of %s file" % filetocopy
725 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
726 >                    dbgmsg += '\t'+str(ex.output)+'\n'
727 >                    print dbgmsg
728                  ErCode = '60307'
729              if local_file_size != remote_file_size:
730                  msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
731                  ErCode = '60307'
732  
733 +        if ErCode != '0':
734 +            try :
735 +                self.removeFile( sbi_dest, dest_file, option )
736 +            except Exception, ex:
737 +                msg += '\n'+str(ex)  
738          return ErCode, msg
739  
740 <    def backup(self):
741 <        """
742 <        Check infos from TFC using existing api obtaining:
743 <        1)destination
744 <        2)protocol
745 <        """
746 <        return
740 >    def removeFile( self, sbi_dest, filetocopy, option ):
741 >        """  
742 >        """  
743 >        if self.debug : print 'removeFile():\n'
744 >        f_tocopy=filetocopy
745 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
746 >        try:
747 >            sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
748 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
749 >        except OperationException, ex:
750 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
751 >            msg += str(ex)
752 >            if self.debug :
753 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
754 >                dbgmsg += '\t'+str(ex.output)+'\n'
755 >                print dbgmsg
756 >            raise Exception(msg)
757  
758 <    def updateReport(self, file, erCode, reason, lfn='', se='' ):
758 >        return
759 >
760 >    def updateReport(self, file, erCode, reason):
761          """
762          Update the final stage out infos
763          """
764          jobStageInfo={}
765          jobStageInfo['erCode']=erCode
766          jobStageInfo['reason']=reason
767 <        jobStageInfo['lfn']=lfn
768 <        jobStageInfo['se']=se
769 <
767 >        if not self.params['for_lfn']: self.params['for_lfn']=''
768 >        if not self.params['se_name']: self.params['se_name']=''
769 >        if not self.hostname: self.hostname=''
770 >        if (erCode != '0') and (erCode != '60308'):
771 >           jobStageInfo['for_lfn']='/copy_problem/'
772 >        else:  
773 >            jobStageInfo['for_lfn']=self.params['for_lfn']
774 >        jobStageInfo['se_name']=self.params['se_name']
775 >        jobStageInfo['endpoint']=self.hostname
776 >        ### ADDING SURLFORGRID FOR COPYDATA
777 >        if not self.params['surl_for_grid']: self.params['surl_for_grid']=''
778 >        jobStageInfo['surl_for_grid']=self.params['surl_for_grid']
779 >        #####
780          report = { file : jobStageInfo}
781          return report
782  
# Line 394 | Line 789 | class cmscp:
789          cmscp_exit_status = 0
790          txt = ''
791          for file, dict in results.iteritems():
792 +            reason = str(dict['reason'])
793 +            if str(reason).find("'") > -1:
794 +                reason = " ".join(reason.split("'"))
795 +            reason="'%s'"%reason
796              if file:
797 <                if dict['lfn']=='':
798 <                    lfn = '$LFNBaseName/'+os.path.basename(file)
797 >                if dict['for_lfn']=='':
798 >                    lfn = '${LFNBaseName}'+os.path.basename(file)
799                      se  = '$SE'
800 +                    LFNBaseName = '$LFNBaseName'
801                  else:
802 <                    lfn = dict['lfn']+os.path.basename(file)
803 <                    se = dict['se']
804 <                #dict['lfn'] # to be implemented
805 <                txt +=  'echo "Report for File: '+file+'"\n'
806 <                txt +=  'echo "LFN: '+lfn+'"\n'
807 <                txt +=  'echo "StorageElement: '+se+'"\n'
808 <                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
802 >                    lfn = dict['for_lfn']+os.path.basename(file)
803 >                    se = dict['se_name']
804 >                    LFNBaseName = os.path.dirname(lfn)
805 >                    if (LFNBaseName[-1] != '/'):
806 >                        LFNBaseName = LFNBaseName + '/'
807 >
808 >                
809 >                txt += 'echo "Report for File: '+file+'"\n'
810 >                txt += 'echo "LFN: '+lfn+'"\n'
811 >                txt += 'echo "StorageElement: '+se+'"\n'
812 >                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
813                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
814 +
815 +                
816                  if dict['erCode'] != '0':
817                      cmscp_exit_status = dict['erCode']
412                    cmscp_exit_status = dict['erCode']
818              else:
819 <                cmscp_exit_status = dict['erCode']
819 >                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
820                  cmscp_exit_status = dict['erCode']
821          txt += '\n'
822          txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
823 <        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
823 >        txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
824          outFile.write(str(txt))
825          outFile.close()
826          return
# Line 424 | Line 829 | class cmscp:
829   def usage():
830  
831      msg="""
832 <    required parameters:
833 <    --source        :: REMOTE           :
834 <    --destination   :: REMOTE           :
430 <    --debug             :
431 <    --inFile :: absPath : or name NOT RELATIVE PATH
432 <    --outFIle :: onlyNAME : NOT YET SUPPORTED
832 >    cmscp:
833 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
834 >        including success checking  version also for CAF using rfcp command to copy the output to SE
835  
836 <    optional parameters
836 >    accepted parameters:
837 >       source           =
838 >       destination      =
839 >       inputFileList    =
840 >       outputFileList   =
841 >       protocol         =
842 >       option           =
843 >       middleware       =  
844 >       srm_version      =
845 >       destinationDir   =
846 >       lfn=             =
847 >       local_stage      =  activate stage fall back  
848 >       debug            =  activate verbose print out
849 >       help             =  print on line man and exit  
850 >    
851 >    mandatory:
852 >       * "source" and/or "destination" must always be defined
853 >       * either "middleware" or "protocol" must always be defined
854 >       * "inputFileList" must always be defined
855 >       * if "local_stage" = 1 also  "lfn" must be defined
856      """
857      print msg
858  
# Line 459 | Line 880 | if __name__ == '__main__' :
880      import getopt
881  
882      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
883 <                  "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
883 >                  "protocol=","option=", "middleware=", "srm_version=", \
884 >                  "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
885      try:
886          opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
887      except getopt.GetoptError, err:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines