ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.19
Committed: Thu Oct 23 17:29:36 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_2_pre1, CRAB_2_4_1, CRAB_2_4_1_pre4, CRAB_2_4_1_pre3
Changes since 1.18: +24 -6 lines
Log Message:
fixed multiple files copy... it works correctly also when first protocol fails. CheckSum coming with srm is skipped and performed by hand.

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 mcinquil 1.16 import sys, os
4 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
6 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
7 spiga 1.1
8    
9     class cmscp:
10 spiga 1.8 def __init__(self, args):
11 spiga 1.1 """
12     cmscp
13    
14 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
16     input:
17     $1 middleware (CAF, LSF, LCG, OSG)
18 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
19     $3 if needed: file name (the output file name)
20     $5 remote SE (complete endpoint)
21 ewv 1.7 $6 srm version
22 spiga 1.1 output:
23     return 0 if all ok
24     return 60307 if srmcp failed
25     return 60303 if file already exists in the SE
26     """
27 spiga 1.8
28 spiga 1.1 #set default
29 spiga 1.8 self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
30 spiga 1.11 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31 ewv 1.14 self.debug = 0
32    
33 spiga 1.8 self.params.update( args )
34 ewv 1.7
35 spiga 1.1 return
36    
37 spiga 1.8 def processOptions( self ):
38 spiga 1.1 """
39 spiga 1.8 check command line parameter
40 spiga 1.1 """
41 spiga 1.8
42 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
43     if 'debug' in self.params.keys(): self.debug = 1
44 ewv 1.7
45     # source and dest cannot be undefined at same time
46 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
47     HelpOptions()
48    
49 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
50 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
51     HelpOptions()
52    
53 ewv 1.7 # input file must be defined
54 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
55 spiga 1.1 else:
56 spiga 1.8 file_to_copy=[]
57 ewv 1.14 if self.params['inputFileList'].find(','):
58 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 ewv 1.7 else:
60 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
61     self.params['inputFileList'] = file_to_copy
62 ewv 1.7
63 spiga 1.1 ## TO DO:
64     #### add check for outFiles
65     #### add map {'inFileNAME':'outFileNAME'} to change out name
66    
67     return
68    
69 ewv 1.7 def run( self ):
70 spiga 1.1 """
71 ewv 1.7 Check if running on UI (no $middleware) or
72     on WN (on the Grid), and take different action
73 spiga 1.1 """
74 spiga 1.8
75     self.processOptions()
76     # stage out from WN
77     if self.params['middleware'] :
78     results = self.stager(self.params['middleware'],self.params['inputFileList'])
79     self.finalReport(results)
80     # Local interaction with SE
81 spiga 1.1 else:
82 mcinquil 1.16 results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'] )
83 spiga 1.8 return results
84 spiga 1.1
85 spiga 1.8 def setProtocol( self, middleware ):
86 spiga 1.1 """
87     define the allowed potocols based on $middlware
88 ewv 1.7 which depend on scheduler
89 spiga 1.1 """
90 spiga 1.8 # default To be used with "middleware"
91 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
92     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
93     srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
94     'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
95 spiga 1.8 rfioOpt=''
96    
97 ewv 1.14 supported_protocol = None
98     if middleware.lower() in ['osg','lcg','condor']:
99 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100     (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
102 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
103 spiga 1.1 else:
104 ewv 1.7 ## here we can add support for any kind of protocol,
105 spiga 1.1 ## maybe some local schedulers need something dedicated
106     pass
107     return supported_protocol
108    
109 spiga 1.8 def stager( self, middleware, list_files ):
110 spiga 1.1 """
111     Implement the logic for remote stage out
112     """
113 spiga 1.6 results={}
114 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
115 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
116 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
117 ewv 1.7 list_retry = []
118     list_existing = []
119     list_ok = []
120 spiga 1.13 if copy_results.keys() == '':
121     results.update(copy_results)
122     else:
123     for file, dict in copy_results.iteritems():
124     er_code = dict['erCode']
125     if er_code == '0':
126     list_ok.append(file)
127     reason = 'Copy succedeed with %s utils'%prot
128     upDict = self.updateReport(file, er_code, reason)
129     copy_results.update(upDict)
130     elif er_code == '60303': list_existing.append( file )
131     else: list_retry.append( file )
132     results.update(copy_results)
133     if len(list_ok) != 0:
134     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135     if self.debug : print msg
136     if len(list_ok) == len(list_files) :
137     break
138 spiga 1.1 else:
139 spiga 1.13 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
140     if len(list_retry): list_files = list_retry
141     else: break
142 ewv 1.7
143     #### TODO Daniele
144 spiga 1.1 #check is something fails and created related dict
145 ewv 1.7 # backup = self.analyzeResults(results)
146    
147     # if backup :
148 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
149     # #backupDict = self.backup()
150 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
151 spiga 1.1 # results.update(backupDict)
152     # print msg
153     return results
154    
155     def initializeApi(self, protocol ):
156     """
157 ewv 1.7 Instantiate storage interface
158 spiga 1.1 """
159 ewv 1.7 source_prot = protocol
160     dest_prot = protocol
161 spiga 1.8 if not self.params['source'] : source_prot = 'local'
162     Source_SE = self.storageInterface( self.params['source'], source_prot )
163     if not self.params['destination'] : dest_prot = 'local'
164     Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
165 spiga 1.1
166     if self.debug :
167 spiga 1.8 print '(source=%s, protocol=%s)'%(self.params['source'], source_prot)
168     print '(destination=%s, protocol=%s)'%(self.params['destination'], dest_prot)
169 spiga 1.1
170     return Source_SE, Destination_SE
171    
172 spiga 1.8 def copy( self, list_file, protocol, options ):
173 spiga 1.1 """
174 ewv 1.7 Make the real file copy using SE API
175 spiga 1.1 """
176     if self.debug :
177 ewv 1.7 print 'copy(): using %s protocol'%protocol
178 spiga 1.13 try:
179     Source_SE, Destination_SE = self.initializeApi( protocol )
180     except Exception, ex:
181     return self.updateReport('', '-1', str(ex))
182 ewv 1.14
183 ewv 1.7 # create remote dir
184 spiga 1.10 if protocol in ['gridftp','rfio']:
185 spiga 1.13 try:
186     self.createDir( Destination_SE, protocol )
187     except Exception, ex:
188     return self.updateReport('', '60316', str(ex))
189 spiga 1.1
190     ## prepare for real copy ##
191 spiga 1.8 try :
192     sbi = SBinterface( Source_SE, Destination_SE )
193     sbi_dest = SBinterface(Destination_SE)
194 spiga 1.11 except ProtocolMismatch, ex:
195     msg = str(ex)+'\n'
196 spiga 1.8 msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
197 spiga 1.13 return self.updateReport('', '-1', str(ex))
198 spiga 1.1
199     results = {}
200 ewv 1.7 ## loop over the complete list of files
201     for filetocopy in list_file:
202 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
203 spiga 1.13 try :
204     ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
205     except Exception, ex:
206 spiga 1.18 ErCode = -1
207     msg = str(ex)
208 ewv 1.7 if ErCode == '0':
209 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
210 spiga 1.8 if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
211 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
212     return results
213 ewv 1.7
214 spiga 1.1
215     def storageInterface( self, endpoint, protocol ):
216     """
217 ewv 1.7 Create the storage interface.
218 spiga 1.1 """
219     try:
220     interface = SElement( FullPath(endpoint), protocol )
221 spiga 1.11 except ProtocolUnknown, ex:
222 ewv 1.7 msg = ''
223 spiga 1.1 if self.debug : msg = str(ex)+'\n'
224 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
225 spiga 1.13 raise Exception(msg)
226 spiga 1.1
227     return interface
228    
229     def createDir(self, Destination_SE, protocol):
230     """
231 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
232 ewv 1.7 this should be transparent at SE API level.
233 spiga 1.1 """
234 ewv 1.14 msg = ''
235 spiga 1.1 try:
236     action = SBinterface( Destination_SE )
237     action.createDir()
238 fanzago 1.15 if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
239 spiga 1.11 except TransferException, ex:
240     msg = str(ex)
241     if self.debug :
242     msg += str(ex.detail)+'\n'
243 spiga 1.12 msg += str(ex.output)+'\n'
244 spiga 1.13 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
245 fanzago 1.15 raise Exceptions(msg)
246 spiga 1.11 except OperationException, ex:
247     msg = str(ex)
248     if self.debug : msg += str(ex.detail)+'\n'
249 mcinquil 1.16 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
250 spiga 1.1
251 spiga 1.13 return msg
252 spiga 1.1
253 spiga 1.8 def checkFileExist( self, sbi, filetocopy ):
254 spiga 1.1 """
255 ewv 1.7 Check if file to copy already exist
256     """
257 spiga 1.8 ErCode = '0'
258     msg = ''
259 spiga 1.1 try:
260     check = sbi.checkExists(filetocopy)
261 spiga 1.11 except OperationException, ex:
262     msg = str(ex)
263     if self.debug :
264     msg += str(ex.detail)+'\n'
265 spiga 1.12 msg += str(ex.output)+'\n'
266 spiga 1.13 msg +='ERROR: problems checkig if file %s already exist'%filetocopy
267     raise Exception(msg)
268 spiga 1.11 except WrongOption, ex:
269     msg = str(ex)
270     if self.debug :
271     msg += str(ex.detail)+'\n'
272 spiga 1.12 msg += str(ex.output)+'\n'
273 spiga 1.13 msg +='ERROR problems checkig if file % already exist'%filetocopy
274     raise Exception(msg)
275 ewv 1.14 if check :
276 ewv 1.7 ErCode = '60303'
277 spiga 1.1 msg = "file %s already exist"%filetocopy
278 ewv 1.14
279 spiga 1.13 return ErCode, msg
280 spiga 1.1
281 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
282 spiga 1.1 """
283 ewv 1.7 call the copy API.
284 spiga 1.1 """
285 ewv 1.7 path = os.path.dirname(filetocopy)
286 spiga 1.1 file_name = os.path.basename(filetocopy)
287     source_file = filetocopy
288     dest_file = file_name ## to be improved supporting changing file name TODO
289 spiga 1.8 if self.params['source'] == '' and path == '':
290 spiga 1.1 source_file = os.path.abspath(filetocopy)
291 spiga 1.8 elif self.params['destination'] =='':
292 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
293 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
294 ewv 1.7 source_file = file_name
295 spiga 1.8
296 spiga 1.1 ErCode = '0'
297     msg = ''
298 ewv 1.7
299 spiga 1.1 try:
300 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
301 spiga 1.11 except TransferException, ex:
302     msg = str(ex)
303     if self.debug :
304     msg += str(ex.detail)+'\n'
305 spiga 1.12 msg += str(ex.output)+'\n'
306 spiga 1.11 msg += "Problem copying %s file" % filetocopy
307 spiga 1.1 ErCode = '60307'
308 spiga 1.11 except WrongOption, ex:
309     msg = str(ex)
310     if self.debug :
311     msg += str(ex.detail)+'\n'
312 spiga 1.12 msg += str(ex.output)+'\n'
313 spiga 1.13 msg += "Problem copying %s file" % filetocopy
314     ErCode = '60307'
315 spiga 1.19 if ErCode == '0' and protocol.find('srm') == 0:
316     remote_file_size = -1
317     local_file_size = os.path.getsize( source_file )
318     try:
319     remote_file_size = sbi_dest.getSize( dest_file )
320     except TransferException, ex:
321     msg = str(ex)
322     if self.debug :
323     msg += str(ex.detail)+'\n'
324     msg += str(ex.output)+'\n'
325     msg += "Problem checking the size of %s file" % filetocopy
326     ErCode = '60307'
327     except WrongOption, ex:
328     msg = str(ex)
329     if self.debug :
330     msg += str(ex.detail)+'\n'
331     msg += str(ex.output)+'\n'
332     msg += "Problem checking the size of %s file" % filetocopy
333     ErCode = '60307'
334     if local_file_size != remote_file_size:
335     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
336     ErCode = '60307'
337 ewv 1.14
338 spiga 1.1 return ErCode, msg
339 ewv 1.7
340     def backup(self):
341 spiga 1.1 """
342     Check infos from TFC using existing api obtaining:
343     1)destination
344     2)protocol
345     """
346     return
347    
348 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
349     """
350     Update the final stage out infos
351     """
352     jobStageInfo={}
353     jobStageInfo['erCode']=erCode
354     jobStageInfo['reason']=reason
355     jobStageInfo['lfn']=lfn
356     jobStageInfo['se']=se
357 spiga 1.1
358 spiga 1.8 report = { file : jobStageInfo}
359     return report
360 ewv 1.7
361 spiga 1.8 def finalReport( self , results ):
362     """
363     It a list of LFNs for each SE where data are stored.
364     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
365 spiga 1.1 """
366 spiga 1.8 outFile = open('cmscpReport.sh',"a")
367     cmscp_exit_status = 0
368     txt = ''
369 ewv 1.14 for file, dict in results.iteritems():
370     if file:
371 spiga 1.11 if dict['lfn']=='':
372     lfn = '$LFNBaseName/'+os.path.basename(file)
373     se = '$SE'
374     else:
375 mcinquil 1.16 lfn = dict['lfn']+os.path.basename(file)
376 spiga 1.11 se = dict['se']
377     #dict['lfn'] # to be implemented
378     txt += 'echo "Report for File: '+file+'"\n'
379     txt += 'echo "LFN: '+lfn+'"\n'
380     txt += 'echo "StorageElement: '+se+'"\n'
381     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
382     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
383     if dict['erCode'] != '0':
384     cmscp_exit_status = dict['erCode']
385     cmscp_exit_status = dict['erCode']
386 spiga 1.12 else:
387     cmscp_exit_status = dict['erCode']
388     cmscp_exit_status = dict['erCode']
389 spiga 1.8 txt += '\n'
390     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
391     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
392     outFile.write(str(txt))
393     outFile.close()
394     return
395    
396    
397     def usage():
398    
399     msg="""
400     required parameters:
401     --source :: REMOTE :
402     --destination :: REMOTE :
403     --debug :
404     --inFile :: absPath : or name NOT RELATIVE PATH
405     --outFIle :: onlyNAME : NOT YET SUPPORTED
406    
407     optional parameters
408     """
409 ewv 1.14 print msg
410 spiga 1.8
411 ewv 1.14 return
412 spiga 1.8
413     def HelpOptions(opts=[]):
414     """
415     Check otps, print help if needed
416 ewv 1.14 prepare dict = { opt : value }
417 spiga 1.8 """
418     dict_args = {}
419     if len(opts):
420     for opt, arg in opts:
421 ewv 1.14 dict_args[opt.split('--')[1]] = arg
422 spiga 1.8 if opt in ('-h','-help','--help') :
423     usage()
424     sys.exit(0)
425     return dict_args
426     else:
427     usage()
428     sys.exit(0)
429 spiga 1.1
430     if __name__ == '__main__' :
431 spiga 1.8
432 ewv 1.14 import getopt
433 spiga 1.8
434     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
435     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
436 ewv 1.14 try:
437     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
438 spiga 1.8 except getopt.GetoptError, err:
439     print err
440     HelpOptions()
441     sys.exit(2)
442 ewv 1.14
443 spiga 1.8 dictArgs = HelpOptions(opts)
444 spiga 1.1 try:
445 spiga 1.8 cmscp_ = cmscp(dictArgs)
446 spiga 1.1 cmscp_.run()
447 spiga 1.11 except Exception, ex :
448     print str(ex)
449 spiga 1.1