ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.1 by spiga, Sun Sep 21 10:34:04 2008 UTC vs.
Revision 1.37 by mcinquil, Wed Jan 21 11:51:36 2009 UTC

# Line 1 | Line 1
1 < #!/usr/bin/env python
1 > ##!/usr/bin/env python
2  
3 < import sys, getopt, string
4 < import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
3 > import sys, os
4 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5   from ProdCommon.Storage.SEAPI.SBinterface import *
6 <
6 > from ProdCommon.Storage.SEAPI.Exceptions import *
7  
8  
9   class cmscp:
10 <    def __init__(self, argv):
10 >    def __init__(self, args):
11          """
12          cmscp
13  
14 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
14 >        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15          including success checking  version also for CAF using rfcp command to copy the output to SE
16          input:
17             $1 middleware (CAF, LSF, LCG, OSG)
18 <           $2 local file (the physical path of output file respect to current working directory)
19 <           $3 file name (the output file name)
20 <           $4 remote SE_PATH (absolute)
21 <           $5 remote SE
23 <           $6 srm version (only in the case of LCG or OSG)
18 >           $2 local file (the absolute path of output file or just the name if it's in top dir)
19 >           $3 if needed: file name (the output file name)
20 >           $5 remote SE (complete endpoint)
21 >           $6 srm version
22          output:
23               return 0 if all ok
24               return 60307 if srmcp failed
25               return 60303 if file already exists in the SE
26          """
27 +
28          #set default
29 +        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
30 +                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31          self.debug = 0
32 <        self.source = ''
33 <        self.destination = ''
34 <        self.file_to_copy = []
34 <        self.remote_file_name = []
35 <        self.protocol = ''
36 <        self.middleware = ''
37 <        self.srmv = ''
38 <  
39 <        try:
40 <            opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
41 <                                                  "protocol=", "middleware=", "srm_version=", "debug", "help"])
42 <        except getopt.GetoptError:
43 <            print self.usage()
44 <            sys.exit(2)
45 <
46 <        self.setAndCheck(opts)  
47 <        
32 >
33 >        self.params.update( args )
34 >
35          return
36  
37 <    def setAndCheck( self, opts ):
37 >    def processOptions( self ):
38          """
39 <        Set and check command line parameter
39 >        check command line parameter
40          """
41 <        if not opts :
42 <            print self.usage()
43 <            sys.exit()
44 <        for opt, arg in opts :
45 <            if opt  == "--help" :
46 <                print self.usage()
47 <                sys.exit()
48 <            elif opt == "--debug" :
49 <                self.debug = 1
50 <            elif opt == "--source" :
51 <                self.source = arg
52 <            elif opt == "--destination":
53 <                self.destination = arg
67 <            elif opt == "--inputFileList":
68 <                infile = arg
69 <            elif opt == "--outputFileList":
70 <                out_file
71 <            elif opt == "--protocol":
72 <                self.protocol = arg
73 <            elif opt == "--middleware":
74 <                self.middleware = arg
75 <            elif opt == "--srm_version":
76 <                self.srmv = arg
77 <
78 <        # source and dest cannot be undefined at same time
79 <        if self.source == '' and self.destination == '':
80 <            print self.usage()
81 <            sys.exit()
82 <        # if middleware is not defined --> protocol cannot be empty  
83 <        if self.middleware == '' and self.protocol == '':
84 <            print self.usage()
85 <            sys.exit()
86 <        # input file must be defined  
87 <        if infile == '':
88 <            print self.usage()
89 <            sys.exit()
41 >        if 'help' in self.params.keys(): HelpOptions()
42 >        if 'debug' in self.params.keys(): self.debug = 1
43 >
44 >        # source and dest cannot be undefined at same time
45 >        if not self.params['source']  and not self.params['destination'] :
46 >            HelpOptions()
47 >        # if middleware is not defined --> protocol cannot be empty
48 >        if not self.params['middleware'] and not self.params['protocol'] :
49 >            HelpOptions()
50 >
51 >        # input file must be defined
52 >        if not self.params['inputFileList'] :
53 >            HelpOptions()
54          else:
55 <            if infile.find(','):
56 <                [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
57 <            else:
58 <                self.file_to_copy.append(infile)
59 <        
55 >            file_to_copy=[]
56 >            if self.params['inputFileList'].find(','):
57 >                [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
58 >            else:
59 >                file_to_copy.append(self.params['inputFileList'])
60 >            self.params['inputFileList'] = file_to_copy
61 >
62          ## TO DO:
63          #### add check for outFiles
64          #### add map {'inFileNAME':'outFileNAME'} to change out name
65  
100        return
66  
67 <    def run( self ):  
67 >    def run( self ):
68          """
69 <        Check if running on UI (no $middleware) or
70 <        on WN (on the Grid), and take different action  
69 >        Check if running on UI (no $middleware) or
70 >        on WN (on the Grid), and take different action
71          """
72 <        if self.middleware :  
73 <           results = self.stager()
72 >        self.processOptions()
73 >        if self.debug: print 'calling run() : \n'
74 >        # stage out from WN
75 >        if self.params['middleware'] :
76 >            results = self.stager(self.params['middleware'],self.params['inputFileList'])
77 >            self.finalReport(results)
78 >        # Local interaction with SE
79          else:
80 <           results = self.copy( self.file_to_copy, self.protocol )
81 <
112 <        self.finalReport(results,self.middleware)
80 >            results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
81 >            return results
82  
83 <        return
115 <    
116 <    def setProtocol( self ):    
83 >    def setProtocol( self, middleware ):
84          """
85          define the allowed potocols based on $middlware
86 <        which depend on scheduler
86 >        which depend on scheduler
87          """
88 <        if self.middleware.lower() in ['osg','lcg']:
89 <            supported_protocol = ['srm-lcg','srmv2']
90 <        elif self.middleware.lower() in ['lsf','caf']:
91 <            supported_protocol = ['rfio']
88 >        # default To be used with "middleware"
89 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
90 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
91 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
92 >                'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 '}
93 >        rfioOpt=''
94 >
95 >        supported_protocol = None
96 >        if middleware.lower() in ['osg','lcg','condor']:
97 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
98 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
99 >        elif middleware.lower() in ['lsf','caf']:
100 >            supported_protocol = [('rfio',rfioOpt)]
101          else:
102 <            ## here we can add support for any kind of protocol,
102 >            ## here we can add support for any kind of protocol,
103              ## maybe some local schedulers need something dedicated
104              pass
105          return supported_protocol
106  
107 <    def stager( self ):              
107 > #   def checkCopy(self, copy_results, list_files):
108          """
109 <        Implement the logic for remote stage out
110 <        """
111 <        protocols = self.setProtocol()  
112 <        count=0
113 <        list_files = self.file_to_copy
114 <        results={}  
115 <        for prot in protocols:
140 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
141 <            copy_results = self.copy( list_files, prot )
142 <            list_retry = []
143 <            list_existing = []
144 <            list_ok = []
109 >        #results={}
110 >        list_retry = []
111 >        list_existing = []
112 >        list_ok = []
113 >        if copy_results.keys() == '':
114 >            self.results.update(copy_results)
115 >        else:
116              for file, dict in copy_results.iteritems():
117                  er_code = dict['erCode']
118 <                if er_code == '60307': list_retry.append( file )
148 <                elif er_code == '60303': list_existing.append( file )
149 <                else:
118 >                if er_code == '0':
119                      list_ok.append(file)
120                      reason = 'Copy succedeed with %s utils'%prot
121                      upDict = self.updateReport(file, er_code, reason)
122 <                    copy_results.update(upDict)
122 >                    copy_results.update(upDict)
123 >                elif er_code == '60303': list_existing.append( file )
124 >                else: list_retry.append( file )
125              results.update(copy_results)
126 <            if len(list_ok) != 0:  
126 >            if len(list_ok) != 0:
127                  msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
128 <               # print msg
129 <            if len(list_ok) == len(list_files) :
130 <                break
128 >                if self.debug : print msg
129 >            if len(list_ok) == len(list_files) :
130 >                msg = 'Copy of  all files succedeed\n'
131 >                #break
132              else:
133 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
134 <                if len(list_retry): list_files = list_retry
135 <                else: break
136 <            count =+1  
133 >                if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
134 >                #if len(list_retry): list_files = list_retry
135 >        return list_retry, results        
136 >        
137 >        """
138 >    def stager( self, middleware, list_files ):
139 >        """
140 >        Implement the logic for remote stage out
141 >        """
142  
143 <        #### TODO Daniele
143 >        if self.debug: print 'stager() :\n'
144 >        results={}
145 >        for prot, opt in self.setProtocol( middleware ):
146 >            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
147 >            copy_results = self.copy( list_files, prot, opt )
148 >            ######## to define a new function checkCopy ################
149 >            #list_retry, self.results = self.checkCopy(copy_results, list_files)
150 >        #def checkCopy (self, copy_results):
151 >        #    """
152 >        #    """
153 >        #    results={}
154 >            list_retry = []
155 >            list_existing = []
156 >            list_ok = []
157 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
158 >                results.update(copy_results)
159 >            else:
160 >                for file, dict in copy_results.iteritems():
161 >                    er_code = dict['erCode']
162 >                    if er_code == '0':
163 >                        list_ok.append(file)
164 >                        reason = 'Copy succedeed with %s utils'%prot
165 >                        upDict = self.updateReport(file, er_code, reason)
166 >                        copy_results.update(upDict)
167 >                    elif er_code in ['60303','60302']: list_existing.append( file )
168 >                    else: list_retry.append( file )
169 >                results.update(copy_results)
170 >                msg = ''
171 >                if len(list_ok) != 0:
172 >                    msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
173 >                if len(list_ok) == len(list_files) :
174 >                    break
175 >                else:
176 >                    if self.debug: msg += '\tCopy of %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
177 >                    if len(list_retry): list_files = list_retry
178 >                    else: break
179 >                if self.debug : print msg
180 >            """
181 >            if len(list_retry):
182 >               list_files = list_retry
183 >            #def backupCopy(list_retry)
184 >               print "in backup"
185 >               self.params['inputFilesList']=list_files
186 >               ### copy backup
187 >               from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
188 >               siteCfg = loadSiteLocalConfig()
189 >               #print siteCfg
190 >               seName = siteCfg.localStageOut.get("se-name", None)
191 >               #print  "seName = ", seName
192 >               self.params['destination']=seName
193 >               #catalog = siteCfg.localStageOut.get("catalog", None)
194 >               #print "catalog = ", catalog
195 >               implName = siteCfg.localStageOut.get("command", None)
196 >               print "implName = ", implName
197 >               if (implName == 'srm'):
198 >                  implName='srmv2'
199 >               self.params['protocol']=implName
200 >               tfc = siteCfg.trivialFileCatalog()
201 >               #print "tfc = ", tfc
202 >               print " self.params['inputFilesList'] = ", self.params['inputFilesList']
203 >               file_backup=[]
204 >               for input in self.params['inputFilesList']:
205 >                   ### to add the correct lfn, passed as argument of cmscp function (--lfn xxxx)
206 >                   file = '/store/'+input
207 >                   pfn = tfc.matchLFN(tfc.preferredProtocol, file)
208 >                   print "pfn = ", pfn
209 >                   file_backup.append(pfn)
210 >               self.params['inputFilesList'] = file_backup
211 >               print "#########################################"
212 >               print "self.params['inputFilesList'] = ", self.params['inputFilesList']
213 >               print "self.params['protocol'] = ", self.params['protocol']
214 >               print "self.params['option'] = ", self.params['option']
215 >               self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'])
216 >               print "#########################################"
217 >               ###list_retry, self.results = checkCopy(copy_results)
218 >                   #check is something fails and created related dict
219 >                   #        backup = self.analyzeResults(results)
220 >                   #        if backup :
221 >                   #            msg = 'WARNING: backup logic is under implementation\n'
222 >                   #            #backupDict = self.backup()
223 >                   #            ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
224 >                   #            results.update(backupDict)
225 >                   #            print msg
226 >            """
227 >        #### TODO Daniele
228          #check is something fails and created related dict
229 <  #      backup = self.analyzeResults(results)
230 <  
231 <  #      if backup :  
229 >  #      backup = self.analyzeResults(results)
230 >
231 >  #      if backup :
232    #          msg = 'WARNING: backup logic is under implementation\n'
233    #          #backupDict = self.backup()
234 <  #          ### NOTA DEVE RITORNARE UN DICT comprensivo di LFN SE Name  
234 >  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
235    #          results.update(backupDict)
236    #          print msg
237          return results
238  
239      def initializeApi(self, protocol ):
240          """
241 <        Instantiate storage interface  
241 >        Instantiate storage interface
242          """
243 <        source_prot = protocol
244 <        dest_prot = protocol
245 <        if self.source == '' : source_prot = 'local'
246 <        Source_SE  = self.storageInterface( self.source, source_prot )
247 <        if self.destination == '' : dest_prot = 'local'
248 <        Destination_SE = self.storageInterface( self.destination, dest_prot )
243 >        if self.debug : print 'initializeApi() :\n'  
244 >        self.source_prot = protocol
245 >        self.dest_prot = protocol
246 >        if not self.params['source'] : self.source_prot = 'local'
247 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
248 >        if not self.params['destination'] : self.dest_prot = 'local'
249 >        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
250  
251          if self.debug :
252 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
253 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
252 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
253 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
254 >            print msg
255  
256          return Source_SE, Destination_SE
257  
258 <    def copy( self, list_file, protocol ):
258 >    def copy( self, list_file, protocol, options ):
259          """
260 <        Make the real file copy using SE API
260 >        Make the real file copy using SE API
261          """
262 +        msg = ""
263          if self.debug :
264 <            print 'copy(): using %s protocol'%protocol
265 <        Source_SE, Destination_SE = self.initializeApi( protocol )
264 >            msg  = 'copy() :\n'
265 >            msg += '\tusing %s protocol\n'%protocol
266 >            print msg
267 >        try:
268 >            Source_SE, Destination_SE = self.initializeApi( protocol )
269 >        except Exception, ex:
270 >            return self.updateReport('', '-1', str(ex))
271  
272 <        # create remote dir
273 <        if protocol in ['gridftp','rfio']:
274 <            self.createDir( Destination_SE, protocol )
272 >        # create remote dir
273 >        if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
274 >            try:
275 >                self.createDir( Destination_SE, Destination_SE.protocol )
276 >            except Exception, ex:
277 >                return self.updateReport('', '60316', str(ex))
278  
279          ## prepare for real copy  ##
280 <        sbi = SBinterface( Source_SE, Destination_SE )
281 <        sbi_dest = SBinterface(Destination_SE)
280 >        try :
281 >            sbi = SBinterface( Source_SE, Destination_SE )
282 >            sbi_dest = SBinterface(Destination_SE)
283 >            sbi_source = SBinterface(Source_SE)
284 >        except ProtocolMismatch, ex:
285 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
286 >            msg += str(ex)
287 >            return self.updateReport('', '-1', msg)
288  
289          results = {}
290 <        ## loop over the complete list of files
291 <        for filetocopy in list_file:
292 <            if self.debug : print 'start real copy for %s'%filetocopy
293 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
294 <            if ErCode == '0':
295 <                ErCode, msg = self.makeCopy( sbi, filetocopy )
296 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
290 >        ## loop over the complete list of files
291 >        for filetocopy in list_file:
292 >            if self.debug : print '\tStart real copy for %s'%filetocopy
293 >            try :
294 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
295 >            except Exception, ex:
296 >                ErCode = -1
297 >                msg = str(ex)  
298 >            if ErCode == '0':
299 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
300 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
301              results.update( self.updateReport(filetocopy, ErCode, msg))
302          return results
221    
222    def updateReport(self, file, erCode, reason, lfn='', se='' ):
223        """
224        Update the final stage out infos
225        """
226        jobStageInfo={}
227        jobStageInfo['erCode']=erCode
228        jobStageInfo['reason']=reason
229        jobStageInfo['lfn']=lfn
230        jobStageInfo['se']=se
231
232        report = { file : jobStageInfo}
233        return report
303  
235    def finalReport( self , results, middleware ):
236        """
237        It should return a clear list of LFNs for each SE where data are stored.
238        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
239        """
240        if middleware:
241            outFile = open('cmscpReport.sh',"a")
242            cmscp_exit_status = 0
243            txt = ''
244            for file, dict in results.iteritems():
245                if dict['lfn']=='':
246                    lfn = '$LFNBaseName/'+os.path.basename(file)
247                    se  = '$SE'
248                else:
249                    lfn = dict['lfn']+os.pat.basename(file)
250                    se = dict['se']      
251                #dict['lfn'] # to be implemented
252                txt +=  'echo "Report for File: '+file+'"\n'
253                txt +=  'echo "LFN: '+lfn+'"\n'  
254                txt +=  'echo "StorageElement: '+se+'"\n'  
255                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
256                txt += 'echo "StageOutSE = '+dict['se']+'" >> $RUNTIME_AREA/$repo\n'
257                if dict['erCode'] != '0':
258                    cmscp_exit_status = dict['erCode']
259            txt += '\n'
260            txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
261            txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
262            outFile.write(str(txt))
263            outFile.close()
264        else:
265            for file, code in results.iteritems():
266                print 'error code = %s for file %s'%(code,file)
267        return
304  
305      def storageInterface( self, endpoint, protocol ):
306          """
307 <        Create the storage interface.
307 >        Create the storage interface.
308          """
309 +        if self.debug : print 'storageInterface():\n'
310          try:
311              interface = SElement( FullPath(endpoint), protocol )
312 <        except Exception, ex:
313 <            msg = ''
314 <            if self.debug : msg = str(ex)+'\n'
315 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
279 <            print msg
312 >        except ProtocolUnknown, ex:
313 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
314 >            msg += str(ex)
315 >            raise Exception(msg)
316  
317          return interface
318  
283    def checkDir(self, Destination_SE, protocol):
284        '''
285        ToBeImplemented NEEDED for castor
286        '''
287        return
288
319      def createDir(self, Destination_SE, protocol):
320          """
321 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
322 <        this should be transparent at SE API level.
321 >        Create remote dir for gsiftp REALLY TEMPORARY
322 >        this should be transparent at SE API level.
323          """
324 <        ErCode = '0'
325 <        msg_1 = ''
324 >        if self.debug : print 'createDir():\n'
325 >        msg = ''
326          try:
327              action = SBinterface( Destination_SE )
328              action.createDir()
329 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
330 <        except Exception, ex:
331 <            msg = ''
332 <            if self.debug : msg = str(ex)+'\n'
333 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
334 <            msg += msg_1
335 <            ErCode = '60316'  
336 <            #print msg
329 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
330 >        except TransferException, ex:
331 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
332 >            msg += str(ex)
333 >            if self.debug :
334 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
335 >                dbgmsg += '\t'+str(ex.output)+'\n'
336 >                print dbgmsg
337 >            raise Exception(msg)
338 >        except OperationException, ex:
339 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
340 >            msg += str(ex)
341 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
342 >            raise Exception(msg)
343 >        except MissingDestination, ex:
344 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
345 >            msg += str(ex)
346 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
347 >            raise Exception(msg)
348 >        except AlreadyExistsException, ex:
349 >            if self.debug: print "\tThe directory already exist"
350 >            pass            
351 >        return msg
352  
353 <        return ErCode, msg_1
309 <
310 <    def checkFileExist(self, sbi, filetocopy):
353 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
354          """
355 <        Check if file to copy already exist  
356 <        """
357 <        try:
358 <            check = sbi.checkExists(filetocopy)
316 <        except Exception, ex:
317 <            msg = ''
318 <            if self.debug : msg = str(ex)+'\n'
319 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
320 <           # print msg
355 >        Check both if source file exist AND
356 >        if destination file ALREADY exist.
357 >        """
358 >        if self.debug : print 'checkFileExist():\n'
359          ErCode = '0'
360          msg = ''
361 <        if check :
362 <            ErCode = '60303'
363 <            msg = "file %s already exist"%filetocopy
364 <            print msg
361 >        f_tocopy=filetocopy
362 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
363 >        try:
364 >            checkSource = sbi_source.checkExists( f_tocopy , opt=option )
365 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
366 >        except OperationException, ex:
367 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
368 >            msg += str(ex)
369 >            if self.debug :
370 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
371 >                dbgmsg += '\t'+str(ex.output)+'\n'
372 >                print dbgmsg
373 >            raise Exception(msg)
374 >        except WrongOption, ex:
375 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
376 >            msg += str(ex)
377 >            if self.debug :
378 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
379 >                dbgmsg += '\t'+str(ex.output)+'\n'
380 >                print dbgmsg
381 >            raise Exception(msg)
382 >        except MissingDestination, ex:
383 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
384 >            msg += str(ex)
385 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
386 >            raise Exception(msg)
387 >        if not checkSource :
388 >            ErCode = '60302'
389 >            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
390 >            return ErCode, msg
391 >        f_tocopy=filetocopy
392 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
393 >        try:
394 >            check = sbi_dest.checkExists( f_tocopy, opt=option )
395 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
396 >        except OperationException, ex:
397 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
398 >            msg += str(ex)
399 >            if self.debug :
400 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
401 >                dbgmsg += '\t'+str(ex.output)+'\n'
402 >                print dbgmsg
403 >            raise Exception(msg)
404 >        except WrongOption, ex:
405 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
406 >            msg += str(ex)
407 >            if self.debug :
408 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
409 >                msg += '\t'+str(ex.output)+'\n'
410 >            raise Exception(msg)
411 >        except MissingDestination, ex:
412 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
413 >            msg += str(ex)
414 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
415 >            raise Exception(msg)
416 >        if check :
417 >            ErCode = '60303'
418 >            msg = "file %s already exist"%os.path.basename(filetocopy)
419  
420 <        return ErCode,msg  
420 >        return ErCode, msg
421  
422 <    def makeCopy(self, sbi, filetocopy ):  
422 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
423          """
424 <        call the copy API.  
424 >        call the copy API.
425          """
426 <        path = os.path.dirname(filetocopy)  
426 >        if self.debug : print 'makeCopy():\n'
427 >        path = os.path.dirname(filetocopy)
428          file_name =  os.path.basename(filetocopy)
429          source_file = filetocopy
430          dest_file = file_name ## to be improved supporting changing file name  TODO
431 <        if self.source == '' and path == '':
431 >        if self.params['source'] == '' and path == '':
432              source_file = os.path.abspath(filetocopy)
433 <        elif self.destination =='':
434 <            dest_file = os.path.join(os.getcwd(),file_name)
435 <        elif self.source != '' and self.destination != '' :
436 <            source_file = file_name  
433 >        elif self.params['destination'] =='':
434 >            destDir = self.params.get('destinationDir',os.getcwd())
435 >            dest_file = os.path.join(destDir,file_name)
436 >        elif self.params['source'] != '' and self.params['destination'] != '' :
437 >            source_file = file_name
438 >
439          ErCode = '0'
440          msg = ''
441 +
442          try:
443 <            pippo = sbi.copy( source_file , dest_file )
444 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
445 <        except Exception, ex:
446 <            msg = ''
447 <            if self.debug : msg = str(ex)+'\n'
448 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
443 >            sbi.copy( source_file , dest_file , opt = option)
444 >        except TransferException, ex:
445 >            msg  = "Problem copying %s file" % filetocopy
446 >            msg += str(ex)
447 >            if self.debug :
448 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
449 >                dbgmsg += '\t'+str(ex.output)+'\n'
450 >                print dbgmsg
451              ErCode = '60307'
452 <            #print msg
453 <
452 >        except WrongOption, ex:
453 >            msg  = "Problem copying %s file" % filetocopy
454 >            msg += str(ex)
455 >            if self.debug :
456 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
457 >                dbgmsg += '\t'+str(ex.output)+'\n'
458 >                print dbsmsg
459 >            ErCode = '60307'
460 >        if ErCode == '0' and protocol.find('srmv') == 0:
461 >            remote_file_size = -1
462 >            local_file_size = os.path.getsize( source_file )
463 >            try:
464 >                remote_file_size = sbi_dest.getSize( dest_file, opt=option )
465 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
466 >            except TransferException, ex:
467 >                msg  = "Problem checking the size of %s file" % filetocopy
468 >                msg += str(ex)
469 >                if self.debug :
470 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
471 >                    dbgmsg += '\t'+str(ex.output)+'\n'
472 >                    print dbgmsg
473 >                ErCode = '60307'
474 >            except WrongOption, ex:
475 >                msg  = "Problem checking the size of %s file" % filetocopy
476 >                msg += str(ex)
477 >                if self.debug :
478 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
479 >                    dbgmsg += '\t'+str(ex.output)+'\n'
480 >                    print dbgmsg
481 >                ErCode = '60307'
482 >            if local_file_size != remote_file_size:
483 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
484 >                ErCode = '60307'
485 >
486 >        if ErCode != '0':
487 >            try :
488 >                self.removeFile( sbi_dest, dest_file, option )
489 >            except Exception, ex:
490 >                msg += '\n'+str(ex)  
491          return ErCode, msg
492 <  
493 <    '''
494 <    def checkSize()
495 <        """
496 <        Using srm needed a check of the ouptut file size.  
497 <        """
498 <    
499 <        echo "--> remoteSize = $remoteSize"
500 <        ## for local file
501 <        localSize=$(stat -c%s "$path_out_file")
502 <        echo "-->  localSize = $localSize"
503 <        if [ $localSize != $remoteSize ]; then
504 <            echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
505 <            echo "Copy failed: removing remote file $destination"
506 <                srmrm $destination
507 <                cmscp_exit_status=60307
508 <      
509 <      
510 <                echo "Problem copying $path_out_file to $destination with srmcp command"
511 <                StageOutExitStatusReason='remote and local file dimension not match'
512 <                echo "StageOutReport = `cat ./srmcp.report`"
513 <    '''
379 <    def backup(self):
492 >
493 >    def removeFile( self, sbi_dest, filetocopy, option ):
494 >        """  
495 >        """  
496 >        if self.debug : print 'removeFile():\n'
497 >        f_tocopy=filetocopy
498 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
499 >        try:
500 >            sbi_dest.delete( f_tocopy, opt=option )
501 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
502 >        except OperationException, ex:
503 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
504 >            msg += str(ex)
505 >            if self.debug :
506 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
507 >                dbgmsg += '\t'+str(ex.output)+'\n'
508 >                print dbgmsg
509 >            raise Exception(msg)
510 >
511 >        return
512 >
513 >    def backup(self):
514          """
515          Check infos from TFC using existing api obtaining:
516          1)destination
# Line 384 | Line 518 | class cmscp:
518          """
519          return
520  
521 <    def usage(self):
521 >    def updateReport(self, file, erCode, reason, lfn='', se='' ):
522 >        """
523 >        Update the final stage out infos
524 >        """
525 >        jobStageInfo={}
526 >        jobStageInfo['erCode']=erCode
527 >        jobStageInfo['reason']=reason
528 >        jobStageInfo['lfn']=lfn
529 >        jobStageInfo['se']=se
530 >
531 >        report = { file : jobStageInfo}
532 >        return report
533  
534 <        msg="""
535 <        required parameters:
536 <        --source :: REMOTE           :      
537 <        --dest   :: REMOTE           :  
393 <        --debug             :
394 <        --inFile :: absPath : or name NOT RELATIVE PATH
395 <        --outFIle :: onlyNAME : NOT YET SUPPORTED
396 <
397 <        optional parameters      
534 >    def finalReport( self , results ):
535 >        """
536 >        It a list of LFNs for each SE where data are stored.
537 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
538          """
539 <        return msg
539 >        outFile = open('cmscpReport.sh',"a")
540 >        cmscp_exit_status = 0
541 >        txt = ''
542 >        for file, dict in results.iteritems():
543 >            if file:
544 >                if dict['lfn']=='':
545 >                    lfn = '$LFNBaseName/'+os.path.basename(file)
546 >                    se  = '$SE'
547 >                else:
548 >                    lfn = dict['lfn']+os.path.basename(file)
549 >                    se = dict['se']
550 >                #dict['lfn'] # to be implemented
551 >                txt +=  'echo "Report for File: '+file+'"\n'
552 >                txt +=  'echo "LFN: '+lfn+'"\n'
553 >                txt +=  'echo "StorageElement: '+se+'"\n'
554 >                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
555 >                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
556 >                if dict['erCode'] != '0':
557 >                    cmscp_exit_status = dict['erCode']
558 >                    cmscp_exit_status = dict['erCode']
559 >            else:
560 >                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
561 >                cmscp_exit_status = dict['erCode']
562 >                cmscp_exit_status = dict['erCode']
563 >        txt += '\n'
564 >        txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
565 >        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
566 >        outFile.write(str(txt))
567 >        outFile.close()
568 >        return
569 >
570 >
571 > def usage():
572 >
573 >    msg="""
574 >    required parameters:
575 >    --source        :: REMOTE           :
576 >    --destination   :: REMOTE           :
577 >    --debug             :
578 >    --inFile :: absPath : or name NOT RELATIVE PATH
579 >    --outFIle :: onlyNAME : NOT YET SUPPORTED
580 >
581 >    optional parameters
582 >    """
583 >    print msg
584 >
585 >    return
586 >
587 > def HelpOptions(opts=[]):
588 >    """
589 >    Check otps, print help if needed
590 >    prepare dict = { opt : value }
591 >    """
592 >    dict_args = {}
593 >    if len(opts):
594 >        for opt, arg in opts:
595 >            dict_args[opt.split('--')[1]] = arg
596 >            if opt in ('-h','-help','--help') :
597 >                usage()
598 >                sys.exit(0)
599 >        return dict_args
600 >    else:
601 >        usage()
602 >        sys.exit(0)
603  
604   if __name__ == '__main__' :
605 +
606 +    import getopt
607 +
608 +    allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
609 +                  "protocol=","option=", "middleware=", "srm_version=", \
610 +                  "destinationDir=","debug", "help"]
611 +    try:
612 +        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
613 +    except getopt.GetoptError, err:
614 +        print err
615 +        HelpOptions()
616 +        sys.exit(2)
617 +
618 +    dictArgs = HelpOptions(opts)
619      try:
620 <        cmscp_ = cmscp(sys.argv[1:])
620 >        cmscp_ = cmscp(dictArgs)
621          cmscp_.run()
622 <    except:
623 <        pass
622 >    except Exception, ex :
623 >        print str(ex)
624  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines