ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.1 by spiga, Sun Sep 21 10:34:04 2008 UTC vs.
Revision 1.49 by spiga, Mon Apr 20 17:51:22 2009 UTC

# Line 1 | Line 1
1 < #!/usr/bin/env python
1 > ##!/usr/bin/env python
2  
3 < import sys, getopt, string
4 < import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
3 > import sys, os
4 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5   from ProdCommon.Storage.SEAPI.SBinterface import *
6 <
6 > from ProdCommon.Storage.SEAPI.Exceptions import *
7  
8  
9   class cmscp:
10 <    def __init__(self, argv):
10 >    def __init__(self, args):
11          """
12          cmscp
13 <
15 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
13 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
14          including success checking  version also for CAF using rfcp command to copy the output to SE
15          input:
16             $1 middleware (CAF, LSF, LCG, OSG)
17 <           $2 local file (the physical path of output file respect to current working directory)
18 <           $3 file name (the output file name)
19 <           $4 remote SE_PATH (absolute)
20 <           $5 remote SE
21 <           $6 srm version (only in the case of LCG or OSG)
17 >           $2 local file (the absolute path of output file or just the name if it's in top dir)
18 >           $3 if needed: file name (the output file name)
19 >           $5 remote SE (complete endpoint)
20 >           $6 srm version
21 >           --lfn $LFNBaseName
22          output:
23               return 0 if all ok
24               return 60307 if srmcp failed
25               return 60303 if file already exists in the SE
26          """
27 +
28          #set default
29 +        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
30 +                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
31          self.debug = 0
32 <        self.source = ''
33 <        self.destination = ''
34 <        self.file_to_copy = []
34 <        self.remote_file_name = []
35 <        self.protocol = ''
36 <        self.middleware = ''
37 <        self.srmv = ''
38 <  
39 <        try:
40 <            opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
41 <                                                  "protocol=", "middleware=", "srm_version=", "debug", "help"])
42 <        except getopt.GetoptError:
43 <            print self.usage()
44 <            sys.exit(2)
45 <
46 <        self.setAndCheck(opts)  
47 <        
32 >        self.local_stage = 0
33 >        self.params.update( args )
34 >
35          return
36  
37 <    def setAndCheck( self, opts ):
37 >    def processOptions( self ):
38          """
39 <        Set and check command line parameter
39 >        check command line parameter
40          """
41 <        if not opts :
42 <            print self.usage()
43 <            sys.exit()
44 <        for opt, arg in opts :
45 <            if opt  == "--help" :
46 <                print self.usage()
47 <                sys.exit()
48 <            elif opt == "--debug" :
49 <                self.debug = 1
50 <            elif opt == "--source" :
51 <                self.source = arg
52 <            elif opt == "--destination":
53 <                self.destination = arg
54 <            elif opt == "--inputFileList":
68 <                infile = arg
69 <            elif opt == "--outputFileList":
70 <                out_file
71 <            elif opt == "--protocol":
72 <                self.protocol = arg
73 <            elif opt == "--middleware":
74 <                self.middleware = arg
75 <            elif opt == "--srm_version":
76 <                self.srmv = arg
77 <
78 <        # source and dest cannot be undefined at same time
79 <        if self.source == '' and self.destination == '':
80 <            print self.usage()
81 <            sys.exit()
82 <        # if middleware is not defined --> protocol cannot be empty  
83 <        if self.middleware == '' and self.protocol == '':
84 <            print self.usage()
85 <            sys.exit()
86 <        # input file must be defined  
87 <        if infile == '':
88 <            print self.usage()
89 <            sys.exit()
41 >        if 'help' in self.params.keys(): HelpOptions()
42 >        if 'debug' in self.params.keys(): self.debug = 1
43 >        if 'local_stage' in self.params.keys(): self.local_stage = 1
44 >
45 >        # source and dest cannot be undefined at same time
46 >        if not self.params['source']  and not self.params['destination'] :
47 >            HelpOptions()
48 >        # if middleware is not defined --> protocol cannot be empty
49 >        if not self.params['middleware'] and not self.params['protocol'] :
50 >            HelpOptions()
51 >
52 >        # input file must be defined
53 >        if not self.params['inputFileList'] :
54 >            HelpOptions()
55          else:
56 <            if infile.find(','):
57 <                [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
58 <            else:
59 <                self.file_to_copy.append(infile)
60 <        
56 >            file_to_copy=[]
57 >            if self.params['inputFileList'].find(','):
58 >                [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 >            else:
60 >                file_to_copy.append(self.params['inputFileList'])
61 >            self.params['inputFileList'] = file_to_copy
62 >
63 >        if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
64 >        
65          ## TO DO:
66          #### add check for outFiles
67          #### add map {'inFileNAME':'outFileNAME'} to change out name
68  
100        return
69  
70 <    def run( self ):  
70 >    def run( self ):
71          """
72 <        Check if running on UI (no $middleware) or
73 <        on WN (on the Grid), and take different action  
72 >        Check if running on UI (no $middleware) or
73 >        on WN (on the Grid), and take different action
74          """
75 <        if self.middleware :  
76 <           results = self.stager()
75 >        self.processOptions()
76 >        if self.debug: print 'calling run() : \n'
77 >        # stage out from WN
78 >        if self.params['middleware'] :
79 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
80 >            self.finalReport(results)
81 >        # Local interaction with SE
82          else:
83 <           results = self.copy( self.file_to_copy, self.protocol )
84 <
112 <        self.finalReport(results,self.middleware)
83 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
84 >            return results
85  
86 <        return
115 <    
116 <    def setProtocol( self ):    
86 >    def setProtocol( self, middleware ):
87          """
88          define the allowed potocols based on $middlware
89 <        which depend on scheduler
89 >        which depend on scheduler
90          """
91 <        if self.middleware.lower() in ['osg','lcg']:
92 <            supported_protocol = ['srm-lcg','srmv2']
93 <        elif self.middleware.lower() in ['lsf','caf']:
94 <            supported_protocol = ['rfio']
91 >        # default To be used with "middleware"
92 >        if self.debug:
93 >            print 'setProtocol() :\n'
94 >            print '\tmiddleware =  %s utils \n'%middleware
95 >        
96 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
97 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
98 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
99 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 '}
100 >        rfioOpt=''
101 >
102 >        supported_protocol = None
103 >        if middleware.lower() in ['osg','lcg','condor','sge']:
104 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
105 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
106 >        elif middleware.lower() in ['lsf','caf']:
107 >            supported_protocol = [('rfio',rfioOpt)]
108          else:
109 <            ## here we can add support for any kind of protocol,
109 >            ## here we can add support for any kind of protocol,
110              ## maybe some local schedulers need something dedicated
111              pass
112          return supported_protocol
113  
114 <    def stager( self ):              
114 >
115 >    def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
116          """
117 <        Implement the logic for remote stage out
117 >        Checks the status of copy and update result dictionary
118          """
119 <        protocols = self.setProtocol()  
120 <        count=0
121 <        list_files = self.file_to_copy
122 <        results={}  
123 <        for prot in protocols:
124 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
125 <            copy_results = self.copy( list_files, prot )
126 <            list_retry = []
127 <            list_existing = []
128 <            list_ok = []
129 <            for file, dict in copy_results.iteritems():
130 <                er_code = dict['erCode']
131 <                if er_code == '60307': list_retry.append( file )
132 <                elif er_code == '60303': list_existing.append( file )
133 <                else:
134 <                    list_ok.append(file)
135 <                    reason = 'Copy succedeed with %s utils'%prot
136 <                    upDict = self.updateReport(file, er_code, reason)
137 <                    copy_results.update(upDict)
138 <            results.update(copy_results)
139 <            if len(list_ok) != 0:  
140 <                msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
141 <               # print msg
142 <            if len(list_ok) == len(list_files) :
159 <                break
119 >        list_retry = []
120 >        list_not_existing = []
121 >        list_ok = []
122 >        
123 >        if self.debug:
124 >            print 'in checkCopy() :\n'
125 >        for file, dict in copy_results.iteritems():
126 >            er_code = dict['erCode']
127 >            if er_code == '0':
128 >                list_ok.append(file)
129 >                reason = 'Copy succedeed with %s utils'%prot
130 >                dict['reason'] = reason
131 >            elif er_code == '60302':
132 >                list_not_existing.append( file )
133 >            else:
134 >                list_retry.append( file )
135 >                
136 >            if self.debug:
137 >                print "\t file %s \n"%file
138 >                print "\t dict['erCode'] %s \n"%dict['erCode']
139 >                print "\t dict['reason'] %s \n"%dict['reason']
140 >                
141 >            if (lfn != '') and (se != ''):
142 >                upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
143              else:
144 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
145 <                if len(list_retry): list_files = list_retry
146 <                else: break
147 <            count =+1  
144 >                upDict = self.updateReport(file, er_code, dict['reason'])
145 >
146 >            copy_results.update(upDict)
147 >        
148 >        msg = ''
149 >        if len(list_ok) != 0:
150 >            msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
151 >        if len(list_ok) != len_list_files :
152 >            msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
153 >            msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
154 >        if self.debug : print msg
155 >        
156 >        return copy_results, list_ok, list_retry
157 >        
158 >    def LocalCopy(self, list_retry, results):
159 >        """
160 >        Tries the stage out to the CloseSE
161 >        """
162 >        if self.debug:
163 >            print 'in LocalCopy() :\n'
164 >            print '\t list_retry %s utils \n'%list_retry
165 >            print '\t len(list_retry) %s \n'%len(list_retry)
166 >                
167 >        list_files = list_retry  
168 >        self.params['inputFilesList']=list_files
169 >        
170 >        ### copy backup
171 >        from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
172 >        siteCfg = loadSiteLocalConfig()
173 >        seName = siteCfg.localStageOut.get("se-name", None)
174 >        catalog = siteCfg.localStageOut.get("catalog", None)
175 >        implName = siteCfg.localStageOut.get("command", None)
176 >        if (implName == 'srm'):
177 >           implName='srmv1'
178 >           self.params['srm_version']=implName
179 >        ##### to be improved ###############
180 >        if (implName == 'rfcp'):
181 >            self.params['middleware']='lsf'
182 >        ####################################    
183 >                  
184 >        self.params['protocol']=implName
185 >        tfc = siteCfg.trivialFileCatalog()
186 >            
187 >        if self.debug:
188 >            print '\t siteCFG %s \n'%siteCfg
189 >            print '\t seName %s \n'%seName
190 >            print '\t catalog %s \n'%catalog
191 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']            
192 >            print '\t tfc %s '%tfc
193 >            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
194 >                
195 >        file_backup=[]
196 >        for input in self.params['inputFilesList']:
197 >            file = self.params['lfn'] + os.path.basename(input)
198 >            surl = tfc.matchLFN(tfc.preferredProtocol, file)
199 >            file_backup.append(surl)
200 >            if self.debug:
201 >                print '\t lfn %s \n'%self.params['lfn']
202 >                print '\t file %s \n'%file
203 >                print '\t surl %s \n'%surl
204 >                    
205 >        destination=os.path.dirname(file_backup[0])
206 >        self.params['destination']=destination
207 >            
208 >        if self.debug:
209 >            print "\t self.params['destination']%s \n"%self.params['destination']
210 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']
211 >            print "\t self.params['option']%s \n"%self.params['option']
212 >              
213 >        for prot, opt in self.setProtocol( self.params['middleware'] ):
214 >            if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
215 >            localCopy_results = self.copy( self.params['inputFileList'], prot, opt )
216 >            if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
217 >                results.update(localCopy_results)
218 >            else:
219 >                localCopy_results, list_ok, list_retry = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
220 >                results.update(localCopy_results)
221 >                if len(list_ok) == len(list_files) :
222 >                    break
223 >                if len(list_retry):
224 >                    list_files = list_retry
225 >                else: break
226 >            if self.debug:
227 >                print "\t localCopy_results = %s \n"%localCopy_results
228 >        
229 >        return results        
230 >
231 >    def stager( self, middleware, list_files ):
232 >        """
233 >        Implement the logic for remote stage out
234 >        """
235  
236 <        #### TODO Daniele
237 <        #check is something fails and created related dict
238 <  #      backup = self.analyzeResults(results)
239 <  
240 <  #      if backup :  
241 <  #          msg = 'WARNING: backup logic is under implementation\n'
242 <  #          #backupDict = self.backup()
243 <  #          ### NOTA DEVE RITORNARE UN DICT comprensivo di LFN SE Name  
244 <  #          results.update(backupDict)
245 <  #          print msg
236 >        if self.debug:
237 >            print 'stager() :\n'
238 >            print '\tmiddleware %s\n'%middleware
239 >            print '\tlist_files %s\n'%list_files
240 >        
241 >        results={}
242 >        for prot, opt in self.setProtocol( middleware ):
243 >            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
244 >            copy_results = self.copy( list_files, prot, opt )
245 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
246 >                results.update(copy_results)
247 >            else:
248 >                copy_results, list_ok, list_retry = self.checkCopy(copy_results, len(list_files), prot)
249 >                results.update(copy_results)
250 >                if len(list_ok) == len(list_files) :
251 >                    break
252 >                if len(list_retry):
253 >                    list_files = list_retry
254 >                else: break
255 >                
256 >        if self.local_stage:
257 >            if len(list_retry):
258 >                results = self.LocalCopy(list_retry, results)
259 >            
260 >        if self.debug:
261 >            print "\t results %s \n"%results
262          return results
263  
264      def initializeApi(self, protocol ):
265          """
266 <        Instantiate storage interface  
266 >        Instantiate storage interface
267          """
268 <        source_prot = protocol
269 <        dest_prot = protocol
270 <        if self.source == '' : source_prot = 'local'
271 <        Source_SE  = self.storageInterface( self.source, source_prot )
272 <        if self.destination == '' : dest_prot = 'local'
273 <        Destination_SE = self.storageInterface( self.destination, dest_prot )
268 >        if self.debug : print 'initializeApi() :\n'  
269 >        self.source_prot = protocol
270 >        self.dest_prot = protocol
271 >        if not self.params['source'] : self.source_prot = 'local'
272 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
273 >        if not self.params['destination'] : self.dest_prot = 'local'
274 >        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
275  
276          if self.debug :
277 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
278 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
277 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
278 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
279 >            print msg
280  
281          return Source_SE, Destination_SE
282  
283 <    def copy( self, list_file, protocol ):
283 >    def copy( self, list_file, protocol, options ):
284          """
285 <        Make the real file copy using SE API
285 >        Make the real file copy using SE API
286          """
287 +        msg = ""
288          if self.debug :
289 <            print 'copy(): using %s protocol'%protocol
290 <        Source_SE, Destination_SE = self.initializeApi( protocol )
289 >            msg  = 'copy() :\n'
290 >            msg += '\tusing %s protocol\n'%protocol
291 >            print msg
292 >        try:
293 >            Source_SE, Destination_SE = self.initializeApi( protocol )
294 >        except Exception, ex:
295 >            return self.updateReport('', '-1', str(ex))
296  
297 <        # create remote dir
298 <        if protocol in ['gridftp','rfio']:
299 <            self.createDir( Destination_SE, protocol )
297 >        # create remote dir
298 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
299 >            try:
300 >                self.createDir( Destination_SE, Destination_SE.protocol )
301 >            except OperationException, ex:
302 >                return self.updateReport('', '60316', str(ex))
303  
304          ## prepare for real copy  ##
305 <        sbi = SBinterface( Source_SE, Destination_SE )
306 <        sbi_dest = SBinterface(Destination_SE)
305 >        try :
306 >            sbi = SBinterface( Source_SE, Destination_SE )
307 >            sbi_dest = SBinterface(Destination_SE)
308 >            sbi_source = SBinterface(Source_SE)
309 >        except ProtocolMismatch, ex:
310 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
311 >            msg += str(ex)
312 >            return self.updateReport('', '-1', msg)
313  
314          results = {}
315 <        ## loop over the complete list of files
316 <        for filetocopy in list_file:
317 <            if self.debug : print 'start real copy for %s'%filetocopy
318 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
319 <            if ErCode == '0':
320 <                ErCode, msg = self.makeCopy( sbi, filetocopy )
321 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
315 >        ## loop over the complete list of files
316 >        for filetocopy in list_file:
317 >            if self.debug : print '\tStart real copy for %s'%filetocopy
318 >            try :
319 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
320 >            except Exception, ex:
321 >                ErCode = -1
322 >                msg = str(ex)  
323 >            if ErCode == '0':
324 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
325 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
326              results.update( self.updateReport(filetocopy, ErCode, msg))
327          return results
221    
222    def updateReport(self, file, erCode, reason, lfn='', se='' ):
223        """
224        Update the final stage out infos
225        """
226        jobStageInfo={}
227        jobStageInfo['erCode']=erCode
228        jobStageInfo['reason']=reason
229        jobStageInfo['lfn']=lfn
230        jobStageInfo['se']=se
328  
232        report = { file : jobStageInfo}
233        return report
234
235    def finalReport( self , results, middleware ):
236        """
237        It should return a clear list of LFNs for each SE where data are stored.
238        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
239        """
240        if middleware:
241            outFile = open('cmscpReport.sh',"a")
242            cmscp_exit_status = 0
243            txt = ''
244            for file, dict in results.iteritems():
245                if dict['lfn']=='':
246                    lfn = '$LFNBaseName/'+os.path.basename(file)
247                    se  = '$SE'
248                else:
249                    lfn = dict['lfn']+os.pat.basename(file)
250                    se = dict['se']      
251                #dict['lfn'] # to be implemented
252                txt +=  'echo "Report for File: '+file+'"\n'
253                txt +=  'echo "LFN: '+lfn+'"\n'  
254                txt +=  'echo "StorageElement: '+se+'"\n'  
255                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
256                txt += 'echo "StageOutSE = '+dict['se']+'" >> $RUNTIME_AREA/$repo\n'
257                if dict['erCode'] != '0':
258                    cmscp_exit_status = dict['erCode']
259            txt += '\n'
260            txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
261            txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
262            outFile.write(str(txt))
263            outFile.close()
264        else:
265            for file, code in results.iteritems():
266                print 'error code = %s for file %s'%(code,file)
267        return
329  
330      def storageInterface( self, endpoint, protocol ):
331          """
332 <        Create the storage interface.
332 >        Create the storage interface.
333          """
334 +        if self.debug : print 'storageInterface():\n'
335          try:
336              interface = SElement( FullPath(endpoint), protocol )
337 <        except Exception, ex:
338 <            msg = ''
339 <            if self.debug : msg = str(ex)+'\n'
340 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
279 <            print msg
337 >        except ProtocolUnknown, ex:
338 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
339 >            msg += str(ex)
340 >            raise Exception(msg)
341  
342          return interface
343  
283    def checkDir(self, Destination_SE, protocol):
284        '''
285        ToBeImplemented NEEDED for castor
286        '''
287        return
288
344      def createDir(self, Destination_SE, protocol):
345          """
346 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
347 <        this should be transparent at SE API level.
346 >        Create remote dir for gsiftp REALLY TEMPORARY
347 >        this should be transparent at SE API level.
348          """
349 <        ErCode = '0'
350 <        msg_1 = ''
349 >        if self.debug : print 'createDir():\n'
350 >        msg = ''
351          try:
352              action = SBinterface( Destination_SE )
353              action.createDir()
354 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
355 <        except Exception, ex:
356 <            msg = ''
357 <            if self.debug : msg = str(ex)+'\n'
358 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
359 <            msg += msg_1
360 <            ErCode = '60316'  
361 <            #print msg
362 <
363 <        return ErCode, msg_1
354 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
355 >        except TransferException, ex:
356 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
357 >            msg += str(ex)
358 >            if self.debug :
359 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
360 >                dbgmsg += '\t'+str(ex.output)+'\n'
361 >                print dbgmsg
362 >            raise Exception(msg)
363 >        except OperationException, ex:
364 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
365 >            msg += str(ex)
366 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
367 >            raise Exception(msg)
368 >        except MissingDestination, ex:
369 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
370 >            msg += str(ex)
371 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
372 >            raise Exception(msg)
373 >        except AlreadyExistsException, ex:
374 >            if self.debug: print "\tThe directory already exist"
375 >            pass            
376 >        return msg
377  
378 <    def checkFileExist(self, sbi, filetocopy):
378 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
379          """
380 <        Check if file to copy already exist  
381 <        """
382 <        try:
383 <            check = sbi.checkExists(filetocopy)
316 <        except Exception, ex:
317 <            msg = ''
318 <            if self.debug : msg = str(ex)+'\n'
319 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
320 <           # print msg
380 >        Check both if source file exist AND
381 >        if destination file ALREADY exist.
382 >        """
383 >        if self.debug : print 'checkFileExist():\n'
384          ErCode = '0'
385          msg = ''
386 <        if check :
387 <            ErCode = '60303'
388 <            msg = "file %s already exist"%filetocopy
389 <            print msg
386 >        f_tocopy=filetocopy
387 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
388 >        try:
389 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option )
390 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
391 >        except OperationException, ex:
392 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
393 >            msg += str(ex)
394 >            if self.debug :
395 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
396 >                dbgmsg += '\t'+str(ex.output)+'\n'
397 >                print dbgmsg
398 >            raise Exception(msg)
399 >        except WrongOption, ex:
400 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
401 >            msg += str(ex)
402 >            if self.debug :
403 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
404 >                dbgmsg += '\t'+str(ex.output)+'\n'
405 >                print dbgmsg
406 >            raise Exception(msg)
407 >        except MissingDestination, ex:
408 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
409 >            msg += str(ex)
410 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
411 >            raise Exception(msg)
412 >        if not checkSource :
413 >            ErCode = '60302'
414 >            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
415 >            return ErCode, msg
416 >        f_tocopy=filetocopy
417 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
418 >        try:
419 >            check = sbi_dest.checkExists( f_tocopy, opt=option )
420 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
421 >        except OperationException, ex:
422 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
423 >            msg += str(ex)
424 >            if self.debug :
425 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
426 >                dbgmsg += '\t'+str(ex.output)+'\n'
427 >                print dbgmsg
428 >            raise Exception(msg)
429 >        except WrongOption, ex:
430 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
431 >            msg += str(ex)
432 >            if self.debug :
433 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
434 >                msg += '\t'+str(ex.output)+'\n'
435 >            raise Exception(msg)
436 >        except MissingDestination, ex:
437 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
438 >            msg += str(ex)
439 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
440 >            raise Exception(msg)
441 >        if check :
442 >            ErCode = '60303'
443 >            msg = "file %s already exist"%os.path.basename(filetocopy)
444  
445 <        return ErCode,msg  
445 >        return ErCode, msg
446  
447 <    def makeCopy(self, sbi, filetocopy ):  
447 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
448          """
449 <        call the copy API.  
449 >        call the copy API.
450          """
451 <        path = os.path.dirname(filetocopy)  
451 >        if self.debug : print 'makeCopy():\n'
452 >        path = os.path.dirname(filetocopy)
453          file_name =  os.path.basename(filetocopy)
454          source_file = filetocopy
455          dest_file = file_name ## to be improved supporting changing file name  TODO
456 <        if self.source == '' and path == '':
456 >        if self.params['source'] == '' and path == '':
457              source_file = os.path.abspath(filetocopy)
458 <        elif self.destination =='':
459 <            dest_file = os.path.join(os.getcwd(),file_name)
460 <        elif self.source != '' and self.destination != '' :
461 <            source_file = file_name  
458 >        elif self.params['destination'] =='':
459 >            destDir = self.params.get('destinationDir',os.getcwd())
460 >            dest_file = os.path.join(destDir,file_name)
461 >        elif self.params['source'] != '' and self.params['destination'] != '' :
462 >            source_file = file_name
463 >
464          ErCode = '0'
465          msg = ''
466 +
467          try:
468 <            pippo = sbi.copy( source_file , dest_file )
469 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
470 <        except Exception, ex:
471 <            msg = ''
472 <            if self.debug : msg = str(ex)+'\n'
473 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
468 >            sbi.copy( source_file , dest_file , opt = option)
469 >        except TransferException, ex:
470 >            msg  = "Problem copying %s file" % filetocopy
471 >            msg += str(ex)
472 >            if self.debug :
473 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
474 >                dbgmsg += '\t'+str(ex.output)+'\n'
475 >                print dbgmsg
476              ErCode = '60307'
477 <            #print msg
478 <
477 >        except WrongOption, ex:
478 >            msg  = "Problem copying %s file" % filetocopy
479 >            msg += str(ex)
480 >            if self.debug :
481 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
482 >                dbgmsg += '\t'+str(ex.output)+'\n'
483 >                print dbgmsg
484 >        except SizeZeroException, ex:
485 >            msg  = "Problem copying %s file" % filetocopy
486 >            msg += str(ex)
487 >            if self.debug :
488 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
489 >                dbgmsg += '\t'+str(ex.output)+'\n'
490 >                print dbgmsg
491 >            ErCode = '60307'
492 >        if ErCode == '0' and protocol.find('srmv') == 0:
493 >            remote_file_size = -1
494 >            local_file_size = os.path.getsize( source_file )
495 >            try:
496 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option )
497 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
498 >            except TransferException, ex:
499 >                msg  = "Problem checking the size of %s file" % filetocopy
500 >                msg += str(ex)
501 >                if self.debug :
502 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
503 >                    dbgmsg += '\t'+str(ex.output)+'\n'
504 >                    print dbgmsg
505 >                ErCode = '60307'
506 >            except WrongOption, ex:
507 >                msg  = "Problem checking the size of %s file" % filetocopy
508 >                msg += str(ex)
509 >                if self.debug :
510 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
511 >                    dbgmsg += '\t'+str(ex.output)+'\n'
512 >                    print dbgmsg
513 >                ErCode = '60307'
514 >            if local_file_size != remote_file_size:
515 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
516 >                ErCode = '60307'
517 >
518 >        if ErCode != '0':
519 >            try :
520 >                self.removeFile( sbi_dest, dest_file, option )
521 >            except Exception, ex:
522 >                msg += '\n'+str(ex)  
523          return ErCode, msg
524 <  
525 <    '''
526 <    def checkSize()
524 >
525 >    def removeFile( self, sbi_dest, filetocopy, option ):
526 >        """  
527 >        """  
528 >        if self.debug : print 'removeFile():\n'
529 >        f_tocopy=filetocopy
530 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
531 >        try:
532 >            sbi_dest.delete( f_tocopy, opt=option )
533 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
534 >        except OperationException, ex:
535 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
536 >            msg += str(ex)
537 >            if self.debug :
538 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
539 >                dbgmsg += '\t'+str(ex.output)+'\n'
540 >                print dbgmsg
541 >            raise Exception(msg)
542 >
543 >        return
544 >
545 >    def updateReport(self, file, erCode, reason, lfn='', se='' ):
546          """
547 <        Using srm needed a check of the ouptut file size.  
547 >        Update the final stage out infos
548          """
549 <    
550 <        echo "--> remoteSize = $remoteSize"
551 <        ## for local file
552 <        localSize=$(stat -c%s "$path_out_file")
553 <        echo "-->  localSize = $localSize"
554 <        if [ $localSize != $remoteSize ]; then
555 <            echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
556 <            echo "Copy failed: removing remote file $destination"
557 <                srmrm $destination
558 <                cmscp_exit_status=60307
559 <      
560 <      
561 <                echo "Problem copying $path_out_file to $destination with srmcp command"
376 <                StageOutExitStatusReason='remote and local file dimension not match'
377 <                echo "StageOutReport = `cat ./srmcp.report`"
378 <    '''
379 <    def backup(self):
380 <        """
381 <        Check infos from TFC using existing api obtaining:
382 <        1)destination
383 <        2)protocol
549 >        jobStageInfo={}
550 >        jobStageInfo['erCode']=erCode
551 >        jobStageInfo['reason']=reason
552 >        jobStageInfo['lfn']=lfn
553 >        jobStageInfo['se']=se
554 >
555 >        report = { file : jobStageInfo}
556 >        return report
557 >
558 >    def finalReport( self , results ):
559 >        """
560 >        It a list of LFNs for each SE where data are stored.
561 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
562          """
563 +        outFile = open('cmscpReport.sh',"a")
564 +        cmscp_exit_status = 0
565 +        txt = ''
566 +        for file, dict in results.iteritems():
567 +            reason="'%s'"%dict['reason']
568 +            if file:
569 +                if dict['lfn']=='':
570 +                    lfn = '$LFNBaseName/'+os.path.basename(file)
571 +                    se  = '$SE'
572 +                else:
573 +                    lfn = dict['lfn']+os.path.basename(file)
574 +                    se = dict['se']
575 +                    
576 +                #dict['lfn'] # to be implemented
577 +                txt += 'echo "Report for File: '+file+'"\n'
578 +                txt += 'echo "LFN: '+lfn+'"\n'
579 +                txt += 'echo "StorageElement: '+se+'"\n'
580 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
581 +                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
582 +                #txt += 'export LFNBaseName='+lfn+'\n'
583 +                txt += 'export SE='+se+'\n'
584 +                
585 +                if dict['erCode'] != '0':
586 +                    cmscp_exit_status = dict['erCode']
587 +            else:
588 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
589 +                cmscp_exit_status = dict['erCode']
590 +                cmscp_exit_status = dict['erCode']
591 +        txt += '\n'
592 +        txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
593 +        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
594 +        outFile.write(str(txt))
595 +        outFile.close()
596          return
597  
387    def usage(self):
598  
599 <        msg="""
600 <        required parameters:
601 <        --source :: REMOTE           :      
602 <        --dest   :: REMOTE           :  
603 <        --debug             :
604 <        --inFile :: absPath : or name NOT RELATIVE PATH
605 <        --outFIle :: onlyNAME : NOT YET SUPPORTED
606 <
607 <        optional parameters      
608 <        """
609 <        return msg
599 > def usage():
600 >
601 >    msg="""
602 >    cmscp:
603 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
604 >        including success checking  version also for CAF using rfcp command to copy the output to SE
605 >
606 >    accepted parameters:
607 >       source           =
608 >       destination      =
609 >       inputFileList    =
610 >       outputFileList   =
611 >       protocol         =
612 >       option           =
613 >       middleware       =  
614 >       srm_version      =
615 >       destinationDir   =
616 >       lfn=             =
617 >       local_stage      =  activate stage fall back  
618 >       debug            =  activate verbose print out
619 >       help             =  print on line man and exit  
620 >    
621 >    mandatory:
622 >       * "source" and/or "destination" must always be defined
623 >       * either "middleware" or "protocol" must always be defined
624 >       * "inputFileList" must always be defined
625 >       * if "local_stage" = 1 also  "lfn" must be defined
626 >    """
627 >    print msg
628 >
629 >    return
630 >
631 > def HelpOptions(opts=[]):
632 >    """
633 >    Check otps, print help if needed
634 >    prepare dict = { opt : value }
635 >    """
636 >    dict_args = {}
637 >    if len(opts):
638 >        for opt, arg in opts:
639 >            dict_args[opt.split('--')[1]] = arg
640 >            if opt in ('-h','-help','--help') :
641 >                usage()
642 >                sys.exit(0)
643 >        return dict_args
644 >    else:
645 >        usage()
646 >        sys.exit(0)
647  
648   if __name__ == '__main__' :
649 +
650 +    import getopt
651 +
652 +    allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
653 +                  "protocol=","option=", "middleware=", "srm_version=", \
654 +                  "destinationDir=", "lfn=", "local_stage", "debug", "help"]
655 +    try:
656 +        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
657 +    except getopt.GetoptError, err:
658 +        print err
659 +        HelpOptions()
660 +        sys.exit(2)
661 +
662 +    dictArgs = HelpOptions(opts)
663      try:
664 <        cmscp_ = cmscp(sys.argv[1:])
664 >        cmscp_ = cmscp(dictArgs)
665          cmscp_.run()
666 <    except:
667 <        pass
666 >    except Exception, ex :
667 >        print str(ex)
668  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines