ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.2 by spiga, Mon Sep 22 09:03:17 2008 UTC vs.
Revision 1.8 by spiga, Fri Oct 3 18:12:51 2008 UTC

# Line 1 | Line 1
1   #!/usr/bin/env python
2  
3 < import sys, getopt, string
3 > import sys, string
4   import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6   from ProdCommon.Storage.SEAPI.SBinterface import *
7  
8  
9
9   class cmscp:
10 <    def __init__(self, argv):
10 >    def __init__(self, args):
11          """
12          cmscp
13  
14 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
14 >        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15          including success checking  version also for CAF using rfcp command to copy the output to SE
16          input:
17             $1 middleware (CAF, LSF, LCG, OSG)
18             $2 local file (the absolute path of output file or just the name if it's in top dir)
19             $3 if needed: file name (the output file name)
20             $5 remote SE (complete endpoint)
21 <           $6 srm version
21 >           $6 srm version
22          output:
23               return 0 if all ok
24               return 60307 if srmcp failed
25               return 60303 if file already exists in the SE
26          """
27 +
28          #set default
29 <        self.debug = 0
30 <        self.source = ''
31 <        self.destination = ''
32 <        self.file_to_copy = []
33 <        self.remote_file_name = []
34 <        self.protocol = ''
35 <        self.middleware = ''
36 <        self.srmv = ''
37 <  
38 <        try:
39 <            opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
40 <                                                  "protocol=", "middleware=", "srm_version=", "debug", "help"])
41 <        except getopt.GetoptError:
42 <            print self.usage()
43 <            sys.exit(2)
29 >        self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
30 >                           "protocol":'', "option":'', "middleware":'', "srm_version":''}
31 >        self.debug = 0  
32  
33 <        self.setAndCheck(opts)  
34 <        
33 >        self.params.update( args )
34 >
35          return
36  
37 <    def setAndCheck( self, opts ):
37 >    def processOptions( self ):
38          """
39 <        Set and check command line parameter
39 >        check command line parameter
40          """
41 <        if not opts :
42 <            print self.usage()
43 <            sys.exit()
44 <        for opt, arg in opts :
45 <            if opt  == "--help" :
46 <                print self.usage()
47 <                sys.exit()
48 <            elif opt == "--debug" :
49 <                self.debug = 1
50 <            elif opt == "--source" :
51 <                self.source = arg
52 <            elif opt == "--destination":
53 <                self.destination = arg
54 <            elif opt == "--inputFileList":
67 <                infile = arg
68 <            elif opt == "--outputFileList":
69 <                out_file
70 <            elif opt == "--protocol":
71 <                self.protocol = arg
72 <            elif opt == "--middleware":
73 <                self.middleware = arg
74 <            elif opt == "--srm_version":
75 <                self.srmv = arg
76 <
77 <        # source and dest cannot be undefined at same time
78 <        if self.source == '' and self.destination == '':
79 <            print self.usage()
80 <            sys.exit()
81 <        # if middleware is not defined --> protocol cannot be empty  
82 <        if self.middleware == '' and self.protocol == '':
83 <            print self.usage()
84 <            sys.exit()
85 <        # input file must be defined  
86 <        if infile == '':
87 <            print self.usage()
88 <            sys.exit()
41 >
42 >        if 'help' in self.params.keys(): HelpOptions()        
43 >        if 'debug' in self.params.keys(): self.debug = 1        
44 >
45 >        # source and dest cannot be undefined at same time
46 >        if not self.params['source']  and not self.params['destination'] :
47 >            HelpOptions()
48 >
49 >        # if middleware is not defined --> protocol cannot be empty
50 >        if not self.params['middleware'] and not self.params['protocol'] :
51 >            HelpOptions()
52 >
53 >        # input file must be defined
54 >        if not self.params['inputFileList'] : HelpOptions()
55          else:
56 <            if infile.find(','):
57 <                [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
58 <            else:
59 <                self.file_to_copy.append(infile)
60 <        
56 >            file_to_copy=[]
57 >            if self.params['inputFileList'].find(','):
58 >                [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 >            else:
60 >                file_to_copy.append(self.params['inputFileList'])
61 >            self.params['inputFileList'] = file_to_copy
62 >
63          ## TO DO:
64          #### add check for outFiles
65          #### add map {'inFileNAME':'outFileNAME'} to change out name
66  
67          return
68  
69 <    def run( self ):  
69 >    def run( self ):
70          """
71 <        Check if running on UI (no $middleware) or
72 <        on WN (on the Grid), and take different action  
71 >        Check if running on UI (no $middleware) or
72 >        on WN (on the Grid), and take different action
73          """
106        if self.middleware :  
107           results = self.stager()
108        else:
109           results = self.copy( self.file_to_copy, self.protocol )
74  
75 <        self.finalReport(results,self.middleware)
75 >        self.processOptions()
76 >        # stage out from WN
77 >        if self.params['middleware'] :
78 >           results = self.stager(self.params['middleware'],self.params['inputFileList'])
79 >           self.finalReport(results)
80 >        # Local interaction with SE
81 >        else:
82 >           results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.protocols['option'] )
83 >           return results
84  
85 <        return
114 <    
115 <    def setProtocol( self ):    
85 >    def setProtocol( self, middleware ):
86          """
87          define the allowed potocols based on $middlware
88 <        which depend on scheduler
88 >        which depend on scheduler
89          """
90 <        if self.middleware.lower() in ['osg','lcg']:
91 <            supported_protocol = ['srm-lcg','srmv2']
92 <        elif self.middleware.lower() in ['lsf','caf']:
93 <            supported_protocol = ['rfio']
90 >        # default To be used with "middleware"
91 >        lcgOpt='-b -D srmv2 --vo cms -t 2400 --verbose'
92 >        srmOpt='-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3'
93 >        rfioOpt=''
94 >
95 >        if middleware.lower() in ['osg','lcg']:
96 >            supported_protocol = [('srm-lcg',lcgOpt),\
97 >                                  ('srmv2',srmOpt)]
98 >        elif middleware.lower() in ['lsf','caf']:
99 >            supported_protocol = [('rfio',rfioOpt)]
100          else:
101 <            ## here we can add support for any kind of protocol,
101 >            ## here we can add support for any kind of protocol,
102              ## maybe some local schedulers need something dedicated
103              pass
104          return supported_protocol
105  
106 <    def stager( self ):              
106 >    def stager( self, middleware, list_files ):
107          """
108          Implement the logic for remote stage out
109          """
110 <        protocols = self.setProtocol()  
111 <        count=0
112 <        list_files = self.file_to_copy
113 <        results={}  
114 <        for prot in protocols:
115 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
116 <            copy_results = self.copy( list_files, prot )
117 <            list_retry = []
142 <            list_existing = []
143 <            list_ok = []
110 >        count=0
111 >        results={}
112 >        for prot, opt in self.setProtocol( middleware ):
113 >            if self.debug: print 'Trying stage out with %s utils \n'%prot
114 >            copy_results = self.copy( list_files, prot, opt )
115 >            list_retry = []
116 >            list_existing = []
117 >            list_ok = []
118              for file, dict in copy_results.iteritems():
119                  er_code = dict['erCode']
120                  if er_code == '60307': list_retry.append( file )
# Line 149 | Line 123 | class cmscp:
123                      list_ok.append(file)
124                      reason = 'Copy succedeed with %s utils'%prot
125                      upDict = self.updateReport(file, er_code, reason)
126 <                    copy_results.update(upDict)
126 >                    copy_results.update(upDict)
127              results.update(copy_results)
128 <            if len(list_ok) != 0:  
128 >            if len(list_ok) != 0:
129                  msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
130 <               # print msg
131 <            if len(list_ok) == len(list_files) :
130 >                if self.debug : print msg
131 >            if len(list_ok) == len(list_files) :
132                  break
133              else:
134 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
134 >                if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
135                  if len(list_retry): list_files = list_retry
136 <                else: break
137 <            count =+1  
138 <
139 <        #### TODO Daniele
136 >                else: break
137 >            count =+1
138 >
139 >        #### TODO Daniele
140          #check is something fails and created related dict
141 <  #      backup = self.analyzeResults(results)
142 <  
143 <  #      if backup :  
141 >  #      backup = self.analyzeResults(results)
142 >
143 >  #      if backup :
144    #          msg = 'WARNING: backup logic is under implementation\n'
145    #          #backupDict = self.backup()
146 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name  
146 >  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
147    #          results.update(backupDict)
148    #          print msg
149          return results
150  
151      def initializeApi(self, protocol ):
152          """
153 <        Instantiate storage interface  
153 >        Instantiate storage interface
154          """
155 <        source_prot = protocol
156 <        dest_prot = protocol
157 <        if self.source == '' : source_prot = 'local'
158 <        Source_SE  = self.storageInterface( self.source, source_prot )
159 <        if self.destination == '' : dest_prot = 'local'
160 <        Destination_SE = self.storageInterface( self.destination, dest_prot )
155 >        source_prot = protocol
156 >        dest_prot = protocol
157 >        if not self.params['source'] : source_prot = 'local'
158 >        Source_SE  = self.storageInterface( self.params['source'], source_prot )
159 >        if not self.params['destination'] : dest_prot = 'local'
160 >        Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
161  
162          if self.debug :
163 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
164 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
163 >            print '(source=%s,  protocol=%s)'%(self.params['source'], source_prot)
164 >            print '(destination=%s,  protocol=%s)'%(self.params['destination'], dest_prot)
165  
166          return Source_SE, Destination_SE
167  
168 <    def copy( self, list_file, protocol ):
168 >    def copy( self, list_file, protocol, options ):
169          """
170 <        Make the real file copy using SE API
170 >        Make the real file copy using SE API
171          """
172          if self.debug :
173 <            print 'copy(): using %s protocol'%protocol
173 >            print 'copy(): using %s protocol'%protocol
174          Source_SE, Destination_SE = self.initializeApi( protocol )
175  
176 <        # create remote dir
177 <        if protocol in ['gridftp','rfio']:
176 >        # create remote dir
177 >        if protocol == 'gridftp':
178              self.createDir( Destination_SE, protocol )
179  
180          ## prepare for real copy  ##
181 <        sbi = SBinterface( Source_SE, Destination_SE )
182 <        sbi_dest = SBinterface(Destination_SE)
181 >        try :
182 >            sbi = SBinterface( Source_SE, Destination_SE )
183 >            sbi_dest = SBinterface(Destination_SE)
184 >        except Exception, ex:
185 >            msg = ''
186 >            if self.debug : msg = str(ex)+'\n'
187 >            msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
188 >            raise msg
189  
190          results = {}
191 <        ## loop over the complete list of files
192 <        for filetocopy in list_file:
191 >        ## loop over the complete list of files
192 >        for filetocopy in list_file:
193              if self.debug : print 'start real copy for %s'%filetocopy
194 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
195 <            if ErCode == '0':
196 <                ErCode, msg = self.makeCopy( sbi, filetocopy )
197 <            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
194 >            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
195 >            if ErCode == '0':
196 >                ErCode, msg = self.makeCopy( sbi, filetocopy , options )
197 >            if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
198              results.update( self.updateReport(filetocopy, ErCode, msg))
199          return results
220    
221    def updateReport(self, file, erCode, reason, lfn='', se='' ):
222        """
223        Update the final stage out infos
224        """
225        jobStageInfo={}
226        jobStageInfo['erCode']=erCode
227        jobStageInfo['reason']=reason
228        jobStageInfo['lfn']=lfn
229        jobStageInfo['se']=se
200  
231        report = { file : jobStageInfo}
232        return report
233
234    def finalReport( self , results, middleware ):
235        """
236        It should return a clear list of LFNs for each SE where data are stored.
237        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
238        """
239        if middleware:
240            outFile = open('cmscpReport.sh',"a")
241            cmscp_exit_status = 0
242            txt = ''
243            for file, dict in results.iteritems():
244                if dict['lfn']=='':
245                    lfn = '$LFNBaseName/'+os.path.basename(file)
246                    se  = '$SE'
247                else:
248                    lfn = dict['lfn']+os.pat.basename(file)
249                    se = dict['se']      
250                #dict['lfn'] # to be implemented
251                txt +=  'echo "Report for File: '+file+'"\n'
252                txt +=  'echo "LFN: '+lfn+'"\n'  
253                txt +=  'echo "StorageElement: '+se+'"\n'  
254                txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
255                txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
256                if dict['erCode'] != '0':
257                    cmscp_exit_status = dict['erCode']
258            txt += '\n'
259            txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
260            txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
261            outFile.write(str(txt))
262            outFile.close()
263        else:
264            for file, code in results.iteritems():
265                print 'error code = %s for file %s'%(code,file)
266        return
201  
202      def storageInterface( self, endpoint, protocol ):
203          """
204 <        Create the storage interface.
204 >        Create the storage interface.
205          """
206          try:
207              interface = SElement( FullPath(endpoint), protocol )
208          except Exception, ex:
209 <            msg = ''
209 >            msg = ''
210              if self.debug : msg = str(ex)+'\n'
211 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
212 <            print msg
211 >            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
212 >            raise msg
213  
214          return interface
215  
216      def checkDir(self, Destination_SE, protocol):
217          '''
218          ToBeImplemented NEEDED for castor
219 <        '''
219 >        '''
220          return
221 <
221 >
222      def createDir(self, Destination_SE, protocol):
223          """
224 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
225 <        this should be transparent at SE API level.
224 >        Create remote dir for gsiftp REALLY TEMPORARY
225 >        this should be transparent at SE API level.
226          """
227          ErCode = '0'
294        msg_1 = ''
228          try:
229              action = SBinterface( Destination_SE )
230              action.createDir()
231              if self.debug: print "The directory has been created using protocol %s\n"%protocol
232          except Exception, ex:
233 <            msg = ''
233 >            msg = ''
234              if self.debug : msg = str(ex)+'\n'
235 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
236 <            msg += msg_1
304 <            ErCode = '60316'  
305 <            #print msg
235 >            msg = "ERROR: problem with the directory creation using %s protocol \n"%protocol
236 >            ErCode = '60316'
237  
238 <        return ErCode, msg_1
238 >        return ErCode, msg
239  
240 <    def checkFileExist(self, sbi, filetocopy):
240 >    def checkFileExist( self, sbi, filetocopy ):
241 >        """
242 >        Check if file to copy already exist
243          """
244 <        Check if file to copy already exist  
245 <        """
244 >        ErCode = '0'
245 >        msg = ''
246          try:
247              check = sbi.checkExists(filetocopy)
248          except Exception, ex:
249 <            msg = ''
249 >            msg = ''
250              if self.debug : msg = str(ex)+'\n'
251 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
252 <           # print msg
253 <        ErCode = '0'
254 <        msg = ''
322 <        if check :
323 <            ErCode = '60303'
251 >            msg +='problems checkig if file already exist'
252 >            raise msg
253 >        if check :    
254 >            ErCode = '60303'
255              msg = "file %s already exist"%filetocopy
325            print msg
256  
257 <        return ErCode,msg  
257 >        return ErCode,msg
258  
259 <    def makeCopy(self, sbi, filetocopy ):  
259 >    def makeCopy(self, sbi, filetocopy, option ):
260          """
261 <        call the copy API.  
261 >        call the copy API.
262          """
263 <        path = os.path.dirname(filetocopy)  
263 >        path = os.path.dirname(filetocopy)
264          file_name =  os.path.basename(filetocopy)
265          source_file = filetocopy
266          dest_file = file_name ## to be improved supporting changing file name  TODO
267 <        if self.source == '' and path == '':
267 >        if self.params['source'] == '' and path == '':
268              source_file = os.path.abspath(filetocopy)
269 <        elif self.destination =='':
269 >        elif self.params['destination'] =='':
270              dest_file = os.path.join(os.getcwd(),file_name)
271 <        elif self.source != '' and self.destination != '' :
272 <            source_file = file_name  
271 >        elif self.params['source'] != '' and self.params['destination'] != '' :
272 >            source_file = file_name
273 >
274          ErCode = '0'
275          msg = ''
276 +
277          try:
278 <            pippo = sbi.copy( source_file , dest_file )
279 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
278 >            sbi.copy( source_file , dest_file , opt = option)
279 >            #if self.protocol == 'srm' : self.checkSize( sbi, filetocopy ) ## TODO
280          except Exception, ex:
281 <            msg = ''
281 >            msg = ''
282              if self.debug : msg = str(ex)+'\n'
283 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
283 >            msg = "Problem copying %s file" % filetocopy
284              ErCode = '60307'
353            #print msg
285  
286          return ErCode, msg
287 <  
287 >
288      '''
289      def checkSize()
290          """
291 <        Using srm needed a check of the ouptut file size.  
291 >        Using srm needed a check of the ouptut file size.
292          """
293 <    
293 >
294          echo "--> remoteSize = $remoteSize"
295          ## for local file
296          localSize=$(stat -c%s "$path_out_file")
# Line 369 | Line 300 | class cmscp:
300              echo "Copy failed: removing remote file $destination"
301                  srmrm $destination
302                  cmscp_exit_status=60307
303 <      
304 <      
303 >
304 >
305                  echo "Problem copying $path_out_file to $destination with srmcp command"
306                  StageOutExitStatusReason='remote and local file dimension not match'
307                  echo "StageOutReport = `cat ./srmcp.report`"
308 <    '''
309 <    def backup(self):
308 >    '''
309 >    def backup(self):
310          """
311          Check infos from TFC using existing api obtaining:
312          1)destination
# Line 383 | Line 314 | class cmscp:
314          """
315          return
316  
317 <    def usage(self):
317 >    def updateReport(self, file, erCode, reason, lfn='', se='' ):
318 >        """
319 >        Update the final stage out infos
320 >        """
321 >        jobStageInfo={}
322 >        jobStageInfo['erCode']=erCode
323 >        jobStageInfo['reason']=reason
324 >        jobStageInfo['lfn']=lfn
325 >        jobStageInfo['se']=se
326  
327 <        msg="""
328 <        required parameters:
329 <        --source :: REMOTE           :      
330 <        --dest   :: REMOTE           :  
392 <        --debug             :
393 <        --inFile :: absPath : or name NOT RELATIVE PATH
394 <        --outFIle :: onlyNAME : NOT YET SUPPORTED
395 <
396 <        optional parameters      
327 >        report = { file : jobStageInfo}
328 >        return report
329 >
330 >    def finalReport( self , results ):
331          """
332 <        return msg
332 >        It a list of LFNs for each SE where data are stored.
333 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
334 >        """
335 >        outFile = open('cmscpReport.sh',"a")
336 >        cmscp_exit_status = 0
337 >        txt = ''
338 >        for file, dict in results.iteritems():
339 >            if dict['lfn']=='':
340 >                lfn = '$LFNBaseName/'+os.path.basename(file)
341 >                se  = '$SE'
342 >            else:
343 >                lfn = dict['lfn']+os.pat.basename(file)
344 >                se = dict['se']
345 >            #dict['lfn'] # to be implemented
346 >            txt +=  'echo "Report for File: '+file+'"\n'
347 >            txt +=  'echo "LFN: '+lfn+'"\n'
348 >            txt +=  'echo "StorageElement: '+se+'"\n'
349 >            txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
350 >            txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
351 >            if dict['erCode'] != '0':
352 >                cmscp_exit_status = dict['erCode']
353 >        txt += '\n'
354 >        txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
355 >        txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
356 >        outFile.write(str(txt))
357 >        outFile.close()
358 >        return
359 >
360 >
361 > def usage():
362 >
363 >    msg="""
364 >    required parameters:
365 >    --source        :: REMOTE           :
366 >    --destination   :: REMOTE           :
367 >    --debug             :
368 >    --inFile :: absPath : or name NOT RELATIVE PATH
369 >    --outFIle :: onlyNAME : NOT YET SUPPORTED
370 >
371 >    optional parameters
372 >    """
373 >    print msg
374 >
375 >    return
376 >
377 > def HelpOptions(opts=[]):
378 >    """
379 >    Check otps, print help if needed
380 >    prepare dict = { opt : value }  
381 >    """
382 >    dict_args = {}
383 >    if len(opts):
384 >        for opt, arg in opts:
385 >            dict_args[opt.split('--')[1]] = arg
386 >            if opt in ('-h','-help','--help') :
387 >                usage()
388 >                sys.exit(0)
389 >        return dict_args
390 >    else:
391 >        usage()
392 >        sys.exit(0)
393  
394   if __name__ == '__main__' :
395 +
396 +    import getopt
397 +
398 +    allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
399 +                  "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
400 +    try:    
401 +        opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
402 +    except getopt.GetoptError, err:
403 +        print err
404 +        HelpOptions()
405 +        sys.exit(2)
406 +
407 +    dictArgs = HelpOptions(opts)
408      try:
409 <        cmscp_ = cmscp(sys.argv[1:])
409 >        cmscp_ = cmscp(dictArgs)
410          cmscp_.run()
411      except:
412          pass

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines