ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
(Generate patch)

Comparing COMP/CRAB/python/cmscp.py (file contents):
Revision 1.6 by spiga, Sun Sep 28 21:17:05 2008 UTC vs.
Revision 1.7 by ewv, Thu Oct 2 16:30:07 2008 UTC

# Line 2 | Line 2
2  
3   import sys, getopt, string
4   import os, popen2
5 < from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5 > from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6   from ProdCommon.Storage.SEAPI.SBinterface import *
7  
8  
# Line 12 | Line 12 | class cmscp:
12          """
13          cmscp
14  
15 <        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15 >        safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16          including success checking  version also for CAF using rfcp command to copy the output to SE
17          input:
18             $1 middleware (CAF, LSF, LCG, OSG)
19             $2 local file (the absolute path of output file or just the name if it's in top dir)
20             $3 if needed: file name (the output file name)
21             $5 remote SE (complete endpoint)
22 <           $6 srm version
22 >           $6 srm version
23          output:
24               return 0 if all ok
25               return 60307 if srmcp failed
# Line 28 | Line 28 | class cmscp:
28          #set default
29          self.debug = 0
30          self.source = ''
31 <        self.destination = ''
31 >        self.destination = ''
32          self.file_to_copy = []
33          self.remote_file_name = []
34          self.protocol = ''
# Line 36 | Line 36 | class cmscp:
36          self.srmv = ''
37  
38          # default for stage out approach To be used with "middleware"
39 <        self.lcgOpt='-b -D srmv2 --vo cms -t 2400 --verbose'  
39 >        self.lcgOpt='-b -D srmv2 --vo cms -t 2400 --verbose'
40          self.srmOpt='-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3'
41          self.rfioOpt=''
42          # default to be used with protocol
# Line 48 | Line 48 | class cmscp:
48          except getopt.GetoptError:
49              print self.usage()
50              sys.exit(2)
51 <
52 <        self.setAndCheck(opts)  
53 <        
51 >
52 >        self.setAndCheck(opts)
53 >
54          return
55  
56 <    def setAndCheck( self, opts ):
56 >    def setAndCheck( self, opts ):
57          """
58          Set and check command line parameter
59          """
# Line 69 | Line 69 | class cmscp:
69              elif opt == "--source" :
70                  self.source = arg
71              elif opt == "--destination":
72 <                self.destination = arg
72 >                self.destination = arg
73              elif opt == "--inputFileList":
74                  infile = arg
75              elif opt == "--outputFileList":
# Line 80 | Line 80 | class cmscp:
80                  self.middleware = arg
81              elif opt == "--srm_version":
82                  self.srmv = arg
83 <
84 <        # source and dest cannot be undefined at same time
83 >
84 >        # source and dest cannot be undefined at same time
85          if self.source == '' and self.destination == '':
86              print self.usage()
87              sys.exit()
88 <        # if middleware is not defined --> protocol cannot be empty  
88 >        # if middleware is not defined --> protocol cannot be empty
89          if self.middleware == '' and self.protocol == '':
90              print self.usage()
91              sys.exit()
92 <        # input file must be defined  
92 >        # input file must be defined
93          if infile == '':
94              print self.usage()
95              sys.exit()
96          else:
97              if infile.find(','):
98                  [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
99 <            else:
99 >            else:
100                  self.file_to_copy.append(infile)
101 <        
101 >
102          ## TO DO:
103          #### add check for outFiles
104          #### add map {'inFileNAME':'outFileNAME'} to change out name
105  
106          return
107  
108 <    def run( self ):  
108 >    def run( self ):
109          """
110 <        Check if running on UI (no $middleware) or
111 <        on WN (on the Grid), and take different action  
110 >        Check if running on UI (no $middleware) or
111 >        on WN (on the Grid), and take different action
112          """
113 <        if self.middleware :  
114 <           results = self.stager()
113 >        if self.middleware :
114 >           results = self.stager()
115          else:
116             results = self.copy( self.file_to_copy, self.protocol , self.opt)
117  
118 <        self.finalReport(results,self.middleware)
118 >        self.finalReport(results,self.middleware)
119 >
120 >        return
121  
122 <        return
121 <    
122 <    def setProtocol( self ):    
122 >    def setProtocol( self ):
123          """
124          define the allowed potocols based on $middlware
125 <        which depend on scheduler
125 >        which depend on scheduler
126          """
127          if self.middleware.lower() in ['osg','lcg']:
128              supported_protocol = ['srm-lcg','srmv2']
129 <            self.OptMap = {'srm-lcg': self.lcgOpt,
129 >            self.OptMap = {'srm-lcg': self.lcgOpt,
130                             'srmv2': self.srmOpt }
131          elif self.middleware.lower() in ['lsf','caf']:
132              supported_protocol = ['rfio']
133              self.OptMap = {'rfio': self.rfioOpt }
134          else:
135 <            ## here we can add support for any kind of protocol,
135 >            ## here we can add support for any kind of protocol,
136              ## maybe some local schedulers need something dedicated
137              pass
138          return supported_protocol
139  
140 <    def stager( self ):              
140 >    def stager( self ):
141          """
142          Implement the logic for remote stage out
143          """
144 <        protocols = self.setProtocol()  
145 <        count=0
144 >        protocols = self.setProtocol()
145 >        count=0
146          list_files = self.file_to_copy
147          results={}
148          for prot in protocols:
149 <            if self.debug: print 'Trying stage out with %s utils \n'%prot
149 >            if self.debug: print 'Trying stage out with %s utils \n'%prot
150              copy_results = self.copy( list_files, prot, self.OptMap[prot] )
151 <            list_retry = []
152 <            list_existing = []
153 <            list_ok = []
151 >            list_retry = []
152 >            list_existing = []
153 >            list_ok = []
154              for file, dict in copy_results.iteritems():
155                  er_code = dict['erCode']
156                  if er_code == '60307': list_retry.append( file )
# Line 159 | Line 159 | class cmscp:
159                      list_ok.append(file)
160                      reason = 'Copy succedeed with %s utils'%prot
161                      upDict = self.updateReport(file, er_code, reason)
162 <                    copy_results.update(upDict)
162 >                    copy_results.update(upDict)
163              results.update(copy_results)
164 <            if len(list_ok) != 0:  
164 >            if len(list_ok) != 0:
165                  msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
166                 # print msg
167 <            if len(list_ok) == len(list_files) :
167 >            if len(list_ok) == len(list_files) :
168                  break
169              else:
170 <         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
170 >         #       print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
171                  if len(list_retry): list_files = list_retry
172 <                else: break
173 <            count =+1  
174 <
175 <        #### TODO Daniele
172 >                else: break
173 >            count =+1
174 >
175 >        #### TODO Daniele
176          #check is something fails and created related dict
177 <  #      backup = self.analyzeResults(results)
178 <  
179 <  #      if backup :  
177 >  #      backup = self.analyzeResults(results)
178 >
179 >  #      if backup :
180    #          msg = 'WARNING: backup logic is under implementation\n'
181    #          #backupDict = self.backup()
182 <  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name  
182 >  #          ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
183    #          results.update(backupDict)
184    #          print msg
185          return results
186  
187      def initializeApi(self, protocol ):
188          """
189 <        Instantiate storage interface  
189 >        Instantiate storage interface
190          """
191 <        source_prot = protocol
192 <        dest_prot = protocol
191 >        source_prot = protocol
192 >        dest_prot = protocol
193          if self.source == '' : source_prot = 'local'
194          Source_SE  = self.storageInterface( self.source, source_prot )
195          if self.destination == '' : dest_prot = 'local'
196          Destination_SE = self.storageInterface( self.destination, dest_prot )
197  
198          if self.debug :
199 <            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
200 <            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
199 >            print '(source=%s,  protocol=%s)'%(self.source, source_prot)
200 >            print '(destination=%s,  protocol=%s)'%(self.destination, dest_prot)
201  
202          return Source_SE, Destination_SE
203  
204      def copy( self, list_file, protocol, opt):
205          """
206 <        Make the real file copy using SE API
206 >        Make the real file copy using SE API
207          """
208          if self.debug :
209 <            print 'copy(): using %s protocol'%protocol
209 >            print 'copy(): using %s protocol'%protocol
210          Source_SE, Destination_SE = self.initializeApi( protocol )
211  
212 <        # create remote dir
212 >        # create remote dir
213          if protocol in ['gridftp','rfio']:
214              self.createDir( Destination_SE, protocol )
215  
216          ## prepare for real copy  ##
217          sbi = SBinterface( Source_SE, Destination_SE )
218 <        sbi_dest = SBinterface(Destination_SE)
218 >        sbi_dest = SBinterface(Destination_SE)
219  
220          results = {}
221 <        ## loop over the complete list of files
222 <        for filetocopy in list_file:
221 >        ## loop over the complete list of files
222 >        for filetocopy in list_file:
223              if self.debug : print 'start real copy for %s'%filetocopy
224 <            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
225 <            if ErCode == '0':
224 >            ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
225 >            if ErCode == '0':
226                  ErCode, msg = self.makeCopy( sbi, filetocopy , opt)
227              if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
228              results.update( self.updateReport(filetocopy, ErCode, msg))
229          return results
230 <    
230 >
231      def updateReport(self, file, erCode, reason, lfn='', se='' ):
232 <        """
233 <        Update the final stage out infos
234 <        """
235 <        jobStageInfo={}
232 >        """
233 >        Update the final stage out infos
234 >        """
235 >        jobStageInfo={}
236          jobStageInfo['erCode']=erCode
237          jobStageInfo['reason']=reason
238 <        jobStageInfo['lfn']=lfn
239 <        jobStageInfo['se']=se
238 >        jobStageInfo['lfn']=lfn
239 >        jobStageInfo['se']=se
240  
241          report = { file : jobStageInfo}
242          return report
# Line 244 | Line 244 | class cmscp:
244      def finalReport( self , results, middleware ):
245          """
246          It should return a clear list of LFNs for each SE where data are stored.
247 <        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.  
247 >        allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
248          """
249          if middleware:
250              outFile = open('cmscpReport.sh',"a")
251              cmscp_exit_status = 0
252 <            txt = ''
252 >            txt = ''
253              for file, dict in results.iteritems():
254                  if dict['lfn']=='':
255                      lfn = '$LFNBaseName/'+os.path.basename(file)
256                      se  = '$SE'
257                  else:
258                      lfn = dict['lfn']+os.pat.basename(file)
259 <                    se = dict['se']      
259 >                    se = dict['se']
260                  #dict['lfn'] # to be implemented
261 <                txt +=  'echo "Report for File: '+file+'"\n'
262 <                txt +=  'echo "LFN: '+lfn+'"\n'  
263 <                txt +=  'echo "StorageElement: '+se+'"\n'  
261 >                txt +=  'echo "Report for File: '+file+'"\n'
262 >                txt +=  'echo "LFN: '+lfn+'"\n'
263 >                txt +=  'echo "StorageElement: '+se+'"\n'
264                  txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
265                  txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
266                  if dict['erCode'] != '0':
# Line 270 | Line 270 | class cmscp:
270              txt +=  'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
271              outFile.write(str(txt))
272              outFile.close()
273 <        else:
273 >        else:
274              for file, code in results.iteritems():
275 <                print 'error code = %s for file %s'%(code,file)
275 >                print 'error code = %s for file %s'%(code,file)
276          return
277  
278      def storageInterface( self, endpoint, protocol ):
279          """
280 <        Create the storage interface.
280 >        Create the storage interface.
281          """
282          try:
283              interface = SElement( FullPath(endpoint), protocol )
284          except Exception, ex:
285 <            msg = ''
285 >            msg = ''
286              if self.debug : msg = str(ex)+'\n'
287 <            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol  
287 >            msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
288              print msg
289  
290          return interface
# Line 292 | Line 292 | class cmscp:
292      def checkDir(self, Destination_SE, protocol):
293          '''
294          ToBeImplemented NEEDED for castor
295 <        '''
295 >        '''
296          return
297 <
297 >
298      def createDir(self, Destination_SE, protocol):
299          """
300 <        Create remote dir for gsiftp/rfio REALLY TEMPORARY
301 <        this should be transparent at SE API level.
300 >        Create remote dir for gsiftp/rfio REALLY TEMPORARY
301 >        this should be transparent at SE API level.
302          """
303          ErCode = '0'
304          msg_1 = ''
# Line 307 | Line 307 | class cmscp:
307              action.createDir()
308              if self.debug: print "The directory has been created using protocol %s\n"%protocol
309          except Exception, ex:
310 <            msg = ''
310 >            msg = ''
311              if self.debug : msg = str(ex)+'\n'
312 <            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
312 >            msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
313              msg += msg_1
314 <            ErCode = '60316'  
314 >            ErCode = '60316'
315              #print msg
316  
317 <        return ErCode, msg_1
317 >        return ErCode, msg_1
318  
319      def checkFileExist(self, sbi, filetocopy):
320          """
321 <        Check if file to copy already exist  
322 <        """
321 >        Check if file to copy already exist
322 >        """
323          try:
324              check = sbi.checkExists(filetocopy)
325          except Exception, ex:
326 <            msg = ''
326 >            msg = ''
327              if self.debug : msg = str(ex)+'\n'
328 <            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
328 >            msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
329             # print msg
330          ErCode = '0'
331          msg = ''
332 <        if check :
333 <            ErCode = '60303'
332 >        if check :
333 >            ErCode = '60303'
334              msg = "file %s already exist"%filetocopy
335              print msg
336  
337 <        return ErCode,msg  
337 >        return ErCode,msg
338  
339 <    def makeCopy(self, sbi, filetocopy, opt ):  
339 >    def makeCopy(self, sbi, filetocopy, opt ):
340          """
341 <        call the copy API.  
341 >        call the copy API.
342          """
343 <        path = os.path.dirname(filetocopy)  
343 >        path = os.path.dirname(filetocopy)
344          file_name =  os.path.basename(filetocopy)
345          source_file = filetocopy
346          dest_file = file_name ## to be improved supporting changing file name  TODO
347          if self.source == '' and path == '':
348              source_file = os.path.abspath(filetocopy)
349 <        elif self.destination =='':
349 >        elif self.destination =='':
350              dest_file = os.path.join(os.getcwd(),file_name)
351          elif self.source != '' and self.destination != '' :
352 <            source_file = file_name  
352 >            source_file = file_name
353          ErCode = '0'
354          msg = ''
355 <
355 >
356          try:
357              pippo = sbi.copy( source_file , dest_file , opt = opt)
358 <            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
358 >            if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
359          except Exception, ex:
360 <            msg = ''
360 >            msg = ''
361              if self.debug : msg = str(ex)+'\n'
362 <            msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
362 >            msg = "Problem copying %s file" % filetocopy
363              ErCode = '60307'
364              #print msg
365  
366          return ErCode, msg
367 <  
367 >
368      '''
369      def checkSize()
370          """
371 <        Using srm needed a check of the ouptut file size.  
371 >        Using srm needed a check of the ouptut file size.
372          """
373 <    
373 >
374          echo "--> remoteSize = $remoteSize"
375          ## for local file
376          localSize=$(stat -c%s "$path_out_file")
# Line 380 | Line 380 | class cmscp:
380              echo "Copy failed: removing remote file $destination"
381                  srmrm $destination
382                  cmscp_exit_status=60307
383 <      
384 <      
383 >
384 >
385                  echo "Problem copying $path_out_file to $destination with srmcp command"
386                  StageOutExitStatusReason='remote and local file dimension not match'
387                  echo "StageOutReport = `cat ./srmcp.report`"
388 <    '''
389 <    def backup(self):
388 >    '''
389 >    def backup(self):
390          """
391          Check infos from TFC using existing api obtaining:
392          1)destination
# Line 396 | Line 396 | class cmscp:
396  
397      def usage(self):
398  
399 <        msg="""
399 >        msg="""
400          required parameters:
401 <        --source        :: REMOTE           :      
402 <        --destination   :: REMOTE           :  
401 >        --source        :: REMOTE           :
402 >        --destination   :: REMOTE           :
403          --debug             :
404          --inFile :: absPath : or name NOT RELATIVE PATH
405          --outFIle :: onlyNAME : NOT YET SUPPORTED
406 <
407 <        optional parameters      
406 >
407 >        optional parameters
408          """
409 <        return msg
409 >        return msg
410  
411   if __name__ == '__main__' :
412      try:

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines