ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.2 by spiga, Mon Sep 22 09:03:17 2008 UTC vs.
Revision 1.73 by mcinquil, Mon May 10 16:58:32 2010 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2 <
3 < import sys, getopt, string
4 < import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
2 > import sys, os
3 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
4   from ProdCommon.Storage.SEAPI.SBinterface import *
5 <
5 > from ProdCommon.Storage.SEAPI.Exceptions import *
6  
7  
8   class cmscp:
9 <    def __init__(self, argv):
9 >    def __init__(self, args):
10          """
11          cmscp
12 <
15 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
12 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
13          including success checking  version also for CAF using rfcp command to copy the output to SE
14          input:
15             $1 middleware (CAF, LSF, LCG, OSG)
16             $2 local file (the absolute path of output file or just the name if it's in top dir)
17             $3 if needed: file name (the output file name)
18             $5 remote SE (complete endpoint)
19 <           $6 srm version
19 >           $6 srm version
20 >           --lfn $LFNBaseName
21          output:
22               return 0 if all ok
23               return 60307 if srmcp failed
24               return 60303 if file already exists in the SE
25          """
26 +
27          #set default
28 +        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
29 +                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
30          self.debug = 0
31 <        self.source = ''
32 <        self.destination = ''
33 <        self.file_to_copy = []
34 <        self.remote_file_name = []
34 <        self.protocol = ''
35 <        self.middleware = ''
36 <        self.srmv = ''
37 <  
38 <        try:
39 <            opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
40 <                                                  "protocol=", "middleware=", "srm_version=", "debug", "help"])
41 <        except getopt.GetoptError:
42 <            print self.usage()
43 <            sys.exit(2)
44 <
45 <        self.setAndCheck(opts)  
46 <        
31 >        self.local_stage = 0
32 >        self.params.update( args )
33 >        self.subprocesstimeout = {'copy': None, 'exists': None, 'delete': None, 'size': None}
34 >
35          return
36  
37 <    def setAndCheck( self, opts ):
37 >    def processOptions( self ):
38          """
39 <        Set and check command line parameter
39 >        check command line parameter
40          """
41 <        if not opts :
42 <            print self.usage()
43 <            sys.exit()
44 <        for opt, arg in opts :
45 <            if opt  == "--help" :
46 <                print self.usage()
47 <                sys.exit()
48 <            elif opt == "--debug" :
49 <                self.debug = 1
50 <            elif opt == "--source" :
51 <                self.source = arg
52 <            elif opt == "--destination":
53 <                self.destination = arg
54 <            elif opt == "--inputFileList":
67 <                infile = arg
68 <            elif opt == "--outputFileList":
69 <                out_file
70 <            elif opt == "--protocol":
71 <                self.protocol = arg
72 <            elif opt == "--middleware":
73 <                self.middleware = arg
74 <            elif opt == "--srm_version":
75 <                self.srmv = arg
76 <
77 <        # source and dest cannot be undefined at same time
78 <        if self.source == '' and self.destination == '':
79 <            print self.usage()
80 <            sys.exit()
81 <        # if middleware is not defined --> protocol cannot be empty  
82 <        if self.middleware == '' and self.protocol == '':
83 <            print self.usage()
84 <            sys.exit()
85 <        # input file must be defined  
86 <        if infile == '':
87 <            print self.usage()
88 <            sys.exit()
41 >        if 'help' in self.params.keys(): HelpOptions()
42 >        if 'debug' in self.params.keys(): self.debug = 1
43 >        if 'local_stage' in self.params.keys(): self.local_stage = 1
44 >
45 >        # source and dest cannot be undefined at same time
46 >        if not self.params['source']  and not self.params['destination'] :
47 >            HelpOptions()
48 >        # if middleware is not defined --> protocol cannot be empty
49 >        if not self.params['middleware'] and not self.params['protocol'] :
50 >            HelpOptions()
51 >
52 >        # input file must be defined
53 >        if not self.params['inputFileList'] :
54 >            HelpOptions()
55          else:
56 <            if infile.find(','):
57 <                [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
58 <            else:
59 <                self.file_to_copy.append(infile)
60 <        
56 >            file_to_copy=[]
57 >            if self.params['inputFileList'].find(','):
58 >                [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 >            else:
60 >                file_to_copy.append(self.params['inputFileList'])
61 >            self.params['inputFileList'] = file_to_copy
62 >
63 >        if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
64 >        
65          ## TO DO:
66          #### add check for outFiles
67          #### add map {'inFileNAME':'outFileNAME'} to change out name
68  
99        return
69  
70 <    def run( self ):  
70 >    def run( self ):
71          """
72 <        Check if running on UI (no $middleware) or
73 <        on WN (on the Grid), and take different action  
72 >        Check if running on UI (no $middleware) or
73 >        on WN (on the Grid), and take different action
74          """
75 <        if self.middleware :  
76 <           results = self.stager()
75 >        self.processOptions()
76 >        if self.debug: print 'calling run() : \n'
77 >        # stage out from WN
78 >        if self.params['middleware'] :
79 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
80 >            self.finalReport(results)
81 >        # Local interaction with SE
82          else:
83 <           results = self.copy( self.file_to_copy, self.protocol )
83 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
84 >            return results
85  
86 <        self.finalReport(results,self.middleware)
86 >    def checkLcgUtils( self ):
87 >        """
88 >        _checkLcgUtils_
89 >        check the lcg-utils version and report
90 >        """
91 >        import commands
92 >        cmd = "lcg-cp --version | grep lcg_util"
93 >        status, output = commands.getstatusoutput( cmd )
94 >        num_ver = -1
95 >        if output.find("not found") == -1 or status == 0:
96 >            temp = output.split("-")
97 >            version = ""
98 >            if len(temp) >= 2:
99 >                version = output.split("-")[1]
100 >                temp = version.split(".")
101 >                if len(temp) >= 1:
102 >                    num_ver = int(temp[0])*10
103 >                    num_ver += int(temp[1])
104 >        return num_ver
105  
106 <        return
114 <    
115 <    def setProtocol( self ):    
106 >    def setProtocol( self, middleware ):
107          """
108          define the allowed potocols based on $middlware
109 <        which depend on scheduler
109 >        which depend on scheduler
110          """
111 <        if self.middleware.lower() in ['osg','lcg']:
112 <            supported_protocol = ['srm-lcg','srmv2']
113 <        elif self.middleware.lower() in ['lsf','caf']:
114 <            supported_protocol = ['rfio']
111 >        # default To be used with "middleware"
112 >        if self.debug:
113 >            print 'setProtocol() :\n'
114 >            print '\tmiddleware =  %s utils \n'%middleware
115 >
116 >        ## timeout needed for subprocess command of SEAPI
117 >        ## they should be a bit higher then the corresponding passed by command line  
118 >        self.subprocesstimeout['copy']   = 3600
119 >        self.subprocesstimeout['exists'] = 1200
120 >        self.subprocesstimeout['delete'] = 1200
121 >        self.subprocesstimeout['size']   = 1200
122 >
123 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
124 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
125 >        if self.checkLcgUtils() >= 17:
126 >            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
127 >                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
128 >
129 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
130 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
131 >        rfioOpt=''
132 >
133 >        supported_protocol = None
134 >        if middleware.lower() in ['osg','lcg','condor','sge']:
135 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
136 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
137 >        elif middleware.lower() in ['lsf','caf']:
138 >            supported_protocol = [('rfio',rfioOpt)]
139 >        elif middleware.lower() in ['pbs']:
140 >            supported_protocol = [('rfio',rfioOpt),('local','')]
141 >        elif middleware.lower() in ['arc']:
142 >            supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
143          else:
144 <            ## here we can add support for any kind of protocol,
144 >            ## here we can add support for any kind of protocol,
145              ## maybe some local schedulers need something dedicated
146              pass
147          return supported_protocol
148  
149 <    def stager( self ):              
149 >
150 >    def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
151          """
152 <        Implement the logic for remote stage out
152 >        Checks the status of copy and update result dictionary
153          """
154 <        protocols = self.setProtocol()  
155 <        count=0
156 <        list_files = self.file_to_copy
157 <        results={}  
158 <        for prot in protocols:
159 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
160 <            copy_results = self.copy( list_files, prot )
161 <            list_retry = []
162 <            list_existing = []
163 <            list_ok = []
164 <            for file, dict in copy_results.iteritems():
165 <                er_code = dict['erCode']
166 <                if er_code == '60307': list_retry.append( file )
167 <                elif er_code == '60303': list_existing.append( file )
168 <                else:
169 <                    list_ok.append(file)
170 <                    reason = 'Copy succedeed with %s utils'%prot
171 <                    upDict = self.updateReport(file, er_code, reason)
172 <                    copy_results.update(upDict)
173 <            results.update(copy_results)
174 <            if len(list_ok) != 0:  
175 <                msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
176 <               # print msg
177 <            if len(list_ok) == len(list_files) :
178 <                break
154 >        list_retry = []
155 >        list_retry_localSE = []
156 >        list_not_existing = []
157 >        list_ok = []
158 >        
159 >        if self.debug:
160 >            print 'in checkCopy() :\n'
161 >        for file, dict in copy_results.iteritems():
162 >            er_code = dict['erCode']
163 >            if er_code == '0':
164 >                list_ok.append(file)
165 >                reason = 'Copy succedeed with %s utils'%prot
166 >                dict['reason'] = reason
167 >            elif er_code == '60302':
168 >                list_not_existing.append( file )
169 >            elif er_code == '10041':
170 >                list_retry.append( file )
171 >            ## WHAT TO DO IN GENERAL FAILURE CONDITION
172 >            else:
173 >                list_retry_localSE.append( file )
174 >                
175 >            if self.debug:
176 >                print "\t file %s \n"%file
177 >                print "\t dict['erCode'] %s \n"%dict['erCode']
178 >                print "\t dict['reason'] %s \n"%dict['reason']
179 >                
180 >            if (lfn != '') and (se != ''):
181 >                upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
182              else:
183 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
184 <                if len(list_retry): list_files = list_retry
185 <                else: break
186 <            count =+1  
183 >                upDict = self.updateReport(file, er_code, dict['reason'])
184 >
185 >            copy_results.update(upDict)
186 >        
187 >        msg = ''
188 >        if len(list_ok) != 0:
189 >            msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
190 >        if len(list_ok) != len_list_files :
191 >            msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
192 >            msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
193 >        if self.debug : print msg
194 >        
195 >        return copy_results, list_ok, list_retry, list_retry_localSE
196 >        
197 >    def LocalCopy(self, list_retry, results):
198 >        """
199 >        Tries the stage out to the CloseSE
200 >        """
201 >        if self.debug:
202 >            print 'in LocalCopy() :\n'
203 >            print '\t list_retry %s utils \n'%list_retry
204 >            print '\t len(list_retry) %s \n'%len(list_retry)
205 >                
206 >        list_files = list_retry  
207 >        self.params['inputFilesList']=list_files
208 >        
209 >        ### copy backup
210 >        from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
211 >        siteCfg = loadSiteLocalConfig()
212 >        seName = siteCfg.localStageOut.get("se-name", None)
213 >        catalog = siteCfg.localStageOut.get("catalog", None)
214 >        implName = siteCfg.localStageOut.get("command", None)
215 >        if (implName == 'srm'):
216 >           implName='srmv1'
217 >           self.params['srm_version']=implName
218 >        ##### to be improved ###############
219 >        if (implName == 'rfcp'):
220 >            self.params['middleware']='lsf'
221 >        ####################################    
222 >                  
223 >        self.params['protocol']=implName
224 >        tfc = siteCfg.trivialFileCatalog()
225 >            
226 >        if self.debug:
227 >            print '\t siteCFG %s \n'%siteCfg
228 >            print '\t seName %s \n'%seName
229 >            print '\t catalog %s \n'%catalog
230 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']            
231 >            print '\t tfc %s '%tfc
232 >            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
233 >                
234 >        #if (str(self.params['lfn']).find("/store/") != -1):
235 >        #    temp = str(self.params['lfn']).split("/store/")
236 >        #    self.params['lfn']= "/store/temp/" + temp[1]
237 >        if (str(self.params['lfn']).find("/store/") == 0):
238 >            temp = str(self.params['lfn']).replace("/store/","/store/temp/",1)
239 >            self.params['lfn']= temp
240 >        
241 >        if ( self.params['lfn'][-1] != '/' ) : self.params['lfn'] = self.params['lfn'] + '/'
242 >            
243 >        file_backup=[]
244 >        for input in self.params['inputFilesList']:
245 >            file = self.params['lfn'] + os.path.basename(input)
246 >            surl = tfc.matchLFN(tfc.preferredProtocol, file)
247 >            file_backup.append(surl)
248 >            if self.debug:
249 >                print '\t lfn %s \n'%self.params['lfn']
250 >                print '\t file %s \n'%file
251 >                print '\t surl %s \n'%surl
252 >                    
253 >        destination=os.path.dirname(file_backup[0])
254 >        if ( destination[-1] != '/' ) : destination = destination + '/'
255 >        self.params['destination']=destination
256 >            
257 >        if self.debug:
258 >            print "\t self.params['destination']%s \n"%self.params['destination']
259 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']
260 >            print "\t self.params['option']%s \n"%self.params['option']
261 >              
262 >        for prot, opt in self.setProtocol( self.params['middleware'] ):
263 >            if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
264 >            localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
265 >            if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
266 >                results.update(localCopy_results)
267 >            else:
268 >                localCopy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
269 >                results.update(localCopy_results)
270 >                if len(list_ok) == len(list_files) :
271 >                    break
272 >                if len(list_retry):
273 >                    list_files = list_retry
274 >                else: break
275 >            if self.debug:
276 >                print "\t localCopy_results = %s \n"%localCopy_results
277 >        
278 >        return results        
279 >
280 >    def stager( self, middleware, list_files ):
281 >        """
282 >        Implement the logic for remote stage out
283 >        """
284  
285 <        #### TODO Daniele
286 <        #check is something fails and created related dict
287 <  #      backup = self.analyzeResults(results)
288 <  
289 <  #      if backup :  
290 <  #          msg = 'WARNING: backup logic is under implementation\n'
291 <  #          #backupDict = self.backup()
292 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name  
293 <  #          results.update(backupDict)
294 <  #          print msg
285 >        if self.debug:
286 >            print 'stager() :\n'
287 >            print '\tmiddleware %s\n'%middleware
288 >            print '\tlist_files %s\n'%list_files
289 >        
290 >        results={}
291 >        for prot, opt in self.setProtocol( middleware ):
292 >            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
293 >            copy_results = self.copy( list_files, prot, opt )
294 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
295 >                results.update(copy_results)
296 >            else:
297 >                copy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(copy_results, len(list_files), prot)
298 >                results.update(copy_results)
299 >                if len(list_ok) == len(list_files) :
300 >                    break
301 >                if len(list_retry):
302 >                    list_files = list_retry
303 >                else: break
304 >                
305 >        if self.local_stage:
306 >            if len(list_retry_localSE):
307 >                results = self.LocalCopy(list_retry_localSE, results)
308 >            
309 >        if self.debug:
310 >            print "\t results %s \n"%results
311          return results
312  
313      def initializeApi(self, protocol ):
314          """
315 <        Instantiate storage interface  
315 >        Instantiate storage interface
316          """
317 <        source_prot = protocol
318 <        dest_prot = protocol
319 <        if self.source == '' : source_prot = 'local'
320 <        Source_SE  = self.storageInterface( self.source, source_prot )
321 <        if self.destination == '' : dest_prot = 'local'
322 <        Destination_SE = self.storageInterface( self.destination, dest_prot )
317 >        if self.debug : print 'initializeApi() :\n'  
318 >        self.source_prot = protocol
319 >        self.dest_prot = protocol
320 >        if not self.params['source'] : self.source_prot = 'local'
321 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
322 >        if not self.params['destination'] : self.dest_prot = 'local'
323 >        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
324  
325          if self.debug :
326 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
327 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
326 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
327 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
328 >            print msg
329  
330          return Source_SE, Destination_SE
331  
332 <    def copy( self, list_file, protocol ):
332 >    def copy( self, list_file, protocol, options, backup='no' ):
333          """
334 <        Make the real file copy using SE API
334 >        Make the real file copy using SE API
335          """
336 +        msg = ""
337          if self.debug :
338 <            print 'copy(): using %s protocol'%protocol
339 <        Source_SE, Destination_SE = self.initializeApi( protocol )
340 <
341 <        # create remote dir
342 <        if protocol in ['gridftp','rfio']:
343 <            self.createDir( Destination_SE, protocol )
338 >            msg  = 'copy() :\n'
339 >            msg += '\tusing %s protocol\n'%protocol
340 >            print msg
341 >        try:
342 >            Source_SE, Destination_SE = self.initializeApi( protocol )
343 >        except Exception, ex:
344 >            for filetocopy in list_file:
345 >                results.update( self.updateReport(filetocopy, '-1', str(ex)))
346 >            return results
347 >
348 >        # create remote dir
349 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
350 >            try:
351 >                self.createDir( Destination_SE, Destination_SE.protocol )
352 >            except OperationException, ex:
353 >                for filetocopy in list_file:
354 >                    results.update( self.updateReport(filetocopy, '60316', str(ex)))
355 >                return results
356 >            ## when the client commands are not found (wrong env or really missing)
357 >            except MissingCommand, ex:
358 >                msg = "ERROR %s %s" %(str(ex), str(ex.detail))
359 >                for filetocopy in list_file:
360 >                    results.update( self.updateReport(filetocopy, '10041', msg))
361 >                return results
362 >            except Exception, ex:
363 >                msg = "ERROR %s" %(str(ex))
364 >                for filetocopy in list_file:
365 >                    results.update( self.updateReport(filetocopy, '-1', msg))
366 >                return results
367  
368          ## prepare for real copy  ##
369 <        sbi = SBinterface( Source_SE, Destination_SE )
370 <        sbi_dest = SBinterface(Destination_SE)
369 >        try :
370 >            sbi = SBinterface( Source_SE, Destination_SE )
371 >            sbi_dest = SBinterface(Destination_SE)
372 >            sbi_source = SBinterface(Source_SE)
373 >        except ProtocolMismatch, ex:
374 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
375 >            msg += str(ex)
376 >            for filetocopy in list_file:
377 >                results.update( self.updateReport(filetocopy, '-1', msg))
378 >            return results
379  
380          results = {}
381 <        ## loop over the complete list of files
382 <        for filetocopy in list_file:
383 <            if self.debug : print 'start real copy for %s'%filetocopy
384 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
385 <            if ErCode == '0':
386 <                ErCode, msg = self.makeCopy( sbi, filetocopy )
387 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
381 >        ## loop over the complete list of files
382 >        for filetocopy in list_file:
383 >            if self.debug : print '\tStart real copy for %s'%filetocopy
384 >            try :
385 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
386 >            except Exception, ex:
387 >                ErCode = '60307'
388 >                msg = str(ex)  
389 >            if ErCode == '0':
390 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
391 >                if (ErCode == '0') and (backup == 'yes'):
392 >                    ErCode = '60308'
393 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
394              results.update( self.updateReport(filetocopy, ErCode, msg))
395          return results
220    
221    def updateReport(self, file, erCode, reason, lfn='', se='' ):
222        """
223        Update the final stage out infos
224        """
225        jobStageInfo={}
226        jobStageInfo['erCode']=erCode
227        jobStageInfo['reason']=reason
228        jobStageInfo['lfn']=lfn
229        jobStageInfo['se']=se
396  
231        report = { file : jobStageInfo}
232        return report
233
234    def finalReport( self , results, middleware ):
235        """
236        It should return a clear list of LFNs for each SE where data are stored.
237        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
238        """
239        if middleware:
240            outFile = open('cmscpReport.sh',"a")
241            cmscp_exit_status = 0
242            txt = ''
243            for file, dict in results.iteritems():
244                if dict['lfn']=='':
245                    lfn = '$LFNBaseName/'+os.path.basename(file)
246                    se  = '$SE'
247                else:
248                    lfn = dict['lfn']+os.pat.basename(file)
249                    se = dict['se']      
250                #dict['lfn'] # to be implemented
251                txt +=  'echo "Report for File: '+file+'"\n'
252                txt +=  'echo "LFN: '+lfn+'"\n'  
253                txt +=  'echo "StorageElement: '+se+'"\n'  
254                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
255                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
256                if dict['erCode'] != '0':
257                    cmscp_exit_status = dict['erCode']
258            txt += '\n'
259            txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
260            txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
261            outFile.write(str(txt))
262            outFile.close()
263        else:
264            for file, code in results.iteritems():
265                print 'error code = %s for file %s'%(code,file)
266        return
397  
398      def storageInterface( self, endpoint, protocol ):
399          """
400 <        Create the storage interface.
400 >        Create the storage interface.
401          """
402 +        if self.debug : print 'storageInterface():\n'
403          try:
404              interface = SElement( FullPath(endpoint), protocol )
405 <        except Exception, ex:
406 <            msg = ''
407 <            if self.debug : msg = str(ex)+'\n'
408 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
278 <            print msg
405 >        except ProtocolUnknown, ex:
406 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
407 >            msg += str(ex)
408 >            raise Exception(msg)
409  
410          return interface
411  
282    def checkDir(self, Destination_SE, protocol):
283        '''
284        ToBeImplemented NEEDED for castor
285        '''
286        return
287
412      def createDir(self, Destination_SE, protocol):
413          """
414 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
415 <        this should be transparent at SE API level.
414 >        Create remote dir for gsiftp REALLY TEMPORARY
415 >        this should be transparent at SE API level.
416          """
417 <        ErCode = '0'
418 <        msg_1 = ''
417 >        if self.debug : print 'createDir():\n'
418 >        msg = ''
419          try:
420              action = SBinterface( Destination_SE )
421              action.createDir()
422 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
423 <        except Exception, ex:
424 <            msg = ''
425 <            if self.debug : msg = str(ex)+'\n'
426 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
427 <            msg += msg_1
428 <            ErCode = '60316'  
429 <            #print msg
430 <
431 <        return ErCode, msg_1
422 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
423 >        except TransferException, ex:
424 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
425 >            msg += str(ex)
426 >            if self.debug :
427 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
428 >                dbgmsg += '\t'+str(ex.output)+'\n'
429 >                print dbgmsg
430 >            raise Exception(msg)
431 >        except OperationException, ex:
432 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
433 >            msg += str(ex)
434 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
435 >            raise Exception(msg)
436 >        except MissingDestination, ex:
437 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
438 >            msg += str(ex)
439 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
440 >            raise Exception(msg)
441 >        except AlreadyExistsException, ex:
442 >            if self.debug: print "\tThe directory already exist"
443 >            pass            
444 >        return msg
445  
446 <    def checkFileExist(self, sbi, filetocopy):
446 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
447          """
448 <        Check if file to copy already exist  
449 <        """
450 <        try:
451 <            check = sbi.checkExists(filetocopy)
315 <        except Exception, ex:
316 <            msg = ''
317 <            if self.debug : msg = str(ex)+'\n'
318 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
319 <           # print msg
448 >        Check both if source file exist AND
449 >        if destination file ALREADY exist.
450 >        """
451 >        if self.debug : print 'checkFileExist():\n'
452          ErCode = '0'
453          msg = ''
454 <        if check :
455 <            ErCode = '60303'
456 <            msg = "file %s already exist"%filetocopy
457 <            print msg
454 >        f_tocopy=filetocopy
455 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
456 >        try:
457 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
458 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
459 >        except OperationException, ex:
460 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
461 >            msg += str(ex)
462 >            if self.debug :
463 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
464 >                dbgmsg += '\t'+str(ex.output)+'\n'
465 >                print dbgmsg
466 >            raise Exception(msg)
467 >        except WrongOption, ex:
468 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
469 >            msg += str(ex)
470 >            if self.debug :
471 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
472 >                dbgmsg += '\t'+str(ex.output)+'\n'
473 >                print dbgmsg
474 >            raise Exception(msg)
475 >        except MissingDestination, ex:
476 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
477 >            msg += str(ex)
478 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
479 >            raise Exception(msg)
480 >        ## when the client commands are not found (wrong env or really missing)
481 >        except MissingCommand, ex:
482 >            ErCode = '10041'
483 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
484 >            return ErCode, msg
485 >        if not checkSource :
486 >            ErCode = '60302'
487 >            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
488 >            return ErCode, msg
489 >        f_tocopy=filetocopy
490 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
491 >        try:
492 >            check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
493 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
494 >        except OperationException, ex:
495 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
496 >            msg += str(ex)
497 >            if self.debug :
498 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
499 >                dbgmsg += '\t'+str(ex.output)+'\n'
500 >                print dbgmsg
501 >            raise Exception(msg)
502 >        except WrongOption, ex:
503 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
504 >            msg += str(ex)
505 >            if self.debug :
506 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
507 >                msg += '\t'+str(ex.output)+'\n'
508 >            raise Exception(msg)
509 >        except MissingDestination, ex:
510 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
511 >            msg += str(ex)
512 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
513 >            raise Exception(msg)
514 >        ## when the client commands are not found (wrong env or really missing)
515 >        except MissingCommand, ex:
516 >            ErCode = '10041'
517 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
518 >            return ErCode, msg
519 >        if check :
520 >            ErCode = '60303'
521 >            msg = "file %s already exist"%os.path.basename(filetocopy)
522  
523 <        return ErCode,msg  
523 >        return ErCode, msg
524  
525 <    def makeCopy(self, sbi, filetocopy ):  
525 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
526          """
527 <        call the copy API.  
527 >        call the copy API.
528          """
529 <        path = os.path.dirname(filetocopy)  
529 >        if self.debug : print 'makeCopy():\n'
530 >        path = os.path.dirname(filetocopy)
531          file_name =  os.path.basename(filetocopy)
532          source_file = filetocopy
533          dest_file = file_name ## to be improved supporting changing file name  TODO
534 <        if self.source == '' and path == '':
534 >        if self.params['source'] == '' and path == '':
535              source_file = os.path.abspath(filetocopy)
536 <        elif self.destination =='':
537 <            dest_file = os.path.join(os.getcwd(),file_name)
538 <        elif self.source != '' and self.destination != '' :
539 <            source_file = file_name  
536 >        elif self.params['destination'] =='':
537 >            destDir = self.params.get('destinationDir',os.getcwd())
538 >            dest_file = os.path.join(destDir,file_name)
539 >        elif self.params['source'] != '' and self.params['destination'] != '' :
540 >            source_file = file_name
541 >
542          ErCode = '0'
543          msg = ''
544 +
545 +        if  self.params['option'].find('space_token')>=0:
546 +            space_token=self.params['option'].split('=')[1]
547 +            if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
548 +            if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
549          try:
550 <            pippo = sbi.copy( source_file , dest_file )
551 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
552 <        except Exception, ex:
553 <            msg = ''
554 <            if self.debug : msg = str(ex)+'\n'
555 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
550 >            sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
551 >        except TransferException, ex:
552 >            msg  = "Problem copying %s file" % filetocopy
553 >            msg += str(ex)
554 >            if self.debug :
555 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
556 >                dbgmsg += '\t'+str(ex.output)+'\n'
557 >                print dbgmsg
558              ErCode = '60307'
559 <            #print msg
560 <
559 >        except WrongOption, ex:
560 >            msg  = "Problem copying %s file" % filetocopy
561 >            msg += str(ex)
562 >            if self.debug :
563 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
564 >                dbgmsg += '\t'+str(ex.output)+'\n'
565 >                print dbgmsg
566 >        except SizeZeroException, ex:
567 >            msg  = "Problem copying %s file" % filetocopy
568 >            msg += str(ex)
569 >            if self.debug :
570 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
571 >                dbgmsg += '\t'+str(ex.output)+'\n'
572 >                print dbgmsg
573 >            ErCode = '60307'
574 >        ## when the client commands are not found (wrong env or really missing)
575 >        except MissingCommand, ex:
576 >            ErCode = '10041'
577 >            msg  = "Problem copying %s file" % filetocopy
578 >            msg += str(ex)
579 >            if self.debug :
580 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
581 >                dbgmsg += '\t'+str(ex.output)+'\n'
582 >                print dbgmsg
583 >        except AuthorizationException, ex:
584 >            ErCode = '60307'
585 >            msg  = "Problem copying %s file" % filetocopy
586 >            msg += str(ex)
587 >            if self.debug :
588 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
589 >                dbgmsg += '\t'+str(ex.output)+'\n'
590 >                print dbgmsg
591 >        except SEAPITimeout, ex:
592 >            ErCode = '60317'
593 >            msg  = "Problem copying %s file" % filetocopy
594 >            msg += str(ex)
595 >            if self.debug :
596 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
597 >                dbgmsg += '\t'+str(ex.output)+'\n'
598 >                print dbgmsg
599 >
600 >        if ErCode == '0' and protocol.find('srmv') == 0:
601 >            remote_file_size = -1
602 >            local_file_size = os.path.getsize( source_file )
603 >            try:
604 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
605 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
606 >            except TransferException, ex:
607 >                msg  = "Problem checking the size of %s file" % filetocopy
608 >                msg += str(ex)
609 >                if self.debug :
610 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
611 >                    dbgmsg += '\t'+str(ex.output)+'\n'
612 >                    print dbgmsg
613 >                ErCode = '60307'
614 >            except WrongOption, ex:
615 >                msg  = "Problem checking the size of %s file" % filetocopy
616 >                msg += str(ex)
617 >                if self.debug :
618 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
619 >                    dbgmsg += '\t'+str(ex.output)+'\n'
620 >                    print dbgmsg
621 >                ErCode = '60307'
622 >            if local_file_size != remote_file_size:
623 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
624 >                ErCode = '60307'
625 >
626 >        if ErCode != '0':
627 >            try :
628 >                self.removeFile( sbi_dest, dest_file, option )
629 >            except Exception, ex:
630 >                msg += '\n'+str(ex)  
631          return ErCode, msg
632 <  
633 <    '''
634 <    def checkSize()
632 >
633 >    def removeFile( self, sbi_dest, filetocopy, option ):
634 >        """  
635 >        """  
636 >        if self.debug : print 'removeFile():\n'
637 >        f_tocopy=filetocopy
638 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
639 >        try:
640 >            sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
641 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
642 >        except OperationException, ex:
643 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
644 >            msg += str(ex)
645 >            if self.debug :
646 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
647 >                dbgmsg += '\t'+str(ex.output)+'\n'
648 >                print dbgmsg
649 >            raise Exception(msg)
650 >
651 >        return
652 >
653 >    def updateReport(self, file, erCode, reason, lfn='', se='' ):
654          """
655 <        Using srm needed a check of the ouptut file size.  
655 >        Update the final stage out infos
656          """
657 <    
658 <        echo "--> remoteSize = $remoteSize"
659 <        ## for local file
660 <        localSize=$(stat -c%s "$path_out_file")
661 <        echo "-->  localSize = $localSize"
662 <        if [ $localSize != $remoteSize ]; then
663 <            echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
664 <            echo "Copy failed: removing remote file $destination"
665 <                srmrm $destination
666 <                cmscp_exit_status=60307
667 <      
668 <      
669 <                echo "Problem copying $path_out_file to $destination with srmcp command"
375 <                StageOutExitStatusReason='remote and local file dimension not match'
376 <                echo "StageOutReport = `cat ./srmcp.report`"
377 <    '''
378 <    def backup(self):
379 <        """
380 <        Check infos from TFC using existing api obtaining:
381 <        1)destination
382 <        2)protocol
657 >        jobStageInfo={}
658 >        jobStageInfo['erCode']=erCode
659 >        jobStageInfo['reason']=reason
660 >        jobStageInfo['lfn']=lfn
661 >        jobStageInfo['se']=se
662 >
663 >        report = { file : jobStageInfo}
664 >        return report
665 >
666 >    def finalReport( self , results ):
667 >        """
668 >        It a list of LFNs for each SE where data are stored.
669 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
670          """
671 +        outFile = open('cmscpReport.sh',"a")
672 +        cmscp_exit_status = 0
673 +        txt = ''
674 +        for file, dict in results.iteritems():
675 +            reason = str(dict['reason'])
676 +            if str(reason).find("'") > -1:
677 +                reason = " ".join(reason.split("'"))
678 +            reason="'%s'"%reason
679 +            if file:
680 +                if dict['lfn']=='':
681 +                    lfn = '${LFNBaseName}'+os.path.basename(file)
682 +                    se  = '$SE'
683 +                    LFNBaseName = '$LFNBaseName'
684 +                else:
685 +                    lfn = dict['lfn']+os.path.basename(file)
686 +                    se = dict['se']
687 +                    LFNBaseName = os.path.dirname(lfn)
688 +                    if (LFNBaseName[-1] != '/'):
689 +                        LFNBaseName = LFNBaseName + '/'
690 +                #dict['lfn'] # to be implemented
691 +                txt += 'echo "Report for File: '+file+'"\n'
692 +                txt += 'echo "LFN: '+lfn+'"\n'
693 +                txt += 'echo "StorageElement: '+se+'"\n'
694 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
695 +                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
696 +                txt += 'export LFNBaseName='+LFNBaseName+'\n'
697 +                txt += 'export SE='+se+'\n'
698 +
699 +                txt += 'export endpoint='+self.params['destination']+'\n'
700 +                
701 +                if dict['erCode'] != '0':
702 +                    cmscp_exit_status = dict['erCode']
703 +            else:
704 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
705 +                cmscp_exit_status = dict['erCode']
706 +        txt += '\n'
707 +        txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
708 +        txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
709 +        outFile.write(str(txt))
710 +        outFile.close()
711          return
712  
386    def usage(self):
713  
714 <        msg="""
715 <        required parameters:
716 <        --source :: REMOTE           :      
717 <        --dest   :: REMOTE           :  
718 <        --debug             :
719 <        --inFile :: absPath : or name NOT RELATIVE PATH
720 <        --outFIle :: onlyNAME : NOT YET SUPPORTED
721 <
722 <        optional parameters      
723 <        """
724 <        return msg
714 > def usage():
715 >
716 >    msg="""
717 >    cmscp:
718 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
719 >        including success checking  version also for CAF using rfcp command to copy the output to SE
720 >
721 >    accepted parameters:
722 >       source           =
723 >       destination      =
724 >       inputFileList    =
725 >       outputFileList   =
726 >       protocol         =
727 >       option           =
728 >       middleware       =  
729 >       srm_version      =
730 >       destinationDir   =
731 >       lfn=             =
732 >       local_stage      =  activate stage fall back  
733 >       debug            =  activate verbose print out
734 >       help             =  print on line man and exit  
735 >    
736 >    mandatory:
737 >       * "source" and/or "destination" must always be defined
738 >       * either "middleware" or "protocol" must always be defined
739 >       * "inputFileList" must always be defined
740 >       * if "local_stage" = 1 also  "lfn" must be defined
741 >    """
742 >    print msg
743 >
744 >    return
745 >
746 > def HelpOptions(opts=[]):
747 >    """
748 >    Check otps, print help if needed
749 >    prepare dict = { opt : value }
750 >    """
751 >    dict_args = {}
752 >    if len(opts):
753 >        for opt, arg in opts:
754 >            dict_args[opt.split('--')[1]] = arg
755 >            if opt in ('-h','-help','--help') :
756 >                usage()
757 >                sys.exit(0)
758 >        return dict_args
759 >    else:
760 >        usage()
761 >        sys.exit(0)
762  
763   if __name__ == '__main__' :
764 +
765 +    import getopt
766 +
767 +    allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
768 +                  "protocol=","option=", "middleware=", "srm_version=", \
769 +                  "destinationDir=", "lfn=", "local_stage", "debug", "help"]
770 +    try:
771 +        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
772 +    except getopt.GetoptError, err:
773 +        print err
774 +        HelpOptions()
775 +        sys.exit(2)
776 +
777 +    dictArgs = HelpOptions(opts)
778      try:
779 <        cmscp_ = cmscp(sys.argv[1:])
779 >        cmscp_ = cmscp(dictArgs)
780          cmscp_.run()
781 <    except:
782 <        pass
781 >    except Exception, ex :
782 >        print str(ex)
783  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines