ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.11 by spiga, Thu Oct 9 11:17:31 2008 UTC vs.
Revision 1.80 by fanzago, Mon Jul 26 16:50:46 2010 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2 <
3 < import sys, string
4 < import os, popen2
2 > import sys, os
3 > try:
4 >    import json
5 > except:    
6 >    import simplejson as json
7   from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
8   from ProdCommon.Storage.SEAPI.SBinterface import *
9   from ProdCommon.Storage.SEAPI.Exceptions import *
# Line 11 | Line 13 | class cmscp:
13      def __init__(self, args):
14          """
15          cmscp
16 <
15 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
17          including success checking  version also for CAF using rfcp command to copy the output to SE
18          input:
19             $1 middleware (CAF, LSF, LCG, OSG)
# Line 20 | Line 21 | class cmscp:
21             $3 if needed: file name (the output file name)
22             $5 remote SE (complete endpoint)
23             $6 srm version
24 +           --for_lfn $LFNBaseName
25          output:
26               return 0 if all ok
27               return 60307 if srmcp failed
28               return 60303 if file already exists in the SE
29          """
30 <
31 <        #set default
32 <        self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
33 <                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
34 <        self.debug = 0  
33 <
30 >        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
31 >                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'' }
32 >        self.debug = 0
33 >        #### for fallback copy
34 >        self.local_stage = 0
35          self.params.update( args )
36 +        ## timeout needed for subprocess command of SEAPI
37 +        ## they should be a bit higher then the corresponding passed by command line  
38 +        ## default values
39 +        self.subprocesstimeout = { \
40 +                                   'copy':   3600, \
41 +                                   'exists': 1200, \
42 +                                   'delete': 1200, \
43 +                                   'size':   1200 \
44 +                                 }
45  
46          return
47  
# Line 39 | Line 49 | class cmscp:
49          """
50          check command line parameter
51          """
52 <
53 <        if 'help' in self.params.keys(): HelpOptions()        
54 <        if 'debug' in self.params.keys(): self.debug = 1        
52 >        if 'help' in self.params.keys(): HelpOptions()
53 >        if 'debug' in self.params.keys(): self.debug = 1
54 >        if 'local_stage' in self.params.keys(): self.local_stage = 1
55  
56          # source and dest cannot be undefined at same time
57          if not self.params['source']  and not self.params['destination'] :
58              HelpOptions()
49
59          # if middleware is not defined --> protocol cannot be empty
60          if not self.params['middleware'] and not self.params['protocol'] :
61              HelpOptions()
62  
63          # input file must be defined
64 <        if not self.params['inputFileList'] : HelpOptions()
64 >        if not self.params['inputFileList'] :
65 >            HelpOptions()
66          else:
67              file_to_copy=[]
68 <            if self.params['inputFileList'].find(','):
68 >            if self.params['inputFileList'].find(','):
69                  [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
70              else:
71                  file_to_copy.append(self.params['inputFileList'])
72              self.params['inputFileList'] = file_to_copy
73  
74 +        #if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
75 +        if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
76 +        
77          ## TO DO:
78          #### add check for outFiles
79          #### add map {'inFileNAME':'outFileNAME'} to change out name
80  
68        return
81  
82      def run( self ):
83          """
84          Check if running on UI (no $middleware) or
85          on WN (on the Grid), and take different action
86          """
75
87          self.processOptions()
88 +        if self.debug: print 'calling run() : \n'
89          # stage out from WN
90          if self.params['middleware'] :
91 <           results = self.stager(self.params['middleware'],self.params['inputFileList'])
92 <           self.finalReport(results)
91 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
92 >            self.writeJsonFile(results)
93 >            self.finalReport(results)
94          # Local interaction with SE
95          else:
96 <           results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.protocols['option'] )
97 <           return results
96 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
97 >            self.writeJsonFile(results)
98 >            return results
99 >            
100 >    def writeJsonFile( self, results ):
101 >        """
102 >        write a json file containing copy results for each file
103 >        """
104 >        if self.debug:
105 >            print 'in writeJsonFile() : \n'
106 >            print "---->>>> in writeJsonFile results =  ", results
107 >        fp = open('resultCopyFile', 'w')
108 >        json.dump(results, fp)
109 >        fp.close()
110 >        if self.debug:
111 >            print '    reading resultCopyFile : \n'
112 >            lp = open('resultCopyFile', "r")
113 >            inputDict = json.load(lp)
114 >            lp.close()
115 >            print "    inputDict = ", inputDict
116 >        return
117 >
118 >    def checkLcgUtils( self ):
119 >        """
120 >        _checkLcgUtils_
121 >        check the lcg-utils version and report
122 >        """
123 >        import commands
124 >        cmd = "lcg-cp --version | grep lcg_util"
125 >        status, output = commands.getstatusoutput( cmd )
126 >        num_ver = -1
127 >        if output.find("not found") == -1 or status == 0:
128 >            temp = output.split("-")
129 >            version = ""
130 >            if len(temp) >= 2:
131 >                version = output.split("-")[1]
132 >                temp = version.split(".")
133 >                if len(temp) >= 1:
134 >                    num_ver = int(temp[0])*10
135 >                    num_ver += int(temp[1])
136 >        return num_ver
137  
138      def setProtocol( self, middleware ):
139          """
# Line 89 | Line 141 | class cmscp:
141          which depend on scheduler
142          """
143          # default To be used with "middleware"
144 <        lcgOpt={'srmv1':'-b -D srmv1 --vo cms -t 2400 --verbose',
145 <                'srmv2':'-b -D srmv2 --vo cms -t 2400 --verbose'}
146 <        srmOpt={'srmv1':'-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
147 <                'srmv2':'-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -srm_protocol_version 2 '}
144 >        if self.debug:
145 >            print 'setProtocol() :\n'
146 >            print '\tmiddleware =  %s utils \n'%middleware
147 >
148 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
149 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
150 >        if self.checkLcgUtils() >= 17:
151 >            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
152 >                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
153 >
154 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
155 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
156          rfioOpt=''
157  
158 <        if middleware.lower() in ['osg','lcg']:
159 <            supported_protocol = [('srm-lcg',lcgOpt[params['srm_version']]),\
160 <                                  (params['srm_version'],srmOpt[params['srm_version']])]
158 >        ## FEDE for hadoop ###
159 >        hadoopOpt = ''
160 >        ####
161 >
162 >        supported_protocol = None
163 >        if middleware.lower() in ['osg','lcg','condor','sge']:
164 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
165 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
166          elif middleware.lower() in ['lsf','caf']:
167 <            supported_protocol = [('rfio',rfioOpt)]
167 >            supported_protocol = [('rfio',rfioOpt)]
168 >        elif middleware.lower() in ['pbs']:
169 >            supported_protocol = [('rfio',rfioOpt),('local','')]
170 >        elif middleware.lower() in ['arc']:
171 >            supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
172 >        ## FEDE for hadoop ###
173 >        elif middleware.lower() in ['hadoop']:
174 >            supported_protocol = [('hadoop', hadoopOpt)]
175 >        ####    
176          else:
177              ## here we can add support for any kind of protocol,
178              ## maybe some local schedulers need something dedicated
179              pass
180          return supported_protocol
181  
182 +
183 +    def checkCopy (self, copy_results, len_list_files, prot):
184 +        """
185 +        Checks the status of copy and update result dictionary
186 +        """
187 +        
188 +        list_retry = []
189 +        list_not_existing = []
190 +        list_already_existing = []
191 +        list_fallback = []
192 +        list_ok = []
193 +        
194 +        if self.debug:
195 +            print 'in checkCopy() :\n'
196 +        for file, dict in copy_results.iteritems():
197 +            er_code = dict['erCode']
198 +            if er_code == '0':
199 +                list_ok.append(file)
200 +                reason = 'Copy succedeed with %s utils'%prot
201 +                dict['reason'] = reason
202 +            elif er_code == '60308':
203 +                list_fallback.append( file )
204 +                reason = 'Copy succedeed with %s utils'%prot
205 +                dict['reason'] = reason
206 +            elif er_code == '60302':
207 +                list_not_existing.append( file )
208 +            elif er_code == '60303':
209 +                list_already_existing.append( file )
210 +            else :    
211 +                list_retry.append( file )
212 +                
213 +            if self.debug:
214 +                print "\t file %s \n"%file
215 +                print "\t dict['erCode'] %s \n"%dict['erCode']
216 +                print "\t dict['reason'] %s \n"%dict['reason']
217 +                
218 +            upDict = self.updateReport(file, er_code, dict['reason'])
219 +
220 +            copy_results.update(upDict)
221 +        
222 +        msg = ''
223 +        if len(list_ok) != 0:
224 +            msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
225 +        if len(list_ok) != len_list_files :
226 +            if len(list_fallback)!=0:
227 +                msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
228 +            if len(list_retry)!=0:
229 +                msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
230 +            if len(list_not_existing)!=0:
231 +                msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
232 +            if len(list_already_existing)!=0:
233 +                msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
234 +        if self.debug : print msg
235 +        
236 +        return copy_results, list_ok, list_retry, list_fallback
237 +        
238 +    def check_for_retry_localSE (self, copy_results):
239 +        """
240 +        Checks the status of copy and create the list of file to copy to CloseSE
241 +        """
242 +        list_retry_localSE = []
243 +
244 +        if self.debug:
245 +            print 'in check_for_retry_localSE() :\n'
246 +            print "\t results in check local = ", copy_results
247 +        for file, dict in copy_results.iteritems():
248 +            er_code = dict['erCode']
249 +            if er_code != '0' and  er_code != '60302' and er_code != '60308':
250 +                list_retry_localSE.append( file )
251 +                
252 +            if self.debug:
253 +                print "\t file %s \n"%file
254 +                print "\t dict['erCode'] %s \n"%dict['erCode']
255 +                print "\t dict['reason'] %s \n"%dict['reason']
256 +                
257 +        return list_retry_localSE
258 +
259 +        
260 +    def LocalCopy(self, list_retry, results):
261 +        """
262 +        Tries the stage out to the CloseSE
263 +        """
264 +        if self.debug:
265 +            print 'in LocalCopy() :\n'
266 +            print '\t list_retry %s utils \n'%list_retry
267 +            print '\t len(list_retry) %s \n'%len(list_retry)
268 +                
269 +        list_files = list_retry
270 +        self.params['inputFileList']=list_files
271 +
272 +        ### copy backup
273 +        from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
274 +        siteCfg = loadSiteLocalConfig()
275 +        seName = siteCfg.localStageOut.get("se-name", None)
276 +        catalog = siteCfg.localStageOut.get("catalog", None)
277 +        implName = siteCfg.localStageOut.get("command", None)
278 +        option = siteCfg.localStageOut.get("option", None)
279 +
280 +        if (implName == 'srm'):
281 +           protocol = 'srmv2'
282 +        elif (implName == 'srmv2-lcg'):
283 +           protocol = 'srm-lcg'
284 +        elif (implName == 'rfcp-CERN'):
285 +           protocol = 'rfio'
286 +        elif (implName == 'rfcp'):
287 +           protocol = 'rfio'
288 +        else: protocol = implName
289 +                  
290 +        tfc = siteCfg.trivialFileCatalog()
291 +            
292 +        if self.debug:
293 +            print '\t siteCFG %s \n'%siteCfg
294 +            print '\t seName %s \n'%seName
295 +            print '\t catalog %s \n'%catalog
296 +            print "\t fallback protocol %s \n"%protocol            
297 +            print "\t fallback option %s \n"%option
298 +            print '\t tfc %s '%tfc
299 +            print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
300 +                
301 +        if (str(self.params['for_lfn']).find("/store/") == 0):
302 +            temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
303 +            self.params['for_lfn']= temp
304 +        
305 +        if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
306 +            
307 +        file_backup=[]
308 +        for input in self.params['inputFileList']:
309 +            file = self.params['for_lfn'] + os.path.basename(input)
310 +            surl = tfc.matchLFN(tfc.preferredProtocol, file)
311 +            file_backup.append(surl)
312 +            if self.debug:
313 +                print '\t for_lfn %s \n'%self.params['for_lfn']
314 +                print '\t file %s \n'%file
315 +                print '\t surl %s \n'%surl
316 +                    
317 +        destination=os.path.dirname(file_backup[0])
318 +        if ( destination[-1] != '/' ) : destination = destination + '/'
319 +        self.params['destination']=destination
320 +        self.params['se_name']=seName
321 +            
322 +        if self.debug:
323 +            print "\t self.params['destination']%s \n"%self.params['destination']
324 +            print "\t protocol %s \n"%protocol
325 +            print "\t optionn%s \n"%option
326 +
327 +        if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%protocol
328 +        if self.debug: print '\tUsing as option %s \n'%option
329 +
330 +        localCopy_results = self.copy( self.params['inputFileList'], protocol, option, backup='yes' )
331 +          
332 +        if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
333 +            results.update(localCopy_results)
334 +        else:
335 +            localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), protocol)
336 +                
337 +            results.update(localCopy_results)
338 +        if self.debug:
339 +            print "\t localCopy_results = %s \n"%localCopy_results
340 +        return results        
341 +
342      def stager( self, middleware, list_files ):
343          """
344          Implement the logic for remote stage out
345          """
346 <        count=0
346 >
347 >        if self.debug:
348 >            print 'stager() :\n'
349 >            print '\tmiddleware %s\n'%middleware
350 >            print '\tlist_files %s\n'%list_files
351 >        
352          results={}
353          for prot, opt in self.setProtocol( middleware ):
354 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
354 >            if self.debug:
355 >                print '\tTrying the stage out with %s utils \n'%prot
356 >                print '\tand options %s\n'%opt
357 >                
358              copy_results = self.copy( list_files, prot, opt )
359 <            list_retry = []
360 <            list_existing = []
120 <            list_ok = []
121 <            for file, dict in copy_results.iteritems():
122 <                er_code = dict['erCode']
123 <                if er_code == '60307': list_retry.append( file )
124 <                elif er_code == '60303': list_existing.append( file )
125 <                else:
126 <                    list_ok.append(file)
127 <                    reason = 'Copy succedeed with %s utils'%prot
128 <                    upDict = self.updateReport(file, er_code, reason)
129 <                    copy_results.update(upDict)
130 <            results.update(copy_results)
131 <            if len(list_ok) != 0:
132 <                msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
133 <                if self.debug : print msg
134 <            if len(list_ok) == len(list_files) :
135 <                break
359 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
360 >                results.update(copy_results)
361              else:
362 <                if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
363 <                if len(list_retry): list_files = list_retry
362 >                copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
363 >                results.update(copy_results)
364 >                if len(list_ok) == len(list_files) :
365 >                    break
366 >                if len(list_retry):
367 >                    list_files = list_retry
368                  else: break
140            count =+1
369  
370 <        #### TODO Daniele
371 <        #check is something fails and created related dict
372 <  #      backup = self.analyzeResults(results)
373 <
374 <  #      if backup :
375 <  #          msg = 'WARNING: backup logic is under implementation\n'
376 <  #          #backupDict = self.backup()
377 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
378 <  #          results.update(backupDict)
151 <  #          print msg
370 >        if self.local_stage:
371 >            list_retry_localSE = self.check_for_retry_localSE(results)
372 >            if len(list_retry_localSE):
373 >                if self.debug:
374 >                    print "\t list_retry_localSE %s \n"%list_retry_localSE
375 >                results = self.LocalCopy(list_retry_localSE, results)
376 >
377 >        if self.debug:
378 >            print "\t results %s \n"%results
379          return results
380  
381      def initializeApi(self, protocol ):
382          """
383          Instantiate storage interface
384          """
385 <        source_prot = protocol
386 <        dest_prot = protocol
387 <        if not self.params['source'] : source_prot = 'local'
388 <        Source_SE  = self.storageInterface( self.params['source'], source_prot )
389 <        if not self.params['destination'] : dest_prot = 'local'
390 <        Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
385 >        if self.debug : print 'initializeApi() :\n'  
386 >        self.source_prot = protocol
387 >        self.dest_prot = protocol
388 >        if not self.params['source'] : self.source_prot = 'local'
389 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
390 >        if not self.params['destination'] :
391 >            self.dest_prot = 'local'
392 >            Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
393 >        else:    
394 >            Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
395  
396          if self.debug :
397 <            print '(source=%s,  protocol=%s)'%(self.params['source'], source_prot)
398 <            print '(destination=%s,  protocol=%s)'%(self.params['destination'], dest_prot)
397 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
398 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
399 >            msg += '\t(destinationDir=%s,  protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
400 >            print msg
401  
402          return Source_SE, Destination_SE
403  
404 <    def copy( self, list_file, protocol, options ):
404 >    def copy( self, list_file, protocol, options, backup='no' ):
405          """
406          Make the real file copy using SE API
407          """
408 +        msg = ""
409 +        results = {}
410          if self.debug :
411 <            print 'copy(): using %s protocol'%protocol
412 <        Source_SE, Destination_SE = self.initializeApi( protocol )
411 >            msg  = 'copy() :\n'
412 >            msg += '\tusing %s protocol\n'%protocol
413 >            print msg
414 >        try:
415 >            Source_SE, Destination_SE = self.initializeApi( protocol )
416 >        except Exception, ex:
417 >            for filetocopy in list_file:
418 >                results.update( self.updateReport(filetocopy, '-1', str(ex)))
419 >            return results
420 >            
421 >        prot = Destination_SE.protocol
422 >        self.hostname=Destination_SE.hostname
423  
424          # create remote dir
425 <        if protocol in ['gridftp','rfio']:
426 <            self.createDir( Destination_SE, protocol )
425 >        ### FEDE for hadoop
426 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop']:
427 >            try:
428 >                self.createDir( Destination_SE, Destination_SE.protocol )
429 >            except OperationException, ex:
430 >                for filetocopy in list_file:
431 >                    results.update( self.updateReport(filetocopy, '60316', str(ex)))
432 >                return results
433 >            ## when the client commands are not found (wrong env or really missing)
434 >            except MissingCommand, ex:
435 >                msg = "ERROR %s %s" %(str(ex), str(ex.detail))
436 >                for filetocopy in list_file:
437 >                    results.update( self.updateReport(filetocopy, '10041', msg))
438 >                return results
439 >            except Exception, ex:
440 >                msg = "ERROR %s" %(str(ex))
441 >                for filetocopy in list_file:
442 >                    results.update( self.updateReport(filetocopy, '-1', msg))
443 >                return results
444  
445          ## prepare for real copy  ##
446          try :
447              sbi = SBinterface( Source_SE, Destination_SE )
448              sbi_dest = SBinterface(Destination_SE)
449 +            sbi_source = SBinterface(Source_SE)
450          except ProtocolMismatch, ex:
451 <            msg = str(ex)+'\n'
452 <            msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
453 <            raise msg
451 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
452 >            msg += str(ex)
453 >            for filetocopy in list_file:
454 >                results.update( self.updateReport(filetocopy, '-1', msg))
455 >            return results
456  
457 <        results = {}
457 >        self.hostname = Destination_SE.hostname
458 >        
459          ## loop over the complete list of files
460          for filetocopy in list_file:
461 <            if self.debug : print 'start real copy for %s'%filetocopy
462 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
461 >            if self.debug : print '\tStart real copy for %s'%filetocopy
462 >            try :
463 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
464 >            except Exception, ex:
465 >                ErCode = '60307'
466 >                msg = str(ex)  
467              if ErCode == '0':
468 <                ErCode, msg = self.makeCopy( sbi, filetocopy , options )
469 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
468 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
469 >                if (ErCode == '0') and (backup == 'yes'):
470 >                    ErCode = '60308'
471 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
472              results.update( self.updateReport(filetocopy, ErCode, msg))
473          return results
474  
# Line 205 | Line 477 | class cmscp:
477          """
478          Create the storage interface.
479          """
480 +        if self.debug : print 'storageInterface():\n'
481          try:
482              interface = SElement( FullPath(endpoint), protocol )
483          except ProtocolUnknown, ex:
484 <            msg = ''
485 <            if self.debug : msg = str(ex)+'\n'
486 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
214 <            raise msg
484 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
485 >            msg += str(ex)
486 >            raise Exception(msg)
487  
488          return interface
489  
# Line 220 | Line 492 | class cmscp:
492          Create remote dir for gsiftp REALLY TEMPORARY
493          this should be transparent at SE API level.
494          """
495 <        ErCode = '0'
496 <        msg = ''
495 >        if self.debug : print 'createDir():\n'
496 >        msg = ''
497          try:
498              action = SBinterface( Destination_SE )
499              action.createDir()
500 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
500 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
501          except TransferException, ex:
502 <            msg = str(ex)
502 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
503 >            msg += str(ex)
504              if self.debug :
505 <                msg += str(ex.detail)+'\n'
506 <                msg += str(ex.out)+'\n'
507 <            msg_rep = "ERROR: problem with the directory creation using %s protocol \n"%protocol
508 <            ErCode = '60316'
236 <            res = self.updateReport('', ErCode, msg_rep )
237 <            self.finalReport( res )
238 <            raise msg
505 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
506 >                dbgmsg += '\t'+str(ex.output)+'\n'
507 >                print dbgmsg
508 >            raise Exception(msg)
509          except OperationException, ex:
510 <            msg = str(ex)
511 <            if self.debug : msg += str(ex.detail)+'\n'
512 <            print msg
513 <
514 <        return ErCode, msg
510 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
511 >            msg += str(ex)
512 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
513 >            raise Exception(msg)
514 >        except MissingDestination, ex:
515 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
516 >            msg += str(ex)
517 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
518 >            raise Exception(msg)
519 >        except AlreadyExistsException, ex:
520 >            if self.debug: print "\tThe directory already exist"
521 >            pass
522 >        except Exception, ex:    
523 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
524 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
525 >            raise Exception(msg)
526 >        return msg
527  
528 <    def checkFileExist( self, sbi, filetocopy ):
528 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
529          """
530 <        Check if file to copy already exist
530 >        Check both if source file exist AND
531 >        if destination file ALREADY exist.
532          """
533 +        if self.debug : print 'checkFileExist():\n'
534          ErCode = '0'
535          msg = ''
536 +        f_tocopy=filetocopy
537 +        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
538 +        try:
539 +            checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
540 +            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
541 +        except OperationException, ex:
542 +            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
543 +            msg += str(ex)
544 +            if self.debug :
545 +                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
546 +                dbgmsg += '\t'+str(ex.output)+'\n'
547 +                print dbgmsg
548 +            raise Exception(msg)
549 +        except WrongOption, ex:
550 +            msg  ='ERROR problems checkig if source file % exist'%filetocopy
551 +            msg += str(ex)
552 +            if self.debug :
553 +                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
554 +                dbgmsg += '\t'+str(ex.output)+'\n'
555 +                print dbgmsg
556 +            raise Exception(msg)
557 +        except MissingDestination, ex:
558 +            msg  ='ERROR problems checkig if source file % exist'%filetocopy
559 +            msg += str(ex)
560 +            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
561 +            raise Exception(msg)
562 +        ## when the client commands are not found (wrong env or really missing)
563 +        except MissingCommand, ex:
564 +            ErCode = '10041'
565 +            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
566 +            return ErCode, msg
567 +        if not checkSource :
568 +            ErCode = '60302'
569 +            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
570 +            return ErCode, msg
571 +        f_tocopy=filetocopy
572 +        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
573          try:
574 <            check = sbi.checkExists(filetocopy)
574 >            check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
575 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
576          except OperationException, ex:
577 <            msg = str(ex)
577 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
578 >            msg += str(ex)
579              if self.debug :
580 <                msg += str(ex.detail)+'\n'
581 <                msg += str(ex.out)+'\n'
582 <            msg +='problems checkig if file already exist'
583 <            raise msg
580 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
581 >                dbgmsg += '\t'+str(ex.output)+'\n'
582 >                print dbgmsg
583 >            raise Exception(msg)
584          except WrongOption, ex:
585 <            msg = str(ex)
585 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
586 >            msg += str(ex)
587              if self.debug :
588 <                msg += str(ex.detail)+'\n'
589 <                msg += str(ex.out)+'\n'
590 <            raise msg
591 <        if check :    
588 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
589 >                msg += '\t'+str(ex.output)+'\n'
590 >            raise Exception(msg)
591 >        except MissingDestination, ex:
592 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
593 >            msg += str(ex)
594 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
595 >            raise Exception(msg)
596 >        ## when the client commands are not found (wrong env or really missing)
597 >        except MissingCommand, ex:
598 >            ErCode = '10041'
599 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
600 >            return ErCode, msg
601 >        if check :
602              ErCode = '60303'
603 <            msg = "file %s already exist"%filetocopy
603 >            msg = "file %s already exist"%os.path.basename(filetocopy)
604  
605 <        return ErCode,msg
605 >        return ErCode, msg
606  
607 <    def makeCopy(self, sbi, filetocopy, option ):
607 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
608          """
609          call the copy API.
610          """
611 +        if self.debug : print 'makeCopy():\n'
612          path = os.path.dirname(filetocopy)
613          file_name =  os.path.basename(filetocopy)
614          source_file = filetocopy
# Line 281 | Line 616 | class cmscp:
616          if self.params['source'] == '' and path == '':
617              source_file = os.path.abspath(filetocopy)
618          elif self.params['destination'] =='':
619 <            dest_file = os.path.join(os.getcwd(),file_name)
619 >            destDir = self.params.get('destinationDir',os.getcwd())
620 >            dest_file = os.path.join(destDir,file_name)
621          elif self.params['source'] != '' and self.params['destination'] != '' :
622              source_file = file_name
623  
624          ErCode = '0'
625          msg = ''
626  
627 +        if  self.params['option'].find('space_token')>=0:
628 +            space_token=self.params['option'].split('=')[1]
629 +            if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
630 +            if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
631          try:
632 <            sbi.copy( source_file , dest_file , opt = option)
632 >            sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
633          except TransferException, ex:
634 <            msg = str(ex)
634 >            msg  = "Problem copying %s file" % filetocopy
635 >            msg += str(ex)
636              if self.debug :
637 <                msg += str(ex.detail)+'\n'
638 <                msg += str(ex.out)+'\n'
639 <            msg += "Problem copying %s file" % filetocopy
637 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
638 >                dbgmsg += '\t'+str(ex.output)+'\n'
639 >                print dbgmsg
640              ErCode = '60307'
641          except WrongOption, ex:
642 <            msg = str(ex)
642 >            msg  = "Problem copying %s file" % filetocopy
643 >            msg += str(ex)
644 >            if self.debug :
645 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
646 >                dbgmsg += '\t'+str(ex.output)+'\n'
647 >                print dbgmsg
648 >        except SizeZeroException, ex:
649 >            msg  = "Problem copying %s file" % filetocopy
650 >            msg += str(ex)
651 >            if self.debug :
652 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
653 >                dbgmsg += '\t'+str(ex.output)+'\n'
654 >                print dbgmsg
655 >            ErCode = '60307'
656 >        ## when the client commands are not found (wrong env or really missing)
657 >        except MissingCommand, ex:
658 >            ErCode = '10041'
659 >            msg  = "Problem copying %s file" % filetocopy
660 >            msg += str(ex)
661              if self.debug :
662 <                msg += str(ex.detail)+'\n'
663 <                msg += str(ex.out)+'\n'
664 <            raise msg
662 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
663 >                dbgmsg += '\t'+str(ex.output)+'\n'
664 >                print dbgmsg
665 >        except AuthorizationException, ex:
666 >            ErCode = '60307'
667 >            msg  = "Problem copying %s file" % filetocopy
668 >            msg += str(ex)
669 >            if self.debug :
670 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
671 >                dbgmsg += '\t'+str(ex.output)+'\n'
672 >                print dbgmsg
673 >        except SEAPITimeout, ex:
674 >            ErCode = '60317'
675 >            msg  = "Problem copying %s file" % filetocopy
676 >            msg += str(ex)
677 >            if self.debug :
678 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
679 >                dbgmsg += '\t'+str(ex.output)+'\n'
680 >                print dbgmsg
681  
682 <         ## TO BE IMPLEMENTED if NEEDED
683 <         ## NOTE: SE API Already available
684 <    #    if self.protocol.find('srm')  : self.checkSize( sbi, filetocopy )
685 <          
682 >        if ErCode == '0' and protocol.find('srmv') == 0:
683 >            remote_file_size = -1
684 >            local_file_size = os.path.getsize( source_file )
685 >            try:
686 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
687 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
688 >            except TransferException, ex:
689 >                msg  = "Problem checking the size of %s file" % filetocopy
690 >                msg += str(ex)
691 >                if self.debug :
692 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
693 >                    dbgmsg += '\t'+str(ex.output)+'\n'
694 >                    print dbgmsg
695 >                ErCode = '60307'
696 >            except WrongOption, ex:
697 >                msg  = "Problem checking the size of %s file" % filetocopy
698 >                msg += str(ex)
699 >                if self.debug :
700 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
701 >                    dbgmsg += '\t'+str(ex.output)+'\n'
702 >                    print dbgmsg
703 >                ErCode = '60307'
704 >            if local_file_size != remote_file_size:
705 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
706 >                ErCode = '60307'
707 >
708 >        if ErCode != '0':
709 >            try :
710 >                self.removeFile( sbi_dest, dest_file, option )
711 >            except Exception, ex:
712 >                msg += '\n'+str(ex)  
713          return ErCode, msg
714  
715 <    def backup(self):
716 <        """
717 <        Check infos from TFC using existing api obtaining:
718 <        1)destination
719 <        2)protocol
720 <        """
721 <        return
715 >    def removeFile( self, sbi_dest, filetocopy, option ):
716 >        """  
717 >        """  
718 >        if self.debug : print 'removeFile():\n'
719 >        f_tocopy=filetocopy
720 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
721 >        try:
722 >            sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
723 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
724 >        except OperationException, ex:
725 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
726 >            msg += str(ex)
727 >            if self.debug :
728 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
729 >                dbgmsg += '\t'+str(ex.output)+'\n'
730 >                print dbgmsg
731 >            raise Exception(msg)
732 >
733 >        return
734  
735 <    def updateReport(self, file, erCode, reason, lfn='', se='' ):
735 >    #def updateReport(self, file, erCode, reason, lfn='', se='' ):
736 >    def updateReport(self, file, erCode, reason):
737          """
738          Update the final stage out infos
739          """
740          jobStageInfo={}
741          jobStageInfo['erCode']=erCode
742          jobStageInfo['reason']=reason
743 <        jobStageInfo['lfn']=lfn
744 <        jobStageInfo['se']=se
743 >        if not self.params['for_lfn']: self.params['for_lfn']=''
744 >        if not self.params['se_name']: self.params['se_name']=''
745 >        if not self.hostname: self.hostname=''
746 >        if (erCode != '0') and (erCode != '60308'):
747 >           jobStageInfo['for_lfn']='/copy_problem/'
748 >        else:  
749 >            jobStageInfo['for_lfn']=self.params['for_lfn']
750 >        jobStageInfo['se_name']=self.params['se_name']
751 >        jobStageInfo['endpoint']=self.hostname
752  
753          report = { file : jobStageInfo}
754          return report
# Line 339 | Line 761 | class cmscp:
761          outFile = open('cmscpReport.sh',"a")
762          cmscp_exit_status = 0
763          txt = ''
764 <        for file, dict in results.iteritems():
765 <            if file:
766 <                if dict['lfn']=='':
767 <                    lfn = '$LFNBaseName/'+os.path.basename(file)
764 >        for file, dict in results.iteritems():
765 >            reason = str(dict['reason'])
766 >            if str(reason).find("'") > -1:
767 >                reason = " ".join(reason.split("'"))
768 >            reason="'%s'"%reason
769 >            if file:
770 >                if dict['for_lfn']=='':
771 >                    lfn = '${LFNBaseName}'+os.path.basename(file)
772                      se  = '$SE'
773 +                    LFNBaseName = '$LFNBaseName'
774                  else:
775 <                    lfn = dict['lfn']+os.pat.basename(file)
776 <                    se = dict['se']
777 <                #dict['lfn'] # to be implemented
778 <                txt +=  'echo "Report for File: '+file+'"\n'
779 <                txt +=  'echo "LFN: '+lfn+'"\n'
780 <                txt +=  'echo "StorageElement: '+se+'"\n'
781 <                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
775 >                    lfn = dict['for_lfn']+os.path.basename(file)
776 >                    se = dict['se_name']
777 >                    LFNBaseName = os.path.dirname(lfn)
778 >                    if (LFNBaseName[-1] != '/'):
779 >                        LFNBaseName = LFNBaseName + '/'
780 >
781 >                
782 >                txt += 'echo "Report for File: '+file+'"\n'
783 >                txt += 'echo "LFN: '+lfn+'"\n'
784 >                txt += 'echo "StorageElement: '+se+'"\n'
785 >                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
786                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
787 +
788 +                
789                  if dict['erCode'] != '0':
357            else:
358                if dict['erCode'] != '0':
359                    cmscp_exit_status = dict['erCode']
790                      cmscp_exit_status = dict['erCode']
791 +            else:
792 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
793 +                cmscp_exit_status = dict['erCode']
794          txt += '\n'
795          txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
796 <        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
796 >        txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
797          outFile.write(str(txt))
798          outFile.close()
799          return
# Line 369 | Line 802 | class cmscp:
802   def usage():
803  
804      msg="""
805 <    required parameters:
806 <    --source        :: REMOTE           :
807 <    --destination   :: REMOTE           :
375 <    --debug             :
376 <    --inFile :: absPath : or name NOT RELATIVE PATH
377 <    --outFIle :: onlyNAME : NOT YET SUPPORTED
805 >    cmscp:
806 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
807 >        including success checking  version also for CAF using rfcp command to copy the output to SE
808  
809 <    optional parameters
809 >    accepted parameters:
810 >       source           =
811 >       destination      =
812 >       inputFileList    =
813 >       outputFileList   =
814 >       protocol         =
815 >       option           =
816 >       middleware       =  
817 >       srm_version      =
818 >       destinationDir   =
819 >       lfn=             =
820 >       local_stage      =  activate stage fall back  
821 >       debug            =  activate verbose print out
822 >       help             =  print on line man and exit  
823 >    
824 >    mandatory:
825 >       * "source" and/or "destination" must always be defined
826 >       * either "middleware" or "protocol" must always be defined
827 >       * "inputFileList" must always be defined
828 >       * if "local_stage" = 1 also  "lfn" must be defined
829      """
830 <    print msg
830 >    print msg
831  
832 <    return
832 >    return
833  
834   def HelpOptions(opts=[]):
835      """
836      Check otps, print help if needed
837 <    prepare dict = { opt : value }  
837 >    prepare dict = { opt : value }
838      """
839      dict_args = {}
840      if len(opts):
841          for opt, arg in opts:
842 <            dict_args[opt.split('--')[1]] = arg
842 >            dict_args[opt.split('--')[1]] = arg
843              if opt in ('-h','-help','--help') :
844                  usage()
845                  sys.exit(0)
# Line 401 | Line 850 | def HelpOptions(opts=[]):
850  
851   if __name__ == '__main__' :
852  
853 <    import getopt
853 >    import getopt
854  
855      allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
856 <                  "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
857 <    try:    
858 <        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
856 >                  "protocol=","option=", "middleware=", "srm_version=", \
857 >                  "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
858 >    try:
859 >        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
860      except getopt.GetoptError, err:
861          print err
862          HelpOptions()
863          sys.exit(2)
864 <
864 >
865      dictArgs = HelpOptions(opts)
866      try:
867          cmscp_ = cmscp(dictArgs)

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines