ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.20 by spiga, Tue Oct 28 17:40:55 2008 UTC vs.
Revision 1.76 by mcinquil, Mon Jun 21 15:41:53 2010 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2
2   import sys, os
3 + try:
4 +    import json
5 + except:    
6 +    import simplejson as json
7   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
8   from ProdCommon.Storage.SEAPI.SBinterface import *
9   from ProdCommon.Storage.SEAPI.Exceptions import *
# Line 10 | Line 13 | class cmscp:
13      def __init__(self, args):
14          """
15          cmscp
16 <
14 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
17          including success checking  version also for CAF using rfcp command to copy the output to SE
18          input:
19             $1 middleware (CAF, LSF, LCG, OSG)
# Line 19 | Line 21 | class cmscp:
21             $3 if needed: file name (the output file name)
22             $5 remote SE (complete endpoint)
23             $6 srm version
24 +           --for_lfn $LFNBaseName
25          output:
26               return 0 if all ok
27               return 60307 if srmcp failed
28               return 60303 if file already exists in the SE
29          """
30  
31 +        #### FEDE added SE_NAME as parameters
32          #set default
33 <        self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
34 <                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
33 >        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
34 >                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'' }
35          self.debug = 0
36 <
36 >        #### for fallback copy
37 >        self.local_stage = 0
38          self.params.update( args )
39 +        ## timeout needed for subprocess command of SEAPI
40 +        ## they should be a bit higher then the corresponding passed by command line  
41 +        ## default values
42 +        self.subprocesstimeout = { \
43 +                                   'copy':   3600, \
44 +                                   'exists': 1200, \
45 +                                   'delete': 1200, \
46 +                                   'size':   1200 \
47 +                                 }
48  
49          return
50  
# Line 38 | Line 52 | class cmscp:
52          """
53          check command line parameter
54          """
41
55          if 'help' in self.params.keys(): HelpOptions()
56          if 'debug' in self.params.keys(): self.debug = 1
57 +        if 'local_stage' in self.params.keys(): self.local_stage = 1
58  
59          # source and dest cannot be undefined at same time
60          if not self.params['source']  and not self.params['destination'] :
61              HelpOptions()
48
62          # if middleware is not defined --> protocol cannot be empty
63          if not self.params['middleware'] and not self.params['protocol'] :
64              HelpOptions()
65  
66          # input file must be defined
67 <        if not self.params['inputFileList'] : HelpOptions()
67 >        if not self.params['inputFileList'] :
68 >            HelpOptions()
69          else:
70              file_to_copy=[]
71              if self.params['inputFileList'].find(','):
# Line 60 | Line 74 | class cmscp:
74                  file_to_copy.append(self.params['inputFileList'])
75              self.params['inputFileList'] = file_to_copy
76  
77 +        #if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
78 +        if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
79 +        
80          ## TO DO:
81          #### add check for outFiles
82          #### add map {'inFileNAME':'outFileNAME'} to change out name
83  
67        return
84  
85      def run( self ):
86          """
87          Check if running on UI (no $middleware) or
88          on WN (on the Grid), and take different action
89          """
74
90          self.processOptions()
91 +        if self.debug: print 'calling run() : \n'
92          # stage out from WN
93          if self.params['middleware'] :
94 <           results = self.stager(self.params['middleware'],self.params['inputFileList'])
95 <           self.finalReport(results)
94 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
95 >            self.writeJsonFile(results)
96 >            self.finalReport(results)
97          # Local interaction with SE
98          else:
99 <           results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'] )
100 <           return results
99 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
100 >            self.writeJsonFile(results)
101 >            return results
102 >            
103 >    def writeJsonFile( self, results ):
104 >        """
105 >        write a json file containing copy results for each file
106 >        """
107 >        if self.debug:
108 >            print 'in writeJsonFile() : \n'
109 >            print "---->>>> in writeJsonFile results =  ", results
110 >        fp = open('resultCopyFile', 'w')
111 >        json.dump(results, fp)
112 >        fp.close()
113 >        if self.debug:
114 >            print '    reading resultCopyFile : \n'
115 >            lp = open('resultCopyFile', "r")
116 >            inputDict = json.load(lp)
117 >            lp.close()
118 >            print "    inputDict = ", inputDict
119 >        return
120 >
121 >    def checkLcgUtils( self ):
122 >        """
123 >        _checkLcgUtils_
124 >        check the lcg-utils version and report
125 >        """
126 >        import commands
127 >        cmd = "lcg-cp --version | grep lcg_util"
128 >        status, output = commands.getstatusoutput( cmd )
129 >        num_ver = -1
130 >        if output.find("not found") == -1 or status == 0:
131 >            temp = output.split("-")
132 >            version = ""
133 >            if len(temp) >= 2:
134 >                version = output.split("-")[1]
135 >                temp = version.split(".")
136 >                if len(temp) >= 1:
137 >                    num_ver = int(temp[0])*10
138 >                    num_ver += int(temp[1])
139 >        return num_ver
140  
141      def setProtocol( self, middleware ):
142          """
# Line 88 | Line 144 | class cmscp:
144          which depend on scheduler
145          """
146          # default To be used with "middleware"
147 +        if self.debug:
148 +            print 'setProtocol() :\n'
149 +            print '\tmiddleware =  %s utils \n'%middleware
150 +
151          lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
152                  'srmv2':'-b -D srmv2  -t 2400 --verbose'}
153 +        if self.checkLcgUtils() >= 17:
154 +            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
155 +                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
156 +
157          srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
158 <                'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
158 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
159          rfioOpt=''
160  
161          supported_protocol = None
162 <        if middleware.lower() in ['osg','lcg','condor']:
162 >        if middleware.lower() in ['osg','lcg','condor','sge']:
163              supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
164 <                                  (self.params['srm_version'],srmOpt[self.params['srm_version']])]
164 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
165          elif middleware.lower() in ['lsf','caf']:
166              supported_protocol = [('rfio',rfioOpt)]
167 +        elif middleware.lower() in ['pbs']:
168 +            supported_protocol = [('rfio',rfioOpt),('local','')]
169 +        elif middleware.lower() in ['arc']:
170 +            supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
171          else:
172              ## here we can add support for any kind of protocol,
173              ## maybe some local schedulers need something dedicated
174              pass
175          return supported_protocol
176  
177 +
178 +    #def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
179 +    def checkCopy (self, copy_results, len_list_files, prot):
180 +        """
181 +        Checks the status of copy and update result dictionary
182 +        """
183 +        
184 +        list_retry = []
185 +        list_not_existing = []
186 +        list_already_existing = []
187 +        list_fallback = []
188 +        list_ok = []
189 +        
190 +        if self.debug:
191 +            print 'in checkCopy() :\n'
192 +        for file, dict in copy_results.iteritems():
193 +            er_code = dict['erCode']
194 +            if er_code == '0':
195 +                list_ok.append(file)
196 +                reason = 'Copy succedeed with %s utils'%prot
197 +                dict['reason'] = reason
198 +            elif er_code == '60308':
199 +                list_fallback.append( file )
200 +                reason = 'Copy succedeed with %s utils'%prot
201 +                dict['reason'] = reason
202 +            elif er_code == '60302':
203 +                list_not_existing.append( file )
204 +            elif er_code == '60303':
205 +                list_already_existing.append( file )
206 +            else :    
207 +                list_retry.append( file )
208 +                
209 +            if self.debug:
210 +                print "\t file %s \n"%file
211 +                print "\t dict['erCode'] %s \n"%dict['erCode']
212 +                print "\t dict['reason'] %s \n"%dict['reason']
213 +                
214 +            #if (lfn != '') and (se != ''):
215 +            #    upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
216 +            #else:
217 +            #    upDict = self.updateReport(file, er_code, dict['reason'])
218 +            
219 +            upDict = self.updateReport(file, er_code, dict['reason'])
220 +
221 +            copy_results.update(upDict)
222 +        
223 +        msg = ''
224 +        if len(list_ok) != 0:
225 +            msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
226 +        if len(list_ok) != len_list_files :
227 +            if len(list_fallback)!=0:
228 +                msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
229 +            if len(list_retry)!=0:
230 +                msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
231 +            if len(list_not_existing)!=0:
232 +                msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
233 +            if len(list_already_existing)!=0:
234 +                msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
235 +        if self.debug : print msg
236 +        
237 +        return copy_results, list_ok, list_retry
238 +        
239 +    def check_for_retry_localSE (self, copy_results):
240 +        """
241 +        Checks the status of copy and create the list of file to copy to CloseSE
242 +        """
243 +        list_retry_localSE = []
244 +
245 +        if self.debug:
246 +            print 'in check_for_retry_localSE() :\n'
247 +            print "\t results in check local = ", copy_results
248 +        for file, dict in copy_results.iteritems():
249 +            er_code = dict['erCode']
250 +            if er_code != '0' and  er_code != '60302' and er_code != '60308':
251 +                list_retry_localSE.append( file )
252 +                
253 +            if self.debug:
254 +                print "\t file %s \n"%file
255 +                print "\t dict['erCode'] %s \n"%dict['erCode']
256 +                print "\t dict['reason'] %s \n"%dict['reason']
257 +                
258 +        return list_retry_localSE
259 +
260 +        
261 +    def LocalCopy(self, list_retry, results):
262 +        """
263 +        Tries the stage out to the CloseSE
264 +        """
265 +        if self.debug:
266 +            print 'in LocalCopy() :\n'
267 +            print '\t list_retry %s utils \n'%list_retry
268 +            print '\t len(list_retry) %s \n'%len(list_retry)
269 +                
270 +        list_files = list_retry  
271 +        self.params['inputFilesList']=list_files
272 +        
273 +        ### copy backup
274 +        from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
275 +        siteCfg = loadSiteLocalConfig()
276 +        seName = siteCfg.localStageOut.get("se-name", None)
277 +        catalog = siteCfg.localStageOut.get("catalog", None)
278 +        implName = siteCfg.localStageOut.get("command", None)
279 +        if (implName == 'srm'):
280 +           implName='srmv1'
281 +           self.params['srm_version']=implName
282 +        ##### to be improved ###############
283 +        if (implName == 'rfcp'):
284 +            self.params['middleware']='lsf'
285 +        ####################################    
286 +                  
287 +        self.params['protocol']=implName
288 +        tfc = siteCfg.trivialFileCatalog()
289 +            
290 +        if self.debug:
291 +            print '\t siteCFG %s \n'%siteCfg
292 +            print '\t seName %s \n'%seName
293 +            print '\t catalog %s \n'%catalog
294 +            print "\t self.params['protocol'] %s \n"%self.params['protocol']            
295 +            print '\t tfc %s '%tfc
296 +            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
297 +                
298 +        #if (str(self.params['lfn']).find("/store/") != -1):
299 +        #    temp = str(self.params['lfn']).split("/store/")
300 +        #    self.params['lfn']= "/store/temp/" + temp[1]
301 +        if (str(self.params['for_lfn']).find("/store/") == 0):
302 +            temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
303 +            self.params['for_lfn']= temp
304 +        
305 +        if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
306 +            
307 +        file_backup=[]
308 +        for input in self.params['inputFilesList']:
309 +            #file = self.params['lfn'] + os.path.basename(input)
310 +            file = self.params['for_lfn'] + os.path.basename(input)
311 +            surl = tfc.matchLFN(tfc.preferredProtocol, file)
312 +            file_backup.append(surl)
313 +            if self.debug:
314 +                #print '\t lfn %s \n'%self.params['lfn']
315 +                print '\t for_lfn %s \n'%self.params['for_lfn']
316 +                print '\t file %s \n'%file
317 +                print '\t surl %s \n'%surl
318 +                    
319 +        destination=os.path.dirname(file_backup[0])
320 +        if ( destination[-1] != '/' ) : destination = destination + '/'
321 +        self.params['destination']=destination
322 +        ###########
323 +        
324 +        self.params['se_name']=seName
325 +        #print "######################################################"
326 +        #print "in local copy self.params['se_name'] = ", self.params['se_name']
327 +        ###print "in local copy self.params['lfn'] = ", self.params['lfn']
328 +        #print "in local copy self.params['for_lfn'] = ", self.params['for_lfn']
329 +        #print "######################################################"
330 +            
331 +        if self.debug:
332 +            print "\t self.params['destination']%s \n"%self.params['destination']
333 +            print "\t self.params['protocol'] %s \n"%self.params['protocol']
334 +            print "\t self.params['option']%s \n"%self.params['option']
335 +              
336 +        for prot, opt in self.setProtocol( self.params['middleware'] ):
337 +            if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
338 +            localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
339 +            if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
340 +                results.update(localCopy_results)
341 +            else:
342 +                localCopy_results, list_ok, list_retry = self.checkCopy(localCopy_results, len(list_files), prot)
343 +                results.update(localCopy_results)
344 +                if len(list_ok) == len(list_files) :
345 +                    break
346 +                if len(list_retry):
347 +                    list_files = list_retry
348 +                else: break
349 +            if self.debug:
350 +                print "\t localCopy_results = %s \n"%localCopy_results
351 +        
352 +        return results        
353 +
354      def stager( self, middleware, list_files ):
355          """
356          Implement the logic for remote stage out
357          """
358 +
359 +        if self.debug:
360 +            print 'stager() :\n'
361 +            print '\tmiddleware %s\n'%middleware
362 +            print '\tlist_files %s\n'%list_files
363 +        
364          results={}
365          for prot, opt in self.setProtocol( middleware ):
366 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
366 >            if self.debug:
367 >                print '\tTrying the stage out with %s utils \n'%prot
368 >                print '\tand options %s\n'%opt
369 >                
370              copy_results = self.copy( list_files, prot, opt )
371 <            list_retry = []
372 <            list_existing = []
119 <            list_ok = []
120 <            if copy_results.keys() == '':
371 >            #print "in stager copy_results = ", copy_results
372 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
373                  results.update(copy_results)
374              else:
375 <                for file, dict in copy_results.iteritems():
124 <                    er_code = dict['erCode']
125 <                    if er_code == '0':
126 <                        list_ok.append(file)
127 <                        reason = 'Copy succedeed with %s utils'%prot
128 <                        upDict = self.updateReport(file, er_code, reason)
129 <                        copy_results.update(upDict)
130 <                    elif er_code == '60303': list_existing.append( file )
131 <                    else: list_retry.append( file )
375 >                copy_results, list_ok, list_retry = self.checkCopy(copy_results, len(list_files), prot)
376                  results.update(copy_results)
133                if len(list_ok) != 0:
134                    msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135                    if self.debug : print msg
377                  if len(list_ok) == len(list_files) :
378                      break
379 <                else:
380 <                    if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
381 <                    if len(list_retry): list_files = list_retry
382 <                    else: break
383 <
384 <        #### TODO Daniele
385 <        #check is something fails and created related dict
386 <  #      backup = self.analyzeResults(results)
387 <
388 <  #      if backup :
389 <  #          msg = 'WARNING: backup logic is under implementation\n'
390 <  #          #backupDict = self.backup()
391 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
392 <  #          results.update(backupDict)
393 <  #          print msg
379 >                if len(list_retry):
380 >                    list_files = list_retry
381 >                else: break
382 >
383 >        if self.local_stage:
384 >            #print "results before check local = ", results
385 >            list_retry_localSE = self.check_for_retry_localSE(results)
386 >            #print "results after check local = ", results
387 >            if len(list_retry_localSE):
388 >                if self.debug:
389 >                    print "\t list_retry_localSE %s \n"%list_retry_localSE
390 >                results = self.LocalCopy(list_retry_localSE, results)
391 >
392 >        if self.debug:
393 >            print "\t results %s \n"%results
394          return results
395  
396      def initializeApi(self, protocol ):
397          """
398          Instantiate storage interface
399          """
400 +        if self.debug : print 'initializeApi() :\n'  
401          self.source_prot = protocol
402          self.dest_prot = protocol
403          if not self.params['source'] : self.source_prot = 'local'
# Line 164 | Line 406 | class cmscp:
406          Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
407  
408          if self.debug :
409 <            print '(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
410 <            print '(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
409 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
410 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
411 >            print msg
412  
413          return Source_SE, Destination_SE
414  
415 <    def copy( self, list_file, protocol, options ):
415 >    def copy( self, list_file, protocol, options, backup='no' ):
416          """
417          Make the real file copy using SE API
418          """
419 +        msg = ""
420 +        results = {}
421          if self.debug :
422 <            print 'copy(): using %s protocol'%protocol
422 >            msg  = 'copy() :\n'
423 >            msg += '\tusing %s protocol\n'%protocol
424 >            print msg
425          try:
426              Source_SE, Destination_SE = self.initializeApi( protocol )
427          except Exception, ex:
428 <            return self.updateReport('', '-1', str(ex))
428 >            for filetocopy in list_file:
429 >                results.update( self.updateReport(filetocopy, '-1', str(ex)))
430 >            return results
431 >            
432 >        prot = Destination_SE.protocol
433 >        self.hostname=Destination_SE.hostname
434 >        #print "in copy hostname = ", self.hostname
435 >        #print "prot = ", prot
436  
437          # create remote dir
438 <        if protocol in ['gridftp','rfio']:
438 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
439              try:
440 <                self.createDir( Destination_SE, protocol )
440 >                self.createDir( Destination_SE, Destination_SE.protocol )
441 >            except OperationException, ex:
442 >                for filetocopy in list_file:
443 >                    results.update( self.updateReport(filetocopy, '60316', str(ex)))
444 >                return results
445 >            ## when the client commands are not found (wrong env or really missing)
446 >            except MissingCommand, ex:
447 >                msg = "ERROR %s %s" %(str(ex), str(ex.detail))
448 >                for filetocopy in list_file:
449 >                    results.update( self.updateReport(filetocopy, '10041', msg))
450 >                return results
451              except Exception, ex:
452 <                return self.updateReport('', '60316', str(ex))
452 >                msg = "ERROR %s" %(str(ex))
453 >                for filetocopy in list_file:
454 >                    results.update( self.updateReport(filetocopy, '-1', msg))
455 >                return results
456  
457          ## prepare for real copy  ##
458          try :
# Line 193 | Line 460 | class cmscp:
460              sbi_dest = SBinterface(Destination_SE)
461              sbi_source = SBinterface(Source_SE)
462          except ProtocolMismatch, ex:
463 <            msg = str(ex)+'\n'
464 <            msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
465 <            return self.updateReport('', '-1', str(ex))
463 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
464 >            msg += str(ex)
465 >            for filetocopy in list_file:
466 >                results.update( self.updateReport(filetocopy, '-1', msg))
467 >            return results
468  
469 <        results = {}
469 >        self.hostname = Destination_SE.hostname
470 >        
471          ## loop over the complete list of files
472          for filetocopy in list_file:
473 <            if self.debug : print 'start real copy for %s'%filetocopy
473 >            if self.debug : print '\tStart real copy for %s'%filetocopy
474              try :
475 <                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy )
475 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
476              except Exception, ex:
477 <                ErCode = -1
477 >                ErCode = '60307'
478                  msg = str(ex)  
479              if ErCode == '0':
480                  ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
481 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
481 >                if (ErCode == '0') and (backup == 'yes'):
482 >                    ErCode = '60308'
483 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
484              results.update( self.updateReport(filetocopy, ErCode, msg))
485          return results
486  
# Line 217 | Line 489 | class cmscp:
489          """
490          Create the storage interface.
491          """
492 +        if self.debug : print 'storageInterface():\n'
493          try:
494              interface = SElement( FullPath(endpoint), protocol )
495          except ProtocolUnknown, ex:
496 <            msg = ''
497 <            if self.debug : msg = str(ex)+'\n'
225 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
496 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
497 >            msg += str(ex)
498              raise Exception(msg)
499  
500          return interface
# Line 232 | Line 504 | class cmscp:
504          Create remote dir for gsiftp REALLY TEMPORARY
505          this should be transparent at SE API level.
506          """
507 +        if self.debug : print 'createDir():\n'
508          msg = ''
509          try:
510              action = SBinterface( Destination_SE )
511              action.createDir()
512 <            if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
512 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
513          except TransferException, ex:
514 <            msg = str(ex)
514 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
515 >            msg += str(ex)
516              if self.debug :
517 <                msg += str(ex.detail)+'\n'
518 <                msg += str(ex.output)+'\n'
519 <            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
520 <            raise Exceptions(msg)
517 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
518 >                dbgmsg += '\t'+str(ex.output)+'\n'
519 >                print dbgmsg
520 >            raise Exception(msg)
521          except OperationException, ex:
522 <            msg = str(ex)
523 <            if self.debug : msg += str(ex.detail)+'\n'
524 <            msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
525 <
522 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
523 >            msg += str(ex)
524 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
525 >            raise Exception(msg)
526 >        except MissingDestination, ex:
527 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
528 >            msg += str(ex)
529 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
530 >            raise Exception(msg)
531 >        except AlreadyExistsException, ex:
532 >            if self.debug: print "\tThe directory already exist"
533 >            pass
534 >        except Exception, ex:    
535 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
536 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
537 >            raise Exception(msg)
538          return msg
539  
540 <    def checkFileExist( self, sbi_source, sbi_dest, filetocopy ):
540 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
541          """
542          Check both if source file exist AND
543          if destination file ALREADY exist.
544          """
545 +        if self.debug : print 'checkFileExist():\n'
546          ErCode = '0'
547          msg = ''
548          f_tocopy=filetocopy
549          if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
550          try:
551 <            checkSource = sbi_source.checkExists( f_tocopy )
551 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
552 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
553          except OperationException, ex:
554 <            msg = str(ex)
554 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
555 >            msg += str(ex)
556              if self.debug :
557 <                msg += str(ex.detail)+'\n'
558 <                msg += str(ex.output)+'\n'
559 <            msg +='ERROR: problems checkig if source file %s exist'%filetocopy
557 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
558 >                dbgmsg += '\t'+str(ex.output)+'\n'
559 >                print dbgmsg
560              raise Exception(msg)
561          except WrongOption, ex:
562 <            msg = str(ex)
562 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
563 >            msg += str(ex)
564              if self.debug :
565 <                msg += str(ex.detail)+'\n'
566 <                msg += str(ex.output)+'\n'
567 <            msg +='ERROR problems checkig if source file % exist'%filetocopy
565 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
566 >                dbgmsg += '\t'+str(ex.output)+'\n'
567 >                print dbgmsg
568              raise Exception(msg)
569 +        except MissingDestination, ex:
570 +            msg  ='ERROR problems checkig if source file % exist'%filetocopy
571 +            msg += str(ex)
572 +            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
573 +            raise Exception(msg)
574 +        ## when the client commands are not found (wrong env or really missing)
575 +        except MissingCommand, ex:
576 +            ErCode = '10041'
577 +            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
578 +            return ErCode, msg
579          if not checkSource :
580              ErCode = '60302'
581              msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
582              return ErCode, msg
283
583          f_tocopy=filetocopy
584          if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
585          try:
586 <            check = sbi_dest.checkExists( f_tocopy )
586 >            check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
587 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
588          except OperationException, ex:
589 <            msg = str(ex)
589 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
590 >            msg += str(ex)
591              if self.debug :
592 <                msg += str(ex.detail)+'\n'
593 <                msg += str(ex.output)+'\n'
594 <            msg +='ERROR: problems checkig if file %s already exist'%filetocopy
592 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
593 >                dbgmsg += '\t'+str(ex.output)+'\n'
594 >                print dbgmsg
595              raise Exception(msg)
596          except WrongOption, ex:
597 <            msg = str(ex)
597 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
598 >            msg += str(ex)
599              if self.debug :
600 <                msg += str(ex.detail)+'\n'
601 <                msg += str(ex.output)+'\n'
602 <            msg +='ERROR problems checkig if file % already exist'%filetocopy
600 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
601 >                msg += '\t'+str(ex.output)+'\n'
602 >            raise Exception(msg)
603 >        except MissingDestination, ex:
604 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
605 >            msg += str(ex)
606 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
607              raise Exception(msg)
608 +        ## when the client commands are not found (wrong env or really missing)
609 +        except MissingCommand, ex:
610 +            ErCode = '10041'
611 +            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
612 +            return ErCode, msg
613          if check :
614              ErCode = '60303'
615              msg = "file %s already exist"%os.path.basename(filetocopy)
# Line 309 | Line 620 | class cmscp:
620          """
621          call the copy API.
622          """
623 +        if self.debug : print 'makeCopy():\n'
624          path = os.path.dirname(filetocopy)
625          file_name =  os.path.basename(filetocopy)
626          source_file = filetocopy
# Line 316 | Line 628 | class cmscp:
628          if self.params['source'] == '' and path == '':
629              source_file = os.path.abspath(filetocopy)
630          elif self.params['destination'] =='':
631 <            dest_file = os.path.join(os.getcwd(),file_name)
631 >            destDir = self.params.get('destinationDir',os.getcwd())
632 >            dest_file = os.path.join(destDir,file_name)
633          elif self.params['source'] != '' and self.params['destination'] != '' :
634              source_file = file_name
635  
636          ErCode = '0'
637          msg = ''
638  
639 +        if  self.params['option'].find('space_token')>=0:
640 +            space_token=self.params['option'].split('=')[1]
641 +            if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
642 +            if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
643          try:
644 <            sbi.copy( source_file , dest_file , opt = option)
644 >            sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
645          except TransferException, ex:
646 <            msg = str(ex)
646 >            msg  = "Problem copying %s file" % filetocopy
647 >            msg += str(ex)
648              if self.debug :
649 <                msg += str(ex.detail)+'\n'
650 <                msg += str(ex.output)+'\n'
651 <            msg += "Problem copying %s file" % filetocopy
649 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
650 >                dbgmsg += '\t'+str(ex.output)+'\n'
651 >                print dbgmsg
652              ErCode = '60307'
653          except WrongOption, ex:
654 <            msg = str(ex)
654 >            msg  = "Problem copying %s file" % filetocopy
655 >            msg += str(ex)
656 >            if self.debug :
657 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
658 >                dbgmsg += '\t'+str(ex.output)+'\n'
659 >                print dbgmsg
660 >        except SizeZeroException, ex:
661 >            msg  = "Problem copying %s file" % filetocopy
662 >            msg += str(ex)
663 >            if self.debug :
664 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
665 >                dbgmsg += '\t'+str(ex.output)+'\n'
666 >                print dbgmsg
667 >            ErCode = '60307'
668 >        ## when the client commands are not found (wrong env or really missing)
669 >        except MissingCommand, ex:
670 >            ErCode = '10041'
671 >            msg  = "Problem copying %s file" % filetocopy
672 >            msg += str(ex)
673              if self.debug :
674 <                msg += str(ex.detail)+'\n'
675 <                msg += str(ex.output)+'\n'
676 <            msg += "Problem copying %s file" % filetocopy
674 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
675 >                dbgmsg += '\t'+str(ex.output)+'\n'
676 >                print dbgmsg
677 >        except AuthorizationException, ex:
678              ErCode = '60307'
679 <        if ErCode == '0' and protocol.find('srm') == 0:
679 >            msg  = "Problem copying %s file" % filetocopy
680 >            msg += str(ex)
681 >            if self.debug :
682 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
683 >                dbgmsg += '\t'+str(ex.output)+'\n'
684 >                print dbgmsg
685 >        except SEAPITimeout, ex:
686 >            ErCode = '60317'
687 >            msg  = "Problem copying %s file" % filetocopy
688 >            msg += str(ex)
689 >            if self.debug :
690 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
691 >                dbgmsg += '\t'+str(ex.output)+'\n'
692 >                print dbgmsg
693 >
694 >        if ErCode == '0' and protocol.find('srmv') == 0:
695              remote_file_size = -1
696              local_file_size = os.path.getsize( source_file )
697              try:
698 <                remote_file_size = sbi_dest.getSize( dest_file )
698 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
699 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
700              except TransferException, ex:
701 <                msg = str(ex)
701 >                msg  = "Problem checking the size of %s file" % filetocopy
702 >                msg += str(ex)
703                  if self.debug :
704 <                    msg += str(ex.detail)+'\n'
705 <                    msg += str(ex.output)+'\n'
706 <                msg += "Problem checking the size of %s file" % filetocopy
704 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
705 >                    dbgmsg += '\t'+str(ex.output)+'\n'
706 >                    print dbgmsg
707                  ErCode = '60307'
708              except WrongOption, ex:
709 <                msg = str(ex)
709 >                msg  = "Problem checking the size of %s file" % filetocopy
710 >                msg += str(ex)
711                  if self.debug :
712 <                    msg += str(ex.detail)+'\n'
713 <                    msg += str(ex.output)+'\n'
714 <                msg += "Problem checking the size of %s file" % filetocopy
712 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
713 >                    dbgmsg += '\t'+str(ex.output)+'\n'
714 >                    print dbgmsg
715                  ErCode = '60307'
716              if local_file_size != remote_file_size:
717                  msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
718                  ErCode = '60307'
719  
720 +        if ErCode != '0':
721 +            try :
722 +                self.removeFile( sbi_dest, dest_file, option )
723 +            except Exception, ex:
724 +                msg += '\n'+str(ex)  
725          return ErCode, msg
726  
727 <    def backup(self):
728 <        """
729 <        Check infos from TFC using existing api obtaining:
730 <        1)destination
731 <        2)protocol
732 <        """
733 <        return
727 >    def removeFile( self, sbi_dest, filetocopy, option ):
728 >        """  
729 >        """  
730 >        if self.debug : print 'removeFile():\n'
731 >        f_tocopy=filetocopy
732 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
733 >        try:
734 >            sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
735 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
736 >        except OperationException, ex:
737 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
738 >            msg += str(ex)
739 >            if self.debug :
740 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
741 >                dbgmsg += '\t'+str(ex.output)+'\n'
742 >                print dbgmsg
743 >            raise Exception(msg)
744 >
745 >        return
746  
747 <    def updateReport(self, file, erCode, reason, lfn='', se='' ):
747 >    #def updateReport(self, file, erCode, reason, lfn='', se='' ):
748 >    def updateReport(self, file, erCode, reason):
749          """
750          Update the final stage out infos
751          """
752          jobStageInfo={}
753          jobStageInfo['erCode']=erCode
754          jobStageInfo['reason']=reason
755 <        jobStageInfo['lfn']=lfn
756 <        jobStageInfo['se']=se
755 >        #if not self.params['lfn']: self.params['lfn']=''
756 >        if not self.params['for_lfn']: self.params['for_lfn']=''
757 >        if not self.params['se_name']: self.params['se_name']=''
758 >        if not self.hostname: self.hostname=''
759 >        if (erCode != '0') and (erCode != '60308'):
760 >           #jobStageInfo['lfn']='/copy_problem/'
761 >           jobStageInfo['for_lfn']='/copy_problem/'
762 >        else:  
763 >            jobStageInfo['for_lfn']=self.params['for_lfn']
764 >        jobStageInfo['se_name']=self.params['se_name']
765 >        jobStageInfo['endpoint']=self.hostname
766  
767          report = { file : jobStageInfo}
768          return report
# Line 394 | Line 776 | class cmscp:
776          cmscp_exit_status = 0
777          txt = ''
778          for file, dict in results.iteritems():
779 +            reason = str(dict['reason'])
780 +            if str(reason).find("'") > -1:
781 +                reason = " ".join(reason.split("'"))
782 +            reason="'%s'"%reason
783              if file:
784 <                if dict['lfn']=='':
785 <                    lfn = '$LFNBaseName/'+os.path.basename(file)
784 >                #if dict['lfn']=='':
785 >                if dict['for_lfn']=='':
786 >                    lfn = '${LFNBaseName}'+os.path.basename(file)
787                      se  = '$SE'
788 +                    LFNBaseName = '$LFNBaseName'
789                  else:
790 <                    lfn = dict['lfn']+os.path.basename(file)
791 <                    se = dict['se']
790 >                    #lfn = dict['lfn']+os.path.basename(file)
791 >                    lfn = dict['for_lfn']+os.path.basename(file)
792 >                    #se = dict['se']
793 >                    se = dict['se_name']
794 >                    LFNBaseName = os.path.dirname(lfn)
795 >                    if (LFNBaseName[-1] != '/'):
796 >                        LFNBaseName = LFNBaseName + '/'
797 >
798 >                
799                  #dict['lfn'] # to be implemented
800 <                txt +=  'echo "Report for File: '+file+'"\n'
801 <                txt +=  'echo "LFN: '+lfn+'"\n'
802 <                txt +=  'echo "StorageElement: '+se+'"\n'
803 <                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
800 >                txt += 'echo "Report for File: '+file+'"\n'
801 >                txt += 'echo "LFN: '+lfn+'"\n'
802 >                txt += 'echo "StorageElement: '+se+'"\n'
803 >                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
804                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
805 +
806 +                
807                  if dict['erCode'] != '0':
808                      cmscp_exit_status = dict['erCode']
412                    cmscp_exit_status = dict['erCode']
809              else:
810 <                cmscp_exit_status = dict['erCode']
810 >                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
811                  cmscp_exit_status = dict['erCode']
812          txt += '\n'
813          txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
814 <        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
814 >        txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
815          outFile.write(str(txt))
816          outFile.close()
817          return
# Line 424 | Line 820 | class cmscp:
820   def usage():
821  
822      msg="""
823 <    required parameters:
824 <    --source        :: REMOTE           :
825 <    --destination   :: REMOTE           :
430 <    --debug             :
431 <    --inFile :: absPath : or name NOT RELATIVE PATH
432 <    --outFIle :: onlyNAME : NOT YET SUPPORTED
823 >    cmscp:
824 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
825 >        including success checking  version also for CAF using rfcp command to copy the output to SE
826  
827 <    optional parameters
827 >    accepted parameters:
828 >       source           =
829 >       destination      =
830 >       inputFileList    =
831 >       outputFileList   =
832 >       protocol         =
833 >       option           =
834 >       middleware       =  
835 >       srm_version      =
836 >       destinationDir   =
837 >       lfn=             =
838 >       local_stage      =  activate stage fall back  
839 >       debug            =  activate verbose print out
840 >       help             =  print on line man and exit  
841 >    
842 >    mandatory:
843 >       * "source" and/or "destination" must always be defined
844 >       * either "middleware" or "protocol" must always be defined
845 >       * "inputFileList" must always be defined
846 >       * if "local_stage" = 1 also  "lfn" must be defined
847      """
848      print msg
849  
# Line 459 | Line 871 | if __name__ == '__main__' :
871      import getopt
872  
873      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
874 <                  "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
874 >                  "protocol=","option=", "middleware=", "srm_version=", \
875 >                  "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
876      try:
877          opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
878      except getopt.GetoptError, err:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines