ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.5
Committed: Sat Sep 27 11:00:08 2008 UTC (16 years, 7 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +1 -1 lines
Log Message:
Fix on passing option to copy command

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3     import sys, getopt, string
4     import os, popen2
5     from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6     from ProdCommon.Storage.SEAPI.SBinterface import *
7    
8    
9    
10     class cmscp:
11     def __init__(self, argv):
12     """
13     cmscp
14    
15     safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16     including success checking version also for CAF using rfcp command to copy the output to SE
17     input:
18     $1 middleware (CAF, LSF, LCG, OSG)
19 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
20     $3 if needed: file name (the output file name)
21     $5 remote SE (complete endpoint)
22     $6 srm version
23 spiga 1.1 output:
24     return 0 if all ok
25     return 60307 if srmcp failed
26     return 60303 if file already exists in the SE
27     """
28     #set default
29     self.debug = 0
30     self.source = ''
31     self.destination = ''
32     self.file_to_copy = []
33     self.remote_file_name = []
34     self.protocol = ''
35     self.middleware = ''
36     self.srmv = ''
37 spiga 1.4 self.lcgOpt='-b -D srmv2 --vo cms -t 2400 --verbose'
38     self.opt=''
39 spiga 1.1 try:
40     opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
41     "protocol=", "middleware=", "srm_version=", "debug", "help"])
42     except getopt.GetoptError:
43     print self.usage()
44     sys.exit(2)
45    
46     self.setAndCheck(opts)
47    
48     return
49    
50     def setAndCheck( self, opts ):
51     """
52     Set and check command line parameter
53     """
54     if not opts :
55     print self.usage()
56     sys.exit()
57     for opt, arg in opts :
58     if opt == "--help" :
59     print self.usage()
60     sys.exit()
61     elif opt == "--debug" :
62     self.debug = 1
63     elif opt == "--source" :
64     self.source = arg
65     elif opt == "--destination":
66     self.destination = arg
67     elif opt == "--inputFileList":
68     infile = arg
69     elif opt == "--outputFileList":
70     out_file
71     elif opt == "--protocol":
72     self.protocol = arg
73     elif opt == "--middleware":
74     self.middleware = arg
75     elif opt == "--srm_version":
76     self.srmv = arg
77    
78     # source and dest cannot be undefined at same time
79     if self.source == '' and self.destination == '':
80     print self.usage()
81     sys.exit()
82     # if middleware is not defined --> protocol cannot be empty
83     if self.middleware == '' and self.protocol == '':
84     print self.usage()
85     sys.exit()
86     # input file must be defined
87     if infile == '':
88     print self.usage()
89     sys.exit()
90     else:
91     if infile.find(','):
92     [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
93     else:
94     self.file_to_copy.append(infile)
95    
96     ## TO DO:
97     #### add check for outFiles
98     #### add map {'inFileNAME':'outFileNAME'} to change out name
99    
100     return
101    
102     def run( self ):
103     """
104     Check if running on UI (no $middleware) or
105     on WN (on the Grid), and take different action
106     """
107     if self.middleware :
108     results = self.stager()
109     else:
110 spiga 1.4 results = self.copy( self.file_to_copy, self.protocol , self.opt)
111 spiga 1.1
112     self.finalReport(results,self.middleware)
113    
114     return
115    
116     def setProtocol( self ):
117     """
118     define the allowed potocols based on $middlware
119     which depend on scheduler
120     """
121     if self.middleware.lower() in ['osg','lcg']:
122 spiga 1.4 supported_protocol = {'srm-lcg': self.lcgOpt }#,
123     # 'srmv2' : '' }
124 spiga 1.1 elif self.middleware.lower() in ['lsf','caf']:
125 spiga 1.4 supported_protocol = {'rfio': '' }
126 spiga 1.1 else:
127     ## here we can add support for any kind of protocol,
128     ## maybe some local schedulers need something dedicated
129     pass
130     return supported_protocol
131    
132     def stager( self ):
133     """
134     Implement the logic for remote stage out
135     """
136     protocols = self.setProtocol()
137     count=0
138     list_files = self.file_to_copy
139     results={}
140 spiga 1.4 for prot in protocols.keys():
141 spiga 1.1 if self.debug: print 'Trying stage out with %s utils \n'%prot
142 spiga 1.4 copy_results = self.copy( list_files, prot, protocols[prot] )
143 spiga 1.1 list_retry = []
144     list_existing = []
145     list_ok = []
146     for file, dict in copy_results.iteritems():
147     er_code = dict['erCode']
148     if er_code == '60307': list_retry.append( file )
149     elif er_code == '60303': list_existing.append( file )
150     else:
151     list_ok.append(file)
152     reason = 'Copy succedeed with %s utils'%prot
153     upDict = self.updateReport(file, er_code, reason)
154     copy_results.update(upDict)
155     results.update(copy_results)
156     if len(list_ok) != 0:
157     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
158     # print msg
159     if len(list_ok) == len(list_files) :
160     break
161     else:
162     # print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
163     if len(list_retry): list_files = list_retry
164     else: break
165     count =+1
166    
167     #### TODO Daniele
168     #check is something fails and created related dict
169     # backup = self.analyzeResults(results)
170    
171     # if backup :
172     # msg = 'WARNING: backup logic is under implementation\n'
173     # #backupDict = self.backup()
174 spiga 1.2 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
175 spiga 1.1 # results.update(backupDict)
176     # print msg
177     return results
178    
179     def initializeApi(self, protocol ):
180     """
181     Instantiate storage interface
182     """
183     source_prot = protocol
184     dest_prot = protocol
185     if self.source == '' : source_prot = 'local'
186     Source_SE = self.storageInterface( self.source, source_prot )
187     if self.destination == '' : dest_prot = 'local'
188     Destination_SE = self.storageInterface( self.destination, dest_prot )
189    
190     if self.debug :
191     print '(source=%s, protocol=%s)'%(self.source, source_prot)
192     print '(destination=%s, protocol=%s)'%(self.destination, dest_prot)
193    
194     return Source_SE, Destination_SE
195    
196 spiga 1.4 def copy( self, list_file, protocol, opt):
197 spiga 1.1 """
198     Make the real file copy using SE API
199     """
200     if self.debug :
201     print 'copy(): using %s protocol'%protocol
202     Source_SE, Destination_SE = self.initializeApi( protocol )
203    
204     # create remote dir
205     if protocol in ['gridftp','rfio']:
206     self.createDir( Destination_SE, protocol )
207    
208     ## prepare for real copy ##
209     sbi = SBinterface( Source_SE, Destination_SE )
210     sbi_dest = SBinterface(Destination_SE)
211    
212     results = {}
213     ## loop over the complete list of files
214     for filetocopy in list_file:
215     if self.debug : print 'start real copy for %s'%filetocopy
216     ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
217     if ErCode == '0':
218 spiga 1.4 ErCode, msg = self.makeCopy( sbi, filetocopy , opt)
219 spiga 1.1 if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
220     results.update( self.updateReport(filetocopy, ErCode, msg))
221     return results
222    
223     def updateReport(self, file, erCode, reason, lfn='', se='' ):
224     """
225     Update the final stage out infos
226     """
227     jobStageInfo={}
228     jobStageInfo['erCode']=erCode
229     jobStageInfo['reason']=reason
230     jobStageInfo['lfn']=lfn
231     jobStageInfo['se']=se
232    
233     report = { file : jobStageInfo}
234     return report
235    
236     def finalReport( self , results, middleware ):
237     """
238     It should return a clear list of LFNs for each SE where data are stored.
239     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
240     """
241     if middleware:
242     outFile = open('cmscpReport.sh',"a")
243     cmscp_exit_status = 0
244     txt = ''
245     for file, dict in results.iteritems():
246     if dict['lfn']=='':
247     lfn = '$LFNBaseName/'+os.path.basename(file)
248     se = '$SE'
249     else:
250     lfn = dict['lfn']+os.pat.basename(file)
251     se = dict['se']
252     #dict['lfn'] # to be implemented
253     txt += 'echo "Report for File: '+file+'"\n'
254     txt += 'echo "LFN: '+lfn+'"\n'
255     txt += 'echo "StorageElement: '+se+'"\n'
256     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
257 spiga 1.2 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
258 spiga 1.1 if dict['erCode'] != '0':
259     cmscp_exit_status = dict['erCode']
260     txt += '\n'
261     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
262     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
263     outFile.write(str(txt))
264     outFile.close()
265     else:
266     for file, code in results.iteritems():
267     print 'error code = %s for file %s'%(code,file)
268     return
269    
270     def storageInterface( self, endpoint, protocol ):
271     """
272     Create the storage interface.
273     """
274     try:
275     interface = SElement( FullPath(endpoint), protocol )
276     except Exception, ex:
277     msg = ''
278     if self.debug : msg = str(ex)+'\n'
279     msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
280     print msg
281    
282     return interface
283    
284     def checkDir(self, Destination_SE, protocol):
285     '''
286     ToBeImplemented NEEDED for castor
287     '''
288     return
289    
290     def createDir(self, Destination_SE, protocol):
291     """
292     Create remote dir for gsiftp/rfio REALLY TEMPORARY
293     this should be transparent at SE API level.
294     """
295     ErCode = '0'
296     msg_1 = ''
297     try:
298     action = SBinterface( Destination_SE )
299     action.createDir()
300     if self.debug: print "The directory has been created using protocol %s\n"%protocol
301     except Exception, ex:
302     msg = ''
303     if self.debug : msg = str(ex)+'\n'
304     msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
305     msg += msg_1
306     ErCode = '60316'
307     #print msg
308    
309     return ErCode, msg_1
310    
311     def checkFileExist(self, sbi, filetocopy):
312     """
313     Check if file to copy already exist
314     """
315     try:
316     check = sbi.checkExists(filetocopy)
317     except Exception, ex:
318     msg = ''
319     if self.debug : msg = str(ex)+'\n'
320     msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
321     # print msg
322     ErCode = '0'
323     msg = ''
324     if check :
325     ErCode = '60303'
326     msg = "file %s already exist"%filetocopy
327     print msg
328    
329     return ErCode,msg
330    
331 spiga 1.4 def makeCopy(self, sbi, filetocopy, opt ):
332 spiga 1.1 """
333     call the copy API.
334     """
335     path = os.path.dirname(filetocopy)
336     file_name = os.path.basename(filetocopy)
337     source_file = filetocopy
338     dest_file = file_name ## to be improved supporting changing file name TODO
339     if self.source == '' and path == '':
340     source_file = os.path.abspath(filetocopy)
341     elif self.destination =='':
342     dest_file = os.path.join(os.getcwd(),file_name)
343     elif self.source != '' and self.destination != '' :
344     source_file = file_name
345     ErCode = '0'
346     msg = ''
347 spiga 1.4
348 spiga 1.1 try:
349 mcinquil 1.5 pippo = sbi.copy( source_file , dest_file , opt = opt)
350 spiga 1.1 if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
351     except Exception, ex:
352     msg = ''
353     if self.debug : msg = str(ex)+'\n'
354     msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
355     ErCode = '60307'
356     #print msg
357    
358     return ErCode, msg
359    
360     '''
361     def checkSize()
362     """
363     Using srm needed a check of the ouptut file size.
364     """
365    
366     echo "--> remoteSize = $remoteSize"
367     ## for local file
368     localSize=$(stat -c%s "$path_out_file")
369     echo "--> localSize = $localSize"
370     if [ $localSize != $remoteSize ]; then
371     echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
372     echo "Copy failed: removing remote file $destination"
373     srmrm $destination
374     cmscp_exit_status=60307
375    
376    
377     echo "Problem copying $path_out_file to $destination with srmcp command"
378     StageOutExitStatusReason='remote and local file dimension not match'
379     echo "StageOutReport = `cat ./srmcp.report`"
380     '''
381     def backup(self):
382     """
383     Check infos from TFC using existing api obtaining:
384     1)destination
385     2)protocol
386     """
387     return
388    
389     def usage(self):
390    
391     msg="""
392     required parameters:
393 spiga 1.3 --source :: REMOTE :
394     --destination :: REMOTE :
395 spiga 1.1 --debug :
396     --inFile :: absPath : or name NOT RELATIVE PATH
397     --outFIle :: onlyNAME : NOT YET SUPPORTED
398    
399     optional parameters
400     """
401     return msg
402    
403     if __name__ == '__main__' :
404     try:
405     cmscp_ = cmscp(sys.argv[1:])
406     cmscp_.run()
407     except:
408     pass
409