ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.90
Committed: Tue Apr 19 15:30:36 2011 UTC (14 years ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_8_patch2, CRAB_2_7_9_pre1, CRAB_2_7_8_patch2_pre1, CRAB_2_7_8_patch1, CRAB_2_7_8_patch1_pre1, CRAB_2_7_8
Changes since 1.89: +10 -10 lines
Log Message:
Daniele's fix for typo in last commit and improve text in some messages

File Contents

# User Rev Content
1 edelmann 1.56 #!/usr/bin/env python
2 mcinquil 1.16 import sys, os
3 fanzago 1.85 import time, random
4 fanzago 1.74 try:
5     import json
6     except:
7     import simplejson as json
8 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
9 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
10 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
11 spiga 1.1
12    
13     class cmscp:
14 spiga 1.8 def __init__(self, args):
15 spiga 1.1 """
16     cmscp
17 spiga 1.46 safe copy of local file to/from remote SE via lcg_cp/srmcp,
18 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
19     input:
20     $1 middleware (CAF, LSF, LCG, OSG)
21 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
22     $3 if needed: file name (the output file name)
23     $5 remote SE (complete endpoint)
24 ewv 1.7 $6 srm version
25 fanzago 1.74 --for_lfn $LFNBaseName
26 spiga 1.1 output:
27     return 0 if all ok
28     return 60307 if srmcp failed
29     return 60303 if file already exists in the SE
30     """
31 spiga 1.24 self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
32 fanzago 1.83 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'', "surl_for_grid":''}
33 ewv 1.14 self.debug = 0
34 fanzago 1.74 #### for fallback copy
35 spiga 1.46 self.local_stage = 0
36 spiga 1.8 self.params.update( args )
37 mcinquil 1.76 ## timeout needed for subprocess command of SEAPI
38     ## they should be a bit higher then the corresponding passed by command line
39     ## default values
40     self.subprocesstimeout = { \
41     'copy': 3600, \
42     'exists': 1200, \
43     'delete': 1200, \
44     'size': 1200 \
45     }
46    
47 spiga 1.1 return
48    
49 spiga 1.8 def processOptions( self ):
50 spiga 1.1 """
51 spiga 1.8 check command line parameter
52 spiga 1.1 """
53 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
54     if 'debug' in self.params.keys(): self.debug = 1
55 spiga 1.45 if 'local_stage' in self.params.keys(): self.local_stage = 1
56 ewv 1.7
57     # source and dest cannot be undefined at same time
58 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
59 spiga 1.36 HelpOptions()
60 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
61 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
62     HelpOptions()
63    
64 ewv 1.7 # input file must be defined
65 spiga 1.35 if not self.params['inputFileList'] :
66 spiga 1.36 HelpOptions()
67 spiga 1.1 else:
68 spiga 1.8 file_to_copy=[]
69 ewv 1.14 if self.params['inputFileList'].find(','):
70 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
71 ewv 1.7 else:
72 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
73     self.params['inputFileList'] = file_to_copy
74 ewv 1.7
75 fanzago 1.74 if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
76 fanzago 1.38
77 spiga 1.1
78 ewv 1.7 def run( self ):
79 spiga 1.1 """
80 ewv 1.7 Check if running on UI (no $middleware) or
81     on WN (on the Grid), and take different action
82 spiga 1.1 """
83 mcinquil 1.37 self.processOptions()
84 spiga 1.30 if self.debug: print 'calling run() : \n'
85 spiga 1.8 # stage out from WN
86     if self.params['middleware'] :
87 spiga 1.36 results = self.stager(self.params['middleware'],self.params['inputFileList'])
88 fanzago 1.74 self.writeJsonFile(results)
89 spiga 1.35 self.finalReport(results)
90 spiga 1.8 # Local interaction with SE
91 spiga 1.1 else:
92 spiga 1.36 results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
93 fanzago 1.74 self.writeJsonFile(results)
94 spiga 1.35 return results
95 fanzago 1.74
96     def writeJsonFile( self, results ):
97     """
98     write a json file containing copy results for each file
99     """
100     if self.debug:
101     print 'in writeJsonFile() : \n'
102     print "---->>>> in writeJsonFile results = ", results
103 spiga 1.86 jsonOut = "resultCopyFile"
104 fanzago 1.88 if os.getenv("RUNTIME_AREA"):
105 spiga 1.86 jsonOut = "%s/resultCopyFile"%os.getenv("RUNTIME_AREA")
106     fp = open(jsonOut, 'w')
107 fanzago 1.74 json.dump(results, fp)
108     fp.close()
109     if self.debug:
110     print ' reading resultCopyFile : \n'
111 spiga 1.86 lp = open(jsonOut, "r")
112 fanzago 1.74 inputDict = json.load(lp)
113     lp.close()
114     print " inputDict = ", inputDict
115     return
116 spiga 1.1
117 mcinquil 1.57 def checkLcgUtils( self ):
118     """
119     _checkLcgUtils_
120     check the lcg-utils version and report
121     """
122     import commands
123     cmd = "lcg-cp --version | grep lcg_util"
124     status, output = commands.getstatusoutput( cmd )
125     num_ver = -1
126     if output.find("not found") == -1 or status == 0:
127     temp = output.split("-")
128     version = ""
129     if len(temp) >= 2:
130     version = output.split("-")[1]
131     temp = version.split(".")
132     if len(temp) >= 1:
133     num_ver = int(temp[0])*10
134     num_ver += int(temp[1])
135     return num_ver
136    
137 spiga 1.8 def setProtocol( self, middleware ):
138 spiga 1.1 """
139     define the allowed potocols based on $middlware
140 ewv 1.7 which depend on scheduler
141 spiga 1.1 """
142 spiga 1.8 # default To be used with "middleware"
143 fanzago 1.38 if self.debug:
144     print 'setProtocol() :\n'
145     print '\tmiddleware = %s utils \n'%middleware
146 mcinquil 1.72
147 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
148     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
149 mcinquil 1.57 if self.checkLcgUtils() >= 17:
150 mcinquil 1.72 lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
151     'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
152 mcinquil 1.57
153 spiga 1.13 srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
154 spiga 1.71 'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
155 spiga 1.8 rfioOpt=''
156    
157 ewv 1.14 supported_protocol = None
158 spiga 1.40 if middleware.lower() in ['osg','lcg','condor','sge']:
159 belforte 1.90 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']])]#,\
160 spiga 1.89 # (self.params['srm_version'],srmOpt[self.params['srm_version']])]
161 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
162 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
163 mcinquil 1.63 elif middleware.lower() in ['pbs']:
164     supported_protocol = [('rfio',rfioOpt),('local','')]
165 edelmann 1.53 elif middleware.lower() in ['arc']:
166     supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
167 spiga 1.1 else:
168 ewv 1.7 ## here we can add support for any kind of protocol,
169 spiga 1.1 ## maybe some local schedulers need something dedicated
170     pass
171     return supported_protocol
172 spiga 1.28
173 fanzago 1.41
174 fanzago 1.74 def checkCopy (self, copy_results, len_list_files, prot):
175 fanzago 1.41 """
176     Checks the status of copy and update result dictionary
177 spiga 1.28 """
178 fanzago 1.74
179 spiga 1.28 list_retry = []
180 fanzago 1.41 list_not_existing = []
181 fanzago 1.74 list_already_existing = []
182 fanzago 1.75 list_fallback = []
183 spiga 1.28 list_ok = []
184 fanzago 1.41
185     if self.debug:
186     print 'in checkCopy() :\n'
187 fanzago 1.81
188 fanzago 1.41 for file, dict in copy_results.iteritems():
189     er_code = dict['erCode']
190     if er_code == '0':
191     list_ok.append(file)
192     reason = 'Copy succedeed with %s utils'%prot
193     dict['reason'] = reason
194 fanzago 1.75 elif er_code == '60308':
195     list_fallback.append( file )
196     reason = 'Copy succedeed with %s utils'%prot
197     dict['reason'] = reason
198 fanzago 1.41 elif er_code == '60302':
199     list_not_existing.append( file )
200 fanzago 1.74 elif er_code == '60303':
201     list_already_existing.append( file )
202     else :
203 fanzago 1.41 list_retry.append( file )
204 fanzago 1.81
205 fanzago 1.41 if self.debug:
206     print "\t file %s \n"%file
207     print "\t dict['erCode'] %s \n"%dict['erCode']
208     print "\t dict['reason'] %s \n"%dict['reason']
209    
210 fanzago 1.74 upDict = self.updateReport(file, er_code, dict['reason'])
211 fanzago 1.41
212     copy_results.update(upDict)
213    
214     msg = ''
215     if len(list_ok) != 0:
216     msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
217     if len(list_ok) != len_list_files :
218 fanzago 1.75 if len(list_fallback)!=0:
219     msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
220 fanzago 1.74 if len(list_retry)!=0:
221     msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
222     if len(list_not_existing)!=0:
223     msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
224     if len(list_already_existing)!=0:
225     msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
226 fanzago 1.41 if self.debug : print msg
227 fanzago 1.78 return copy_results, list_ok, list_retry, list_fallback
228 fanzago 1.74
229     def check_for_retry_localSE (self, copy_results):
230     """
231     Checks the status of copy and create the list of file to copy to CloseSE
232     """
233     list_retry_localSE = []
234    
235     if self.debug:
236     print 'in check_for_retry_localSE() :\n'
237     print "\t results in check local = ", copy_results
238     for file, dict in copy_results.iteritems():
239     er_code = dict['erCode']
240     if er_code != '0' and er_code != '60302' and er_code != '60308':
241     list_retry_localSE.append( file )
242    
243     if self.debug:
244     print "\t file %s \n"%file
245     print "\t dict['erCode'] %s \n"%dict['erCode']
246     print "\t dict['reason'] %s \n"%dict['reason']
247    
248     return list_retry_localSE
249    
250 fanzago 1.41
251 spiga 1.45 def LocalCopy(self, list_retry, results):
252 fanzago 1.41 """
253 spiga 1.45 Tries the stage out to the CloseSE
254 fanzago 1.38 """
255 fanzago 1.41 if self.debug:
256 spiga 1.45 print 'in LocalCopy() :\n'
257 fanzago 1.41 print '\t list_retry %s utils \n'%list_retry
258     print '\t len(list_retry) %s \n'%len(list_retry)
259    
260 fanzago 1.81 list_files = list_retry
261 fanzago 1.78 self.params['inputFileList']=list_files
262    
263 fanzago 1.41 ### copy backup
264     from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
265     siteCfg = loadSiteLocalConfig()
266     catalog = siteCfg.localStageOut.get("catalog", None)
267 fanzago 1.81 tfc = siteCfg.trivialFileCatalog()
268     seName = siteCfg.localStageOutSEName()
269     if seName is None:
270     seName = ""
271     option = siteCfg.localStageOutOption()
272     if option is None:
273     option = ""
274     implName = siteCfg.localStageOutCommand()
275     if implName is None:
276     implName = ""
277 fanzago 1.80
278 fanzago 1.41 if (implName == 'srm'):
279 fanzago 1.80 protocol = 'srmv2'
280     elif (implName == 'srmv2-lcg'):
281     protocol = 'srm-lcg'
282 fanzago 1.81 option = option + ' -b -D srmv2 '
283 fanzago 1.80 elif (implName == 'rfcp-CERN'):
284     protocol = 'rfio'
285     elif (implName == 'rfcp'):
286     protocol = 'rfio'
287 fanzago 1.81 elif (implName == 'cp'):
288     protocol = 'local'
289 fanzago 1.80 else: protocol = implName
290 fanzago 1.82
291     self.params['protocol']=protocol
292     self.params['option']=option
293    
294 fanzago 1.41 if self.debug:
295     print '\t siteCFG %s \n'%siteCfg
296     print '\t catalog %s \n'%catalog
297 fanzago 1.81 print '\t tfc %s '%tfc
298     print '\t fallback seName %s \n'%seName
299 fanzago 1.80 print "\t fallback protocol %s \n"%protocol
300     print "\t fallback option %s \n"%option
301 fanzago 1.78 print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
302 fanzago 1.41
303 fanzago 1.74 if (str(self.params['for_lfn']).find("/store/") == 0):
304     temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
305     self.params['for_lfn']= temp
306 fanzago 1.69
307 fanzago 1.74 if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
308 fanzago 1.65
309 fanzago 1.41 file_backup=[]
310 fanzago 1.83 file_backup_surlgrid=[]
311 fanzago 1.78 for input in self.params['inputFileList']:
312 fanzago 1.74 file = self.params['for_lfn'] + os.path.basename(input)
313 fanzago 1.41 surl = tfc.matchLFN(tfc.preferredProtocol, file)
314 fanzago 1.83
315     ###### FEDE TEST_FOR_SURL_GRID
316     surl_for_grid = tfc.matchLFN('srmv2', file)
317     if (surl_for_grid == None):
318     surl_for_grid = tfc.matchLFN('srmv2-lcg', file)
319     if (surl_for_grid == None):
320     surl_for_grid = tfc.matchLFN('srm', file)
321     if surl_for_grid:
322     file_backup_surlgrid.append(surl_for_grid)
323     ######
324    
325 fanzago 1.41 file_backup.append(surl)
326     if self.debug:
327 fanzago 1.74 print '\t for_lfn %s \n'%self.params['for_lfn']
328 fanzago 1.41 print '\t file %s \n'%file
329     print '\t surl %s \n'%surl
330    
331     destination=os.path.dirname(file_backup[0])
332 fanzago 1.59 if ( destination[-1] != '/' ) : destination = destination + '/'
333 fanzago 1.41 self.params['destination']=destination
334 fanzago 1.83
335 fanzago 1.74 self.params['se_name']=seName
336 fanzago 1.83
337     ######
338     if (len(file_backup_surlgrid)>0) :
339     surl_for_grid=os.path.dirname(file_backup_surlgrid[0])
340     if ( surl_for_grid[-1] != '/' ) : surl_for_grid = surl_for_grid + '/'
341     else:
342     surl_for_grid=''
343    
344     print "surl_for_grid = ", surl_for_grid
345     self.params['surl_for_grid']=surl_for_grid
346     #####
347    
348 fanzago 1.41 if self.debug:
349 fanzago 1.82 print '\tIn LocalCopy trying the stage out with: \n'
350     print "\tself.params['destination'] %s \n"%self.params['destination']
351     print "\tself.params['protocol'] %s \n"%self.params['protocol']
352     print "\tself.params['option'] %s \n"%self.params['option']
353 fanzago 1.80
354 fanzago 1.82 localCopy_results = self.copy( self.params['inputFileList'], self.params['protocol'], self.params['option'], backup='yes' )
355 fanzago 1.80
356     if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
357     results.update(localCopy_results)
358     else:
359 fanzago 1.82 localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), self.params['protocol'])
360 fanzago 1.80
361     results.update(localCopy_results)
362     if self.debug:
363     print "\t localCopy_results = %s \n"%localCopy_results
364 fanzago 1.41 return results
365    
366 spiga 1.8 def stager( self, middleware, list_files ):
367 spiga 1.1 """
368     Implement the logic for remote stage out
369     """
370 spiga 1.30
371 fanzago 1.38 if self.debug:
372     print 'stager() :\n'
373     print '\tmiddleware %s\n'%middleware
374 spiga 1.45 print '\tlist_files %s\n'%list_files
375 fanzago 1.38
376 spiga 1.6 results={}
377 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
378 fanzago 1.74 if self.debug:
379     print '\tTrying the stage out with %s utils \n'%prot
380     print '\tand options %s\n'%opt
381    
382 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
383 spiga 1.30 if copy_results.keys() == [''] or copy_results.keys() == '' :
384 spiga 1.13 results.update(copy_results)
385     else:
386 fanzago 1.78 copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
387 spiga 1.13 results.update(copy_results)
388     if len(list_ok) == len(list_files) :
389     break
390 fanzago 1.74 if len(list_retry):
391 fanzago 1.41 list_files = list_retry
392 fanzago 1.85 #### FEDE added ramdom time before the retry copy with other protocol
393 fanzago 1.87 sec = 240 * random.random()
394     sec = sec + 60
395 fanzago 1.85 time.sleep(sec)
396 fanzago 1.41 else: break
397 fanzago 1.74
398 spiga 1.45 if self.local_stage:
399 fanzago 1.74 list_retry_localSE = self.check_for_retry_localSE(results)
400 fanzago 1.55 if len(list_retry_localSE):
401 fanzago 1.74 if self.debug:
402     print "\t list_retry_localSE %s \n"%list_retry_localSE
403 fanzago 1.55 results = self.LocalCopy(list_retry_localSE, results)
404 fanzago 1.74
405 fanzago 1.38 if self.debug:
406     print "\t results %s \n"%results
407 spiga 1.1 return results
408    
409     def initializeApi(self, protocol ):
410     """
411 ewv 1.7 Instantiate storage interface
412 spiga 1.1 """
413 spiga 1.30 if self.debug : print 'initializeApi() :\n'
414 spiga 1.20 self.source_prot = protocol
415     self.dest_prot = protocol
416     if not self.params['source'] : self.source_prot = 'local'
417     Source_SE = self.storageInterface( self.params['source'], self.source_prot )
418 fanzago 1.77 if not self.params['destination'] :
419     self.dest_prot = 'local'
420     Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
421     else:
422     Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
423 spiga 1.1
424     if self.debug :
425 spiga 1.30 msg = '\t(source=%s, protocol=%s)'%(self.params['source'], self.source_prot)
426     msg += '\t(destination=%s, protocol=%s)'%(self.params['destination'], self.dest_prot)
427 fanzago 1.77 msg += '\t(destinationDir=%s, protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
428 mcinquil 1.32 print msg
429 spiga 1.1
430     return Source_SE, Destination_SE
431    
432 fanzago 1.65 def copy( self, list_file, protocol, options, backup='no' ):
433 spiga 1.1 """
434 ewv 1.7 Make the real file copy using SE API
435 spiga 1.1 """
436 mcinquil 1.37 msg = ""
437 fanzago 1.74 results = {}
438 spiga 1.1 if self.debug :
439 spiga 1.30 msg = 'copy() :\n'
440     msg += '\tusing %s protocol\n'%protocol
441 fanzago 1.81 msg += '\tusing %s options\n'%options
442     msg += '\tlist_file %s\n'%list_file
443 spiga 1.30 print msg
444 spiga 1.13 try:
445     Source_SE, Destination_SE = self.initializeApi( protocol )
446     except Exception, ex:
447 fanzago 1.69 for filetocopy in list_file:
448     results.update( self.updateReport(filetocopy, '-1', str(ex)))
449     return results
450 ewv 1.14
451 fanzago 1.81 self.hostname = Destination_SE.hostname
452     if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop','local']:
453 spiga 1.13 try:
454 mcinquil 1.32 self.createDir( Destination_SE, Destination_SE.protocol )
455 mcinquil 1.47 except OperationException, ex:
456 fanzago 1.69 for filetocopy in list_file:
457 fanzago 1.70 results.update( self.updateReport(filetocopy, '60316', str(ex)))
458 fanzago 1.69 return results
459 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
460     except MissingCommand, ex:
461     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
462 fanzago 1.69 for filetocopy in list_file:
463     results.update( self.updateReport(filetocopy, '10041', msg))
464     return results
465     except Exception, ex:
466     msg = "ERROR %s" %(str(ex))
467     for filetocopy in list_file:
468     results.update( self.updateReport(filetocopy, '-1', msg))
469     return results
470 spiga 1.1 ## prepare for real copy ##
471 spiga 1.8 try :
472     sbi = SBinterface( Source_SE, Destination_SE )
473     sbi_dest = SBinterface(Destination_SE)
474 spiga 1.20 sbi_source = SBinterface(Source_SE)
475 spiga 1.11 except ProtocolMismatch, ex:
476 spiga 1.30 msg = "ERROR : Unable to create SBinterface with %s protocol"%protocol
477     msg += str(ex)
478 fanzago 1.69 for filetocopy in list_file:
479     results.update( self.updateReport(filetocopy, '-1', msg))
480     return results
481 spiga 1.1
482 fanzago 1.74
483 ewv 1.7 ## loop over the complete list of files
484     for filetocopy in list_file:
485 spiga 1.30 if self.debug : print '\tStart real copy for %s'%filetocopy
486 spiga 1.13 try :
487 spiga 1.34 ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
488 spiga 1.13 except Exception, ex:
489 spiga 1.58 ErCode = '60307'
490 spiga 1.18 msg = str(ex)
491 ewv 1.7 if ErCode == '0':
492 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
493 fanzago 1.65 if (ErCode == '0') and (backup == 'yes'):
494     ErCode = '60308'
495 spiga 1.30 if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
496 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
497     return results
498 ewv 1.7
499 spiga 1.1
500     def storageInterface( self, endpoint, protocol ):
501     """
502 ewv 1.7 Create the storage interface.
503 spiga 1.1 """
504 spiga 1.30 if self.debug : print 'storageInterface():\n'
505 spiga 1.1 try:
506     interface = SElement( FullPath(endpoint), protocol )
507 spiga 1.11 except ProtocolUnknown, ex:
508 spiga 1.30 msg = "ERROR : Unable to create interface with %s protocol"%protocol
509     msg += str(ex)
510 spiga 1.13 raise Exception(msg)
511 spiga 1.1
512     return interface
513    
514     def createDir(self, Destination_SE, protocol):
515     """
516 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
517 ewv 1.7 this should be transparent at SE API level.
518 spiga 1.1 """
519 spiga 1.30 if self.debug : print 'createDir():\n'
520 ewv 1.14 msg = ''
521 spiga 1.1 try:
522     action = SBinterface( Destination_SE )
523     action.createDir()
524 spiga 1.30 if self.debug: print "\tThe directory has been created using protocol %s"%protocol
525 spiga 1.11 except TransferException, ex:
526 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
527     msg += str(ex)
528 spiga 1.11 if self.debug :
529 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
530     dbgmsg += '\t'+str(ex.output)+'\n'
531     print dbgmsg
532 spiga 1.29 raise Exception(msg)
533 spiga 1.11 except OperationException, ex:
534 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
535     msg += str(ex)
536     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
537 spiga 1.29 raise Exception(msg)
538 spiga 1.31 except MissingDestination, ex:
539     msg = "ERROR: problem with the directory creation using %s protocol "%protocol
540     msg += str(ex)
541     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
542     raise Exception(msg)
543     except AlreadyExistsException, ex:
544     if self.debug: print "\tThe directory already exist"
545 fanzago 1.74 pass
546     except Exception, ex:
547     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
548     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
549     raise Exception(msg)
550 spiga 1.13 return msg
551 spiga 1.1
552 spiga 1.34 def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
553 spiga 1.1 """
554 spiga 1.20 Check both if source file exist AND
555     if destination file ALREADY exist.
556 ewv 1.7 """
557 spiga 1.30 if self.debug : print 'checkFileExist():\n'
558 spiga 1.8 ErCode = '0'
559     msg = ''
560 spiga 1.20 f_tocopy=filetocopy
561 fanzago 1.81 if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
562 spiga 1.1 try:
563 mcinquil 1.72 checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
564 belforte 1.90 if self.debug : print '\tCheck for local file %s existance executed \n'%f_tocopy
565 spiga 1.20 except OperationException, ex:
566 belforte 1.90 msg ='ERROR: problems checking source file %s existance'%filetocopy
567 spiga 1.30 msg += str(ex)
568 spiga 1.20 if self.debug :
569 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
570     dbgmsg += '\t'+str(ex.output)+'\n'
571     print dbgmsg
572 spiga 1.20 raise Exception(msg)
573     except WrongOption, ex:
574 belforte 1.90 msg ='ERROR: problems checking source file %s existance'%filetocopy
575 spiga 1.30 msg += str(ex)
576 spiga 1.20 if self.debug :
577 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
578     dbgmsg += '\t'+str(ex.output)+'\n'
579     print dbgmsg
580 spiga 1.20 raise Exception(msg)
581 spiga 1.31 except MissingDestination, ex:
582 belforte 1.90 msg ='ERROR: problems checking source file %s existance'%filetocopy
583 spiga 1.31 msg += str(ex)
584     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
585     raise Exception(msg)
586 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
587     except MissingCommand, ex:
588     ErCode = '10041'
589     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
590     return ErCode, msg
591 spiga 1.20 if not checkSource :
592     ErCode = '60302'
593 belforte 1.90 msg = "ERROR file %s does not exist"%os.path.basename(filetocopy)
594 spiga 1.20 return ErCode, msg
595 fanzago 1.81 f_tocopy=os.path.basename(filetocopy)
596 spiga 1.20 try:
597 mcinquil 1.72 check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
598 belforte 1.90 if self.debug : print '\tCheck for remote file %s existance executed \n'%f_tocopy
599 fanzago 1.81 if self.debug : print '\twith exit code = %s \n'%check
600 spiga 1.11 except OperationException, ex:
601 belforte 1.90 msg = 'ERROR: problems checking if file %s already exist'%filetocopy
602 spiga 1.30 msg += str(ex)
603 spiga 1.11 if self.debug :
604 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
605     dbgmsg += '\t'+str(ex.output)+'\n'
606     print dbgmsg
607 spiga 1.13 raise Exception(msg)
608 spiga 1.11 except WrongOption, ex:
609 belforte 1.90 msg = 'ERROR problems checking if file % already exists'%filetocopy
610 spiga 1.30 msg += str(ex)
611 spiga 1.11 if self.debug :
612 spiga 1.30 msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
613     msg += '\t'+str(ex.output)+'\n'
614 spiga 1.13 raise Exception(msg)
615 spiga 1.31 except MissingDestination, ex:
616 belforte 1.90 msg ='ERROR problems checking if destination file % exists'%filetocopy
617 spiga 1.31 msg += str(ex)
618     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
619     raise Exception(msg)
620 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
621     except MissingCommand, ex:
622     ErCode = '10041'
623     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
624     return ErCode, msg
625 ewv 1.14 if check :
626 ewv 1.7 ErCode = '60303'
627 spiga 1.20 msg = "file %s already exist"%os.path.basename(filetocopy)
628 ewv 1.14
629 spiga 1.13 return ErCode, msg
630 spiga 1.1
631 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
632 spiga 1.1 """
633 ewv 1.7 call the copy API.
634 spiga 1.1 """
635 spiga 1.30 if self.debug : print 'makeCopy():\n'
636 ewv 1.7 path = os.path.dirname(filetocopy)
637 spiga 1.1 file_name = os.path.basename(filetocopy)
638     source_file = filetocopy
639     dest_file = file_name ## to be improved supporting changing file name TODO
640 spiga 1.8 if self.params['source'] == '' and path == '':
641 spiga 1.1 source_file = os.path.abspath(filetocopy)
642 spiga 1.8 elif self.params['destination'] =='':
643 spiga 1.24 destDir = self.params.get('destinationDir',os.getcwd())
644     dest_file = os.path.join(destDir,file_name)
645 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
646 ewv 1.7 source_file = file_name
647 spiga 1.8
648 spiga 1.1 ErCode = '0'
649     msg = ''
650 ewv 1.7
651 belforte 1.84 copy_option = option
652 spiga 1.71 if self.params['option'].find('space_token')>=0:
653 slacapra 1.64 space_token=self.params['option'].split('=')[1]
654 belforte 1.84 if protocol == 'srmv2': copy_option = '%s -space_token=%s'%(option,space_token)
655     if protocol == 'srm-lcg': copy_option = '%s -S %s'%(option,space_token)
656 spiga 1.1 try:
657 belforte 1.84 sbi.copy( source_file , dest_file , opt = copy_option, tout = self.subprocesstimeout['copy'])
658 spiga 1.11 except TransferException, ex:
659 spiga 1.30 msg = "Problem copying %s file" % filetocopy
660     msg += str(ex)
661 spiga 1.11 if self.debug :
662 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
663     dbgmsg += '\t'+str(ex.output)+'\n'
664 mcinquil 1.32 print dbgmsg
665 spiga 1.1 ErCode = '60307'
666 spiga 1.11 except WrongOption, ex:
667 spiga 1.30 msg = "Problem copying %s file" % filetocopy
668     msg += str(ex)
669 spiga 1.11 if self.debug :
670 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
671     dbgmsg += '\t'+str(ex.output)+'\n'
672 fanzago 1.48 print dbgmsg
673 spiga 1.44 except SizeZeroException, ex:
674     msg = "Problem copying %s file" % filetocopy
675     msg += str(ex)
676     if self.debug :
677     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
678     dbgmsg += '\t'+str(ex.output)+'\n'
679 fanzago 1.48 print dbgmsg
680 spiga 1.13 ErCode = '60307'
681 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
682     except MissingCommand, ex:
683     ErCode = '10041'
684     msg = "Problem copying %s file" % filetocopy
685     msg += str(ex)
686     if self.debug :
687     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
688     dbgmsg += '\t'+str(ex.output)+'\n'
689     print dbgmsg
690 mcinquil 1.54 except AuthorizationException, ex:
691     ErCode = '60307'
692     msg = "Problem copying %s file" % filetocopy
693     msg += str(ex)
694     if self.debug :
695     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
696     dbgmsg += '\t'+str(ex.output)+'\n'
697     print dbgmsg
698 mcinquil 1.72 except SEAPITimeout, ex:
699     ErCode = '60317'
700     msg = "Problem copying %s file" % filetocopy
701     msg += str(ex)
702     if self.debug :
703     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
704     dbgmsg += '\t'+str(ex.output)+'\n'
705     print dbgmsg
706    
707 spiga 1.24 if ErCode == '0' and protocol.find('srmv') == 0:
708 spiga 1.19 remote_file_size = -1
709     local_file_size = os.path.getsize( source_file )
710     try:
711 mcinquil 1.72 remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
712 spiga 1.30 if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
713 spiga 1.19 except TransferException, ex:
714 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
715     msg += str(ex)
716 spiga 1.19 if self.debug :
717 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
718     dbgmsg += '\t'+str(ex.output)+'\n'
719     print dbgmsg
720 spiga 1.19 ErCode = '60307'
721     except WrongOption, ex:
722 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
723     msg += str(ex)
724 spiga 1.19 if self.debug :
725 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
726     dbgmsg += '\t'+str(ex.output)+'\n'
727     print dbgmsg
728 spiga 1.19 ErCode = '60307'
729     if local_file_size != remote_file_size:
730     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
731     ErCode = '60307'
732 ewv 1.14
733 spiga 1.27 if ErCode != '0':
734     try :
735 spiga 1.34 self.removeFile( sbi_dest, dest_file, option )
736 spiga 1.27 except Exception, ex:
737     msg += '\n'+str(ex)
738 spiga 1.1 return ErCode, msg
739 ewv 1.7
740 spiga 1.34 def removeFile( self, sbi_dest, filetocopy, option ):
741 spiga 1.30 """
742     """
743     if self.debug : print 'removeFile():\n'
744 spiga 1.21 f_tocopy=filetocopy
745     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
746     try:
747 mcinquil 1.72 sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
748 spiga 1.30 if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
749 spiga 1.21 except OperationException, ex:
750 spiga 1.30 msg ='ERROR: problems removing partially staged file %s'%filetocopy
751     msg += str(ex)
752 spiga 1.21 if self.debug :
753 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
754     dbgmsg += '\t'+str(ex.output)+'\n'
755     print dbgmsg
756 spiga 1.21 raise Exception(msg)
757    
758     return
759    
760 fanzago 1.74 def updateReport(self, file, erCode, reason):
761 spiga 1.8 """
762     Update the final stage out infos
763     """
764     jobStageInfo={}
765     jobStageInfo['erCode']=erCode
766     jobStageInfo['reason']=reason
767 fanzago 1.74 if not self.params['for_lfn']: self.params['for_lfn']=''
768     if not self.params['se_name']: self.params['se_name']=''
769     if not self.hostname: self.hostname=''
770     if (erCode != '0') and (erCode != '60308'):
771     jobStageInfo['for_lfn']='/copy_problem/'
772     else:
773     jobStageInfo['for_lfn']=self.params['for_lfn']
774     jobStageInfo['se_name']=self.params['se_name']
775     jobStageInfo['endpoint']=self.hostname
776 fanzago 1.83 ### ADDING SURLFORGRID FOR COPYDATA
777     if not self.params['surl_for_grid']: self.params['surl_for_grid']=''
778     jobStageInfo['surl_for_grid']=self.params['surl_for_grid']
779     #####
780 spiga 1.8 report = { file : jobStageInfo}
781     return report
782 ewv 1.7
783 spiga 1.8 def finalReport( self , results ):
784     """
785     It a list of LFNs for each SE where data are stored.
786     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
787 spiga 1.1 """
788 spiga 1.8 outFile = open('cmscpReport.sh',"a")
789     cmscp_exit_status = 0
790     txt = ''
791 ewv 1.14 for file, dict in results.iteritems():
792 spiga 1.52 reason = str(dict['reason'])
793     if str(reason).find("'") > -1:
794     reason = " ".join(reason.split("'"))
795     reason="'%s'"%reason
796 ewv 1.14 if file:
797 fanzago 1.74 if dict['for_lfn']=='':
798 fanzago 1.67 lfn = '${LFNBaseName}'+os.path.basename(file)
799 spiga 1.11 se = '$SE'
800 fanzago 1.66 LFNBaseName = '$LFNBaseName'
801 spiga 1.11 else:
802 fanzago 1.74 lfn = dict['for_lfn']+os.path.basename(file)
803     se = dict['se_name']
804 fanzago 1.66 LFNBaseName = os.path.dirname(lfn)
805     if (LFNBaseName[-1] != '/'):
806     LFNBaseName = LFNBaseName + '/'
807 fanzago 1.74
808    
809 fanzago 1.39 txt += 'echo "Report for File: '+file+'"\n'
810     txt += 'echo "LFN: '+lfn+'"\n'
811     txt += 'echo "StorageElement: '+se+'"\n'
812 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
813 spiga 1.11 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
814 fanzago 1.59
815 spiga 1.71
816     if dict['erCode'] != '0':
817     cmscp_exit_status = dict['erCode']
818 spiga 1.12 else:
819 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
820 spiga 1.12 cmscp_exit_status = dict['erCode']
821 spiga 1.8 txt += '\n'
822     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
823 fanzago 1.65 txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
824 spiga 1.8 outFile.write(str(txt))
825     outFile.close()
826     return
827    
828    
829     def usage():
830    
831     msg="""
832 spiga 1.46 cmscp:
833     safe copy of local file to/from remote SE via lcg_cp/srmcp,
834     including success checking version also for CAF using rfcp command to copy the output to SE
835 spiga 1.8
836 spiga 1.46 accepted parameters:
837     source =
838     destination =
839     inputFileList =
840     outputFileList =
841     protocol =
842     option =
843     middleware =
844     srm_version =
845     destinationDir =
846     lfn= =
847     local_stage = activate stage fall back
848     debug = activate verbose print out
849     help = print on line man and exit
850    
851     mandatory:
852     * "source" and/or "destination" must always be defined
853     * either "middleware" or "protocol" must always be defined
854     * "inputFileList" must always be defined
855     * if "local_stage" = 1 also "lfn" must be defined
856 spiga 1.8 """
857 ewv 1.14 print msg
858 spiga 1.8
859 ewv 1.14 return
860 spiga 1.8
861     def HelpOptions(opts=[]):
862     """
863     Check otps, print help if needed
864 ewv 1.14 prepare dict = { opt : value }
865 spiga 1.8 """
866     dict_args = {}
867     if len(opts):
868     for opt, arg in opts:
869 ewv 1.14 dict_args[opt.split('--')[1]] = arg
870 spiga 1.8 if opt in ('-h','-help','--help') :
871     usage()
872     sys.exit(0)
873     return dict_args
874     else:
875     usage()
876     sys.exit(0)
877 spiga 1.1
878     if __name__ == '__main__' :
879 spiga 1.8
880 ewv 1.14 import getopt
881 spiga 1.8
882     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
883 spiga 1.25 "protocol=","option=", "middleware=", "srm_version=", \
884 fanzago 1.74 "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
885 ewv 1.14 try:
886     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
887 spiga 1.8 except getopt.GetoptError, err:
888     print err
889     HelpOptions()
890     sys.exit(2)
891 ewv 1.14
892 spiga 1.8 dictArgs = HelpOptions(opts)
893 spiga 1.1 try:
894 spiga 1.8 cmscp_ = cmscp(dictArgs)
895 spiga 1.1 cmscp_.run()
896 spiga 1.11 except Exception, ex :
897     print str(ex)
898 spiga 1.1