ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.24
Committed: Mon Nov 10 10:44:16 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.23: +4 -3 lines
Log Message:
addeded destinationDir parameter to be used while copying locally files. mifor fixes

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 mcinquil 1.16 import sys, os
4 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
6 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
7 spiga 1.1
8    
9     class cmscp:
10 spiga 1.8 def __init__(self, args):
11 spiga 1.1 """
12     cmscp
13    
14 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
16     input:
17     $1 middleware (CAF, LSF, LCG, OSG)
18 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
19     $3 if needed: file name (the output file name)
20     $5 remote SE (complete endpoint)
21 ewv 1.7 $6 srm version
22 spiga 1.1 output:
23     return 0 if all ok
24     return 60307 if srmcp failed
25     return 60303 if file already exists in the SE
26     """
27 spiga 1.8
28 spiga 1.1 #set default
29 spiga 1.24 self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
30 spiga 1.11 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31 ewv 1.14 self.debug = 0
32    
33 spiga 1.8 self.params.update( args )
34 ewv 1.7
35 spiga 1.1 return
36    
37 spiga 1.8 def processOptions( self ):
38 spiga 1.1 """
39 spiga 1.8 check command line parameter
40 spiga 1.1 """
41 spiga 1.8
42 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
43     if 'debug' in self.params.keys(): self.debug = 1
44 ewv 1.7
45     # source and dest cannot be undefined at same time
46 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
47     HelpOptions()
48    
49 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
50 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
51     HelpOptions()
52    
53 ewv 1.7 # input file must be defined
54 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
55 spiga 1.1 else:
56 spiga 1.8 file_to_copy=[]
57 ewv 1.14 if self.params['inputFileList'].find(','):
58 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 ewv 1.7 else:
60 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
61     self.params['inputFileList'] = file_to_copy
62 ewv 1.7
63 spiga 1.1 ## TO DO:
64     #### add check for outFiles
65     #### add map {'inFileNAME':'outFileNAME'} to change out name
66    
67     return
68    
69 ewv 1.7 def run( self ):
70 spiga 1.1 """
71 ewv 1.7 Check if running on UI (no $middleware) or
72     on WN (on the Grid), and take different action
73 spiga 1.1 """
74 spiga 1.8
75     self.processOptions()
76     # stage out from WN
77     if self.params['middleware'] :
78     results = self.stager(self.params['middleware'],self.params['inputFileList'])
79     self.finalReport(results)
80     # Local interaction with SE
81 spiga 1.1 else:
82 spiga 1.23 results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
83 spiga 1.8 return results
84 spiga 1.1
85 spiga 1.8 def setProtocol( self, middleware ):
86 spiga 1.1 """
87     define the allowed potocols based on $middlware
88 ewv 1.7 which depend on scheduler
89 spiga 1.1 """
90 spiga 1.8 # default To be used with "middleware"
91 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
92     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
93     srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
94     'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
95 spiga 1.8 rfioOpt=''
96    
97 ewv 1.14 supported_protocol = None
98     if middleware.lower() in ['osg','lcg','condor']:
99 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100     (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
102 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
103 spiga 1.1 else:
104 ewv 1.7 ## here we can add support for any kind of protocol,
105 spiga 1.1 ## maybe some local schedulers need something dedicated
106     pass
107     return supported_protocol
108    
109 spiga 1.8 def stager( self, middleware, list_files ):
110 spiga 1.1 """
111     Implement the logic for remote stage out
112     """
113 spiga 1.6 results={}
114 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
115 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
116 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
117 ewv 1.7 list_retry = []
118     list_existing = []
119     list_ok = []
120 spiga 1.13 if copy_results.keys() == '':
121     results.update(copy_results)
122     else:
123     for file, dict in copy_results.iteritems():
124     er_code = dict['erCode']
125     if er_code == '0':
126     list_ok.append(file)
127     reason = 'Copy succedeed with %s utils'%prot
128     upDict = self.updateReport(file, er_code, reason)
129     copy_results.update(upDict)
130     elif er_code == '60303': list_existing.append( file )
131     else: list_retry.append( file )
132     results.update(copy_results)
133     if len(list_ok) != 0:
134     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135     if self.debug : print msg
136     if len(list_ok) == len(list_files) :
137     break
138 spiga 1.1 else:
139 spiga 1.13 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
140     if len(list_retry): list_files = list_retry
141     else: break
142 ewv 1.7
143     #### TODO Daniele
144 spiga 1.1 #check is something fails and created related dict
145 ewv 1.7 # backup = self.analyzeResults(results)
146    
147     # if backup :
148 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
149     # #backupDict = self.backup()
150 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
151 spiga 1.1 # results.update(backupDict)
152     # print msg
153     return results
154    
155     def initializeApi(self, protocol ):
156     """
157 ewv 1.7 Instantiate storage interface
158 spiga 1.1 """
159 spiga 1.20 self.source_prot = protocol
160     self.dest_prot = protocol
161     if not self.params['source'] : self.source_prot = 'local'
162     Source_SE = self.storageInterface( self.params['source'], self.source_prot )
163     if not self.params['destination'] : self.dest_prot = 'local'
164     Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
165 spiga 1.1
166     if self.debug :
167 spiga 1.20 print '(source=%s, protocol=%s)'%(self.params['source'], self.source_prot)
168     print '(destination=%s, protocol=%s)'%(self.params['destination'], self.dest_prot)
169 spiga 1.1
170     return Source_SE, Destination_SE
171    
172 spiga 1.8 def copy( self, list_file, protocol, options ):
173 spiga 1.1 """
174 ewv 1.7 Make the real file copy using SE API
175 spiga 1.1 """
176     if self.debug :
177 ewv 1.7 print 'copy(): using %s protocol'%protocol
178 spiga 1.13 try:
179     Source_SE, Destination_SE = self.initializeApi( protocol )
180     except Exception, ex:
181     return self.updateReport('', '-1', str(ex))
182 ewv 1.14
183 ewv 1.7 # create remote dir
184 spiga 1.22 if protocol in ['gridftp','rfio','srmv2']:
185 spiga 1.13 try:
186     self.createDir( Destination_SE, protocol )
187     except Exception, ex:
188     return self.updateReport('', '60316', str(ex))
189 spiga 1.1
190     ## prepare for real copy ##
191 spiga 1.8 try :
192     sbi = SBinterface( Source_SE, Destination_SE )
193     sbi_dest = SBinterface(Destination_SE)
194 spiga 1.20 sbi_source = SBinterface(Source_SE)
195 spiga 1.11 except ProtocolMismatch, ex:
196     msg = str(ex)+'\n'
197 spiga 1.8 msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
198 spiga 1.13 return self.updateReport('', '-1', str(ex))
199 spiga 1.1
200     results = {}
201 ewv 1.7 ## loop over the complete list of files
202     for filetocopy in list_file:
203 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
204 spiga 1.13 try :
205 spiga 1.20 ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy )
206 spiga 1.13 except Exception, ex:
207 spiga 1.18 ErCode = -1
208     msg = str(ex)
209 ewv 1.7 if ErCode == '0':
210 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
211 spiga 1.21 if ErCode != '0':
212     try :
213     self.removeFile( sbi_dest, filetocopy )
214     except Exception, ex:
215     msg += '\n'+str(ex)
216 spiga 1.8 if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
217 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
218     return results
219 ewv 1.7
220 spiga 1.1
221     def storageInterface( self, endpoint, protocol ):
222     """
223 ewv 1.7 Create the storage interface.
224 spiga 1.1 """
225     try:
226     interface = SElement( FullPath(endpoint), protocol )
227 spiga 1.11 except ProtocolUnknown, ex:
228 ewv 1.7 msg = ''
229 spiga 1.1 if self.debug : msg = str(ex)+'\n'
230 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
231 spiga 1.13 raise Exception(msg)
232 spiga 1.1
233     return interface
234    
235     def createDir(self, Destination_SE, protocol):
236     """
237 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
238 ewv 1.7 this should be transparent at SE API level.
239 spiga 1.1 """
240 ewv 1.14 msg = ''
241 spiga 1.1 try:
242     action = SBinterface( Destination_SE )
243     action.createDir()
244 fanzago 1.15 if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
245 spiga 1.11 except TransferException, ex:
246     msg = str(ex)
247     if self.debug :
248     msg += str(ex.detail)+'\n'
249 spiga 1.12 msg += str(ex.output)+'\n'
250 spiga 1.13 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
251 fanzago 1.15 raise Exceptions(msg)
252 spiga 1.11 except OperationException, ex:
253     msg = str(ex)
254     if self.debug : msg += str(ex.detail)+'\n'
255 mcinquil 1.16 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
256 spiga 1.1
257 spiga 1.13 return msg
258 spiga 1.1
259 spiga 1.20 def checkFileExist( self, sbi_source, sbi_dest, filetocopy ):
260 spiga 1.1 """
261 spiga 1.20 Check both if source file exist AND
262     if destination file ALREADY exist.
263 ewv 1.7 """
264 spiga 1.8 ErCode = '0'
265     msg = ''
266 spiga 1.20 f_tocopy=filetocopy
267     if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
268 spiga 1.1 try:
269 spiga 1.20 checkSource = sbi_source.checkExists( f_tocopy )
270     except OperationException, ex:
271     msg = str(ex)
272     if self.debug :
273     msg += str(ex.detail)+'\n'
274     msg += str(ex.output)+'\n'
275     msg +='ERROR: problems checkig if source file %s exist'%filetocopy
276     raise Exception(msg)
277     except WrongOption, ex:
278     msg = str(ex)
279     if self.debug :
280     msg += str(ex.detail)+'\n'
281     msg += str(ex.output)+'\n'
282     msg +='ERROR problems checkig if source file % exist'%filetocopy
283     raise Exception(msg)
284     if not checkSource :
285     ErCode = '60302'
286     msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
287     return ErCode, msg
288    
289     f_tocopy=filetocopy
290     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
291     try:
292     check = sbi_dest.checkExists( f_tocopy )
293 spiga 1.11 except OperationException, ex:
294     msg = str(ex)
295     if self.debug :
296     msg += str(ex.detail)+'\n'
297 spiga 1.12 msg += str(ex.output)+'\n'
298 spiga 1.13 msg +='ERROR: problems checkig if file %s already exist'%filetocopy
299     raise Exception(msg)
300 spiga 1.11 except WrongOption, ex:
301     msg = str(ex)
302     if self.debug :
303     msg += str(ex.detail)+'\n'
304 spiga 1.12 msg += str(ex.output)+'\n'
305 spiga 1.13 msg +='ERROR problems checkig if file % already exist'%filetocopy
306     raise Exception(msg)
307 ewv 1.14 if check :
308 ewv 1.7 ErCode = '60303'
309 spiga 1.20 msg = "file %s already exist"%os.path.basename(filetocopy)
310 ewv 1.14
311 spiga 1.13 return ErCode, msg
312 spiga 1.1
313 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
314 spiga 1.1 """
315 ewv 1.7 call the copy API.
316 spiga 1.1 """
317 ewv 1.7 path = os.path.dirname(filetocopy)
318 spiga 1.1 file_name = os.path.basename(filetocopy)
319     source_file = filetocopy
320     dest_file = file_name ## to be improved supporting changing file name TODO
321 spiga 1.8 if self.params['source'] == '' and path == '':
322 spiga 1.1 source_file = os.path.abspath(filetocopy)
323 spiga 1.8 elif self.params['destination'] =='':
324 spiga 1.24 destDir = self.params.get('destinationDir',os.getcwd())
325     dest_file = os.path.join(destDir,file_name)
326 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
327 ewv 1.7 source_file = file_name
328 spiga 1.8
329 spiga 1.1 ErCode = '0'
330     msg = ''
331 ewv 1.7
332 spiga 1.1 try:
333 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
334 spiga 1.11 except TransferException, ex:
335     msg = str(ex)
336     if self.debug :
337     msg += str(ex.detail)+'\n'
338 spiga 1.12 msg += str(ex.output)+'\n'
339 spiga 1.11 msg += "Problem copying %s file" % filetocopy
340 spiga 1.1 ErCode = '60307'
341 spiga 1.11 except WrongOption, ex:
342     msg = str(ex)
343     if self.debug :
344     msg += str(ex.detail)+'\n'
345 spiga 1.12 msg += str(ex.output)+'\n'
346 spiga 1.13 msg += "Problem copying %s file" % filetocopy
347     ErCode = '60307'
348 spiga 1.24 if ErCode == '0' and protocol.find('srmv') == 0:
349 spiga 1.19 remote_file_size = -1
350     local_file_size = os.path.getsize( source_file )
351     try:
352     remote_file_size = sbi_dest.getSize( dest_file )
353     except TransferException, ex:
354     msg = str(ex)
355     if self.debug :
356     msg += str(ex.detail)+'\n'
357     msg += str(ex.output)+'\n'
358     msg += "Problem checking the size of %s file" % filetocopy
359     ErCode = '60307'
360     except WrongOption, ex:
361     msg = str(ex)
362     if self.debug :
363     msg += str(ex.detail)+'\n'
364     msg += str(ex.output)+'\n'
365     msg += "Problem checking the size of %s file" % filetocopy
366     ErCode = '60307'
367     if local_file_size != remote_file_size:
368     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
369     ErCode = '60307'
370 ewv 1.14
371 spiga 1.1 return ErCode, msg
372 ewv 1.7
373 spiga 1.21 def removeFile( self, sbi_dest, filetocopy ):
374    
375     f_tocopy=filetocopy
376     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
377     try:
378     sbi_dest.delete( f_tocopy )
379     except OperationException, ex:
380     msg = str(ex)
381     if self.debug :
382     msg += str(ex.detail)+'\n'
383     msg += str(ex.output)+'\n'
384     msg +='ERROR: problems removing partially staged file %s'%filetocopy
385     raise Exception(msg)
386    
387     return
388    
389 ewv 1.7 def backup(self):
390 spiga 1.1 """
391     Check infos from TFC using existing api obtaining:
392     1)destination
393     2)protocol
394     """
395     return
396    
397 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
398     """
399     Update the final stage out infos
400     """
401     jobStageInfo={}
402     jobStageInfo['erCode']=erCode
403     jobStageInfo['reason']=reason
404     jobStageInfo['lfn']=lfn
405     jobStageInfo['se']=se
406 spiga 1.1
407 spiga 1.8 report = { file : jobStageInfo}
408     return report
409 ewv 1.7
410 spiga 1.8 def finalReport( self , results ):
411     """
412     It a list of LFNs for each SE where data are stored.
413     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
414 spiga 1.1 """
415 spiga 1.8 outFile = open('cmscpReport.sh',"a")
416     cmscp_exit_status = 0
417     txt = ''
418 ewv 1.14 for file, dict in results.iteritems():
419     if file:
420 spiga 1.11 if dict['lfn']=='':
421     lfn = '$LFNBaseName/'+os.path.basename(file)
422     se = '$SE'
423     else:
424 mcinquil 1.16 lfn = dict['lfn']+os.path.basename(file)
425 spiga 1.11 se = dict['se']
426     #dict['lfn'] # to be implemented
427     txt += 'echo "Report for File: '+file+'"\n'
428     txt += 'echo "LFN: '+lfn+'"\n'
429     txt += 'echo "StorageElement: '+se+'"\n'
430     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
431     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
432     if dict['erCode'] != '0':
433     cmscp_exit_status = dict['erCode']
434     cmscp_exit_status = dict['erCode']
435 spiga 1.12 else:
436     cmscp_exit_status = dict['erCode']
437     cmscp_exit_status = dict['erCode']
438 spiga 1.8 txt += '\n'
439     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
440     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
441     outFile.write(str(txt))
442     outFile.close()
443     return
444    
445    
446     def usage():
447    
448     msg="""
449     required parameters:
450     --source :: REMOTE :
451     --destination :: REMOTE :
452     --debug :
453     --inFile :: absPath : or name NOT RELATIVE PATH
454     --outFIle :: onlyNAME : NOT YET SUPPORTED
455    
456     optional parameters
457     """
458 ewv 1.14 print msg
459 spiga 1.8
460 ewv 1.14 return
461 spiga 1.8
462     def HelpOptions(opts=[]):
463     """
464     Check otps, print help if needed
465 ewv 1.14 prepare dict = { opt : value }
466 spiga 1.8 """
467     dict_args = {}
468     if len(opts):
469     for opt, arg in opts:
470 ewv 1.14 dict_args[opt.split('--')[1]] = arg
471 spiga 1.8 if opt in ('-h','-help','--help') :
472     usage()
473     sys.exit(0)
474     return dict_args
475     else:
476     usage()
477     sys.exit(0)
478 spiga 1.1
479     if __name__ == '__main__' :
480 spiga 1.8
481 ewv 1.14 import getopt
482 spiga 1.8
483     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
484     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
485 ewv 1.14 try:
486     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
487 spiga 1.8 except getopt.GetoptError, err:
488     print err
489     HelpOptions()
490     sys.exit(2)
491 ewv 1.14
492 spiga 1.8 dictArgs = HelpOptions(opts)
493 spiga 1.1 try:
494 spiga 1.8 cmscp_ = cmscp(dictArgs)
495 spiga 1.1 cmscp_.run()
496 spiga 1.11 except Exception, ex :
497     print str(ex)
498 spiga 1.1