ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.4 by spiga, Fri Sep 26 11:15:35 2008 UTC vs.
Revision 1.65 by fanzago, Fri Dec 4 12:08:21 2009 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2 <
3 < import sys, getopt, string
4 < import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
2 > import sys, os
3 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
4   from ProdCommon.Storage.SEAPI.SBinterface import *
5 <
5 > from ProdCommon.Storage.SEAPI.Exceptions import *
6  
7  
8   class cmscp:
9 <    def __init__(self, argv):
9 >    def __init__(self, args):
10          """
11          cmscp
12 <
15 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
12 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
13          including success checking  version also for CAF using rfcp command to copy the output to SE
14          input:
15             $1 middleware (CAF, LSF, LCG, OSG)
16             $2 local file (the absolute path of output file or just the name if it's in top dir)
17             $3 if needed: file name (the output file name)
18             $5 remote SE (complete endpoint)
19 <           $6 srm version
19 >           $6 srm version
20 >           --lfn $LFNBaseName
21          output:
22               return 0 if all ok
23               return 60307 if srmcp failed
24               return 60303 if file already exists in the SE
25          """
26 +
27          #set default
28 +        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
29 +                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
30          self.debug = 0
31 <        self.source = ''
32 <        self.destination = ''
33 <        self.file_to_copy = []
33 <        self.remote_file_name = []
34 <        self.protocol = ''
35 <        self.middleware = ''
36 <        self.srmv = ''
37 <        self.lcgOpt='-b -D srmv2 --vo cms -t 2400 --verbose'  
38 <        self.opt=''
39 <        try:
40 <            opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
41 <                                                  "protocol=", "middleware=", "srm_version=", "debug", "help"])
42 <        except getopt.GetoptError:
43 <            print self.usage()
44 <            sys.exit(2)
45 <
46 <        self.setAndCheck(opts)  
47 <        
31 >        self.local_stage = 0
32 >        self.params.update( args )
33 >
34          return
35  
36 <    def setAndCheck( self, opts ):
36 >    def processOptions( self ):
37          """
38 <        Set and check command line parameter
38 >        check command line parameter
39          """
40 <        if not opts :
41 <            print self.usage()
42 <            sys.exit()
43 <        for opt, arg in opts :
44 <            if opt  == "--help" :
45 <                print self.usage()
46 <                sys.exit()
47 <            elif opt == "--debug" :
48 <                self.debug = 1
49 <            elif opt == "--source" :
50 <                self.source = arg
51 <            elif opt == "--destination":
52 <                self.destination = arg
53 <            elif opt == "--inputFileList":
68 <                infile = arg
69 <            elif opt == "--outputFileList":
70 <                out_file
71 <            elif opt == "--protocol":
72 <                self.protocol = arg
73 <            elif opt == "--middleware":
74 <                self.middleware = arg
75 <            elif opt == "--srm_version":
76 <                self.srmv = arg
77 <
78 <        # source and dest cannot be undefined at same time
79 <        if self.source == '' and self.destination == '':
80 <            print self.usage()
81 <            sys.exit()
82 <        # if middleware is not defined --> protocol cannot be empty  
83 <        if self.middleware == '' and self.protocol == '':
84 <            print self.usage()
85 <            sys.exit()
86 <        # input file must be defined  
87 <        if infile == '':
88 <            print self.usage()
89 <            sys.exit()
40 >        if 'help' in self.params.keys(): HelpOptions()
41 >        if 'debug' in self.params.keys(): self.debug = 1
42 >        if 'local_stage' in self.params.keys(): self.local_stage = 1
43 >
44 >        # source and dest cannot be undefined at same time
45 >        if not self.params['source']  and not self.params['destination'] :
46 >            HelpOptions()
47 >        # if middleware is not defined --> protocol cannot be empty
48 >        if not self.params['middleware'] and not self.params['protocol'] :
49 >            HelpOptions()
50 >
51 >        # input file must be defined
52 >        if not self.params['inputFileList'] :
53 >            HelpOptions()
54          else:
55 <            if infile.find(','):
56 <                [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
57 <            else:
58 <                self.file_to_copy.append(infile)
59 <        
55 >            file_to_copy=[]
56 >            if self.params['inputFileList'].find(','):
57 >                [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
58 >            else:
59 >                file_to_copy.append(self.params['inputFileList'])
60 >            self.params['inputFileList'] = file_to_copy
61 >
62 >        if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
63 >        
64          ## TO DO:
65          #### add check for outFiles
66          #### add map {'inFileNAME':'outFileNAME'} to change out name
67  
100        return
68  
69 <    def run( self ):  
69 >    def run( self ):
70          """
71 <        Check if running on UI (no $middleware) or
72 <        on WN (on the Grid), and take different action  
71 >        Check if running on UI (no $middleware) or
72 >        on WN (on the Grid), and take different action
73          """
74 <        if self.middleware :  
75 <           results = self.stager()
74 >        self.processOptions()
75 >        if self.debug: print 'calling run() : \n'
76 >        # stage out from WN
77 >        if self.params['middleware'] :
78 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
79 >            self.finalReport(results)
80 >        # Local interaction with SE
81          else:
82 <           results = self.copy( self.file_to_copy, self.protocol , self.opt)
82 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
83 >            return results
84  
85 <        self.finalReport(results,self.middleware)
85 >    def checkLcgUtils( self ):
86 >        """
87 >        _checkLcgUtils_
88 >        check the lcg-utils version and report
89 >        """
90 >        import commands
91 >        cmd = "lcg-cp --version | grep lcg_util"
92 >        status, output = commands.getstatusoutput( cmd )
93 >        num_ver = -1
94 >        if output.find("not found") == -1 or status == 0:
95 >            temp = output.split("-")
96 >            version = ""
97 >            if len(temp) >= 2:
98 >                version = output.split("-")[1]
99 >                temp = version.split(".")
100 >                if len(temp) >= 1:
101 >                    num_ver = int(temp[0])*10
102 >                    num_ver += int(temp[1])
103 >        return num_ver
104  
105 <        return
115 <    
116 <    def setProtocol( self ):    
105 >    def setProtocol( self, middleware ):
106          """
107          define the allowed potocols based on $middlware
108 <        which depend on scheduler
108 >        which depend on scheduler
109          """
110 <        if self.middleware.lower() in ['osg','lcg']:
111 <            supported_protocol = {'srm-lcg': self.lcgOpt }#,
112 <                               #   'srmv2' : '' }
113 <        elif self.middleware.lower() in ['lsf','caf']:
114 <            supported_protocol = {'rfio': '' }
110 >        # default To be used with "middleware"
111 >        if self.debug:
112 >            print 'setProtocol() :\n'
113 >            print '\tmiddleware =  %s utils \n'%middleware
114 >        
115 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
116 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
117 >        if self.checkLcgUtils() >= 17:
118 >            lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose',
119 >                    'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 2400 --verbose'}
120 >
121 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
122 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 '}
123 >        rfioOpt=''
124 >
125 >        supported_protocol = None
126 >        if middleware.lower() in ['osg','lcg','condor','sge']:
127 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
128 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
129 >        elif middleware.lower() in ['lsf','caf']:
130 >            supported_protocol = [('rfio',rfioOpt)]
131 >        elif middleware.lower() in ['pbs']:
132 >            supported_protocol = [('rfio',rfioOpt),('local','')]
133 >        elif middleware.lower() in ['arc']:
134 >            supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
135          else:
136 <            ## here we can add support for any kind of protocol,
136 >            ## here we can add support for any kind of protocol,
137              ## maybe some local schedulers need something dedicated
138              pass
139          return supported_protocol
140  
141 <    def stager( self ):              
141 >
142 >    def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
143          """
144 <        Implement the logic for remote stage out
144 >        Checks the status of copy and update result dictionary
145          """
146 <        protocols = self.setProtocol()  
147 <        count=0
148 <        list_files = self.file_to_copy
149 <        results={}  
150 <        for prot in protocols.keys():
151 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
152 <            copy_results = self.copy( list_files, prot, protocols[prot] )
153 <            list_retry = []
154 <            list_existing = []
155 <            list_ok = []
156 <            for file, dict in copy_results.iteritems():
157 <                er_code = dict['erCode']
158 <                if er_code == '60307': list_retry.append( file )
159 <                elif er_code == '60303': list_existing.append( file )
160 <                else:
161 <                    list_ok.append(file)
162 <                    reason = 'Copy succedeed with %s utils'%prot
163 <                    upDict = self.updateReport(file, er_code, reason)
164 <                    copy_results.update(upDict)
165 <            results.update(copy_results)
166 <            if len(list_ok) != 0:  
167 <                msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
168 <               # print msg
169 <            if len(list_ok) == len(list_files) :
170 <                break
146 >        list_retry = []
147 >        list_retry_localSE = []
148 >        list_not_existing = []
149 >        list_ok = []
150 >        
151 >        if self.debug:
152 >            print 'in checkCopy() :\n'
153 >        for file, dict in copy_results.iteritems():
154 >            er_code = dict['erCode']
155 >            if er_code == '0':
156 >                list_ok.append(file)
157 >                reason = 'Copy succedeed with %s utils'%prot
158 >                dict['reason'] = reason
159 >            elif er_code == '60302':
160 >                list_not_existing.append( file )
161 >            elif er_code == '10041':
162 >                list_retry.append( file )
163 >            ## WHAT TO DO IN GENERAL FAILURE CONDITION
164 >            else:
165 >                list_retry_localSE.append( file )
166 >                
167 >            if self.debug:
168 >                print "\t file %s \n"%file
169 >                print "\t dict['erCode'] %s \n"%dict['erCode']
170 >                print "\t dict['reason'] %s \n"%dict['reason']
171 >                
172 >            if (lfn != '') and (se != ''):
173 >                upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
174              else:
175 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
176 <                if len(list_retry): list_files = list_retry
177 <                else: break
178 <            count =+1  
175 >                upDict = self.updateReport(file, er_code, dict['reason'])
176 >
177 >            copy_results.update(upDict)
178 >        
179 >        msg = ''
180 >        if len(list_ok) != 0:
181 >            msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
182 >        if len(list_ok) != len_list_files :
183 >            msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
184 >            msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
185 >        if self.debug : print msg
186 >        
187 >        return copy_results, list_ok, list_retry, list_retry_localSE
188 >        
189 >    def LocalCopy(self, list_retry, results):
190 >        """
191 >        Tries the stage out to the CloseSE
192 >        """
193 >        if self.debug:
194 >            print 'in LocalCopy() :\n'
195 >            print '\t list_retry %s utils \n'%list_retry
196 >            print '\t len(list_retry) %s \n'%len(list_retry)
197 >                
198 >        list_files = list_retry  
199 >        self.params['inputFilesList']=list_files
200 >        
201 >        ### copy backup
202 >        from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
203 >        siteCfg = loadSiteLocalConfig()
204 >        seName = siteCfg.localStageOut.get("se-name", None)
205 >        catalog = siteCfg.localStageOut.get("catalog", None)
206 >        implName = siteCfg.localStageOut.get("command", None)
207 >        if (implName == 'srm'):
208 >           implName='srmv1'
209 >           self.params['srm_version']=implName
210 >        ##### to be improved ###############
211 >        if (implName == 'rfcp'):
212 >            self.params['middleware']='lsf'
213 >        ####################################    
214 >                  
215 >        self.params['protocol']=implName
216 >        tfc = siteCfg.trivialFileCatalog()
217 >            
218 >        if self.debug:
219 >            print '\t siteCFG %s \n'%siteCfg
220 >            print '\t seName %s \n'%seName
221 >            print '\t catalog %s \n'%catalog
222 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']            
223 >            print '\t tfc %s '%tfc
224 >            print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
225 >                
226 >        if (str(self.params['lfn']).find("/store/") != -1):
227 >            temp = str(self.params['lfn']).split("/store/")
228 >            self.params['lfn']= "/store/temp/" + temp[1]
229 >            
230 >        file_backup=[]
231 >        for input in self.params['inputFilesList']:
232 >            file = self.params['lfn'] + os.path.basename(input)
233 >            surl = tfc.matchLFN(tfc.preferredProtocol, file)
234 >            file_backup.append(surl)
235 >            if self.debug:
236 >                print '\t lfn %s \n'%self.params['lfn']
237 >                print '\t file %s \n'%file
238 >                print '\t surl %s \n'%surl
239 >                    
240 >        destination=os.path.dirname(file_backup[0])
241 >        if ( destination[-1] != '/' ) : destination = destination + '/'
242 >        self.params['destination']=destination
243 >            
244 >        if self.debug:
245 >            print "\t self.params['destination']%s \n"%self.params['destination']
246 >            print "\t self.params['protocol'] %s \n"%self.params['protocol']
247 >            print "\t self.params['option']%s \n"%self.params['option']
248 >              
249 >        for prot, opt in self.setProtocol( self.params['middleware'] ):
250 >            if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
251 >            localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
252 >            if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
253 >                results.update(localCopy_results)
254 >            else:
255 >                localCopy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
256 >                results.update(localCopy_results)
257 >                if len(list_ok) == len(list_files) :
258 >                    break
259 >                if len(list_retry):
260 >                    list_files = list_retry
261 >                else: break
262 >            if self.debug:
263 >                print "\t localCopy_results = %s \n"%localCopy_results
264 >        
265 >        return results        
266 >
267 >    def stager( self, middleware, list_files ):
268 >        """
269 >        Implement the logic for remote stage out
270 >        """
271  
272 <        #### TODO Daniele
273 <        #check is something fails and created related dict
274 <  #      backup = self.analyzeResults(results)
275 <  
276 <  #      if backup :  
277 <  #          msg = 'WARNING: backup logic is under implementation\n'
278 <  #          #backupDict = self.backup()
279 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name  
280 <  #          results.update(backupDict)
281 <  #          print msg
272 >        if self.debug:
273 >            print 'stager() :\n'
274 >            print '\tmiddleware %s\n'%middleware
275 >            print '\tlist_files %s\n'%list_files
276 >        
277 >        results={}
278 >        for prot, opt in self.setProtocol( middleware ):
279 >            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
280 >            copy_results = self.copy( list_files, prot, opt )
281 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
282 >                results.update(copy_results)
283 >            else:
284 >                copy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(copy_results, len(list_files), prot)
285 >                results.update(copy_results)
286 >                if len(list_ok) == len(list_files) :
287 >                    break
288 >                if len(list_retry):
289 >                    list_files = list_retry
290 >                else: break
291 >                
292 >        if self.local_stage:
293 >            if len(list_retry_localSE):
294 >                results = self.LocalCopy(list_retry_localSE, results)
295 >            
296 >        if self.debug:
297 >            print "\t results %s \n"%results
298          return results
299  
300      def initializeApi(self, protocol ):
301          """
302 <        Instantiate storage interface  
302 >        Instantiate storage interface
303          """
304 <        source_prot = protocol
305 <        dest_prot = protocol
306 <        if self.source == '' : source_prot = 'local'
307 <        Source_SE  = self.storageInterface( self.source, source_prot )
308 <        if self.destination == '' : dest_prot = 'local'
309 <        Destination_SE = self.storageInterface( self.destination, dest_prot )
304 >        if self.debug : print 'initializeApi() :\n'  
305 >        self.source_prot = protocol
306 >        self.dest_prot = protocol
307 >        if not self.params['source'] : self.source_prot = 'local'
308 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
309 >        if not self.params['destination'] : self.dest_prot = 'local'
310 >        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
311  
312          if self.debug :
313 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
314 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
313 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
314 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
315 >            print msg
316  
317          return Source_SE, Destination_SE
318  
319 <    def copy( self, list_file, protocol, opt):
319 >    def copy( self, list_file, protocol, options, backup='no' ):
320          """
321 <        Make the real file copy using SE API
321 >        Make the real file copy using SE API
322          """
323 +        msg = ""
324          if self.debug :
325 <            print 'copy(): using %s protocol'%protocol
326 <        Source_SE, Destination_SE = self.initializeApi( protocol )
325 >            msg  = 'copy() :\n'
326 >            msg += '\tusing %s protocol\n'%protocol
327 >            print msg
328 >        try:
329 >            Source_SE, Destination_SE = self.initializeApi( protocol )
330 >        except Exception, ex:
331 >            return self.updateReport('', '-1', str(ex))
332  
333 <        # create remote dir
334 <        if protocol in ['gridftp','rfio']:
335 <            self.createDir( Destination_SE, protocol )
333 >        # create remote dir
334 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
335 >            try:
336 >                self.createDir( Destination_SE, Destination_SE.protocol )
337 >            except OperationException, ex:
338 >                return self.updateReport('', '60316', str(ex))
339 >            ## when the client commands are not found (wrong env or really missing)
340 >            except MissingCommand, ex:
341 >                msg = "ERROR %s %s" %(str(ex), str(ex.detail))
342 >                return self.updateReport('', '10041', msg)
343  
344          ## prepare for real copy  ##
345 <        sbi = SBinterface( Source_SE, Destination_SE )
346 <        sbi_dest = SBinterface(Destination_SE)
345 >        try :
346 >            sbi = SBinterface( Source_SE, Destination_SE )
347 >            sbi_dest = SBinterface(Destination_SE)
348 >            sbi_source = SBinterface(Source_SE)
349 >        except ProtocolMismatch, ex:
350 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
351 >            msg += str(ex)
352 >            return self.updateReport('', '-1', msg)
353  
354          results = {}
355 <        ## loop over the complete list of files
356 <        for filetocopy in list_file:
357 <            if self.debug : print 'start real copy for %s'%filetocopy
358 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
359 <            if ErCode == '0':
360 <                ErCode, msg = self.makeCopy( sbi, filetocopy , opt)
361 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
355 >        ## loop over the complete list of files
356 >        for filetocopy in list_file:
357 >            if self.debug : print '\tStart real copy for %s'%filetocopy
358 >            try :
359 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
360 >            except Exception, ex:
361 >                ErCode = '60307'
362 >                msg = str(ex)  
363 >            if ErCode == '0':
364 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
365 >                if (ErCode == '0') and (backup == 'yes'):
366 >                    ErCode = '60308'
367 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
368              results.update( self.updateReport(filetocopy, ErCode, msg))
369          return results
222    
223    def updateReport(self, file, erCode, reason, lfn='', se='' ):
224        """
225        Update the final stage out infos
226        """
227        jobStageInfo={}
228        jobStageInfo['erCode']=erCode
229        jobStageInfo['reason']=reason
230        jobStageInfo['lfn']=lfn
231        jobStageInfo['se']=se
232
233        report = { file : jobStageInfo}
234        return report
370  
236    def finalReport( self , results, middleware ):
237        """
238        It should return a clear list of LFNs for each SE where data are stored.
239        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
240        """
241        if middleware:
242            outFile = open('cmscpReport.sh',"a")
243            cmscp_exit_status = 0
244            txt = ''
245            for file, dict in results.iteritems():
246                if dict['lfn']=='':
247                    lfn = '$LFNBaseName/'+os.path.basename(file)
248                    se  = '$SE'
249                else:
250                    lfn = dict['lfn']+os.pat.basename(file)
251                    se = dict['se']      
252                #dict['lfn'] # to be implemented
253                txt +=  'echo "Report for File: '+file+'"\n'
254                txt +=  'echo "LFN: '+lfn+'"\n'  
255                txt +=  'echo "StorageElement: '+se+'"\n'  
256                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
257                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
258                if dict['erCode'] != '0':
259                    cmscp_exit_status = dict['erCode']
260            txt += '\n'
261            txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
262            txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
263            outFile.write(str(txt))
264            outFile.close()
265        else:
266            for file, code in results.iteritems():
267                print 'error code = %s for file %s'%(code,file)
268        return
371  
372      def storageInterface( self, endpoint, protocol ):
373          """
374 <        Create the storage interface.
374 >        Create the storage interface.
375          """
376 +        if self.debug : print 'storageInterface():\n'
377          try:
378              interface = SElement( FullPath(endpoint), protocol )
379 <        except Exception, ex:
380 <            msg = ''
381 <            if self.debug : msg = str(ex)+'\n'
382 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
280 <            print msg
379 >        except ProtocolUnknown, ex:
380 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
381 >            msg += str(ex)
382 >            raise Exception(msg)
383  
384          return interface
385  
284    def checkDir(self, Destination_SE, protocol):
285        '''
286        ToBeImplemented NEEDED for castor
287        '''
288        return
289
386      def createDir(self, Destination_SE, protocol):
387          """
388 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
389 <        this should be transparent at SE API level.
388 >        Create remote dir for gsiftp REALLY TEMPORARY
389 >        this should be transparent at SE API level.
390          """
391 <        ErCode = '0'
392 <        msg_1 = ''
391 >        if self.debug : print 'createDir():\n'
392 >        msg = ''
393          try:
394              action = SBinterface( Destination_SE )
395              action.createDir()
396 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
397 <        except Exception, ex:
398 <            msg = ''
399 <            if self.debug : msg = str(ex)+'\n'
400 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
401 <            msg += msg_1
402 <            ErCode = '60316'  
403 <            #print msg
404 <
405 <        return ErCode, msg_1
396 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
397 >        except TransferException, ex:
398 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
399 >            msg += str(ex)
400 >            if self.debug :
401 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
402 >                dbgmsg += '\t'+str(ex.output)+'\n'
403 >                print dbgmsg
404 >            raise Exception(msg)
405 >        except OperationException, ex:
406 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
407 >            msg += str(ex)
408 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
409 >            raise Exception(msg)
410 >        except MissingDestination, ex:
411 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
412 >            msg += str(ex)
413 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
414 >            raise Exception(msg)
415 >        except AlreadyExistsException, ex:
416 >            if self.debug: print "\tThe directory already exist"
417 >            pass            
418 >        return msg
419  
420 <    def checkFileExist(self, sbi, filetocopy):
420 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
421          """
422 <        Check if file to copy already exist  
423 <        """
424 <        try:
425 <            check = sbi.checkExists(filetocopy)
317 <        except Exception, ex:
318 <            msg = ''
319 <            if self.debug : msg = str(ex)+'\n'
320 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
321 <           # print msg
422 >        Check both if source file exist AND
423 >        if destination file ALREADY exist.
424 >        """
425 >        if self.debug : print 'checkFileExist():\n'
426          ErCode = '0'
427          msg = ''
428 <        if check :
429 <            ErCode = '60303'
430 <            msg = "file %s already exist"%filetocopy
431 <            print msg
428 >        f_tocopy=filetocopy
429 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
430 >        try:
431 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option )
432 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
433 >        except OperationException, ex:
434 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
435 >            msg += str(ex)
436 >            if self.debug :
437 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
438 >                dbgmsg += '\t'+str(ex.output)+'\n'
439 >                print dbgmsg
440 >            raise Exception(msg)
441 >        except WrongOption, ex:
442 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
443 >            msg += str(ex)
444 >            if self.debug :
445 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
446 >                dbgmsg += '\t'+str(ex.output)+'\n'
447 >                print dbgmsg
448 >            raise Exception(msg)
449 >        except MissingDestination, ex:
450 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
451 >            msg += str(ex)
452 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
453 >            raise Exception(msg)
454 >        ## when the client commands are not found (wrong env or really missing)
455 >        except MissingCommand, ex:
456 >            ErCode = '10041'
457 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
458 >            return ErCode, msg
459 >        if not checkSource :
460 >            ErCode = '60302'
461 >            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
462 >            return ErCode, msg
463 >        f_tocopy=filetocopy
464 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
465 >        try:
466 >            check = sbi_dest.checkExists( f_tocopy, opt=option )
467 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
468 >        except OperationException, ex:
469 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
470 >            msg += str(ex)
471 >            if self.debug :
472 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
473 >                dbgmsg += '\t'+str(ex.output)+'\n'
474 >                print dbgmsg
475 >            raise Exception(msg)
476 >        except WrongOption, ex:
477 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
478 >            msg += str(ex)
479 >            if self.debug :
480 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
481 >                msg += '\t'+str(ex.output)+'\n'
482 >            raise Exception(msg)
483 >        except MissingDestination, ex:
484 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
485 >            msg += str(ex)
486 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
487 >            raise Exception(msg)
488 >        ## when the client commands are not found (wrong env or really missing)
489 >        except MissingCommand, ex:
490 >            ErCode = '10041'
491 >            msg = "ERROR %s %s" %(str(ex), str(ex.detail))
492 >            return ErCode, msg
493 >        if check :
494 >            ErCode = '60303'
495 >            msg = "file %s already exist"%os.path.basename(filetocopy)
496  
497 <        return ErCode,msg  
497 >        return ErCode, msg
498  
499 <    def makeCopy(self, sbi, filetocopy, opt ):  
499 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
500          """
501 <        call the copy API.  
501 >        call the copy API.
502          """
503 <        path = os.path.dirname(filetocopy)  
503 >        if self.debug : print 'makeCopy():\n'
504 >        path = os.path.dirname(filetocopy)
505          file_name =  os.path.basename(filetocopy)
506          source_file = filetocopy
507          dest_file = file_name ## to be improved supporting changing file name  TODO
508 <        if self.source == '' and path == '':
508 >        if self.params['source'] == '' and path == '':
509              source_file = os.path.abspath(filetocopy)
510 <        elif self.destination =='':
511 <            dest_file = os.path.join(os.getcwd(),file_name)
512 <        elif self.source != '' and self.destination != '' :
513 <            source_file = file_name  
510 >        elif self.params['destination'] =='':
511 >            destDir = self.params.get('destinationDir',os.getcwd())
512 >            dest_file = os.path.join(destDir,file_name)
513 >        elif self.params['source'] != '' and self.params['destination'] != '' :
514 >            source_file = file_name
515 >
516          ErCode = '0'
517          msg = ''
518 <
518 >
519 >        if  self.params['option'].find('space_token')>0:
520 >            space_token=self.params['option'].split('=')[1]
521 >            if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
522 >            if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
523          try:
524 <            pippo = sbi.copy( source_file , dest_file , opt)
525 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
526 <        except Exception, ex:
527 <            msg = ''
528 <            if self.debug : msg = str(ex)+'\n'
529 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
524 >            sbi.copy( source_file , dest_file , opt = option)
525 >        except TransferException, ex:
526 >            msg  = "Problem copying %s file" % filetocopy
527 >            msg += str(ex)
528 >            if self.debug :
529 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
530 >                dbgmsg += '\t'+str(ex.output)+'\n'
531 >                print dbgmsg
532              ErCode = '60307'
533 <            #print msg
534 <
533 >        except WrongOption, ex:
534 >            msg  = "Problem copying %s file" % filetocopy
535 >            msg += str(ex)
536 >            if self.debug :
537 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
538 >                dbgmsg += '\t'+str(ex.output)+'\n'
539 >                print dbgmsg
540 >        except SizeZeroException, ex:
541 >            msg  = "Problem copying %s file" % filetocopy
542 >            msg += str(ex)
543 >            if self.debug :
544 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
545 >                dbgmsg += '\t'+str(ex.output)+'\n'
546 >                print dbgmsg
547 >            ErCode = '60307'
548 >        ## when the client commands are not found (wrong env or really missing)
549 >        except MissingCommand, ex:
550 >            ErCode = '10041'
551 >            msg  = "Problem copying %s file" % filetocopy
552 >            msg += str(ex)
553 >            if self.debug :
554 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
555 >                dbgmsg += '\t'+str(ex.output)+'\n'
556 >                print dbgmsg
557 >        except AuthorizationException, ex:
558 >            ErCode = '60307'
559 >            msg  = "Problem copying %s file" % filetocopy
560 >            msg += str(ex)
561 >            if self.debug :
562 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
563 >                dbgmsg += '\t'+str(ex.output)+'\n'
564 >                print dbgmsg
565 >        if ErCode == '0' and protocol.find('srmv') == 0:
566 >            remote_file_size = -1
567 >            local_file_size = os.path.getsize( source_file )
568 >            try:
569 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option )
570 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
571 >            except TransferException, ex:
572 >                msg  = "Problem checking the size of %s file" % filetocopy
573 >                msg += str(ex)
574 >                if self.debug :
575 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
576 >                    dbgmsg += '\t'+str(ex.output)+'\n'
577 >                    print dbgmsg
578 >                ErCode = '60307'
579 >            except WrongOption, ex:
580 >                msg  = "Problem checking the size of %s file" % filetocopy
581 >                msg += str(ex)
582 >                if self.debug :
583 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
584 >                    dbgmsg += '\t'+str(ex.output)+'\n'
585 >                    print dbgmsg
586 >                ErCode = '60307'
587 >            if local_file_size != remote_file_size:
588 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
589 >                ErCode = '60307'
590 >
591 >        if ErCode != '0':
592 >            try :
593 >                self.removeFile( sbi_dest, dest_file, option )
594 >            except Exception, ex:
595 >                msg += '\n'+str(ex)  
596          return ErCode, msg
597 <  
598 <    '''
599 <    def checkSize()
597 >
598 >    def removeFile( self, sbi_dest, filetocopy, option ):
599 >        """  
600 >        """  
601 >        if self.debug : print 'removeFile():\n'
602 >        f_tocopy=filetocopy
603 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
604 >        try:
605 >            sbi_dest.delete( f_tocopy, opt=option )
606 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
607 >        except OperationException, ex:
608 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
609 >            msg += str(ex)
610 >            if self.debug :
611 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
612 >                dbgmsg += '\t'+str(ex.output)+'\n'
613 >                print dbgmsg
614 >            raise Exception(msg)
615 >
616 >        return
617 >
618 >    def updateReport(self, file, erCode, reason, lfn='', se='' ):
619          """
620 <        Using srm needed a check of the ouptut file size.  
620 >        Update the final stage out infos
621          """
622 <    
623 <        echo "--> remoteSize = $remoteSize"
624 <        ## for local file
625 <        localSize=$(stat -c%s "$path_out_file")
626 <        echo "-->  localSize = $localSize"
627 <        if [ $localSize != $remoteSize ]; then
628 <            echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
629 <            echo "Copy failed: removing remote file $destination"
630 <                srmrm $destination
631 <                cmscp_exit_status=60307
632 <      
633 <      
634 <                echo "Problem copying $path_out_file to $destination with srmcp command"
378 <                StageOutExitStatusReason='remote and local file dimension not match'
379 <                echo "StageOutReport = `cat ./srmcp.report`"
380 <    '''
381 <    def backup(self):
382 <        """
383 <        Check infos from TFC using existing api obtaining:
384 <        1)destination
385 <        2)protocol
622 >        jobStageInfo={}
623 >        jobStageInfo['erCode']=erCode
624 >        jobStageInfo['reason']=reason
625 >        jobStageInfo['lfn']=lfn
626 >        jobStageInfo['se']=se
627 >
628 >        report = { file : jobStageInfo}
629 >        return report
630 >
631 >    def finalReport( self , results ):
632 >        """
633 >        It a list of LFNs for each SE where data are stored.
634 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
635          """
636 +        outFile = open('cmscpReport.sh',"a")
637 +        cmscp_exit_status = 0
638 +        txt = ''
639 +        for file, dict in results.iteritems():
640 +            reason = str(dict['reason'])
641 +            if str(reason).find("'") > -1:
642 +                reason = " ".join(reason.split("'"))
643 +            reason="'%s'"%reason
644 +            if file:
645 +                if dict['lfn']=='':
646 +                    lfn = '$LFNBaseName'+os.path.basename(file)
647 +                    se  = '$SE'
648 +                else:
649 +                    lfn = dict['lfn']+os.path.basename(file)
650 +                    se = dict['se']
651 +                #dict['lfn'] # to be implemented
652 +                txt += 'echo "Report for File: '+file+'"\n'
653 +                txt += 'echo "LFN: '+lfn+'"\n'
654 +                txt += 'echo "StorageElement: '+se+'"\n'
655 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
656 +                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
657 +                LFNBaseName = os.path.dirname(lfn)
658 +                if (LFNBaseName[-1] != '/'):
659 +                   LFNBaseName = LFNBaseName + '/'
660 +                txt += 'export LFNBaseName='+LFNBaseName+'\n'
661 +                txt += 'export SE='+se+'\n'
662 +
663 +                txt += 'export endpoint='+self.params['destination']+'\n'
664 +                
665 +                if dict['erCode'] != '0':
666 +                    cmscp_exit_status = dict['erCode']
667 +            else:
668 +                txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
669 +                cmscp_exit_status = dict['erCode']
670 +        txt += '\n'
671 +        txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
672 +        txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
673 +        outFile.write(str(txt))
674 +        outFile.close()
675          return
676  
389    def usage(self):
677  
678 <        msg="""
679 <        required parameters:
680 <        --source        :: REMOTE           :      
681 <        --destination   :: REMOTE           :  
682 <        --debug             :
683 <        --inFile :: absPath : or name NOT RELATIVE PATH
684 <        --outFIle :: onlyNAME : NOT YET SUPPORTED
685 <
686 <        optional parameters      
687 <        """
688 <        return msg
678 > def usage():
679 >
680 >    msg="""
681 >    cmscp:
682 >        safe copy of local file  to/from remote SE via lcg_cp/srmcp,
683 >        including success checking  version also for CAF using rfcp command to copy the output to SE
684 >
685 >    accepted parameters:
686 >       source           =
687 >       destination      =
688 >       inputFileList    =
689 >       outputFileList   =
690 >       protocol         =
691 >       option           =
692 >       middleware       =  
693 >       srm_version      =
694 >       destinationDir   =
695 >       lfn=             =
696 >       local_stage      =  activate stage fall back  
697 >       debug            =  activate verbose print out
698 >       help             =  print on line man and exit  
699 >    
700 >    mandatory:
701 >       * "source" and/or "destination" must always be defined
702 >       * either "middleware" or "protocol" must always be defined
703 >       * "inputFileList" must always be defined
704 >       * if "local_stage" = 1 also  "lfn" must be defined
705 >    """
706 >    print msg
707 >
708 >    return
709 >
710 > def HelpOptions(opts=[]):
711 >    """
712 >    Check otps, print help if needed
713 >    prepare dict = { opt : value }
714 >    """
715 >    dict_args = {}
716 >    if len(opts):
717 >        for opt, arg in opts:
718 >            dict_args[opt.split('--')[1]] = arg
719 >            if opt in ('-h','-help','--help') :
720 >                usage()
721 >                sys.exit(0)
722 >        return dict_args
723 >    else:
724 >        usage()
725 >        sys.exit(0)
726  
727   if __name__ == '__main__' :
728 +
729 +    import getopt
730 +
731 +    allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
732 +                  "protocol=","option=", "middleware=", "srm_version=", \
733 +                  "destinationDir=", "lfn=", "local_stage", "debug", "help"]
734 +    try:
735 +        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
736 +    except getopt.GetoptError, err:
737 +        print err
738 +        HelpOptions()
739 +        sys.exit(2)
740 +
741 +    dictArgs = HelpOptions(opts)
742      try:
743 <        cmscp_ = cmscp(sys.argv[1:])
743 >        cmscp_ = cmscp(dictArgs)
744          cmscp_.run()
745 <    except:
746 <        pass
745 >    except Exception, ex :
746 >        print str(ex)
747  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines