ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.72
Committed: Fri May 7 18:13:10 2010 UTC (14 years, 11 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
Changes since 1.71: +25 -15 lines
Log Message:
Adding timeout usage for all storage commands

File Contents

# User Rev Content
1 edelmann 1.56 #!/usr/bin/env python
2 mcinquil 1.16 import sys, os
3 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
4 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
5 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
6 spiga 1.1
7    
8     class cmscp:
9 spiga 1.8 def __init__(self, args):
10 spiga 1.1 """
11     cmscp
12 spiga 1.46 safe copy of local file to/from remote SE via lcg_cp/srmcp,
13 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
14     input:
15     $1 middleware (CAF, LSF, LCG, OSG)
16 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
17     $3 if needed: file name (the output file name)
18     $5 remote SE (complete endpoint)
19 ewv 1.7 $6 srm version
20 fanzago 1.38 --lfn $LFNBaseName
21 spiga 1.1 output:
22     return 0 if all ok
23     return 60307 if srmcp failed
24     return 60303 if file already exists in the SE
25     """
26 spiga 1.8
27 spiga 1.1 #set default
28 spiga 1.24 self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
29 fanzago 1.38 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "lfn":'' }
30 ewv 1.14 self.debug = 0
31 spiga 1.46 self.local_stage = 0
32 spiga 1.8 self.params.update( args )
33 mcinquil 1.72 self.subprocesstimeout = {'copy': None, 'exists': None, 'delete': None, 'size': None}
34 ewv 1.7
35 spiga 1.1 return
36    
37 spiga 1.8 def processOptions( self ):
38 spiga 1.1 """
39 spiga 1.8 check command line parameter
40 spiga 1.1 """
41 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
42     if 'debug' in self.params.keys(): self.debug = 1
43 spiga 1.45 if 'local_stage' in self.params.keys(): self.local_stage = 1
44 ewv 1.7
45     # source and dest cannot be undefined at same time
46 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
47 spiga 1.36 HelpOptions()
48 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
49 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
50     HelpOptions()
51    
52 ewv 1.7 # input file must be defined
53 spiga 1.35 if not self.params['inputFileList'] :
54 spiga 1.36 HelpOptions()
55 spiga 1.1 else:
56 spiga 1.8 file_to_copy=[]
57 ewv 1.14 if self.params['inputFileList'].find(','):
58 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 ewv 1.7 else:
60 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
61     self.params['inputFileList'] = file_to_copy
62 ewv 1.7
63 spiga 1.46 if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
64 fanzago 1.38
65 spiga 1.1 ## TO DO:
66     #### add check for outFiles
67     #### add map {'inFileNAME':'outFileNAME'} to change out name
68    
69    
70 ewv 1.7 def run( self ):
71 spiga 1.1 """
72 ewv 1.7 Check if running on UI (no $middleware) or
73     on WN (on the Grid), and take different action
74 spiga 1.1 """
75 mcinquil 1.37 self.processOptions()
76 spiga 1.30 if self.debug: print 'calling run() : \n'
77 spiga 1.8 # stage out from WN
78     if self.params['middleware'] :
79 spiga 1.36 results = self.stager(self.params['middleware'],self.params['inputFileList'])
80 spiga 1.35 self.finalReport(results)
81 spiga 1.8 # Local interaction with SE
82 spiga 1.1 else:
83 spiga 1.36 results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
84 spiga 1.35 return results
85 spiga 1.1
86 mcinquil 1.57 def checkLcgUtils( self ):
87     """
88     _checkLcgUtils_
89     check the lcg-utils version and report
90     """
91     import commands
92     cmd = "lcg-cp --version | grep lcg_util"
93     status, output = commands.getstatusoutput( cmd )
94     num_ver = -1
95     if output.find("not found") == -1 or status == 0:
96     temp = output.split("-")
97     version = ""
98     if len(temp) >= 2:
99     version = output.split("-")[1]
100     temp = version.split(".")
101     if len(temp) >= 1:
102     num_ver = int(temp[0])*10
103     num_ver += int(temp[1])
104     return num_ver
105    
106 spiga 1.8 def setProtocol( self, middleware ):
107 spiga 1.1 """
108     define the allowed potocols based on $middlware
109 ewv 1.7 which depend on scheduler
110 spiga 1.1 """
111 spiga 1.8 # default To be used with "middleware"
112 fanzago 1.38 if self.debug:
113     print 'setProtocol() :\n'
114     print '\tmiddleware = %s utils \n'%middleware
115 mcinquil 1.72
116     ## timeout needed for subprocess command of SEAPI
117     ## they should be a bit higher then the corresponding passed by command line
118     self.subprocesstimeout['copy'] = 2500
119     self.subprocesstimeout['exists'] = 600
120     self.subprocesstimeout['delete'] = 600
121     self.subprocesstimeout['size'] = 600
122    
123 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
124     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
125 mcinquil 1.57 if self.checkLcgUtils() >= 17:
126 mcinquil 1.72 lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
127     'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
128 mcinquil 1.57
129 spiga 1.13 srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
130 spiga 1.71 'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
131 spiga 1.8 rfioOpt=''
132    
133 ewv 1.14 supported_protocol = None
134 spiga 1.40 if middleware.lower() in ['osg','lcg','condor','sge']:
135 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
136 spiga 1.30 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
137 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
138 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
139 mcinquil 1.63 elif middleware.lower() in ['pbs']:
140     supported_protocol = [('rfio',rfioOpt),('local','')]
141 edelmann 1.53 elif middleware.lower() in ['arc']:
142     supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
143 spiga 1.1 else:
144 ewv 1.7 ## here we can add support for any kind of protocol,
145 spiga 1.1 ## maybe some local schedulers need something dedicated
146     pass
147     return supported_protocol
148 spiga 1.28
149 fanzago 1.41
150     def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
151     """
152     Checks the status of copy and update result dictionary
153 spiga 1.28 """
154     list_retry = []
155 fanzago 1.55 list_retry_localSE = []
156 fanzago 1.41 list_not_existing = []
157 spiga 1.28 list_ok = []
158 fanzago 1.41
159     if self.debug:
160     print 'in checkCopy() :\n'
161     for file, dict in copy_results.iteritems():
162     er_code = dict['erCode']
163     if er_code == '0':
164     list_ok.append(file)
165     reason = 'Copy succedeed with %s utils'%prot
166     dict['reason'] = reason
167     elif er_code == '60302':
168     list_not_existing.append( file )
169 mcinquil 1.50 elif er_code == '10041':
170 fanzago 1.41 list_retry.append( file )
171 mcinquil 1.50 ## WHAT TO DO IN GENERAL FAILURE CONDITION
172 fanzago 1.55 else:
173     list_retry_localSE.append( file )
174 fanzago 1.41
175     if self.debug:
176     print "\t file %s \n"%file
177     print "\t dict['erCode'] %s \n"%dict['erCode']
178     print "\t dict['reason'] %s \n"%dict['reason']
179    
180     if (lfn != '') and (se != ''):
181     upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
182 spiga 1.28 else:
183 fanzago 1.41 upDict = self.updateReport(file, er_code, dict['reason'])
184    
185     copy_results.update(upDict)
186    
187     msg = ''
188     if len(list_ok) != 0:
189     msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
190     if len(list_ok) != len_list_files :
191     msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
192     msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
193     if self.debug : print msg
194    
195 fanzago 1.55 return copy_results, list_ok, list_retry, list_retry_localSE
196 fanzago 1.41
197 spiga 1.45 def LocalCopy(self, list_retry, results):
198 fanzago 1.41 """
199 spiga 1.45 Tries the stage out to the CloseSE
200 fanzago 1.38 """
201 fanzago 1.41 if self.debug:
202 spiga 1.45 print 'in LocalCopy() :\n'
203 fanzago 1.41 print '\t list_retry %s utils \n'%list_retry
204     print '\t len(list_retry) %s \n'%len(list_retry)
205    
206     list_files = list_retry
207     self.params['inputFilesList']=list_files
208    
209     ### copy backup
210     from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
211     siteCfg = loadSiteLocalConfig()
212     seName = siteCfg.localStageOut.get("se-name", None)
213     catalog = siteCfg.localStageOut.get("catalog", None)
214     implName = siteCfg.localStageOut.get("command", None)
215     if (implName == 'srm'):
216     implName='srmv1'
217     self.params['srm_version']=implName
218     ##### to be improved ###############
219     if (implName == 'rfcp'):
220     self.params['middleware']='lsf'
221     ####################################
222    
223     self.params['protocol']=implName
224     tfc = siteCfg.trivialFileCatalog()
225    
226     if self.debug:
227     print '\t siteCFG %s \n'%siteCfg
228     print '\t seName %s \n'%seName
229     print '\t catalog %s \n'%catalog
230     print "\t self.params['protocol'] %s \n"%self.params['protocol']
231     print '\t tfc %s '%tfc
232     print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
233    
234 fanzago 1.69 #if (str(self.params['lfn']).find("/store/") != -1):
235     # temp = str(self.params['lfn']).split("/store/")
236     # self.params['lfn']= "/store/temp/" + temp[1]
237     if (str(self.params['lfn']).find("/store/") == 0):
238     temp = str(self.params['lfn']).replace("/store/","/store/temp/",1)
239     self.params['lfn']= temp
240    
241     if ( self.params['lfn'][-1] != '/' ) : self.params['lfn'] = self.params['lfn'] + '/'
242 fanzago 1.65
243 fanzago 1.41 file_backup=[]
244     for input in self.params['inputFilesList']:
245     file = self.params['lfn'] + os.path.basename(input)
246     surl = tfc.matchLFN(tfc.preferredProtocol, file)
247     file_backup.append(surl)
248     if self.debug:
249     print '\t lfn %s \n'%self.params['lfn']
250     print '\t file %s \n'%file
251     print '\t surl %s \n'%surl
252    
253     destination=os.path.dirname(file_backup[0])
254 fanzago 1.59 if ( destination[-1] != '/' ) : destination = destination + '/'
255 fanzago 1.41 self.params['destination']=destination
256    
257     if self.debug:
258     print "\t self.params['destination']%s \n"%self.params['destination']
259     print "\t self.params['protocol'] %s \n"%self.params['protocol']
260     print "\t self.params['option']%s \n"%self.params['option']
261    
262     for prot, opt in self.setProtocol( self.params['middleware'] ):
263 spiga 1.45 if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
264 fanzago 1.65 localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
265 spiga 1.45 if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
266     results.update(localCopy_results)
267 fanzago 1.41 else:
268 fanzago 1.55 localCopy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(localCopy_results, len(list_files), prot, self.params['lfn'], seName)
269 spiga 1.45 results.update(localCopy_results)
270 fanzago 1.41 if len(list_ok) == len(list_files) :
271     break
272     if len(list_retry):
273     list_files = list_retry
274     else: break
275     if self.debug:
276 spiga 1.45 print "\t localCopy_results = %s \n"%localCopy_results
277 spiga 1.28
278 fanzago 1.41 return results
279    
280 spiga 1.8 def stager( self, middleware, list_files ):
281 spiga 1.1 """
282     Implement the logic for remote stage out
283     """
284 spiga 1.30
285 fanzago 1.38 if self.debug:
286     print 'stager() :\n'
287     print '\tmiddleware %s\n'%middleware
288 spiga 1.45 print '\tlist_files %s\n'%list_files
289 fanzago 1.38
290 spiga 1.6 results={}
291 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
292 spiga 1.30 if self.debug: print '\tTrying the stage out with %s utils \n'%prot
293 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
294 spiga 1.30 if copy_results.keys() == [''] or copy_results.keys() == '' :
295 spiga 1.13 results.update(copy_results)
296     else:
297 fanzago 1.55 copy_results, list_ok, list_retry, list_retry_localSE = self.checkCopy(copy_results, len(list_files), prot)
298 spiga 1.13 results.update(copy_results)
299     if len(list_ok) == len(list_files) :
300     break
301 fanzago 1.41 if len(list_retry):
302     list_files = list_retry
303     else: break
304    
305 spiga 1.45 if self.local_stage:
306 fanzago 1.55 if len(list_retry_localSE):
307     results = self.LocalCopy(list_retry_localSE, results)
308 fanzago 1.38
309     if self.debug:
310     print "\t results %s \n"%results
311 spiga 1.1 return results
312    
313     def initializeApi(self, protocol ):
314     """
315 ewv 1.7 Instantiate storage interface
316 spiga 1.1 """
317 spiga 1.30 if self.debug : print 'initializeApi() :\n'
318 spiga 1.20 self.source_prot = protocol
319     self.dest_prot = protocol
320     if not self.params['source'] : self.source_prot = 'local'
321     Source_SE = self.storageInterface( self.params['source'], self.source_prot )
322     if not self.params['destination'] : self.dest_prot = 'local'
323     Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
324 spiga 1.1
325     if self.debug :
326 spiga 1.30 msg = '\t(source=%s, protocol=%s)'%(self.params['source'], self.source_prot)
327     msg += '\t(destination=%s, protocol=%s)'%(self.params['destination'], self.dest_prot)
328 mcinquil 1.32 print msg
329 spiga 1.1
330     return Source_SE, Destination_SE
331    
332 fanzago 1.65 def copy( self, list_file, protocol, options, backup='no' ):
333 spiga 1.1 """
334 ewv 1.7 Make the real file copy using SE API
335 spiga 1.1 """
336 mcinquil 1.37 msg = ""
337 spiga 1.1 if self.debug :
338 spiga 1.30 msg = 'copy() :\n'
339     msg += '\tusing %s protocol\n'%protocol
340     print msg
341 spiga 1.13 try:
342     Source_SE, Destination_SE = self.initializeApi( protocol )
343     except Exception, ex:
344 fanzago 1.69 for filetocopy in list_file:
345     results.update( self.updateReport(filetocopy, '-1', str(ex)))
346     return results
347 ewv 1.14
348 ewv 1.7 # create remote dir
349 mcinquil 1.32 if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
350 spiga 1.13 try:
351 mcinquil 1.32 self.createDir( Destination_SE, Destination_SE.protocol )
352 mcinquil 1.47 except OperationException, ex:
353 fanzago 1.69 for filetocopy in list_file:
354 fanzago 1.70 results.update( self.updateReport(filetocopy, '60316', str(ex)))
355 fanzago 1.69 return results
356 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
357     except MissingCommand, ex:
358     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
359 fanzago 1.69 for filetocopy in list_file:
360     results.update( self.updateReport(filetocopy, '10041', msg))
361     return results
362     except Exception, ex:
363     msg = "ERROR %s" %(str(ex))
364     for filetocopy in list_file:
365     results.update( self.updateReport(filetocopy, '-1', msg))
366     return results
367    
368 spiga 1.1 ## prepare for real copy ##
369 spiga 1.8 try :
370     sbi = SBinterface( Source_SE, Destination_SE )
371     sbi_dest = SBinterface(Destination_SE)
372 spiga 1.20 sbi_source = SBinterface(Source_SE)
373 spiga 1.11 except ProtocolMismatch, ex:
374 spiga 1.30 msg = "ERROR : Unable to create SBinterface with %s protocol"%protocol
375     msg += str(ex)
376 fanzago 1.69 for filetocopy in list_file:
377     results.update( self.updateReport(filetocopy, '-1', msg))
378     return results
379 spiga 1.1
380     results = {}
381 ewv 1.7 ## loop over the complete list of files
382     for filetocopy in list_file:
383 spiga 1.30 if self.debug : print '\tStart real copy for %s'%filetocopy
384 spiga 1.13 try :
385 spiga 1.34 ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
386 spiga 1.13 except Exception, ex:
387 spiga 1.58 ErCode = '60307'
388 spiga 1.18 msg = str(ex)
389 ewv 1.7 if ErCode == '0':
390 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
391 fanzago 1.65 if (ErCode == '0') and (backup == 'yes'):
392     ErCode = '60308'
393 spiga 1.30 if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
394 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
395     return results
396 ewv 1.7
397 spiga 1.1
398     def storageInterface( self, endpoint, protocol ):
399     """
400 ewv 1.7 Create the storage interface.
401 spiga 1.1 """
402 spiga 1.30 if self.debug : print 'storageInterface():\n'
403 spiga 1.1 try:
404     interface = SElement( FullPath(endpoint), protocol )
405 spiga 1.11 except ProtocolUnknown, ex:
406 spiga 1.30 msg = "ERROR : Unable to create interface with %s protocol"%protocol
407     msg += str(ex)
408 spiga 1.13 raise Exception(msg)
409 spiga 1.1
410     return interface
411    
412     def createDir(self, Destination_SE, protocol):
413     """
414 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
415 ewv 1.7 this should be transparent at SE API level.
416 spiga 1.1 """
417 spiga 1.30 if self.debug : print 'createDir():\n'
418 ewv 1.14 msg = ''
419 spiga 1.1 try:
420     action = SBinterface( Destination_SE )
421     action.createDir()
422 spiga 1.30 if self.debug: print "\tThe directory has been created using protocol %s"%protocol
423 spiga 1.11 except TransferException, ex:
424 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
425     msg += str(ex)
426 spiga 1.11 if self.debug :
427 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
428     dbgmsg += '\t'+str(ex.output)+'\n'
429     print dbgmsg
430 spiga 1.29 raise Exception(msg)
431 spiga 1.11 except OperationException, ex:
432 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
433     msg += str(ex)
434     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
435 spiga 1.29 raise Exception(msg)
436 spiga 1.31 except MissingDestination, ex:
437     msg = "ERROR: problem with the directory creation using %s protocol "%protocol
438     msg += str(ex)
439     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
440     raise Exception(msg)
441     except AlreadyExistsException, ex:
442     if self.debug: print "\tThe directory already exist"
443     pass
444 spiga 1.13 return msg
445 spiga 1.1
446 spiga 1.34 def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
447 spiga 1.1 """
448 spiga 1.20 Check both if source file exist AND
449     if destination file ALREADY exist.
450 ewv 1.7 """
451 spiga 1.30 if self.debug : print 'checkFileExist():\n'
452 spiga 1.8 ErCode = '0'
453     msg = ''
454 spiga 1.20 f_tocopy=filetocopy
455     if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
456 spiga 1.1 try:
457 mcinquil 1.72 checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
458 spiga 1.30 if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy
459 spiga 1.20 except OperationException, ex:
460 spiga 1.30 msg ='ERROR: problems checkig if source file %s exist'%filetocopy
461     msg += str(ex)
462 spiga 1.20 if self.debug :
463 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
464     dbgmsg += '\t'+str(ex.output)+'\n'
465     print dbgmsg
466 spiga 1.20 raise Exception(msg)
467     except WrongOption, ex:
468 spiga 1.30 msg ='ERROR problems checkig if source file % exist'%filetocopy
469     msg += str(ex)
470 spiga 1.20 if self.debug :
471 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
472     dbgmsg += '\t'+str(ex.output)+'\n'
473     print dbgmsg
474 spiga 1.20 raise Exception(msg)
475 spiga 1.31 except MissingDestination, ex:
476     msg ='ERROR problems checkig if source file % exist'%filetocopy
477     msg += str(ex)
478     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
479     raise Exception(msg)
480 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
481     except MissingCommand, ex:
482     ErCode = '10041'
483     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
484     return ErCode, msg
485 spiga 1.20 if not checkSource :
486     ErCode = '60302'
487     msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
488     return ErCode, msg
489     f_tocopy=filetocopy
490     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
491     try:
492 mcinquil 1.72 check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
493 spiga 1.30 if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy
494 spiga 1.11 except OperationException, ex:
495 spiga 1.30 msg = 'ERROR: problems checkig if file %s already exist'%filetocopy
496     msg += str(ex)
497 spiga 1.11 if self.debug :
498 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
499     dbgmsg += '\t'+str(ex.output)+'\n'
500     print dbgmsg
501 spiga 1.13 raise Exception(msg)
502 spiga 1.11 except WrongOption, ex:
503 spiga 1.30 msg = 'ERROR problems checkig if file % already exist'%filetocopy
504     msg += str(ex)
505 spiga 1.11 if self.debug :
506 spiga 1.30 msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
507     msg += '\t'+str(ex.output)+'\n'
508 spiga 1.13 raise Exception(msg)
509 spiga 1.31 except MissingDestination, ex:
510     msg ='ERROR problems checkig if source file % exist'%filetocopy
511     msg += str(ex)
512     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
513     raise Exception(msg)
514 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
515     except MissingCommand, ex:
516     ErCode = '10041'
517     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
518     return ErCode, msg
519 ewv 1.14 if check :
520 ewv 1.7 ErCode = '60303'
521 spiga 1.20 msg = "file %s already exist"%os.path.basename(filetocopy)
522 ewv 1.14
523 spiga 1.13 return ErCode, msg
524 spiga 1.1
525 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
526 spiga 1.1 """
527 ewv 1.7 call the copy API.
528 spiga 1.1 """
529 spiga 1.30 if self.debug : print 'makeCopy():\n'
530 ewv 1.7 path = os.path.dirname(filetocopy)
531 spiga 1.1 file_name = os.path.basename(filetocopy)
532     source_file = filetocopy
533     dest_file = file_name ## to be improved supporting changing file name TODO
534 spiga 1.8 if self.params['source'] == '' and path == '':
535 spiga 1.1 source_file = os.path.abspath(filetocopy)
536 spiga 1.8 elif self.params['destination'] =='':
537 spiga 1.24 destDir = self.params.get('destinationDir',os.getcwd())
538     dest_file = os.path.join(destDir,file_name)
539 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
540 ewv 1.7 source_file = file_name
541 spiga 1.8
542 spiga 1.1 ErCode = '0'
543     msg = ''
544 ewv 1.7
545 spiga 1.71 if self.params['option'].find('space_token')>=0:
546 slacapra 1.64 space_token=self.params['option'].split('=')[1]
547     if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
548     if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
549 spiga 1.1 try:
550 mcinquil 1.72 sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
551 spiga 1.11 except TransferException, ex:
552 spiga 1.30 msg = "Problem copying %s file" % filetocopy
553     msg += str(ex)
554 spiga 1.11 if self.debug :
555 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
556     dbgmsg += '\t'+str(ex.output)+'\n'
557 mcinquil 1.32 print dbgmsg
558 spiga 1.1 ErCode = '60307'
559 spiga 1.11 except WrongOption, ex:
560 spiga 1.30 msg = "Problem copying %s file" % filetocopy
561     msg += str(ex)
562 spiga 1.11 if self.debug :
563 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
564     dbgmsg += '\t'+str(ex.output)+'\n'
565 fanzago 1.48 print dbgmsg
566 spiga 1.44 except SizeZeroException, ex:
567     msg = "Problem copying %s file" % filetocopy
568     msg += str(ex)
569     if self.debug :
570     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
571     dbgmsg += '\t'+str(ex.output)+'\n'
572 fanzago 1.48 print dbgmsg
573 spiga 1.13 ErCode = '60307'
574 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
575     except MissingCommand, ex:
576     ErCode = '10041'
577     msg = "Problem copying %s file" % filetocopy
578     msg += str(ex)
579     if self.debug :
580     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
581     dbgmsg += '\t'+str(ex.output)+'\n'
582     print dbgmsg
583 mcinquil 1.54 except AuthorizationException, ex:
584     ErCode = '60307'
585     msg = "Problem copying %s file" % filetocopy
586     msg += str(ex)
587     if self.debug :
588     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
589     dbgmsg += '\t'+str(ex.output)+'\n'
590     print dbgmsg
591 mcinquil 1.72 except SEAPITimeout, ex:
592     ErCode = '60317'
593     msg = "Problem copying %s file" % filetocopy
594     msg += str(ex)
595     if self.debug :
596     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
597     dbgmsg += '\t'+str(ex.output)+'\n'
598     print dbgmsg
599    
600 spiga 1.24 if ErCode == '0' and protocol.find('srmv') == 0:
601 spiga 1.19 remote_file_size = -1
602     local_file_size = os.path.getsize( source_file )
603     try:
604 mcinquil 1.72 remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
605 spiga 1.30 if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
606 spiga 1.19 except TransferException, ex:
607 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
608     msg += str(ex)
609 spiga 1.19 if self.debug :
610 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
611     dbgmsg += '\t'+str(ex.output)+'\n'
612     print dbgmsg
613 spiga 1.19 ErCode = '60307'
614     except WrongOption, ex:
615 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
616     msg += str(ex)
617 spiga 1.19 if self.debug :
618 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
619     dbgmsg += '\t'+str(ex.output)+'\n'
620     print dbgmsg
621 spiga 1.19 ErCode = '60307'
622     if local_file_size != remote_file_size:
623     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
624     ErCode = '60307'
625 ewv 1.14
626 spiga 1.27 if ErCode != '0':
627     try :
628 spiga 1.34 self.removeFile( sbi_dest, dest_file, option )
629 spiga 1.27 except Exception, ex:
630     msg += '\n'+str(ex)
631 spiga 1.1 return ErCode, msg
632 ewv 1.7
633 spiga 1.34 def removeFile( self, sbi_dest, filetocopy, option ):
634 spiga 1.30 """
635     """
636     if self.debug : print 'removeFile():\n'
637 spiga 1.21 f_tocopy=filetocopy
638     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
639     try:
640 mcinquil 1.72 sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
641 spiga 1.30 if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
642 spiga 1.21 except OperationException, ex:
643 spiga 1.30 msg ='ERROR: problems removing partially staged file %s'%filetocopy
644     msg += str(ex)
645 spiga 1.21 if self.debug :
646 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
647     dbgmsg += '\t'+str(ex.output)+'\n'
648     print dbgmsg
649 spiga 1.21 raise Exception(msg)
650    
651     return
652    
653 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
654     """
655     Update the final stage out infos
656     """
657     jobStageInfo={}
658     jobStageInfo['erCode']=erCode
659     jobStageInfo['reason']=reason
660     jobStageInfo['lfn']=lfn
661     jobStageInfo['se']=se
662 spiga 1.1
663 spiga 1.8 report = { file : jobStageInfo}
664     return report
665 ewv 1.7
666 spiga 1.8 def finalReport( self , results ):
667     """
668     It a list of LFNs for each SE where data are stored.
669     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
670 spiga 1.1 """
671 spiga 1.8 outFile = open('cmscpReport.sh',"a")
672     cmscp_exit_status = 0
673     txt = ''
674 ewv 1.14 for file, dict in results.iteritems():
675 spiga 1.52 reason = str(dict['reason'])
676     if str(reason).find("'") > -1:
677     reason = " ".join(reason.split("'"))
678     reason="'%s'"%reason
679 ewv 1.14 if file:
680 spiga 1.11 if dict['lfn']=='':
681 fanzago 1.67 lfn = '${LFNBaseName}'+os.path.basename(file)
682 spiga 1.11 se = '$SE'
683 fanzago 1.66 LFNBaseName = '$LFNBaseName'
684 spiga 1.11 else:
685 mcinquil 1.16 lfn = dict['lfn']+os.path.basename(file)
686 spiga 1.11 se = dict['se']
687 fanzago 1.66 LFNBaseName = os.path.dirname(lfn)
688     if (LFNBaseName[-1] != '/'):
689     LFNBaseName = LFNBaseName + '/'
690 spiga 1.11 #dict['lfn'] # to be implemented
691 fanzago 1.39 txt += 'echo "Report for File: '+file+'"\n'
692     txt += 'echo "LFN: '+lfn+'"\n'
693     txt += 'echo "StorageElement: '+se+'"\n'
694 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
695 spiga 1.11 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
696 fanzago 1.65 txt += 'export LFNBaseName='+LFNBaseName+'\n'
697 fanzago 1.39 txt += 'export SE='+se+'\n'
698 fanzago 1.59
699     txt += 'export endpoint='+self.params['destination']+'\n'
700 spiga 1.71
701     if dict['erCode'] != '0':
702     cmscp_exit_status = dict['erCode']
703 spiga 1.12 else:
704 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
705 spiga 1.12 cmscp_exit_status = dict['erCode']
706 spiga 1.8 txt += '\n'
707     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
708 fanzago 1.65 txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
709 spiga 1.8 outFile.write(str(txt))
710     outFile.close()
711     return
712    
713    
714     def usage():
715    
716     msg="""
717 spiga 1.46 cmscp:
718     safe copy of local file to/from remote SE via lcg_cp/srmcp,
719     including success checking version also for CAF using rfcp command to copy the output to SE
720 spiga 1.8
721 spiga 1.46 accepted parameters:
722     source =
723     destination =
724     inputFileList =
725     outputFileList =
726     protocol =
727     option =
728     middleware =
729     srm_version =
730     destinationDir =
731     lfn= =
732     local_stage = activate stage fall back
733     debug = activate verbose print out
734     help = print on line man and exit
735    
736     mandatory:
737     * "source" and/or "destination" must always be defined
738     * either "middleware" or "protocol" must always be defined
739     * "inputFileList" must always be defined
740     * if "local_stage" = 1 also "lfn" must be defined
741 spiga 1.8 """
742 ewv 1.14 print msg
743 spiga 1.8
744 ewv 1.14 return
745 spiga 1.8
746     def HelpOptions(opts=[]):
747     """
748     Check otps, print help if needed
749 ewv 1.14 prepare dict = { opt : value }
750 spiga 1.8 """
751     dict_args = {}
752     if len(opts):
753     for opt, arg in opts:
754 ewv 1.14 dict_args[opt.split('--')[1]] = arg
755 spiga 1.8 if opt in ('-h','-help','--help') :
756     usage()
757     sys.exit(0)
758     return dict_args
759     else:
760     usage()
761     sys.exit(0)
762 spiga 1.1
763     if __name__ == '__main__' :
764 spiga 1.8
765 ewv 1.14 import getopt
766 spiga 1.8
767     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
768 spiga 1.25 "protocol=","option=", "middleware=", "srm_version=", \
769 spiga 1.46 "destinationDir=", "lfn=", "local_stage", "debug", "help"]
770 ewv 1.14 try:
771     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
772 spiga 1.8 except getopt.GetoptError, err:
773     print err
774     HelpOptions()
775     sys.exit(2)
776 ewv 1.14
777 spiga 1.8 dictArgs = HelpOptions(opts)
778 spiga 1.1 try:
779 spiga 1.8 cmscp_ = cmscp(dictArgs)
780 spiga 1.1 cmscp_.run()
781 spiga 1.11 except Exception, ex :
782     print str(ex)
783 spiga 1.1