ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.2 by spiga, Mon Sep 22 09:03:17 2008 UTC vs.
Revision 1.30 by spiga, Mon Nov 24 15:02:22 2008 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2  
3 < import sys, getopt, string
4 < import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
3 > import sys, os
4 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5   from ProdCommon.Storage.SEAPI.SBinterface import *
6 <
6 > from ProdCommon.Storage.SEAPI.Exceptions import *
7  
8  
9   class cmscp:
10 <    def __init__(self, argv):
10 >    def __init__(self, args):
11          """
12          cmscp
13  
14 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
14 >        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15          including success checking  version also for CAF using rfcp command to copy the output to SE
16          input:
17             $1 middleware (CAF, LSF, LCG, OSG)
18             $2 local file (the absolute path of output file or just the name if it's in top dir)
19             $3 if needed: file name (the output file name)
20             $5 remote SE (complete endpoint)
21 <           $6 srm version
21 >           $6 srm version
22          output:
23               return 0 if all ok
24               return 60307 if srmcp failed
25               return 60303 if file already exists in the SE
26          """
27 +
28          #set default
29 +        self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
30 +                           "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31          self.debug = 0
32 <        self.source = ''
33 <        self.destination = ''
34 <        self.file_to_copy = []
33 <        self.remote_file_name = []
34 <        self.protocol = ''
35 <        self.middleware = ''
36 <        self.srmv = ''
37 <  
38 <        try:
39 <            opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
40 <                                                  "protocol=", "middleware=", "srm_version=", "debug", "help"])
41 <        except getopt.GetoptError:
42 <            print self.usage()
43 <            sys.exit(2)
44 <
45 <        self.setAndCheck(opts)  
46 <        
32 >
33 >        self.params.update( args )
34 >
35          return
36  
37 <    def setAndCheck( self, opts ):
37 >    def processOptions( self ):
38          """
39 <        Set and check command line parameter
39 >        check command line parameter
40          """
41 <        if not opts :
42 <            print self.usage()
43 <            sys.exit()
44 <        for opt, arg in opts :
45 <            if opt  == "--help" :
46 <                print self.usage()
47 <                sys.exit()
48 <            elif opt == "--debug" :
49 <                self.debug = 1
50 <            elif opt == "--source" :
51 <                self.source = arg
52 <            elif opt == "--destination":
53 <                self.destination = arg
54 <            elif opt == "--inputFileList":
67 <                infile = arg
68 <            elif opt == "--outputFileList":
69 <                out_file
70 <            elif opt == "--protocol":
71 <                self.protocol = arg
72 <            elif opt == "--middleware":
73 <                self.middleware = arg
74 <            elif opt == "--srm_version":
75 <                self.srmv = arg
76 <
77 <        # source and dest cannot be undefined at same time
78 <        if self.source == '' and self.destination == '':
79 <            print self.usage()
80 <            sys.exit()
81 <        # if middleware is not defined --> protocol cannot be empty  
82 <        if self.middleware == '' and self.protocol == '':
83 <            print self.usage()
84 <            sys.exit()
85 <        # input file must be defined  
86 <        if infile == '':
87 <            print self.usage()
88 <            sys.exit()
41 >
42 >        if 'help' in self.params.keys(): HelpOptions()
43 >        if 'debug' in self.params.keys(): self.debug = 1
44 >
45 >        # source and dest cannot be undefined at same time
46 >        if not self.params['source']  and not self.params['destination'] :
47 >            HelpOptions()
48 >
49 >        # if middleware is not defined --> protocol cannot be empty
50 >        if not self.params['middleware'] and not self.params['protocol'] :
51 >            HelpOptions()
52 >
53 >        # input file must be defined
54 >        if not self.params['inputFileList'] : HelpOptions()
55          else:
56 <            if infile.find(','):
57 <                [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
58 <            else:
59 <                self.file_to_copy.append(infile)
60 <        
56 >            file_to_copy=[]
57 >            if self.params['inputFileList'].find(','):
58 >                [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 >            else:
60 >                file_to_copy.append(self.params['inputFileList'])
61 >            self.params['inputFileList'] = file_to_copy
62 >
63          ## TO DO:
64          #### add check for outFiles
65          #### add map {'inFileNAME':'outFileNAME'} to change out name
66  
67          return
68  
69 <    def run( self ):  
69 >    def run( self ):
70          """
71 <        Check if running on UI (no $middleware) or
72 <        on WN (on the Grid), and take different action  
71 >        Check if running on UI (no $middleware) or
72 >        on WN (on the Grid), and take different action
73          """
74 <        if self.middleware :  
75 <           results = self.stager()
74 >        self.processOptions()
75 >        if self.debug: print 'calling run() : \n'
76 >        # stage out from WN
77 >        if self.params['middleware'] :
78 >           results = self.stager(self.params['middleware'],self.params['inputFileList'])
79 >           self.finalReport(results)
80 >        # Local interaction with SE
81          else:
82 <           results = self.copy( self.file_to_copy, self.protocol )
83 <
111 <        self.finalReport(results,self.middleware)
82 >           results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
83 >           return results
84  
85 <        return
114 <    
115 <    def setProtocol( self ):    
85 >    def setProtocol( self, middleware ):
86          """
87          define the allowed potocols based on $middlware
88 <        which depend on scheduler
88 >        which depend on scheduler
89          """
90 <        if self.middleware.lower() in ['osg','lcg']:
91 <            supported_protocol = ['srm-lcg','srmv2']
92 <        elif self.middleware.lower() in ['lsf','caf']:
93 <            supported_protocol = ['rfio']
90 >        # default To be used with "middleware"
91 >        lcgOpt={'srmv1':'-b -D srmv1  -t 2400 --verbose',
92 >                'srmv2':'-b -D srmv2  -t 2400 --verbose'}
93 >        srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
94 >                'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
95 >        rfioOpt=''
96 >
97 >        supported_protocol = None
98 >        if middleware.lower() in ['osg','lcg','condor']:
99 >            supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100 >                                 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101 >        elif middleware.lower() in ['lsf','caf']:
102 >            supported_protocol = [('rfio',rfioOpt)]
103          else:
104 <            ## here we can add support for any kind of protocol,
104 >            ## here we can add support for any kind of protocol,
105              ## maybe some local schedulers need something dedicated
106              pass
107          return supported_protocol
108  
109 <    def stager( self ):              
131 <        """
132 <        Implement the logic for remote stage out
109 > #   def checkCopy(self, copy_results, list_files):
110          """
111 <        protocols = self.setProtocol()  
112 <        count=0
113 <        list_files = self.file_to_copy
114 <        results={}  
115 <        for prot in protocols:
116 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
117 <            copy_results = self.copy( list_files, prot )
141 <            list_retry = []
142 <            list_existing = []
143 <            list_ok = []
111 >        #results={}
112 >        list_retry = []
113 >        list_existing = []
114 >        list_ok = []
115 >        if copy_results.keys() == '':
116 >            self.results.update(copy_results)
117 >        else:
118              for file, dict in copy_results.iteritems():
119                  er_code = dict['erCode']
120 <                if er_code == '60307': list_retry.append( file )
147 <                elif er_code == '60303': list_existing.append( file )
148 <                else:
120 >                if er_code == '0':
121                      list_ok.append(file)
122                      reason = 'Copy succedeed with %s utils'%prot
123                      upDict = self.updateReport(file, er_code, reason)
124 <                    copy_results.update(upDict)
124 >                    copy_results.update(upDict)
125 >                elif er_code == '60303': list_existing.append( file )
126 >                else: list_retry.append( file )
127              results.update(copy_results)
128 <            if len(list_ok) != 0:  
128 >            if len(list_ok) != 0:
129                  msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
130 <               # print msg
131 <            if len(list_ok) == len(list_files) :
132 <                break
130 >                if self.debug : print msg
131 >            if len(list_ok) == len(list_files) :
132 >                msg = 'Copy of  all files succedeed\n'
133 >                #break
134              else:
135 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
136 <                if len(list_retry): list_files = list_retry
137 <                else: break
138 <            count =+1  
135 >                if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
136 >                #if len(list_retry): list_files = list_retry
137 >        return list_retry, results        
138 >        
139 >        """
140 >    def stager( self, middleware, list_files ):
141 >        """
142 >        Implement the logic for remote stage out
143 >        """
144  
145 <        #### TODO Daniele
145 >        if self.debug: print 'stager() :\n'
146 >        results={}
147 >        for prot, opt in self.setProtocol( middleware ):
148 >            if self.debug: print '\tTrying the stage out with %s utils \n'%prot
149 >            copy_results = self.copy( list_files, prot, opt )
150 >            ######## to define a new function checkCopy ################
151 >            #list_retry, self.results = self.checkCopy(copy_results, list_files)
152 >        #def checkCopy (self, copy_results):
153 >        #    """
154 >        #    """
155 >        #    results={}
156 >            list_retry = []
157 >            list_existing = []
158 >            list_ok = []
159 >            if copy_results.keys() == [''] or copy_results.keys() == '' :
160 >                results.update(copy_results)
161 >            else:
162 >                for file, dict in copy_results.iteritems():
163 >                    er_code = dict['erCode']
164 >                    if er_code == '0':
165 >                        list_ok.append(file)
166 >                        reason = 'Copy succedeed with %s utils'%prot
167 >                        upDict = self.updateReport(file, er_code, reason)
168 >                        copy_results.update(upDict)
169 >                    elif er_code in ['60303','60302']: list_existing.append( file )
170 >                    else: list_retry.append( file )
171 >                results.update(copy_results)
172 >                msg = ''
173 >                if len(list_ok) != 0:
174 >                    msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
175 >                if len(list_ok) == len(list_files) :
176 >                    break
177 >                else:
178 >                    if self.debug: msg += '\tCopy of %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
179 >                    if len(list_retry): list_files = list_retry
180 >                    else: break
181 >                if self.debug : print msg
182 >            """
183 >            if len(list_retry):
184 >               list_files = list_retry
185 >            #def backupCopy(list_retry)
186 >               print "in backup"
187 >               self.params['inputFilesList']=list_files
188 >               ### copy backup
189 >               from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
190 >               siteCfg = loadSiteLocalConfig()
191 >               #print siteCfg
192 >               seName = siteCfg.localStageOut.get("se-name", None)
193 >               #print  "seName = ", seName
194 >               self.params['destination']=seName
195 >               #catalog = siteCfg.localStageOut.get("catalog", None)
196 >               #print "catalog = ", catalog
197 >               implName = siteCfg.localStageOut.get("command", None)
198 >               print "implName = ", implName
199 >               if (implName == 'srm'):
200 >                  implName='srmv2'
201 >               self.params['protocol']=implName
202 >               tfc = siteCfg.trivialFileCatalog()
203 >               #print "tfc = ", tfc
204 >               print " self.params['inputFilesList'] = ", self.params['inputFilesList']
205 >               file_backup=[]
206 >               for input in self.params['inputFilesList']:
207 >                   ### to add the correct lfn, passed as argument of cmscp function (--lfn xxxx)
208 >                   file = '/store/'+input
209 >                   pfn = tfc.matchLFN(tfc.preferredProtocol, file)
210 >                   print "pfn = ", pfn
211 >                   file_backup.append(pfn)
212 >               self.params['inputFilesList'] = file_backup
213 >               print "#########################################"
214 >               print "self.params['inputFilesList'] = ", self.params['inputFilesList']
215 >               print "self.params['protocol'] = ", self.params['protocol']
216 >               print "self.params['option'] = ", self.params['option']
217 >               self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'])
218 >               print "#########################################"
219 >               ###list_retry, self.results = checkCopy(copy_results)
220 >                   #check is something fails and created related dict
221 >                   #        backup = self.analyzeResults(results)
222 >                   #        if backup :
223 >                   #            msg = 'WARNING: backup logic is under implementation\n'
224 >                   #            #backupDict = self.backup()
225 >                   #            ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
226 >                   #            results.update(backupDict)
227 >                   #            print msg
228 >            """
229 >        #### TODO Daniele
230          #check is something fails and created related dict
231 <  #      backup = self.analyzeResults(results)
232 <  
233 <  #      if backup :  
231 >  #      backup = self.analyzeResults(results)
232 >
233 >  #      if backup :
234    #          msg = 'WARNING: backup logic is under implementation\n'
235    #          #backupDict = self.backup()
236 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name  
236 >  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
237    #          results.update(backupDict)
238    #          print msg
239          return results
240  
241      def initializeApi(self, protocol ):
242          """
243 <        Instantiate storage interface  
243 >        Instantiate storage interface
244          """
245 <        source_prot = protocol
246 <        dest_prot = protocol
247 <        if self.source == '' : source_prot = 'local'
248 <        Source_SE  = self.storageInterface( self.source, source_prot )
249 <        if self.destination == '' : dest_prot = 'local'
250 <        Destination_SE = self.storageInterface( self.destination, dest_prot )
245 >        if self.debug : print 'initializeApi() :\n'  
246 >        self.source_prot = protocol
247 >        self.dest_prot = protocol
248 >        if not self.params['source'] : self.source_prot = 'local'
249 >        Source_SE  = self.storageInterface( self.params['source'], self.source_prot )
250 >        if not self.params['destination'] : self.dest_prot = 'local'
251 >        Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
252  
253          if self.debug :
254 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
255 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
254 >            msg  = '\t(source=%s,  protocol=%s)'%(self.params['source'], self.source_prot)
255 >            msg += '\t(destination=%s,  protocol=%s)'%(self.params['destination'], self.dest_prot)
256  
257          return Source_SE, Destination_SE
258  
259 <    def copy( self, list_file, protocol ):
259 >    def copy( self, list_file, protocol, options ):
260          """
261 <        Make the real file copy using SE API
261 >        Make the real file copy using SE API
262          """
263          if self.debug :
264 <            print 'copy(): using %s protocol'%protocol
265 <        Source_SE, Destination_SE = self.initializeApi( protocol )
264 >            msg  = 'copy() :\n'
265 >            msg += '\tusing %s protocol\n'%protocol
266 >            print msg
267 >        try:
268 >            Source_SE, Destination_SE = self.initializeApi( protocol )
269 >        except Exception, ex:
270 >            return self.updateReport('', '-1', str(ex))
271  
272 <        # create remote dir
273 <        if protocol in ['gridftp','rfio']:
274 <            self.createDir( Destination_SE, protocol )
272 >        # create remote dir
273 >        if protocol in ['gridftp','rfio','srmv2']:
274 >            try:
275 >                self.createDir( Destination_SE, protocol )
276 >            except Exception, ex:
277 >                return self.updateReport('', '60316', str(ex))
278  
279          ## prepare for real copy  ##
280 <        sbi = SBinterface( Source_SE, Destination_SE )
281 <        sbi_dest = SBinterface(Destination_SE)
280 >        try :
281 >            sbi = SBinterface( Source_SE, Destination_SE )
282 >            sbi_dest = SBinterface(Destination_SE)
283 >            sbi_source = SBinterface(Source_SE)
284 >        except ProtocolMismatch, ex:
285 >            msg  = "ERROR : Unable to create SBinterface with %s protocol"%protocol
286 >            msg += str(ex)
287 >            return self.updateReport('', '-1', msg)
288  
289          results = {}
290 <        ## loop over the complete list of files
291 <        for filetocopy in list_file:
292 <            if self.debug : print 'start real copy for %s'%filetocopy
293 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
294 <            if ErCode == '0':
295 <                ErCode, msg = self.makeCopy( sbi, filetocopy )
296 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
290 >        ## loop over the complete list of files
291 >        for filetocopy in list_file:
292 >            if self.debug : print '\tStart real copy for %s'%filetocopy
293 >            try :
294 >                ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy )
295 >            except Exception, ex:
296 >                ErCode = -1
297 >                msg = str(ex)  
298 >            if ErCode == '0':
299 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
300 >            if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
301              results.update( self.updateReport(filetocopy, ErCode, msg))
302          return results
220    
221    def updateReport(self, file, erCode, reason, lfn='', se='' ):
222        """
223        Update the final stage out infos
224        """
225        jobStageInfo={}
226        jobStageInfo['erCode']=erCode
227        jobStageInfo['reason']=reason
228        jobStageInfo['lfn']=lfn
229        jobStageInfo['se']=se
303  
231        report = { file : jobStageInfo}
232        return report
233
234    def finalReport( self , results, middleware ):
235        """
236        It should return a clear list of LFNs for each SE where data are stored.
237        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
238        """
239        if middleware:
240            outFile = open('cmscpReport.sh',"a")
241            cmscp_exit_status = 0
242            txt = ''
243            for file, dict in results.iteritems():
244                if dict['lfn']=='':
245                    lfn = '$LFNBaseName/'+os.path.basename(file)
246                    se  = '$SE'
247                else:
248                    lfn = dict['lfn']+os.pat.basename(file)
249                    se = dict['se']      
250                #dict['lfn'] # to be implemented
251                txt +=  'echo "Report for File: '+file+'"\n'
252                txt +=  'echo "LFN: '+lfn+'"\n'  
253                txt +=  'echo "StorageElement: '+se+'"\n'  
254                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
255                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
256                if dict['erCode'] != '0':
257                    cmscp_exit_status = dict['erCode']
258            txt += '\n'
259            txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
260            txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
261            outFile.write(str(txt))
262            outFile.close()
263        else:
264            for file, code in results.iteritems():
265                print 'error code = %s for file %s'%(code,file)
266        return
304  
305      def storageInterface( self, endpoint, protocol ):
306          """
307 <        Create the storage interface.
307 >        Create the storage interface.
308          """
309 +        if self.debug : print 'storageInterface():\n'
310          try:
311              interface = SElement( FullPath(endpoint), protocol )
312 <        except Exception, ex:
313 <            msg = ''
314 <            if self.debug : msg = str(ex)+'\n'
315 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
278 <            print msg
312 >        except ProtocolUnknown, ex:
313 >            msg  = "ERROR : Unable to create interface with %s protocol"%protocol
314 >            msg += str(ex)
315 >            raise Exception(msg)
316  
317          return interface
318  
282    def checkDir(self, Destination_SE, protocol):
283        '''
284        ToBeImplemented NEEDED for castor
285        '''
286        return
287
319      def createDir(self, Destination_SE, protocol):
320          """
321 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
322 <        this should be transparent at SE API level.
321 >        Create remote dir for gsiftp REALLY TEMPORARY
322 >        this should be transparent at SE API level.
323          """
324 <        ErCode = '0'
325 <        msg_1 = ''
324 >        if self.debug : print 'createDir():\n'
325 >        msg = ''
326          try:
327              action = SBinterface( Destination_SE )
328              action.createDir()
329 <            if self.debug: print "The directory has been created using protocol %s\n"%protocol
330 <        except Exception, ex:
331 <            msg = ''
332 <            if self.debug : msg = str(ex)+'\n'
333 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
334 <            msg += msg_1
335 <            ErCode = '60316'  
336 <            #print msg
329 >            if self.debug: print "\tThe directory has been created using protocol %s"%protocol
330 >        except TransferException, ex:
331 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
332 >            msg += str(ex)
333 >            if self.debug :
334 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
335 >                dbgmsg += '\t'+str(ex.output)+'\n'
336 >                print dbgmsg
337 >            raise Exception(msg)
338 >        except OperationException, ex:
339 >            msg  = "ERROR: problem with the directory creation using %s protocol "%protocol
340 >            msg += str(ex)
341 >            if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
342 >            raise Exception(msg)
343 >        return msg
344  
345 <        return ErCode, msg_1
308 <
309 <    def checkFileExist(self, sbi, filetocopy):
345 >    def checkFileExist( self, sbi_source, sbi_dest, filetocopy ):
346          """
347 <        Check if file to copy already exist  
348 <        """
349 <        try:
350 <            check = sbi.checkExists(filetocopy)
315 <        except Exception, ex:
316 <            msg = ''
317 <            if self.debug : msg = str(ex)+'\n'
318 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
319 <           # print msg
347 >        Check both if source file exist AND
348 >        if destination file ALREADY exist.
349 >        """
350 >        if self.debug : print 'checkFileExist():\n'
351          ErCode = '0'
352          msg = ''
353 <        if check :
354 <            ErCode = '60303'
355 <            msg = "file %s already exist"%filetocopy
356 <            print msg
353 >        f_tocopy=filetocopy
354 >        if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
355 >        try:
356 >            checkSource = sbi_source.checkExists( f_tocopy )
357 >            if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy  
358 >        except OperationException, ex:
359 >            msg  ='ERROR: problems checkig if source file %s exist'%filetocopy
360 >            msg += str(ex)
361 >            if self.debug :
362 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
363 >                dbgmsg += '\t'+str(ex.output)+'\n'
364 >                print dbgmsg
365 >            raise Exception(msg)
366 >        except WrongOption, ex:
367 >            msg  ='ERROR problems checkig if source file % exist'%filetocopy
368 >            msg += str(ex)
369 >            if self.debug :
370 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
371 >                dbgmsg += '\t'+str(ex.output)+'\n'
372 >                print dbgmsg
373 >            raise Exception(msg)
374 >        if not checkSource :
375 >            ErCode = '60302'
376 >            msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
377 >            return ErCode, msg
378 >        f_tocopy=filetocopy
379 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
380 >        try:
381 >            check = sbi_dest.checkExists( f_tocopy )
382 >            if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy  
383 >        except OperationException, ex:
384 >            msg  = 'ERROR: problems checkig if file %s already exist'%filetocopy
385 >            msg += str(ex)
386 >            if self.debug :
387 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
388 >                dbgmsg += '\t'+str(ex.output)+'\n'
389 >                print dbgmsg
390 >            raise Exception(msg)
391 >        except WrongOption, ex:
392 >            msg  = 'ERROR problems checkig if file % already exist'%filetocopy
393 >            msg += str(ex)
394 >            if self.debug :
395 >                msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
396 >                msg += '\t'+str(ex.output)+'\n'
397 >            raise Exception(msg)
398 >        if check :
399 >            ErCode = '60303'
400 >            msg = "file %s already exist"%os.path.basename(filetocopy)
401  
402 <        return ErCode,msg  
402 >        return ErCode, msg
403  
404 <    def makeCopy(self, sbi, filetocopy ):  
404 >    def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
405          """
406 <        call the copy API.  
406 >        call the copy API.
407          """
408 <        path = os.path.dirname(filetocopy)  
408 >        if self.debug : print 'makeCopy():\n'
409 >        path = os.path.dirname(filetocopy)
410          file_name =  os.path.basename(filetocopy)
411          source_file = filetocopy
412          dest_file = file_name ## to be improved supporting changing file name  TODO
413 <        if self.source == '' and path == '':
413 >        if self.params['source'] == '' and path == '':
414              source_file = os.path.abspath(filetocopy)
415 <        elif self.destination =='':
416 <            dest_file = os.path.join(os.getcwd(),file_name)
417 <        elif self.source != '' and self.destination != '' :
418 <            source_file = file_name  
415 >        elif self.params['destination'] =='':
416 >            destDir = self.params.get('destinationDir',os.getcwd())
417 >            dest_file = os.path.join(destDir,file_name)
418 >        elif self.params['source'] != '' and self.params['destination'] != '' :
419 >            source_file = file_name
420 >
421          ErCode = '0'
422          msg = ''
423 +
424          try:
425 <            pippo = sbi.copy( source_file , dest_file )
426 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
427 <        except Exception, ex:
428 <            msg = ''
429 <            if self.debug : msg = str(ex)+'\n'
430 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
425 >            sbi.copy( source_file , dest_file , opt = option)
426 >        except TransferException, ex:
427 >            msg  = "Problem copying %s file" % filetocopy
428 >            msg += str(ex)
429 >            if self.debug :
430 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
431 >                dbgmsg += '\t'+str(ex.output)+'\n'
432 >                print dbsmsg
433              ErCode = '60307'
434 <            #print msg
435 <
434 >        except WrongOption, ex:
435 >            msg  = "Problem copying %s file" % filetocopy
436 >            msg += str(ex)
437 >            if self.debug :
438 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
439 >                dbgmsg += '\t'+str(ex.output)+'\n'
440 >                print dbsmsg
441 >            ErCode = '60307'
442 >        if ErCode == '0' and protocol.find('srmv') == 0:
443 >            remote_file_size = -1
444 >            local_file_size = os.path.getsize( source_file )
445 >            try:
446 >                remote_file_size = sbi_dest.getSize( dest_file )
447 >                if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
448 >            except TransferException, ex:
449 >                msg  = "Problem checking the size of %s file" % filetocopy
450 >                msg += str(ex)
451 >                if self.debug :
452 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
453 >                    dbgmsg += '\t'+str(ex.output)+'\n'
454 >                    print dbgmsg
455 >                ErCode = '60307'
456 >            except WrongOption, ex:
457 >                msg  = "Problem checking the size of %s file" % filetocopy
458 >                msg += str(ex)
459 >                if self.debug :
460 >                    dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
461 >                    dbgmsg += '\t'+str(ex.output)+'\n'
462 >                    print dbgmsg
463 >                ErCode = '60307'
464 >            if local_file_size != remote_file_size:
465 >                msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
466 >                ErCode = '60307'
467 >
468 >        if ErCode != '0':
469 >            try :
470 >                self.removeFile( sbi_dest, dest_file )
471 >            except Exception, ex:
472 >                msg += '\n'+str(ex)  
473          return ErCode, msg
474 <  
475 <    '''
476 <    def checkSize()
477 <        """
478 <        Using srm needed a check of the ouptut file size.  
479 <        """
480 <    
481 <        echo "--> remoteSize = $remoteSize"
482 <        ## for local file
483 <        localSize=$(stat -c%s "$path_out_file")
484 <        echo "-->  localSize = $localSize"
485 <        if [ $localSize != $remoteSize ]; then
486 <            echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
487 <            echo "Copy failed: removing remote file $destination"
488 <                srmrm $destination
489 <                cmscp_exit_status=60307
490 <      
491 <      
492 <                echo "Problem copying $path_out_file to $destination with srmcp command"
493 <                StageOutExitStatusReason='remote and local file dimension not match'
494 <                echo "StageOutReport = `cat ./srmcp.report`"
495 <    '''
378 <    def backup(self):
474 >
475 >    def removeFile( self, sbi_dest, filetocopy ):
476 >        """  
477 >        """  
478 >        if self.debug : print 'removeFile():\n'
479 >        f_tocopy=filetocopy
480 >        if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
481 >        try:
482 >            sbi_dest.delete( f_tocopy )
483 >            if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
484 >        except OperationException, ex:
485 >            msg  ='ERROR: problems removing partially staged file %s'%filetocopy
486 >            msg += str(ex)
487 >            if self.debug :
488 >                dbgmsg  = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
489 >                dbgmsg += '\t'+str(ex.output)+'\n'
490 >                print dbgmsg
491 >            raise Exception(msg)
492 >
493 >        return
494 >
495 >    def backup(self):
496          """
497          Check infos from TFC using existing api obtaining:
498          1)destination
# Line 383 | Line 500 | class cmscp:
500          """
501          return
502  
503 <    def usage(self):
503 >    def updateReport(self, file, erCode, reason, lfn='', se='' ):
504 >        """
505 >        Update the final stage out infos
506 >        """
507 >        jobStageInfo={}
508 >        jobStageInfo['erCode']=erCode
509 >        jobStageInfo['reason']=reason
510 >        jobStageInfo['lfn']=lfn
511 >        jobStageInfo['se']=se
512  
513 <        msg="""
514 <        required parameters:
515 <        --source :: REMOTE           :      
516 <        --dest   :: REMOTE           :  
517 <        --debug             :
518 <        --inFile :: absPath : or name NOT RELATIVE PATH
519 <        --outFIle :: onlyNAME : NOT YET SUPPORTED
395 <
396 <        optional parameters      
513 >        report = { file : jobStageInfo}
514 >        return report
515 >
516 >    def finalReport( self , results ):
517 >        """
518 >        It a list of LFNs for each SE where data are stored.
519 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
520          """
521 <        return msg
521 >        outFile = open('cmscpReport.sh',"a")
522 >        cmscp_exit_status = 0
523 >        txt = ''
524 >        for file, dict in results.iteritems():
525 >            if file:
526 >                if dict['lfn']=='':
527 >                    lfn = '$LFNBaseName/'+os.path.basename(file)
528 >                    se  = '$SE'
529 >                else:
530 >                    lfn = dict['lfn']+os.path.basename(file)
531 >                    se = dict['se']
532 >                #dict['lfn'] # to be implemented
533 >                txt +=  'echo "Report for File: '+file+'"\n'
534 >                txt +=  'echo "LFN: '+lfn+'"\n'
535 >                txt +=  'echo "StorageElement: '+se+'"\n'
536 >                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
537 >                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
538 >                if dict['erCode'] != '0':
539 >                    cmscp_exit_status = dict['erCode']
540 >                    cmscp_exit_status = dict['erCode']
541 >            else:
542 >                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
543 >                cmscp_exit_status = dict['erCode']
544 >                cmscp_exit_status = dict['erCode']
545 >        txt += '\n'
546 >        txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
547 >        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
548 >        outFile.write(str(txt))
549 >        outFile.close()
550 >        return
551 >
552 >
553 > def usage():
554 >
555 >    msg="""
556 >    required parameters:
557 >    --source        :: REMOTE           :
558 >    --destination   :: REMOTE           :
559 >    --debug             :
560 >    --inFile :: absPath : or name NOT RELATIVE PATH
561 >    --outFIle :: onlyNAME : NOT YET SUPPORTED
562 >
563 >    optional parameters
564 >    """
565 >    print msg
566 >
567 >    return
568 >
569 > def HelpOptions(opts=[]):
570 >    """
571 >    Check otps, print help if needed
572 >    prepare dict = { opt : value }
573 >    """
574 >    dict_args = {}
575 >    if len(opts):
576 >        for opt, arg in opts:
577 >            dict_args[opt.split('--')[1]] = arg
578 >            if opt in ('-h','-help','--help') :
579 >                usage()
580 >                sys.exit(0)
581 >        return dict_args
582 >    else:
583 >        usage()
584 >        sys.exit(0)
585  
586   if __name__ == '__main__' :
587 +
588 +    import getopt
589 +
590 +    allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
591 +                  "protocol=","option=", "middleware=", "srm_version=", \
592 +                  "destinationDir=","debug", "help"]
593 +    try:
594 +        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
595 +    except getopt.GetoptError, err:
596 +        print err
597 +        HelpOptions()
598 +        sys.exit(2)
599 +
600 +    dictArgs = HelpOptions(opts)
601      try:
602 <        cmscp_ = cmscp(sys.argv[1:])
602 >        cmscp_ = cmscp(dictArgs)
603          cmscp_.run()
604 <    except:
605 <        pass
604 >    except Exception, ex :
605 >        print str(ex)
606  

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines