ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.79
Committed: Wed Jul 21 10:12:52 2010 UTC (14 years, 9 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_4_pre3
Changes since 1.78: +13 -1 lines
Log Message:
changes to manage hadoop protocol

File Contents

# User Rev Content
1 edelmann 1.56 #!/usr/bin/env python
2 mcinquil 1.16 import sys, os
3 fanzago 1.74 try:
4     import json
5     except:
6     import simplejson as json
7 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
8 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
9 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
10 spiga 1.1
11    
12     class cmscp:
13 spiga 1.8 def __init__(self, args):
14 spiga 1.1 """
15     cmscp
16 spiga 1.46 safe copy of local file to/from remote SE via lcg_cp/srmcp,
17 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
18     input:
19     $1 middleware (CAF, LSF, LCG, OSG)
20 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
21     $3 if needed: file name (the output file name)
22     $5 remote SE (complete endpoint)
23 ewv 1.7 $6 srm version
24 fanzago 1.74 --for_lfn $LFNBaseName
25 spiga 1.1 output:
26     return 0 if all ok
27     return 60307 if srmcp failed
28     return 60303 if file already exists in the SE
29     """
30 spiga 1.24 self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
31 fanzago 1.74 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'' }
32 ewv 1.14 self.debug = 0
33 fanzago 1.74 #### for fallback copy
34 spiga 1.46 self.local_stage = 0
35 spiga 1.8 self.params.update( args )
36 mcinquil 1.76 ## timeout needed for subprocess command of SEAPI
37     ## they should be a bit higher then the corresponding passed by command line
38     ## default values
39     self.subprocesstimeout = { \
40     'copy': 3600, \
41     'exists': 1200, \
42     'delete': 1200, \
43     'size': 1200 \
44     }
45    
46 spiga 1.1 return
47    
48 spiga 1.8 def processOptions( self ):
49 spiga 1.1 """
50 spiga 1.8 check command line parameter
51 spiga 1.1 """
52 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
53     if 'debug' in self.params.keys(): self.debug = 1
54 spiga 1.45 if 'local_stage' in self.params.keys(): self.local_stage = 1
55 ewv 1.7
56     # source and dest cannot be undefined at same time
57 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
58 spiga 1.36 HelpOptions()
59 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
60 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
61     HelpOptions()
62    
63 ewv 1.7 # input file must be defined
64 spiga 1.35 if not self.params['inputFileList'] :
65 spiga 1.36 HelpOptions()
66 spiga 1.1 else:
67 spiga 1.8 file_to_copy=[]
68 ewv 1.14 if self.params['inputFileList'].find(','):
69 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
70 ewv 1.7 else:
71 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
72     self.params['inputFileList'] = file_to_copy
73 ewv 1.7
74 fanzago 1.74 #if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
75     if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
76 fanzago 1.38
77 spiga 1.1 ## TO DO:
78     #### add check for outFiles
79     #### add map {'inFileNAME':'outFileNAME'} to change out name
80    
81    
82 ewv 1.7 def run( self ):
83 spiga 1.1 """
84 ewv 1.7 Check if running on UI (no $middleware) or
85     on WN (on the Grid), and take different action
86 spiga 1.1 """
87 mcinquil 1.37 self.processOptions()
88 spiga 1.30 if self.debug: print 'calling run() : \n'
89 spiga 1.8 # stage out from WN
90     if self.params['middleware'] :
91 spiga 1.36 results = self.stager(self.params['middleware'],self.params['inputFileList'])
92 fanzago 1.74 self.writeJsonFile(results)
93 spiga 1.35 self.finalReport(results)
94 spiga 1.8 # Local interaction with SE
95 spiga 1.1 else:
96 spiga 1.36 results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
97 fanzago 1.74 self.writeJsonFile(results)
98 spiga 1.35 return results
99 fanzago 1.74
100     def writeJsonFile( self, results ):
101     """
102     write a json file containing copy results for each file
103     """
104     if self.debug:
105     print 'in writeJsonFile() : \n'
106     print "---->>>> in writeJsonFile results = ", results
107     fp = open('resultCopyFile', 'w')
108     json.dump(results, fp)
109     fp.close()
110     if self.debug:
111     print ' reading resultCopyFile : \n'
112     lp = open('resultCopyFile', "r")
113     inputDict = json.load(lp)
114     lp.close()
115     print " inputDict = ", inputDict
116     return
117 spiga 1.1
118 mcinquil 1.57 def checkLcgUtils( self ):
119     """
120     _checkLcgUtils_
121     check the lcg-utils version and report
122     """
123     import commands
124     cmd = "lcg-cp --version | grep lcg_util"
125     status, output = commands.getstatusoutput( cmd )
126     num_ver = -1
127     if output.find("not found") == -1 or status == 0:
128     temp = output.split("-")
129     version = ""
130     if len(temp) >= 2:
131     version = output.split("-")[1]
132     temp = version.split(".")
133     if len(temp) >= 1:
134     num_ver = int(temp[0])*10
135     num_ver += int(temp[1])
136     return num_ver
137    
138 spiga 1.8 def setProtocol( self, middleware ):
139 spiga 1.1 """
140     define the allowed potocols based on $middlware
141 ewv 1.7 which depend on scheduler
142 spiga 1.1 """
143 spiga 1.8 # default To be used with "middleware"
144 fanzago 1.38 if self.debug:
145     print 'setProtocol() :\n'
146     print '\tmiddleware = %s utils \n'%middleware
147 mcinquil 1.72
148 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
149     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
150 mcinquil 1.57 if self.checkLcgUtils() >= 17:
151 mcinquil 1.72 lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
152     'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
153 mcinquil 1.57
154 spiga 1.13 srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
155 spiga 1.71 'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
156 spiga 1.8 rfioOpt=''
157    
158 fanzago 1.79 ## FEDE for hadoop ###
159     hadoopOpt = ''
160     ####
161    
162 ewv 1.14 supported_protocol = None
163 spiga 1.40 if middleware.lower() in ['osg','lcg','condor','sge']:
164 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
165 spiga 1.30 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
166 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
167 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
168 mcinquil 1.63 elif middleware.lower() in ['pbs']:
169     supported_protocol = [('rfio',rfioOpt),('local','')]
170 edelmann 1.53 elif middleware.lower() in ['arc']:
171     supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
172 fanzago 1.79 ## FEDE for hadoop ###
173     elif middleware.lower() in ['hadoop']:
174     supported_protocol = [('hadoop', hadoopOpt)]
175     ####
176 spiga 1.1 else:
177 ewv 1.7 ## here we can add support for any kind of protocol,
178 spiga 1.1 ## maybe some local schedulers need something dedicated
179     pass
180     return supported_protocol
181 spiga 1.28
182 fanzago 1.41
183 fanzago 1.74 def checkCopy (self, copy_results, len_list_files, prot):
184 fanzago 1.41 """
185     Checks the status of copy and update result dictionary
186 spiga 1.28 """
187 fanzago 1.74
188 spiga 1.28 list_retry = []
189 fanzago 1.41 list_not_existing = []
190 fanzago 1.74 list_already_existing = []
191 fanzago 1.75 list_fallback = []
192 spiga 1.28 list_ok = []
193 fanzago 1.41
194     if self.debug:
195     print 'in checkCopy() :\n'
196     for file, dict in copy_results.iteritems():
197     er_code = dict['erCode']
198     if er_code == '0':
199     list_ok.append(file)
200     reason = 'Copy succedeed with %s utils'%prot
201     dict['reason'] = reason
202 fanzago 1.75 elif er_code == '60308':
203     list_fallback.append( file )
204     reason = 'Copy succedeed with %s utils'%prot
205     dict['reason'] = reason
206 fanzago 1.41 elif er_code == '60302':
207     list_not_existing.append( file )
208 fanzago 1.74 elif er_code == '60303':
209     list_already_existing.append( file )
210     else :
211 fanzago 1.41 list_retry.append( file )
212    
213     if self.debug:
214     print "\t file %s \n"%file
215     print "\t dict['erCode'] %s \n"%dict['erCode']
216     print "\t dict['reason'] %s \n"%dict['reason']
217    
218 fanzago 1.74 upDict = self.updateReport(file, er_code, dict['reason'])
219 fanzago 1.41
220     copy_results.update(upDict)
221    
222     msg = ''
223     if len(list_ok) != 0:
224     msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
225     if len(list_ok) != len_list_files :
226 fanzago 1.75 if len(list_fallback)!=0:
227     msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
228 fanzago 1.74 if len(list_retry)!=0:
229     msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
230     if len(list_not_existing)!=0:
231     msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
232     if len(list_already_existing)!=0:
233     msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
234 fanzago 1.41 if self.debug : print msg
235    
236 fanzago 1.78 return copy_results, list_ok, list_retry, list_fallback
237 fanzago 1.74
238     def check_for_retry_localSE (self, copy_results):
239     """
240     Checks the status of copy and create the list of file to copy to CloseSE
241     """
242     list_retry_localSE = []
243    
244     if self.debug:
245     print 'in check_for_retry_localSE() :\n'
246     print "\t results in check local = ", copy_results
247     for file, dict in copy_results.iteritems():
248     er_code = dict['erCode']
249     if er_code != '0' and er_code != '60302' and er_code != '60308':
250     list_retry_localSE.append( file )
251    
252     if self.debug:
253     print "\t file %s \n"%file
254     print "\t dict['erCode'] %s \n"%dict['erCode']
255     print "\t dict['reason'] %s \n"%dict['reason']
256    
257     return list_retry_localSE
258    
259 fanzago 1.41
260 spiga 1.45 def LocalCopy(self, list_retry, results):
261 fanzago 1.41 """
262 spiga 1.45 Tries the stage out to the CloseSE
263 fanzago 1.38 """
264 fanzago 1.41 if self.debug:
265 spiga 1.45 print 'in LocalCopy() :\n'
266 fanzago 1.41 print '\t list_retry %s utils \n'%list_retry
267     print '\t len(list_retry) %s \n'%len(list_retry)
268    
269 fanzago 1.78 list_files = list_retry
270     self.params['inputFileList']=list_files
271    
272 fanzago 1.41 ### copy backup
273     from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
274     siteCfg = loadSiteLocalConfig()
275     seName = siteCfg.localStageOut.get("se-name", None)
276     catalog = siteCfg.localStageOut.get("catalog", None)
277     implName = siteCfg.localStageOut.get("command", None)
278     if (implName == 'srm'):
279     implName='srmv1'
280     self.params['srm_version']=implName
281     ##### to be improved ###############
282     if (implName == 'rfcp'):
283     self.params['middleware']='lsf'
284 fanzago 1.79 #### FEDE for hadoop
285     if implName == 'hadoop':
286     self.params['middleware'] = 'hadoop'
287 fanzago 1.41 ####################################
288    
289     self.params['protocol']=implName
290     tfc = siteCfg.trivialFileCatalog()
291    
292     if self.debug:
293     print '\t siteCFG %s \n'%siteCfg
294     print '\t seName %s \n'%seName
295     print '\t catalog %s \n'%catalog
296     print "\t self.params['protocol'] %s \n"%self.params['protocol']
297     print '\t tfc %s '%tfc
298 fanzago 1.78 print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
299 fanzago 1.41
300 fanzago 1.74 if (str(self.params['for_lfn']).find("/store/") == 0):
301     temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
302     self.params['for_lfn']= temp
303 fanzago 1.69
304 fanzago 1.74 if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
305 fanzago 1.65
306 fanzago 1.41 file_backup=[]
307 fanzago 1.78 for input in self.params['inputFileList']:
308 fanzago 1.74 file = self.params['for_lfn'] + os.path.basename(input)
309 fanzago 1.41 surl = tfc.matchLFN(tfc.preferredProtocol, file)
310     file_backup.append(surl)
311     if self.debug:
312 fanzago 1.74 print '\t for_lfn %s \n'%self.params['for_lfn']
313 fanzago 1.41 print '\t file %s \n'%file
314     print '\t surl %s \n'%surl
315    
316     destination=os.path.dirname(file_backup[0])
317 fanzago 1.59 if ( destination[-1] != '/' ) : destination = destination + '/'
318 fanzago 1.77
319 fanzago 1.41 self.params['destination']=destination
320 fanzago 1.74
321     self.params['se_name']=seName
322 fanzago 1.41
323     if self.debug:
324     print "\t self.params['destination']%s \n"%self.params['destination']
325     print "\t self.params['protocol'] %s \n"%self.params['protocol']
326     print "\t self.params['option']%s \n"%self.params['option']
327    
328     for prot, opt in self.setProtocol( self.params['middleware'] ):
329 spiga 1.45 if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
330 fanzago 1.65 localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
331 spiga 1.45 if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
332     results.update(localCopy_results)
333 fanzago 1.41 else:
334 fanzago 1.78 localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), prot)
335 spiga 1.45 results.update(localCopy_results)
336 fanzago 1.78 if len(list_fallback) == len(list_files) :
337 fanzago 1.41 break
338     if len(list_retry):
339     list_files = list_retry
340     else: break
341     if self.debug:
342 spiga 1.45 print "\t localCopy_results = %s \n"%localCopy_results
343 spiga 1.28
344 fanzago 1.41 return results
345    
346 spiga 1.8 def stager( self, middleware, list_files ):
347 spiga 1.1 """
348     Implement the logic for remote stage out
349     """
350 spiga 1.30
351 fanzago 1.38 if self.debug:
352     print 'stager() :\n'
353     print '\tmiddleware %s\n'%middleware
354 spiga 1.45 print '\tlist_files %s\n'%list_files
355 fanzago 1.38
356 spiga 1.6 results={}
357 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
358 fanzago 1.74 if self.debug:
359     print '\tTrying the stage out with %s utils \n'%prot
360     print '\tand options %s\n'%opt
361    
362 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
363 spiga 1.30 if copy_results.keys() == [''] or copy_results.keys() == '' :
364 spiga 1.13 results.update(copy_results)
365     else:
366 fanzago 1.78 copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
367 spiga 1.13 results.update(copy_results)
368     if len(list_ok) == len(list_files) :
369     break
370 fanzago 1.74 if len(list_retry):
371 fanzago 1.41 list_files = list_retry
372     else: break
373 fanzago 1.74
374 spiga 1.45 if self.local_stage:
375 fanzago 1.74 list_retry_localSE = self.check_for_retry_localSE(results)
376 fanzago 1.55 if len(list_retry_localSE):
377 fanzago 1.74 if self.debug:
378     print "\t list_retry_localSE %s \n"%list_retry_localSE
379 fanzago 1.55 results = self.LocalCopy(list_retry_localSE, results)
380 fanzago 1.74
381 fanzago 1.38 if self.debug:
382     print "\t results %s \n"%results
383 spiga 1.1 return results
384    
385     def initializeApi(self, protocol ):
386     """
387 ewv 1.7 Instantiate storage interface
388 spiga 1.1 """
389 spiga 1.30 if self.debug : print 'initializeApi() :\n'
390 spiga 1.20 self.source_prot = protocol
391     self.dest_prot = protocol
392     if not self.params['source'] : self.source_prot = 'local'
393     Source_SE = self.storageInterface( self.params['source'], self.source_prot )
394 fanzago 1.77 if not self.params['destination'] :
395     self.dest_prot = 'local'
396     Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
397     else:
398     Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
399 spiga 1.1
400     if self.debug :
401 spiga 1.30 msg = '\t(source=%s, protocol=%s)'%(self.params['source'], self.source_prot)
402     msg += '\t(destination=%s, protocol=%s)'%(self.params['destination'], self.dest_prot)
403 fanzago 1.77 msg += '\t(destinationDir=%s, protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
404 mcinquil 1.32 print msg
405 spiga 1.1
406     return Source_SE, Destination_SE
407    
408 fanzago 1.65 def copy( self, list_file, protocol, options, backup='no' ):
409 spiga 1.1 """
410 ewv 1.7 Make the real file copy using SE API
411 spiga 1.1 """
412 mcinquil 1.37 msg = ""
413 fanzago 1.74 results = {}
414 spiga 1.1 if self.debug :
415 spiga 1.30 msg = 'copy() :\n'
416     msg += '\tusing %s protocol\n'%protocol
417     print msg
418 spiga 1.13 try:
419     Source_SE, Destination_SE = self.initializeApi( protocol )
420     except Exception, ex:
421 fanzago 1.69 for filetocopy in list_file:
422     results.update( self.updateReport(filetocopy, '-1', str(ex)))
423     return results
424 fanzago 1.74
425     prot = Destination_SE.protocol
426     self.hostname=Destination_SE.hostname
427 ewv 1.14
428 ewv 1.7 # create remote dir
429 fanzago 1.79 ### FEDE for hadoop
430     if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop']:
431 spiga 1.13 try:
432 mcinquil 1.32 self.createDir( Destination_SE, Destination_SE.protocol )
433 mcinquil 1.47 except OperationException, ex:
434 fanzago 1.69 for filetocopy in list_file:
435 fanzago 1.70 results.update( self.updateReport(filetocopy, '60316', str(ex)))
436 fanzago 1.69 return results
437 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
438     except MissingCommand, ex:
439     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
440 fanzago 1.69 for filetocopy in list_file:
441     results.update( self.updateReport(filetocopy, '10041', msg))
442     return results
443     except Exception, ex:
444     msg = "ERROR %s" %(str(ex))
445     for filetocopy in list_file:
446     results.update( self.updateReport(filetocopy, '-1', msg))
447     return results
448    
449 spiga 1.1 ## prepare for real copy ##
450 spiga 1.8 try :
451     sbi = SBinterface( Source_SE, Destination_SE )
452     sbi_dest = SBinterface(Destination_SE)
453 spiga 1.20 sbi_source = SBinterface(Source_SE)
454 spiga 1.11 except ProtocolMismatch, ex:
455 spiga 1.30 msg = "ERROR : Unable to create SBinterface with %s protocol"%protocol
456     msg += str(ex)
457 fanzago 1.69 for filetocopy in list_file:
458     results.update( self.updateReport(filetocopy, '-1', msg))
459     return results
460 spiga 1.1
461 fanzago 1.74 self.hostname = Destination_SE.hostname
462    
463 ewv 1.7 ## loop over the complete list of files
464     for filetocopy in list_file:
465 spiga 1.30 if self.debug : print '\tStart real copy for %s'%filetocopy
466 spiga 1.13 try :
467 spiga 1.34 ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
468 spiga 1.13 except Exception, ex:
469 spiga 1.58 ErCode = '60307'
470 spiga 1.18 msg = str(ex)
471 ewv 1.7 if ErCode == '0':
472 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
473 fanzago 1.65 if (ErCode == '0') and (backup == 'yes'):
474     ErCode = '60308'
475 spiga 1.30 if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
476 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
477     return results
478 ewv 1.7
479 spiga 1.1
480     def storageInterface( self, endpoint, protocol ):
481     """
482 ewv 1.7 Create the storage interface.
483 spiga 1.1 """
484 spiga 1.30 if self.debug : print 'storageInterface():\n'
485 spiga 1.1 try:
486     interface = SElement( FullPath(endpoint), protocol )
487 spiga 1.11 except ProtocolUnknown, ex:
488 spiga 1.30 msg = "ERROR : Unable to create interface with %s protocol"%protocol
489     msg += str(ex)
490 spiga 1.13 raise Exception(msg)
491 spiga 1.1
492     return interface
493    
494     def createDir(self, Destination_SE, protocol):
495     """
496 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
497 ewv 1.7 this should be transparent at SE API level.
498 spiga 1.1 """
499 spiga 1.30 if self.debug : print 'createDir():\n'
500 ewv 1.14 msg = ''
501 spiga 1.1 try:
502     action = SBinterface( Destination_SE )
503     action.createDir()
504 spiga 1.30 if self.debug: print "\tThe directory has been created using protocol %s"%protocol
505 spiga 1.11 except TransferException, ex:
506 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
507     msg += str(ex)
508 spiga 1.11 if self.debug :
509 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
510     dbgmsg += '\t'+str(ex.output)+'\n'
511     print dbgmsg
512 spiga 1.29 raise Exception(msg)
513 spiga 1.11 except OperationException, ex:
514 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
515     msg += str(ex)
516     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
517 spiga 1.29 raise Exception(msg)
518 spiga 1.31 except MissingDestination, ex:
519     msg = "ERROR: problem with the directory creation using %s protocol "%protocol
520     msg += str(ex)
521     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
522     raise Exception(msg)
523     except AlreadyExistsException, ex:
524     if self.debug: print "\tThe directory already exist"
525 fanzago 1.74 pass
526     except Exception, ex:
527     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
528     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
529     raise Exception(msg)
530 spiga 1.13 return msg
531 spiga 1.1
532 spiga 1.34 def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
533 spiga 1.1 """
534 spiga 1.20 Check both if source file exist AND
535     if destination file ALREADY exist.
536 ewv 1.7 """
537 spiga 1.30 if self.debug : print 'checkFileExist():\n'
538 spiga 1.8 ErCode = '0'
539     msg = ''
540 spiga 1.20 f_tocopy=filetocopy
541     if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
542 spiga 1.1 try:
543 mcinquil 1.72 checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
544 spiga 1.30 if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy
545 spiga 1.20 except OperationException, ex:
546 spiga 1.30 msg ='ERROR: problems checkig if source file %s exist'%filetocopy
547     msg += str(ex)
548 spiga 1.20 if self.debug :
549 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
550     dbgmsg += '\t'+str(ex.output)+'\n'
551     print dbgmsg
552 spiga 1.20 raise Exception(msg)
553     except WrongOption, ex:
554 spiga 1.30 msg ='ERROR problems checkig if source file % exist'%filetocopy
555     msg += str(ex)
556 spiga 1.20 if self.debug :
557 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
558     dbgmsg += '\t'+str(ex.output)+'\n'
559     print dbgmsg
560 spiga 1.20 raise Exception(msg)
561 spiga 1.31 except MissingDestination, ex:
562     msg ='ERROR problems checkig if source file % exist'%filetocopy
563     msg += str(ex)
564     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
565     raise Exception(msg)
566 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
567     except MissingCommand, ex:
568     ErCode = '10041'
569     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
570     return ErCode, msg
571 spiga 1.20 if not checkSource :
572     ErCode = '60302'
573     msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
574     return ErCode, msg
575     f_tocopy=filetocopy
576     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
577     try:
578 mcinquil 1.72 check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
579 spiga 1.30 if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy
580 spiga 1.11 except OperationException, ex:
581 spiga 1.30 msg = 'ERROR: problems checkig if file %s already exist'%filetocopy
582     msg += str(ex)
583 spiga 1.11 if self.debug :
584 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
585     dbgmsg += '\t'+str(ex.output)+'\n'
586     print dbgmsg
587 spiga 1.13 raise Exception(msg)
588 spiga 1.11 except WrongOption, ex:
589 spiga 1.30 msg = 'ERROR problems checkig if file % already exist'%filetocopy
590     msg += str(ex)
591 spiga 1.11 if self.debug :
592 spiga 1.30 msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
593     msg += '\t'+str(ex.output)+'\n'
594 spiga 1.13 raise Exception(msg)
595 spiga 1.31 except MissingDestination, ex:
596     msg ='ERROR problems checkig if source file % exist'%filetocopy
597     msg += str(ex)
598     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
599     raise Exception(msg)
600 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
601     except MissingCommand, ex:
602     ErCode = '10041'
603     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
604     return ErCode, msg
605 ewv 1.14 if check :
606 ewv 1.7 ErCode = '60303'
607 spiga 1.20 msg = "file %s already exist"%os.path.basename(filetocopy)
608 ewv 1.14
609 spiga 1.13 return ErCode, msg
610 spiga 1.1
611 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
612 spiga 1.1 """
613 ewv 1.7 call the copy API.
614 spiga 1.1 """
615 spiga 1.30 if self.debug : print 'makeCopy():\n'
616 ewv 1.7 path = os.path.dirname(filetocopy)
617 spiga 1.1 file_name = os.path.basename(filetocopy)
618     source_file = filetocopy
619     dest_file = file_name ## to be improved supporting changing file name TODO
620 spiga 1.8 if self.params['source'] == '' and path == '':
621 spiga 1.1 source_file = os.path.abspath(filetocopy)
622 spiga 1.8 elif self.params['destination'] =='':
623 spiga 1.24 destDir = self.params.get('destinationDir',os.getcwd())
624     dest_file = os.path.join(destDir,file_name)
625 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
626 ewv 1.7 source_file = file_name
627 spiga 1.8
628 spiga 1.1 ErCode = '0'
629     msg = ''
630 ewv 1.7
631 spiga 1.71 if self.params['option'].find('space_token')>=0:
632 slacapra 1.64 space_token=self.params['option'].split('=')[1]
633     if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
634     if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
635 spiga 1.1 try:
636 mcinquil 1.72 sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
637 spiga 1.11 except TransferException, ex:
638 spiga 1.30 msg = "Problem copying %s file" % filetocopy
639     msg += str(ex)
640 spiga 1.11 if self.debug :
641 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
642     dbgmsg += '\t'+str(ex.output)+'\n'
643 mcinquil 1.32 print dbgmsg
644 spiga 1.1 ErCode = '60307'
645 spiga 1.11 except WrongOption, ex:
646 spiga 1.30 msg = "Problem copying %s file" % filetocopy
647     msg += str(ex)
648 spiga 1.11 if self.debug :
649 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
650     dbgmsg += '\t'+str(ex.output)+'\n'
651 fanzago 1.48 print dbgmsg
652 spiga 1.44 except SizeZeroException, ex:
653     msg = "Problem copying %s file" % filetocopy
654     msg += str(ex)
655     if self.debug :
656     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
657     dbgmsg += '\t'+str(ex.output)+'\n'
658 fanzago 1.48 print dbgmsg
659 spiga 1.13 ErCode = '60307'
660 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
661     except MissingCommand, ex:
662     ErCode = '10041'
663     msg = "Problem copying %s file" % filetocopy
664     msg += str(ex)
665     if self.debug :
666     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
667     dbgmsg += '\t'+str(ex.output)+'\n'
668     print dbgmsg
669 mcinquil 1.54 except AuthorizationException, ex:
670     ErCode = '60307'
671     msg = "Problem copying %s file" % filetocopy
672     msg += str(ex)
673     if self.debug :
674     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
675     dbgmsg += '\t'+str(ex.output)+'\n'
676     print dbgmsg
677 mcinquil 1.72 except SEAPITimeout, ex:
678     ErCode = '60317'
679     msg = "Problem copying %s file" % filetocopy
680     msg += str(ex)
681     if self.debug :
682     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
683     dbgmsg += '\t'+str(ex.output)+'\n'
684     print dbgmsg
685    
686 spiga 1.24 if ErCode == '0' and protocol.find('srmv') == 0:
687 spiga 1.19 remote_file_size = -1
688     local_file_size = os.path.getsize( source_file )
689     try:
690 mcinquil 1.72 remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
691 spiga 1.30 if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
692 spiga 1.19 except TransferException, ex:
693 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
694     msg += str(ex)
695 spiga 1.19 if self.debug :
696 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
697     dbgmsg += '\t'+str(ex.output)+'\n'
698     print dbgmsg
699 spiga 1.19 ErCode = '60307'
700     except WrongOption, ex:
701 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
702     msg += str(ex)
703 spiga 1.19 if self.debug :
704 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
705     dbgmsg += '\t'+str(ex.output)+'\n'
706     print dbgmsg
707 spiga 1.19 ErCode = '60307'
708     if local_file_size != remote_file_size:
709     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
710     ErCode = '60307'
711 ewv 1.14
712 spiga 1.27 if ErCode != '0':
713     try :
714 spiga 1.34 self.removeFile( sbi_dest, dest_file, option )
715 spiga 1.27 except Exception, ex:
716     msg += '\n'+str(ex)
717 spiga 1.1 return ErCode, msg
718 ewv 1.7
719 spiga 1.34 def removeFile( self, sbi_dest, filetocopy, option ):
720 spiga 1.30 """
721     """
722     if self.debug : print 'removeFile():\n'
723 spiga 1.21 f_tocopy=filetocopy
724     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
725     try:
726 mcinquil 1.72 sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
727 spiga 1.30 if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
728 spiga 1.21 except OperationException, ex:
729 spiga 1.30 msg ='ERROR: problems removing partially staged file %s'%filetocopy
730     msg += str(ex)
731 spiga 1.21 if self.debug :
732 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
733     dbgmsg += '\t'+str(ex.output)+'\n'
734     print dbgmsg
735 spiga 1.21 raise Exception(msg)
736    
737     return
738    
739 fanzago 1.74 #def updateReport(self, file, erCode, reason, lfn='', se='' ):
740     def updateReport(self, file, erCode, reason):
741 spiga 1.8 """
742     Update the final stage out infos
743     """
744     jobStageInfo={}
745     jobStageInfo['erCode']=erCode
746     jobStageInfo['reason']=reason
747 fanzago 1.74 if not self.params['for_lfn']: self.params['for_lfn']=''
748     if not self.params['se_name']: self.params['se_name']=''
749     if not self.hostname: self.hostname=''
750     if (erCode != '0') and (erCode != '60308'):
751     jobStageInfo['for_lfn']='/copy_problem/'
752     else:
753     jobStageInfo['for_lfn']=self.params['for_lfn']
754     jobStageInfo['se_name']=self.params['se_name']
755     jobStageInfo['endpoint']=self.hostname
756 spiga 1.1
757 spiga 1.8 report = { file : jobStageInfo}
758     return report
759 ewv 1.7
760 spiga 1.8 def finalReport( self , results ):
761     """
762     It a list of LFNs for each SE where data are stored.
763     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
764 spiga 1.1 """
765 spiga 1.8 outFile = open('cmscpReport.sh',"a")
766     cmscp_exit_status = 0
767     txt = ''
768 ewv 1.14 for file, dict in results.iteritems():
769 spiga 1.52 reason = str(dict['reason'])
770     if str(reason).find("'") > -1:
771     reason = " ".join(reason.split("'"))
772     reason="'%s'"%reason
773 ewv 1.14 if file:
774 fanzago 1.74 if dict['for_lfn']=='':
775 fanzago 1.67 lfn = '${LFNBaseName}'+os.path.basename(file)
776 spiga 1.11 se = '$SE'
777 fanzago 1.66 LFNBaseName = '$LFNBaseName'
778 spiga 1.11 else:
779 fanzago 1.74 lfn = dict['for_lfn']+os.path.basename(file)
780     se = dict['se_name']
781 fanzago 1.66 LFNBaseName = os.path.dirname(lfn)
782     if (LFNBaseName[-1] != '/'):
783     LFNBaseName = LFNBaseName + '/'
784 fanzago 1.74
785    
786 fanzago 1.39 txt += 'echo "Report for File: '+file+'"\n'
787     txt += 'echo "LFN: '+lfn+'"\n'
788     txt += 'echo "StorageElement: '+se+'"\n'
789 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
790 spiga 1.11 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
791 fanzago 1.59
792 spiga 1.71
793     if dict['erCode'] != '0':
794     cmscp_exit_status = dict['erCode']
795 spiga 1.12 else:
796 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
797 spiga 1.12 cmscp_exit_status = dict['erCode']
798 spiga 1.8 txt += '\n'
799     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
800 fanzago 1.65 txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
801 spiga 1.8 outFile.write(str(txt))
802     outFile.close()
803     return
804    
805    
806     def usage():
807    
808     msg="""
809 spiga 1.46 cmscp:
810     safe copy of local file to/from remote SE via lcg_cp/srmcp,
811     including success checking version also for CAF using rfcp command to copy the output to SE
812 spiga 1.8
813 spiga 1.46 accepted parameters:
814     source =
815     destination =
816     inputFileList =
817     outputFileList =
818     protocol =
819     option =
820     middleware =
821     srm_version =
822     destinationDir =
823     lfn= =
824     local_stage = activate stage fall back
825     debug = activate verbose print out
826     help = print on line man and exit
827    
828     mandatory:
829     * "source" and/or "destination" must always be defined
830     * either "middleware" or "protocol" must always be defined
831     * "inputFileList" must always be defined
832     * if "local_stage" = 1 also "lfn" must be defined
833 spiga 1.8 """
834 ewv 1.14 print msg
835 spiga 1.8
836 ewv 1.14 return
837 spiga 1.8
838     def HelpOptions(opts=[]):
839     """
840     Check otps, print help if needed
841 ewv 1.14 prepare dict = { opt : value }
842 spiga 1.8 """
843     dict_args = {}
844     if len(opts):
845     for opt, arg in opts:
846 ewv 1.14 dict_args[opt.split('--')[1]] = arg
847 spiga 1.8 if opt in ('-h','-help','--help') :
848     usage()
849     sys.exit(0)
850     return dict_args
851     else:
852     usage()
853     sys.exit(0)
854 spiga 1.1
855     if __name__ == '__main__' :
856 spiga 1.8
857 ewv 1.14 import getopt
858 spiga 1.8
859     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
860 spiga 1.25 "protocol=","option=", "middleware=", "srm_version=", \
861 fanzago 1.74 "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
862 ewv 1.14 try:
863     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
864 spiga 1.8 except getopt.GetoptError, err:
865     print err
866     HelpOptions()
867     sys.exit(2)
868 ewv 1.14
869 spiga 1.8 dictArgs = HelpOptions(opts)
870 spiga 1.1 try:
871 spiga 1.8 cmscp_ = cmscp(dictArgs)
872 spiga 1.1 cmscp_.run()
873 spiga 1.11 except Exception, ex :
874     print str(ex)
875 spiga 1.1