ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.3 by spiga, Fri Sep 26 07:38:41 2008 UTC vs.
Revision 1.51 by spiga, Tue May 26 21:22:04 2009 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2 <
3 < import sys, getopt, string
4 < import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
2 > import sys, os
3 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
4   from ProdCommon.Storage.SEAPI.SBinterface import *
5 <
5 > from ProdCommon.Storage.SEAPI.Exceptions import *
6  
7  
8   class cmscp:
9 <    def __init__(self, argv):
9 >    def __init__(self, args):
10          """
11          cmscp
12 <
15 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
12 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
13          including success checking  version also for CAF using rfcp command to copy the output to SE
14          input:
15             $1 middleware (CAF, LSF, LCG, OSG)
16             $2 local file (the absolute path of output file or just the name if it's in top dir)
17             $3 if needed: file name (the output file name)
18             $5 remote SE (complete endpoint)
19 <           $6 srm version
19 >           $6 srm version
20 >           --lfn $LFNBaseName
21          output:
22               return 0 if all ok
23               return 60307 if srmcp failed
24               return 60303 if file already exists in the SE
25          """
26 +
27          #set default
28 +        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
29 +                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
30          self.debug = 0
31 <        self.source = ''
32 <        self.destination = ''
33 <        self.file_to_copy = []
33 <        self.remote_file_name = []
34 <        self.protocol = ''
35 <        self.middleware = ''
36 <        self.srmv = ''
37 <  
38 <        try:
39 <            opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
40 <                                                  "protocol=", "middleware=", "srm_version=", "debug", "help"])
41 <        except getopt.GetoptError:
42 <            print self.usage()
43 <            sys.exit(2)
44 <
45 <        self.setAndCheck(opts)  
46 <        
31 >        self.local_stage = 0
32 >        self.params.update( args )
33 >
34          return
35  
36 <    def setAndCheck( self, opts ):
36 >    def processOptions( self ):
37          """
38 <        Set and check command line parameter
38 >        check command line parameter
39          """
40 <        if not opts :
41 <            print self.usage()
42 <            sys.exit()
43 <        for opt, arg in opts :
44 <            if opt  == "--help" :
45 <                print self.usage()
46 <                sys.exit()
47 <            elif opt == "--debug" :
48 <                self.debug = 1
49 <            elif opt == "--source" :
50 <                self.source = arg
51 <            elif opt == "--destination":
52 <                self.destination = arg
53 <            elif opt == "--inputFileList":
67 <                infile = arg
68 <            elif opt == "--outputFileList":
69 <                out_file
70 <            elif opt == "--protocol":
71 <                self.protocol = arg
72 <            elif opt == "--middleware":
73 <                self.middleware = arg
74 <            elif opt == "--srm_version":
75 <                self.srmv = arg
76 <
77 <        # source and dest cannot be undefined at same time
78 <        if self.source == '' and self.destination == '':
79 <            print self.usage()
80 <            sys.exit()
81 <        # if middleware is not defined --> protocol cannot be empty  
82 <        if self.middleware == '' and self.protocol == '':
83 <            print self.usage()
84 <            sys.exit()
85 <        # input file must be defined  
86 <        if infile == '':
87 <            print self.usage()
88 <            sys.exit()
40 >        if 'help' in self.params.keys(): HelpOptions()
41 >        if 'debug' in self.params.keys(): self.debug = 1
42 >        if 'local_stage' in self.params.keys(): self.local_stage = 1
43 >
44 >        # source and dest cannot be undefined at same time
45 >        if not self.params['source']  and not self.params['destination'] :
46 >            HelpOptions()
47 >        # if middleware is not defined --> protocol cannot be empty
48 >        if not self.params['middleware'] and not self.params['protocol'] :
49 >            HelpOptions()
50 >
51 >        # input file must be defined
52 >        if not self.params['inputFileList'] :
53 >            HelpOptions()
54          else:
55 <            if infile.find(','):
56 <                [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
57 <            else:
58 <                self.file_to_copy.append(infile)
59 <        
55 >            file_to_copy=[]
56 >            if self.params['inputFileList'].find(','):
57 >                [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
58 >            else:
59 >                file_to_copy.append(self.params['inputFileList'])
60 >            self.params['inputFileList'] = file_to_copy
61 >
62 >        if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
63 >        
64          ## TO DO:
65          #### add check for outFiles
66          #### add map {'inFileNAME':'outFileNAME'} to change out name
67  
99        return
68  
69 <    def run( self ):  
69 >    def run( self ):
70          """
71 <        Check if running on UI (no $middleware) or
72 <        on WN (on the Grid), and take different action  
71 >        Check if running on UI (no $middleware) or
72 >        on WN (on the Grid), and take different action
73          """
74 <        if self.middleware :  
75 <           results = self.stager()
74 >        self.processOptions()
75 >        if self.debug: print 'calling run() : \n'
76 >        # stage out from WN
77 >        if self.params['middleware'] :
78 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
79 >            self.finalReport(results)
80 >        # Local interaction with SE
81          else:
82 <           results = self.copy( self.file_to_copy, self.protocol )
82 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
83 >            return results
84  
85 <        self.finalReport(results,self.middleware)
112 <
113 <        return
114 <    
115 <    def setProtocol( self ):    
85 >    def setProtocol( self, middleware ):
86          """
87          define the allowed potocols based on $middlware
88 <        which depend on scheduler
88 >        which depend on scheduler
89          """
90 <        if self.middleware.lower() in ['osg','lcg']:
91 <            supported_protocol = ['srm-lcg','srmv2']
92 <        elif self.middleware.lower() in ['lsf','caf']:
93 <            supported_protocol = ['rfio']
90 >        # default To be used with "middleware"
91 >        if self.debug:
92 >            print 'setProtocol() :\n'
93 >            print '\tmiddleware =  %s utils \n'%middleware
94 >        
95 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
96 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
97 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
98 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 '}
99 >        rfioOpt=''
100 >
101 >        supported_protocol = None
102 >        if middleware.lower() in ['osg','lcg','condor','sge']:
103 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
104 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
105 >        elif middleware.lower() in ['lsf','caf']:
106 >            supported_protocol = [('rfio',rfioOpt)]
107          else:
108 <            ## here we can add support for any kind of protocol,
108 >            ## here we can add support for any kind of protocol,
109              ## maybe some local schedulers need something dedicated
110              pass
111          return supported_protocol
112  
113 <    def stager( self ):              
113 >
114 >    def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
115          """
116 <        Implement the logic for remote stage out
116 >        Checks the status of copy and update result dictionary
117          """
118 <        protocols = self.setProtocol()  
119 <        count=0
120 <        list_files = self.file_to_copy
121 <        results={}  
122 <        for prot in protocols:
123 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
124 <            copy_results = self.copy( list_files, prot )
125 <            list_retry = []
126 <            list_existing = []
127 <            list_ok = []
128 <            for file, dict in copy_results.iteritems():
129 <                er_code = dict['erCode']
130 <                if er_code == '60307': list_retry.append( file )
131 <                elif er_code == '60303': list_existing.append( file )
132 <                else:
133 <                    list_ok.append(file)
134 <                    reason = 'Copy succedeed with %s utils'%prot
135 <                    upDict = self.updateReport(file, er_code, reason)
136 <                    copy_results.update(upDict)
137 <            results.update(copy_results)
138 <            if len(list_ok) != 0:  
139 <                msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
140 <               # print msg
141 <            if len(list_ok) == len(list_files) :
142 <                break
118 >        list_retry = []
119 >        list_not_existing = []
120 >        list_ok = []
121 >        
122 >        if self.debug:
123 >            print 'in checkCopy() :\n'
124 >        for file, dict in copy_results.iteritems():
125 >            er_code = dict['erCode']
126 >            if er_code == '0':
127 >                list_ok.append(file)
128 >                reason = 'Copy succedeed with %s utils'%prot
129 >                dict['reason'] = reason
130 >            elif er_code == '60302':
131 >                list_not_existing.append( file )
132 >            elif er_code == '10041':
133 >                list_retry.append( file )
134 >            ## WHAT TO DO IN GENERAL FAILURE CONDITION
135 >            #else:
136 >            #    list_retry.append( file )
137 >                
138 >            if self.debug:
139 >                print "\t file %s \n"%file
140 >                print "\t dict['erCode'] %s \n"%dict['erCode']
141 >                print "\t dict['reason'] %s \n"%dict['reason']
142 >                
143 >            if (lfn != '') and (se != ''):
144 >                upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
145              else:
146 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
147 <                if len(list_retry): list_files = list_retry
148 <                else: break
149 <            count =+1  
146 >                upDict = self.updateReport(file, er_code, dict['reason'])
147 >
148 >            copy_results.update(upDict)
149 >        
150 >        msg = ''
151 >        if len(list_ok) != 0:
152 >            msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
153 >        if len(list_ok) != len_list_files :
154 >            msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
155 >            msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
156 >        if self.debug : print msg
157 >        
158 >        return copy_results, list_ok, list_retry
159 >        
160 >    def LocalCopy(self, list_retry, results):
161 >        """
162 >        Tries the stage out to the CloseSE
163 >        """
164 >        if self.debug:
165 >            print 'in LocalCopy() :\n'
166 >            print '\t list_retry %s utils \n'%list_retry
167 >            print '\t len(list_retry) %s \n'%len(list_retry)
168 >                
169 >        list_files = list_retry  
170 >        self.params['inputFilesList']=list_files
171 >        
172 >        ### copy backup
173 >        from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
174 >        siteCfg = loadSiteLocalConfig()
175 >        seName = siteCfg.localStageOut.get("se-name", None)
176 >        catalog = siteCfg.localStageOut.get("catalog", None)
177 >        implName = siteCfg.localStageOut.get("command", None)
178 >        if (implName == 'srm'):
179 >           implName='srmv1'
180 >           self.params['srm_version']=implName
181 >        ##### to be improved ###############
182 >        if (implName == 'rfcp'):
183 >            self.params['middleware']='lsf'
184 >        ####################################    
185 >                  
186 >        self.params['protocol']=implName
187 >        tfc = siteCfg.trivialFileCatalog()
188 >            
189 >        if self.debug:
190 >            print '\t siteCFG %s \n'%siteCfg
191 >            print '\t seName %s \n'%seName
192 >            print '\t catalog %s \n'%catalog
193 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']            
194 >            print '\t tfc %s '%tfc
195 >            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
196 >                
197 >        file_backup=[]
198 >        for input in self.params['inputFilesList']:
199 >            file = self.params['lfn'] + os.path.basename(input)
200 >            surl = tfc.matchLFN(tfc.preferredProtocol, file)
201 >            file_backup.append(surl)
202 >            if self.debug:
203 >                print '\t lfn %s \n'%self.params['lfn']
204 >                print '\t file %s \n'%file
205 >                print '\t surl %s \n'%surl
206 >                    
207 >        destination=os.path.dirname(file_backup[0])
208 >        self.params['destination']=destination
209 >            
210 >        if self.debug:
211 >            print "\t self.params['destination']%s \n"%self.params['destination']
212 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']
213 >            print "\t self.params['option']%s \n"%self.params['option']
214 >              
215 >        for prot, opt in self.setProtocol( self.params['middleware'] ):
216 >            if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
217 >            localCopy_results = self.copy( self.params['inputFileList'], prot, opt )
218 >            if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
219 >                results.update(localCopy_results)
220 >            else:
221 >                localCopy_results, list_ok, list_retry = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
222 >                results.update(localCopy_results)
223 >                if len(list_ok) == len(list_files) :
224 >                    break
225 >                if len(list_retry):
226 >                    list_files = list_retry
227 >                else: break
228 >            if self.debug:
229 >                print "\t localCopy_results = %s \n"%localCopy_results
230 >        
231 >        return results        
232 >
233 >    def stager( self, middleware, list_files ):
234 >        """
235 >        Implement the logic for remote stage out
236 >        """
237  
238 <        #### TODO Daniele
239 <        #check is something fails and created related dict
240 <  #      backup = self.analyzeResults(results)
241 <  
242 <  #      if backup :  
243 <  #          msg = 'WARNING: backup logic is under implementation\n'
244 <  #          #backupDict = self.backup()
245 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name  
246 <  #          results.update(backupDict)
247 <  #          print msg
238 >        if self.debug:
239 >            print 'stager() :\n'
240 >            print '\tmiddleware %s\n'%middleware
241 >            print '\tlist_files %s\n'%list_files
242 >        
243 >        results={}
244 >        for prot, opt in self.setProtocol( middleware ):
245 >            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
246 >            copy_results = self.copy( list_files, prot, opt )
247 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
248 >                results.update(copy_results)
249 >            else:
250 >                copy_results, list_ok, list_retry = self.checkCopy(copy_results, len(list_files), prot)
251 >                results.update(copy_results)
252 >                if len(list_ok) == len(list_files) :
253 >                    break
254 >                if len(list_retry):
255 >                    list_files = list_retry
256 >                else: break
257 >                
258 >        if self.local_stage:
259 >            if len(list_retry):
260 >                results = self.LocalCopy(list_retry, results)
261 >            
262 >        if self.debug:
263 >            print "\t results %s \n"%results
264          return results
265  
266      def initializeApi(self, protocol ):
267          """
268 <        Instantiate storage interface  
268 >        Instantiate storage interface
269          """
270 <        source_prot = protocol
271 <        dest_prot = protocol
272 <        if self.source == '' : source_prot = 'local'
273 <        Source_SE  = self.storageInterface( self.source, source_prot )
274 <        if self.destination == '' : dest_prot = 'local'
275 <        Destination_SE = self.storageInterface( self.destination, dest_prot )
270 >        if self.debug : print 'initializeApi() :\n'  
271 >        self.source_prot = protocol
272 >        self.dest_prot = protocol
273 >        if not self.params['source'] : self.source_prot = 'local'
274 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
275 >        if not self.params['destination'] : self.dest_prot = 'local'
276 >        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
277  
278          if self.debug :
279 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
280 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
279 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
280 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
281 >            print msg
282  
283          return Source_SE, Destination_SE
284  
285 <    def copy( self, list_file, protocol ):
285 >    def copy( self, list_file, protocol, options ):
286          """
287 <        Make the real file copy using SE API
287 >        Make the real file copy using SE API
288          """
289 +        msg = ""
290          if self.debug :
291 <            print 'copy(): using %s protocol'%protocol
292 <        Source_SE, Destination_SE = self.initializeApi( protocol )
291 >            msg  = 'copy() :\n'
292 >            msg += '\tusing %s protocol\n'%protocol
293 >            print msg
294 >        try:
295 >            Source_SE, Destination_SE = self.initializeApi( protocol )
296 >        except Exception, ex:
297 >            return self.updateReport('', '-1', str(ex))
298  
299 <        # create remote dir
300 <        if protocol in ['gridftp','rfio']:
301 <            self.createDir( Destination_SE, protocol )
299 >        # create remote dir
300 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
301 >            try:
302 >                self.createDir( Destination_SE, Destination_SE.protocol )
303 >            except OperationException, ex:
304 >                return self.updateReport('', '60316', str(ex))
305 >            ## when the client commands are not found (wrong env or really missing)
306 >            except MissingCommand, ex:
307 >                msg = "ERROR %s %s" %(str(ex), str(ex.detail))
308 >                return self.updateReport('', '10041', msg)
309  
310          ## prepare for real copy  ##
311 <        sbi = SBinterface( Source_SE, Destination_SE )
312 <        sbi_dest = SBinterface(Destination_SE)
311 >        try :
312 >            sbi = SBinterface( Source_SE, Destination_SE )
313 >            sbi_dest = SBinterface(Destination_SE)
314 >            sbi_source = SBinterface(Source_SE)
315 >        except ProtocolMismatch, ex:
316 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
317 >            msg += str(ex)
318 >            return self.updateReport('', '-1', msg)
319  
320          results = {}
321 <        ## loop over the complete list of files
322 <        for filetocopy in list_file:
323 <            if self.debug : print 'start real copy for %s'%filetocopy
324 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
325 <            if ErCode == '0':
326 <                ErCode, msg = self.makeCopy( sbi, filetocopy )
327 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
321 >        ## loop over the complete list of files
322 >        for filetocopy in list_file:
323 >            if self.debug : print '\tStart real copy for %s'%filetocopy
324 >            try :
325 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
326 >            except Exception, ex:
327 >                ErCode = -1
328 >                msg = str(ex)  
329 >            if ErCode == '0':
330 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
331 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
332              results.update( self.updateReport(filetocopy, ErCode, msg))
333          return results
220    
221    def updateReport(self, file, erCode, reason, lfn='', se='' ):
222        """
223        Update the final stage out infos
224        """
225        jobStageInfo={}
226        jobStageInfo['erCode']=erCode
227        jobStageInfo['reason']=reason
228        jobStageInfo['lfn']=lfn
229        jobStageInfo['se']=se
230
231        report = { file : jobStageInfo}
232        return report
334  
234    def finalReport( self , results, middleware ):
235        """
236        It should return a clear list of LFNs for each SE where data are stored.
237        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
238        """
239        if middleware:
240            outFile = open('cmscpReport.sh',"a")
241            cmscp_exit_status = 0
242            txt = ''
243            for file, dict in results.iteritems():
244                if dict['lfn']=='':
245                    lfn = '$LFNBaseName/'+os.path.basename(file)
246                    se  = '$SE'
247                else:
248                    lfn = dict['lfn']+os.pat.basename(file)
249                    se = dict['se']      
250                #dict['lfn'] # to be implemented
251                txt +=  'echo "Report for File: '+file+'"\n'
252                txt +=  'echo "LFN: '+lfn+'"\n'  
253                txt +=  'echo "StorageElement: '+se+'"\n'  
254                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
255                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
256                if dict['erCode'] != '0':
257                    cmscp_exit_status = dict['erCode']
258            txt += '\n'
259            txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
260            txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
261            outFile.write(str(txt))
262            outFile.close()
263        else:
264            for file, code in results.iteritems():
265                print 'error code = %s for file %s'%(code,file)
266        return
335  
336      def storageInterface( self, endpoint, protocol ):
337          """
338 <        Create the storage interface.
338 >        Create the storage interface.
339          """
340 +        if self.debug : print 'storageInterface():\n'
341          try:
342              interface = SElement( FullPath(endpoint), protocol )
343 <        except Exception, ex:
344 <            msg = ''
345 <            if self.debug : msg = str(ex)+'\n'
346 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
278 <            print msg
343 >        except ProtocolUnknown, ex:
344 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
345 >            msg += str(ex)
346 >            raise Exception(msg)
347  
348          return interface
349  
282    def checkDir(self, Destination_SE, protocol):
283        '''
284        ToBeImplemented NEEDED for castor
285        '''
286        return
287
350      def createDir(self, Destination_SE, protocol):
351          """
352 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
353 <        this should be transparent at SE API level.
352 >        Create remote dir for gsiftp REALLY TEMPORARY
353 >        this should be transparent at SE API level.
354          """
355 <        ErCode = '0'
356 <        msg_1 = ''
355 >        if self.debug : print 'createDir():\n'
356 >        msg = ''
357          try:
358              action = SBinterface( Destination_SE )
359              action.createDir()
360 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
361 <        except Exception, ex:
362 <            msg = ''
363 <            if self.debug : msg = str(ex)+'\n'
364 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
365 <            msg += msg_1
366 <            ErCode = '60316'  
367 <            #print msg
368 <
369 <        return ErCode, msg_1
360 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
361 >        except TransferException, ex:
362 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
363 >            msg += str(ex)
364 >            if self.debug :
365 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
366 >                dbgmsg += '\t'+str(ex.output)+'\n'
367 >                print dbgmsg
368 >            raise Exception(msg)
369 >        except OperationException, ex:
370 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
371 >            msg += str(ex)
372 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
373 >            raise Exception(msg)
374 >        except MissingDestination, ex:
375 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
376 >            msg += str(ex)
377 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
378 >            raise Exception(msg)
379 >        except AlreadyExistsException, ex:
380 >            if self.debug: print "\tThe directory already exist"
381 >            pass            
382 >        return msg
383  
384 <    def checkFileExist(self, sbi, filetocopy):
384 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
385          """
386 <        Check if file to copy already exist  
387 <        """
388 <        try:
389 <            check = sbi.checkExists(filetocopy)
315 <        except Exception, ex:
316 <            msg = ''
317 <            if self.debug : msg = str(ex)+'\n'
318 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
319 <           # print msg
386 >        Check both if source file exist AND
387 >        if destination file ALREADY exist.
388 >        """
389 >        if self.debug : print 'checkFileExist():\n'
390          ErCode = '0'
391          msg = ''
392 <        if check :
393 <            ErCode = '60303'
394 <            msg = "file %s already exist"%filetocopy
395 <            print msg
392 >        f_tocopy=filetocopy
393 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
394 >        try:
395 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option )
396 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
397 >        except OperationException, ex:
398 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
399 >            msg += str(ex)
400 >            if self.debug :
401 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
402 >                dbgmsg += '\t'+str(ex.output)+'\n'
403 >                print dbgmsg
404 >            raise Exception(msg)
405 >        except WrongOption, ex:
406 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
407 >            msg += str(ex)
408 >            if self.debug :
409 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
410 >                dbgmsg += '\t'+str(ex.output)+'\n'
411 >                print dbgmsg
412 >            raise Exception(msg)
413 >        except MissingDestination, ex:
414 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
415 >            msg += str(ex)
416 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
417 >            raise Exception(msg)
418 >        ## when the client commands are not found (wrong env or really missing)
419 >        except MissingCommand, ex:
420 >            ErCode = '10041'
421 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
422 >            return ErCode, msg
423 >        if not checkSource :
424 >            ErCode = '60302'
425 >            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
426 >            return ErCode, msg
427 >        f_tocopy=filetocopy
428 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
429 >        try:
430 >            check = sbi_dest.checkExists( f_tocopy, opt=option )
431 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
432 >        except OperationException, ex:
433 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
434 >            msg += str(ex)
435 >            if self.debug :
436 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
437 >                dbgmsg += '\t'+str(ex.output)+'\n'
438 >                print dbgmsg
439 >            raise Exception(msg)
440 >        except WrongOption, ex:
441 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
442 >            msg += str(ex)
443 >            if self.debug :
444 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
445 >                msg += '\t'+str(ex.output)+'\n'
446 >            raise Exception(msg)
447 >        except MissingDestination, ex:
448 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
449 >            msg += str(ex)
450 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
451 >            raise Exception(msg)
452 >        ## when the client commands are not found (wrong env or really missing)
453 >        except MissingCommand, ex:
454 >            ErCode = '10041'
455 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
456 >            return ErCode, msg
457 >        if check :
458 >            ErCode = '60303'
459 >            msg = "file %s already exist"%os.path.basename(filetocopy)
460  
461 <        return ErCode,msg  
461 >        return ErCode, msg
462  
463 <    def makeCopy(self, sbi, filetocopy ):  
463 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
464          """
465 <        call the copy API.  
465 >        call the copy API.
466          """
467 <        path = os.path.dirname(filetocopy)  
467 >        if self.debug : print 'makeCopy():\n'
468 >        path = os.path.dirname(filetocopy)
469          file_name =  os.path.basename(filetocopy)
470          source_file = filetocopy
471          dest_file = file_name ## to be improved supporting changing file name  TODO
472 <        if self.source == '' and path == '':
472 >        if self.params['source'] == '' and path == '':
473              source_file = os.path.abspath(filetocopy)
474 <        elif self.destination =='':
475 <            dest_file = os.path.join(os.getcwd(),file_name)
476 <        elif self.source != '' and self.destination != '' :
477 <            source_file = file_name  
474 >        elif self.params['destination'] =='':
475 >            destDir = self.params.get('destinationDir',os.getcwd())
476 >            dest_file = os.path.join(destDir,file_name)
477 >        elif self.params['source'] != '' and self.params['destination'] != '' :
478 >            source_file = file_name
479 >
480          ErCode = '0'
481          msg = ''
482 +
483          try:
484 <            pippo = sbi.copy( source_file , dest_file )
485 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
486 <        except Exception, ex:
487 <            msg = ''
488 <            if self.debug : msg = str(ex)+'\n'
489 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
484 >            sbi.copy( source_file , dest_file , opt = option)
485 >        except TransferException, ex:
486 >            msg  = "Problem copying %s file" % filetocopy
487 >            msg += str(ex)
488 >            if self.debug :
489 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
490 >                dbgmsg += '\t'+str(ex.output)+'\n'
491 >                print dbgmsg
492              ErCode = '60307'
493 <            #print msg
494 <
493 >        except WrongOption, ex:
494 >            msg  = "Problem copying %s file" % filetocopy
495 >            msg += str(ex)
496 >            if self.debug :
497 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
498 >                dbgmsg += '\t'+str(ex.output)+'\n'
499 >                print dbgmsg
500 >        except SizeZeroException, ex:
501 >            msg  = "Problem copying %s file" % filetocopy
502 >            msg += str(ex)
503 >            if self.debug :
504 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
505 >                dbgmsg += '\t'+str(ex.output)+'\n'
506 >                print dbgmsg
507 >            ErCode = '60307'
508 >        ## when the client commands are not found (wrong env or really missing)
509 >        except MissingCommand, ex:
510 >            ErCode = '10041'
511 >            msg  = "Problem copying %s file" % filetocopy
512 >            msg += str(ex)
513 >            if self.debug :
514 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
515 >                dbgmsg += '\t'+str(ex.output)+'\n'
516 >                print dbgmsg
517 >        if ErCode == '0' and protocol.find('srmv') == 0:
518 >            remote_file_size = -1
519 >            local_file_size = os.path.getsize( source_file )
520 >            try:
521 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option )
522 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
523 >            except TransferException, ex:
524 >                msg  = "Problem checking the size of %s file" % filetocopy
525 >                msg += str(ex)
526 >                if self.debug :
527 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
528 >                    dbgmsg += '\t'+str(ex.output)+'\n'
529 >                    print dbgmsg
530 >                ErCode = '60307'
531 >            except WrongOption, ex:
532 >                msg  = "Problem checking the size of %s file" % filetocopy
533 >                msg += str(ex)
534 >                if self.debug :
535 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
536 >                    dbgmsg += '\t'+str(ex.output)+'\n'
537 >                    print dbgmsg
538 >                ErCode = '60307'
539 >            if local_file_size != remote_file_size:
540 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
541 >                ErCode = '60307'
542 >
543 >        if ErCode != '0':
544 >            try :
545 >                self.removeFile( sbi_dest, dest_file, option )
546 >            except Exception, ex:
547 >                msg += '\n'+str(ex)  
548          return ErCode, msg
549 <  
550 <    '''
551 <    def checkSize()
549 >
550 >    def removeFile( self, sbi_dest, filetocopy, option ):
551 >        """  
552 >        """  
553 >        if self.debug : print 'removeFile():\n'
554 >        f_tocopy=filetocopy
555 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
556 >        try:
557 >            sbi_dest.delete( f_tocopy, opt=option )
558 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
559 >        except OperationException, ex:
560 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
561 >            msg += str(ex)
562 >            if self.debug :
563 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
564 >                dbgmsg += '\t'+str(ex.output)+'\n'
565 >                print dbgmsg
566 >            raise Exception(msg)
567 >
568 >        return
569 >
570 >    def updateReport(self, file, erCode, reason, lfn='', se='' ):
571          """
572 <        Using srm needed a check of the ouptut file size.  
572 >        Update the final stage out infos
573          """
574 <    
575 <        echo "--> remoteSize = $remoteSize"
576 <        ## for local file
577 <        localSize=$(stat -c%s "$path_out_file")
578 <        echo "-->  localSize = $localSize"
579 <        if [ $localSize != $remoteSize ]; then
580 <            echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
581 <            echo "Copy failed: removing remote file $destination"
582 <                srmrm $destination
583 <                cmscp_exit_status=60307
584 <      
585 <      
586 <                echo "Problem copying $path_out_file to $destination with srmcp command"
375 <                StageOutExitStatusReason='remote and local file dimension not match'
376 <                echo "StageOutReport = `cat ./srmcp.report`"
377 <    '''
378 <    def backup(self):
379 <        """
380 <        Check infos from TFC using existing api obtaining:
381 <        1)destination
382 <        2)protocol
574 >        jobStageInfo={}
575 >        jobStageInfo['erCode']=erCode
576 >        jobStageInfo['reason']=reason
577 >        jobStageInfo['lfn']=lfn
578 >        jobStageInfo['se']=se
579 >
580 >        report = { file : jobStageInfo}
581 >        return report
582 >
583 >    def finalReport( self , results ):
584 >        """
585 >        It a list of LFNs for each SE where data are stored.
586 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
587          """
588 +        outFile = open('cmscpReport.sh',"a")
589 +        cmscp_exit_status = 0
590 +        txt = ''
591 +        for file, dict in results.iteritems():
592 +            reason="'%s'"%dict['reason']
593 +            if file:
594 +                if dict['lfn']=='':
595 +                    lfn = '$LFNBaseName/'+os.path.basename(file)
596 +                    se  = '$SE'
597 +                else:
598 +                    lfn = dict['lfn']+os.path.basename(file)
599 +                    se = dict['se']
600 +                    
601 +                #dict['lfn'] # to be implemented
602 +                txt += 'echo "Report for File: '+file+'"\n'
603 +                txt += 'echo "LFN: '+lfn+'"\n'
604 +                txt += 'echo "StorageElement: '+se+'"\n'
605 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
606 +                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
607 +                #txt += 'export LFNBaseName='+lfn+'\n'
608 +                txt += 'export SE='+se+'\n'
609 +                
610 +                if dict['erCode'] != '0':
611 +                    cmscp_exit_status = dict['erCode']
612 +            else:
613 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
614 +                cmscp_exit_status = dict['erCode']
615 +                cmscp_exit_status = dict['erCode']
616 +        txt += '\n'
617 +        txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
618 +        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
619 +        outFile.write(str(txt))
620 +        outFile.close()
621          return
622  
386    def usage(self):
623  
624 <        msg="""
625 <        required parameters:
626 <        --source        :: REMOTE           :      
627 <        --destination   :: REMOTE           :  
628 <        --debug             :
629 <        --inFile :: absPath : or name NOT RELATIVE PATH
630 <        --outFIle :: onlyNAME : NOT YET SUPPORTED
631 <
632 <        optional parameters      
633 <        """
634 <        return msg
624 > def usage():
625 >
626 >    msg="""
627 >    cmscp:
628 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
629 >        including success checking  version also for CAF using rfcp command to copy the output to SE
630 >
631 >    accepted parameters:
632 >       source           =
633 >       destination      =
634 >       inputFileList    =
635 >       outputFileList   =
636 >       protocol         =
637 >       option           =
638 >       middleware       =  
639 >       srm_version      =
640 >       destinationDir   =
641 >       lfn=             =
642 >       local_stage      =  activate stage fall back  
643 >       debug            =  activate verbose print out
644 >       help             =  print on line man and exit  
645 >    
646 >    mandatory:
647 >       * "source" and/or "destination" must always be defined
648 >       * either "middleware" or "protocol" must always be defined
649 >       * "inputFileList" must always be defined
650 >       * if "local_stage" = 1 also  "lfn" must be defined
651 >    """
652 >    print msg
653 >
654 >    return
655 >
656 > def HelpOptions(opts=[]):
657 >    """
658 >    Check otps, print help if needed
659 >    prepare dict = { opt : value }
660 >    """
661 >    dict_args = {}
662 >    if len(opts):
663 >        for opt, arg in opts:
664 >            dict_args[opt.split('--')[1]] = arg
665 >            if opt in ('-h','-help','--help') :
666 >                usage()
667 >                sys.exit(0)
668 >        return dict_args
669 >    else:
670 >        usage()
671 >        sys.exit(0)
672  
673   if __name__ == '__main__' :
674 +
675 +    import getopt
676 +
677 +    allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
678 +                  "protocol=","option=", "middleware=", "srm_version=", \
679 +                  "destinationDir=", "lfn=", "local_stage", "debug", "help"]
680 +    try:
681 +        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
682 +    except getopt.GetoptError, err:
683 +        print err
684 +        HelpOptions()
685 +        sys.exit(2)
686 +
687 +    dictArgs = HelpOptions(opts)
688      try:
689 <        cmscp_ = cmscp(sys.argv[1:])
689 >        cmscp_ = cmscp(dictArgs)
690          cmscp_.run()
691 <    except:
692 <        pass
691 >    except Exception, ex :
692 >        print str(ex)
693  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines