ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.95
Committed: Fri Jan 11 14:02:19 2013 UTC (12 years, 3 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3
Changes since 1.94: +13 -5 lines
Log Message:
final fix for https://savannah.cern.ch/bugs/index.php?99708

File Contents

# User Rev Content
1 edelmann 1.56 #!/usr/bin/env python
2 mcinquil 1.16 import sys, os
3 fanzago 1.85 import time, random
4 fanzago 1.74 try:
5     import json
6     except:
7     import simplejson as json
8 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
9 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
10 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
11 belforte 1.94 from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
12    
13 spiga 1.1
14    
15     class cmscp:
16 spiga 1.8 def __init__(self, args):
17 spiga 1.1 """
18     cmscp
19 spiga 1.46 safe copy of local file to/from remote SE via lcg_cp/srmcp,
20 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
21     input:
22     $1 middleware (CAF, LSF, LCG, OSG)
23 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
24     $3 if needed: file name (the output file name)
25     $5 remote SE (complete endpoint)
26 ewv 1.7 $6 srm version
27 fanzago 1.74 --for_lfn $LFNBaseName
28 spiga 1.1 output:
29     return 0 if all ok
30     return 60307 if srmcp failed
31     return 60303 if file already exists in the SE
32     """
33 spiga 1.24 self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
34 fanzago 1.83 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'', "surl_for_grid":''}
35 ewv 1.14 self.debug = 0
36 fanzago 1.74 #### for fallback copy
37 spiga 1.46 self.local_stage = 0
38 spiga 1.8 self.params.update( args )
39 mcinquil 1.76 ## timeout needed for subprocess command of SEAPI
40     ## they should be a bit higher then the corresponding passed by command line
41     ## default values
42     self.subprocesstimeout = { \
43     'copy': 3600, \
44     'exists': 1200, \
45     'delete': 1200, \
46     'size': 1200 \
47     }
48    
49 spiga 1.1 return
50    
51 spiga 1.8 def processOptions( self ):
52 spiga 1.1 """
53 spiga 1.8 check command line parameter
54 spiga 1.1 """
55 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
56     if 'debug' in self.params.keys(): self.debug = 1
57 spiga 1.45 if 'local_stage' in self.params.keys(): self.local_stage = 1
58 ewv 1.7
59     # source and dest cannot be undefined at same time
60 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
61 spiga 1.36 HelpOptions()
62 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
63 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
64     HelpOptions()
65    
66 ewv 1.7 # input file must be defined
67 spiga 1.35 if not self.params['inputFileList'] :
68 spiga 1.36 HelpOptions()
69 spiga 1.1 else:
70 spiga 1.8 file_to_copy=[]
71 ewv 1.14 if self.params['inputFileList'].find(','):
72 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
73 ewv 1.7 else:
74 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
75     self.params['inputFileList'] = file_to_copy
76 ewv 1.7
77 fanzago 1.74 if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
78 fanzago 1.38
79 spiga 1.1
80 ewv 1.7 def run( self ):
81 spiga 1.1 """
82 ewv 1.7 Check if running on UI (no $middleware) or
83     on WN (on the Grid), and take different action
84 spiga 1.1 """
85 mcinquil 1.37 self.processOptions()
86 spiga 1.30 if self.debug: print 'calling run() : \n'
87 spiga 1.8 # stage out from WN
88     if self.params['middleware'] :
89 spiga 1.36 results = self.stager(self.params['middleware'],self.params['inputFileList'])
90 fanzago 1.74 self.writeJsonFile(results)
91 spiga 1.35 self.finalReport(results)
92 spiga 1.8 # Local interaction with SE
93 spiga 1.1 else:
94 spiga 1.36 results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
95 fanzago 1.74 self.writeJsonFile(results)
96 spiga 1.35 return results
97 fanzago 1.74
98     def writeJsonFile( self, results ):
99     """
100     write a json file containing copy results for each file
101     """
102     if self.debug:
103     print 'in writeJsonFile() : \n'
104     print "---->>>> in writeJsonFile results = ", results
105 spiga 1.86 jsonOut = "resultCopyFile"
106 fanzago 1.88 if os.getenv("RUNTIME_AREA"):
107 spiga 1.86 jsonOut = "%s/resultCopyFile"%os.getenv("RUNTIME_AREA")
108     fp = open(jsonOut, 'w')
109 fanzago 1.74 json.dump(results, fp)
110     fp.close()
111     if self.debug:
112     print ' reading resultCopyFile : \n'
113 spiga 1.86 lp = open(jsonOut, "r")
114 fanzago 1.74 inputDict = json.load(lp)
115     lp.close()
116     print " inputDict = ", inputDict
117     return
118 spiga 1.1
119 mcinquil 1.57 def checkLcgUtils( self ):
120     """
121     _checkLcgUtils_
122     check the lcg-utils version and report
123     """
124     import commands
125     cmd = "lcg-cp --version | grep lcg_util"
126     status, output = commands.getstatusoutput( cmd )
127     num_ver = -1
128     if output.find("not found") == -1 or status == 0:
129     temp = output.split("-")
130     version = ""
131     if len(temp) >= 2:
132     version = output.split("-")[1]
133     temp = version.split(".")
134     if len(temp) >= 1:
135     num_ver = int(temp[0])*10
136     num_ver += int(temp[1])
137     return num_ver
138    
139 spiga 1.8 def setProtocol( self, middleware ):
140 spiga 1.1 """
141     define the allowed potocols based on $middlware
142 ewv 1.7 which depend on scheduler
143 spiga 1.1 """
144 spiga 1.8 # default To be used with "middleware"
145 fanzago 1.38 if self.debug:
146     print 'setProtocol() :\n'
147     print '\tmiddleware = %s utils \n'%middleware
148 mcinquil 1.72
149 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
150     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
151 mcinquil 1.57 if self.checkLcgUtils() >= 17:
152 mcinquil 1.72 lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
153     'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
154 mcinquil 1.57
155 spiga 1.13 srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
156 spiga 1.71 'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
157 spiga 1.8 rfioOpt=''
158 fanzago 1.91 #### FEDE FOR XROOTD #########
159     xrootdOpt=''
160     #############################
161 spiga 1.8
162 ewv 1.14 supported_protocol = None
163 spiga 1.40 if middleware.lower() in ['osg','lcg','condor','sge']:
164 belforte 1.90 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']])]#,\
165 spiga 1.89 # (self.params['srm_version'],srmOpt[self.params['srm_version']])]
166 fanzago 1.91 #elif middleware.lower() in ['lsf','caf']:
167     elif middleware.lower() in ['lsf']:
168 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
169 mcinquil 1.63 elif middleware.lower() in ['pbs']:
170     supported_protocol = [('rfio',rfioOpt),('local','')]
171 edelmann 1.53 elif middleware.lower() in ['arc']:
172     supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
173 fanzago 1.91 #### FEDE FOR XROOTD ##########
174     elif middleware.lower() in ['caf']:
175     if self.params['protocol']:
176     supported_protocol = [(self.params['protocol'], '')]
177     else:
178     supported_protocol = [('rfio',rfioOpt)]
179     #######################################
180 spiga 1.1 else:
181 ewv 1.7 ## here we can add support for any kind of protocol,
182 spiga 1.1 ## maybe some local schedulers need something dedicated
183     pass
184 belforte 1.95
185     # find out where we run and where we stageout
186     siteCfg = loadSiteLocalConfig()
187     localSeName = siteCfg.localStageOutSEName()
188     remoteSeName = self.params['se_name']
189     seIsLocal = localSeName == remoteSeName
190     print "******************************************************"
191     print "localSeName = ",localSeName
192     print "remoteSeName = ", remoteSeName
193     print "******************************************************"
194    
195 belforte 1.94 # force lstore protocol for Vanderbilt, waiting for better
196     # long term solution
197 belforte 1.95 if seIsLocal and localSeName.endswith('vanderbilt.edu') :
198     print "*** I am at, and writing-to, Vanderbilt. Try lstore first ***"
199 belforte 1.94 supported_protocol.insert(0, ('lstore','') )
200    
201 spiga 1.1 return supported_protocol
202 spiga 1.28
203 fanzago 1.41
204 fanzago 1.74 def checkCopy (self, copy_results, len_list_files, prot):
205 fanzago 1.41 """
206     Checks the status of copy and update result dictionary
207 spiga 1.28 """
208 fanzago 1.74
209 spiga 1.28 list_retry = []
210 fanzago 1.41 list_not_existing = []
211 fanzago 1.74 list_already_existing = []
212 fanzago 1.75 list_fallback = []
213 spiga 1.28 list_ok = []
214 fanzago 1.41
215     if self.debug:
216     print 'in checkCopy() :\n'
217 fanzago 1.81
218 fanzago 1.41 for file, dict in copy_results.iteritems():
219     er_code = dict['erCode']
220     if er_code == '0':
221     list_ok.append(file)
222     reason = 'Copy succedeed with %s utils'%prot
223     dict['reason'] = reason
224 fanzago 1.75 elif er_code == '60308':
225     list_fallback.append( file )
226     reason = 'Copy succedeed with %s utils'%prot
227     dict['reason'] = reason
228 fanzago 1.41 elif er_code == '60302':
229     list_not_existing.append( file )
230 fanzago 1.74 elif er_code == '60303':
231     list_already_existing.append( file )
232     else :
233 fanzago 1.41 list_retry.append( file )
234 fanzago 1.81
235 fanzago 1.41 if self.debug:
236     print "\t file %s \n"%file
237     print "\t dict['erCode'] %s \n"%dict['erCode']
238     print "\t dict['reason'] %s \n"%dict['reason']
239    
240 fanzago 1.74 upDict = self.updateReport(file, er_code, dict['reason'])
241 fanzago 1.41
242     copy_results.update(upDict)
243    
244     msg = ''
245     if len(list_ok) != 0:
246     msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
247     if len(list_ok) != len_list_files :
248 fanzago 1.75 if len(list_fallback)!=0:
249     msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
250 fanzago 1.74 if len(list_retry)!=0:
251     msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
252     if len(list_not_existing)!=0:
253     msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
254     if len(list_already_existing)!=0:
255     msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
256 fanzago 1.41 if self.debug : print msg
257 fanzago 1.78 return copy_results, list_ok, list_retry, list_fallback
258 fanzago 1.74
259     def check_for_retry_localSE (self, copy_results):
260     """
261     Checks the status of copy and create the list of file to copy to CloseSE
262     """
263     list_retry_localSE = []
264    
265     if self.debug:
266     print 'in check_for_retry_localSE() :\n'
267     print "\t results in check local = ", copy_results
268     for file, dict in copy_results.iteritems():
269     er_code = dict['erCode']
270     if er_code != '0' and er_code != '60302' and er_code != '60308':
271     list_retry_localSE.append( file )
272    
273     if self.debug:
274     print "\t file %s \n"%file
275     print "\t dict['erCode'] %s \n"%dict['erCode']
276     print "\t dict['reason'] %s \n"%dict['reason']
277    
278     return list_retry_localSE
279    
280 fanzago 1.41
281 spiga 1.45 def LocalCopy(self, list_retry, results):
282 fanzago 1.41 """
283 spiga 1.45 Tries the stage out to the CloseSE
284 fanzago 1.38 """
285 fanzago 1.41 if self.debug:
286 spiga 1.45 print 'in LocalCopy() :\n'
287 fanzago 1.41 print '\t list_retry %s utils \n'%list_retry
288     print '\t len(list_retry) %s \n'%len(list_retry)
289    
290 fanzago 1.81 list_files = list_retry
291 fanzago 1.78 self.params['inputFileList']=list_files
292    
293 fanzago 1.41 ### copy backup
294     from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
295     siteCfg = loadSiteLocalConfig()
296     catalog = siteCfg.localStageOut.get("catalog", None)
297 fanzago 1.81 tfc = siteCfg.trivialFileCatalog()
298     seName = siteCfg.localStageOutSEName()
299     if seName is None:
300     seName = ""
301     option = siteCfg.localStageOutOption()
302     if option is None:
303     option = ""
304     implName = siteCfg.localStageOutCommand()
305     if implName is None:
306     implName = ""
307 fanzago 1.80
308 fanzago 1.41 if (implName == 'srm'):
309 fanzago 1.80 protocol = 'srmv2'
310     elif (implName == 'srmv2-lcg'):
311     protocol = 'srm-lcg'
312 fanzago 1.81 option = option + ' -b -D srmv2 '
313 fanzago 1.80 elif (implName == 'rfcp-CERN'):
314     protocol = 'rfio'
315     elif (implName == 'rfcp'):
316     protocol = 'rfio'
317 fanzago 1.81 elif (implName == 'cp'):
318     protocol = 'local'
319 fanzago 1.80 else: protocol = implName
320 fanzago 1.82
321     self.params['protocol']=protocol
322     self.params['option']=option
323    
324 fanzago 1.41 if self.debug:
325     print '\t siteCFG %s \n'%siteCfg
326     print '\t catalog %s \n'%catalog
327 fanzago 1.81 print '\t tfc %s '%tfc
328     print '\t fallback seName %s \n'%seName
329 fanzago 1.80 print "\t fallback protocol %s \n"%protocol
330     print "\t fallback option %s \n"%option
331 fanzago 1.78 print "\t self.params['inputFileList'] %s \n"%self.params['inputFileList']
332 fanzago 1.41
333 fanzago 1.74 if (str(self.params['for_lfn']).find("/store/") == 0):
334     temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
335     self.params['for_lfn']= temp
336 fanzago 1.69
337 fanzago 1.74 if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
338 fanzago 1.65
339 fanzago 1.41 file_backup=[]
340 fanzago 1.83 file_backup_surlgrid=[]
341 fanzago 1.78 for input in self.params['inputFileList']:
342 fanzago 1.74 file = self.params['for_lfn'] + os.path.basename(input)
343 fanzago 1.41 surl = tfc.matchLFN(tfc.preferredProtocol, file)
344 fanzago 1.83
345     ###### FEDE TEST_FOR_SURL_GRID
346     surl_for_grid = tfc.matchLFN('srmv2', file)
347     if (surl_for_grid == None):
348     surl_for_grid = tfc.matchLFN('srmv2-lcg', file)
349     if (surl_for_grid == None):
350     surl_for_grid = tfc.matchLFN('srm', file)
351     if surl_for_grid:
352     file_backup_surlgrid.append(surl_for_grid)
353     ######
354    
355 fanzago 1.41 file_backup.append(surl)
356     if self.debug:
357 fanzago 1.74 print '\t for_lfn %s \n'%self.params['for_lfn']
358 fanzago 1.41 print '\t file %s \n'%file
359     print '\t surl %s \n'%surl
360    
361     destination=os.path.dirname(file_backup[0])
362 fanzago 1.59 if ( destination[-1] != '/' ) : destination = destination + '/'
363 fanzago 1.41 self.params['destination']=destination
364 fanzago 1.83
365 fanzago 1.74 self.params['se_name']=seName
366 fanzago 1.83
367     ######
368     if (len(file_backup_surlgrid)>0) :
369     surl_for_grid=os.path.dirname(file_backup_surlgrid[0])
370     if ( surl_for_grid[-1] != '/' ) : surl_for_grid = surl_for_grid + '/'
371     else:
372     surl_for_grid=''
373    
374     print "surl_for_grid = ", surl_for_grid
375     self.params['surl_for_grid']=surl_for_grid
376     #####
377    
378 fanzago 1.41 if self.debug:
379 fanzago 1.82 print '\tIn LocalCopy trying the stage out with: \n'
380     print "\tself.params['destination'] %s \n"%self.params['destination']
381     print "\tself.params['protocol'] %s \n"%self.params['protocol']
382     print "\tself.params['option'] %s \n"%self.params['option']
383 fanzago 1.80
384 fanzago 1.82 localCopy_results = self.copy( self.params['inputFileList'], self.params['protocol'], self.params['option'], backup='yes' )
385 fanzago 1.80
386     if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
387     results.update(localCopy_results)
388     else:
389 fanzago 1.82 localCopy_results, list_ok, list_retry, list_fallback = self.checkCopy(localCopy_results, len(list_files), self.params['protocol'])
390 fanzago 1.80
391     results.update(localCopy_results)
392     if self.debug:
393     print "\t localCopy_results = %s \n"%localCopy_results
394 fanzago 1.41 return results
395    
396 spiga 1.8 def stager( self, middleware, list_files ):
397 spiga 1.1 """
398     Implement the logic for remote stage out
399     """
400 spiga 1.30
401 fanzago 1.38 if self.debug:
402     print 'stager() :\n'
403     print '\tmiddleware %s\n'%middleware
404 spiga 1.45 print '\tlist_files %s\n'%list_files
405 fanzago 1.38
406 spiga 1.6 results={}
407 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
408 fanzago 1.74 if self.debug:
409     print '\tTrying the stage out with %s utils \n'%prot
410     print '\tand options %s\n'%opt
411    
412 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
413 spiga 1.30 if copy_results.keys() == [''] or copy_results.keys() == '' :
414 spiga 1.13 results.update(copy_results)
415     else:
416 fanzago 1.78 copy_results, list_ok, list_retry, list_fallback = self.checkCopy(copy_results, len(list_files), prot)
417 spiga 1.13 results.update(copy_results)
418     if len(list_ok) == len(list_files) :
419     break
420 fanzago 1.74 if len(list_retry):
421 fanzago 1.41 list_files = list_retry
422 fanzago 1.85 #### FEDE added ramdom time before the retry copy with other protocol
423 fanzago 1.87 sec = 240 * random.random()
424     sec = sec + 60
425 fanzago 1.85 time.sleep(sec)
426 fanzago 1.41 else: break
427 fanzago 1.74
428 spiga 1.45 if self.local_stage:
429 fanzago 1.74 list_retry_localSE = self.check_for_retry_localSE(results)
430 fanzago 1.55 if len(list_retry_localSE):
431 fanzago 1.74 if self.debug:
432     print "\t list_retry_localSE %s \n"%list_retry_localSE
433 fanzago 1.55 results = self.LocalCopy(list_retry_localSE, results)
434 fanzago 1.74
435 fanzago 1.38 if self.debug:
436     print "\t results %s \n"%results
437 spiga 1.1 return results
438    
439     def initializeApi(self, protocol ):
440     """
441 ewv 1.7 Instantiate storage interface
442 spiga 1.1 """
443 spiga 1.30 if self.debug : print 'initializeApi() :\n'
444 spiga 1.20 self.source_prot = protocol
445     self.dest_prot = protocol
446     if not self.params['source'] : self.source_prot = 'local'
447     Source_SE = self.storageInterface( self.params['source'], self.source_prot )
448 fanzago 1.77 if not self.params['destination'] :
449     self.dest_prot = 'local'
450     Destination_SE = self.storageInterface( self.params['destinationDir'], self.dest_prot )
451     else:
452     Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
453 spiga 1.1
454     if self.debug :
455 spiga 1.30 msg = '\t(source=%s, protocol=%s)'%(self.params['source'], self.source_prot)
456     msg += '\t(destination=%s, protocol=%s)'%(self.params['destination'], self.dest_prot)
457 fanzago 1.77 msg += '\t(destinationDir=%s, protocol=%s)'%(self.params['destinationDir'], self.dest_prot)
458 mcinquil 1.32 print msg
459 spiga 1.1
460     return Source_SE, Destination_SE
461    
462 fanzago 1.65 def copy( self, list_file, protocol, options, backup='no' ):
463 spiga 1.1 """
464 ewv 1.7 Make the real file copy using SE API
465 spiga 1.1 """
466 mcinquil 1.37 msg = ""
467 fanzago 1.74 results = {}
468 spiga 1.1 if self.debug :
469 spiga 1.30 msg = 'copy() :\n'
470     msg += '\tusing %s protocol\n'%protocol
471 fanzago 1.81 msg += '\tusing %s options\n'%options
472     msg += '\tlist_file %s\n'%list_file
473 spiga 1.30 print msg
474 spiga 1.13 try:
475     Source_SE, Destination_SE = self.initializeApi( protocol )
476     except Exception, ex:
477 fanzago 1.69 for filetocopy in list_file:
478     results.update( self.updateReport(filetocopy, '-1', str(ex)))
479     return results
480 ewv 1.14
481 fanzago 1.81 self.hostname = Destination_SE.hostname
482 belforte 1.94 if Destination_SE.protocol in ['gridftp','rfio','srmv2','hadoop','lstore','local']:
483 spiga 1.13 try:
484 mcinquil 1.32 self.createDir( Destination_SE, Destination_SE.protocol )
485 mcinquil 1.47 except OperationException, ex:
486 fanzago 1.69 for filetocopy in list_file:
487 fanzago 1.70 results.update( self.updateReport(filetocopy, '60316', str(ex)))
488 fanzago 1.69 return results
489 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
490     except MissingCommand, ex:
491     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
492 fanzago 1.69 for filetocopy in list_file:
493     results.update( self.updateReport(filetocopy, '10041', msg))
494     return results
495     except Exception, ex:
496     msg = "ERROR %s" %(str(ex))
497     for filetocopy in list_file:
498     results.update( self.updateReport(filetocopy, '-1', msg))
499     return results
500 spiga 1.1 ## prepare for real copy ##
501 spiga 1.8 try :
502     sbi = SBinterface( Source_SE, Destination_SE )
503     sbi_dest = SBinterface(Destination_SE)
504 spiga 1.20 sbi_source = SBinterface(Source_SE)
505 spiga 1.11 except ProtocolMismatch, ex:
506 spiga 1.30 msg = "ERROR : Unable to create SBinterface with %s protocol"%protocol
507     msg += str(ex)
508 fanzago 1.69 for filetocopy in list_file:
509     results.update( self.updateReport(filetocopy, '-1', msg))
510     return results
511 spiga 1.1
512 fanzago 1.74
513 ewv 1.7 ## loop over the complete list of files
514     for filetocopy in list_file:
515 spiga 1.30 if self.debug : print '\tStart real copy for %s'%filetocopy
516 spiga 1.13 try :
517 spiga 1.34 ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
518 spiga 1.13 except Exception, ex:
519 spiga 1.58 ErCode = '60307'
520 spiga 1.18 msg = str(ex)
521 ewv 1.7 if ErCode == '0':
522 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
523 fanzago 1.65 if (ErCode == '0') and (backup == 'yes'):
524     ErCode = '60308'
525 spiga 1.30 if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
526 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
527     return results
528 ewv 1.7
529 spiga 1.1
530     def storageInterface( self, endpoint, protocol ):
531     """
532 ewv 1.7 Create the storage interface.
533 spiga 1.1 """
534 spiga 1.30 if self.debug : print 'storageInterface():\n'
535 spiga 1.1 try:
536     interface = SElement( FullPath(endpoint), protocol )
537 spiga 1.11 except ProtocolUnknown, ex:
538 spiga 1.30 msg = "ERROR : Unable to create interface with %s protocol"%protocol
539     msg += str(ex)
540 spiga 1.13 raise Exception(msg)
541 spiga 1.1
542     return interface
543    
544     def createDir(self, Destination_SE, protocol):
545     """
546 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
547 ewv 1.7 this should be transparent at SE API level.
548 spiga 1.1 """
549 spiga 1.30 if self.debug : print 'createDir():\n'
550 ewv 1.14 msg = ''
551 spiga 1.1 try:
552     action = SBinterface( Destination_SE )
553     action.createDir()
554 spiga 1.30 if self.debug: print "\tThe directory has been created using protocol %s"%protocol
555 spiga 1.11 except TransferException, ex:
556 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
557     msg += str(ex)
558 spiga 1.11 if self.debug :
559 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
560     dbgmsg += '\t'+str(ex.output)+'\n'
561     print dbgmsg
562 spiga 1.29 raise Exception(msg)
563 spiga 1.11 except OperationException, ex:
564 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
565     msg += str(ex)
566     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
567 spiga 1.29 raise Exception(msg)
568 spiga 1.31 except MissingDestination, ex:
569     msg = "ERROR: problem with the directory creation using %s protocol "%protocol
570     msg += str(ex)
571     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
572     raise Exception(msg)
573     except AlreadyExistsException, ex:
574     if self.debug: print "\tThe directory already exist"
575 fanzago 1.74 pass
576     except Exception, ex:
577     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
578     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
579     raise Exception(msg)
580 spiga 1.13 return msg
581 spiga 1.1
582 spiga 1.34 def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
583 spiga 1.1 """
584 spiga 1.20 Check both if source file exist AND
585     if destination file ALREADY exist.
586 ewv 1.7 """
587 spiga 1.30 if self.debug : print 'checkFileExist():\n'
588 spiga 1.8 ErCode = '0'
589     msg = ''
590 spiga 1.20 f_tocopy=filetocopy
591 fanzago 1.81 if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
592 spiga 1.1 try:
593 mcinquil 1.72 checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
594 belforte 1.90 if self.debug : print '\tCheck for local file %s existance executed \n'%f_tocopy
595 spiga 1.20 except OperationException, ex:
596 belforte 1.90 msg ='ERROR: problems checking source file %s existance'%filetocopy
597 spiga 1.30 msg += str(ex)
598 spiga 1.20 if self.debug :
599 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
600     dbgmsg += '\t'+str(ex.output)+'\n'
601     print dbgmsg
602 spiga 1.20 raise Exception(msg)
603     except WrongOption, ex:
604 belforte 1.90 msg ='ERROR: problems checking source file %s existance'%filetocopy
605 spiga 1.30 msg += str(ex)
606 spiga 1.20 if self.debug :
607 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
608     dbgmsg += '\t'+str(ex.output)+'\n'
609     print dbgmsg
610 spiga 1.20 raise Exception(msg)
611 spiga 1.31 except MissingDestination, ex:
612 belforte 1.90 msg ='ERROR: problems checking source file %s existance'%filetocopy
613 spiga 1.31 msg += str(ex)
614     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
615     raise Exception(msg)
616 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
617     except MissingCommand, ex:
618     ErCode = '10041'
619     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
620     return ErCode, msg
621 spiga 1.20 if not checkSource :
622     ErCode = '60302'
623 belforte 1.90 msg = "ERROR file %s does not exist"%os.path.basename(filetocopy)
624 spiga 1.20 return ErCode, msg
625 fanzago 1.81 f_tocopy=os.path.basename(filetocopy)
626 spiga 1.20 try:
627 mcinquil 1.72 check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
628 belforte 1.90 if self.debug : print '\tCheck for remote file %s existance executed \n'%f_tocopy
629 fanzago 1.81 if self.debug : print '\twith exit code = %s \n'%check
630 spiga 1.11 except OperationException, ex:
631 belforte 1.90 msg = 'ERROR: problems checking if file %s already exist'%filetocopy
632 spiga 1.30 msg += str(ex)
633 spiga 1.11 if self.debug :
634 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
635     dbgmsg += '\t'+str(ex.output)+'\n'
636     print dbgmsg
637 spiga 1.13 raise Exception(msg)
638 spiga 1.11 except WrongOption, ex:
639 belforte 1.90 msg = 'ERROR problems checking if file % already exists'%filetocopy
640 spiga 1.30 msg += str(ex)
641 spiga 1.11 if self.debug :
642 spiga 1.30 msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
643     msg += '\t'+str(ex.output)+'\n'
644 spiga 1.13 raise Exception(msg)
645 spiga 1.31 except MissingDestination, ex:
646 belforte 1.90 msg ='ERROR problems checking if destination file % exists'%filetocopy
647 spiga 1.31 msg += str(ex)
648     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
649     raise Exception(msg)
650 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
651     except MissingCommand, ex:
652     ErCode = '10041'
653     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
654     return ErCode, msg
655 ewv 1.14 if check :
656 ewv 1.7 ErCode = '60303'
657 spiga 1.20 msg = "file %s already exist"%os.path.basename(filetocopy)
658 ewv 1.14
659 spiga 1.13 return ErCode, msg
660 spiga 1.1
661 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
662 spiga 1.1 """
663 ewv 1.7 call the copy API.
664 spiga 1.1 """
665 spiga 1.30 if self.debug : print 'makeCopy():\n'
666 ewv 1.7 path = os.path.dirname(filetocopy)
667 spiga 1.1 file_name = os.path.basename(filetocopy)
668     source_file = filetocopy
669     dest_file = file_name ## to be improved supporting changing file name TODO
670 spiga 1.8 if self.params['source'] == '' and path == '':
671 spiga 1.1 source_file = os.path.abspath(filetocopy)
672 spiga 1.8 elif self.params['destination'] =='':
673 spiga 1.24 destDir = self.params.get('destinationDir',os.getcwd())
674     dest_file = os.path.join(destDir,file_name)
675 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
676 ewv 1.7 source_file = file_name
677 spiga 1.8
678 spiga 1.1 ErCode = '0'
679     msg = ''
680 ewv 1.7
681 belforte 1.84 copy_option = option
682 spiga 1.71 if self.params['option'].find('space_token')>=0:
683 slacapra 1.64 space_token=self.params['option'].split('=')[1]
684 belforte 1.84 if protocol == 'srmv2': copy_option = '%s -space_token=%s'%(option,space_token)
685     if protocol == 'srm-lcg': copy_option = '%s -S %s'%(option,space_token)
686 spiga 1.1 try:
687 belforte 1.84 sbi.copy( source_file , dest_file , opt = copy_option, tout = self.subprocesstimeout['copy'])
688 spiga 1.11 except TransferException, ex:
689 spiga 1.30 msg = "Problem copying %s file" % filetocopy
690     msg += str(ex)
691 spiga 1.11 if self.debug :
692 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
693     dbgmsg += '\t'+str(ex.output)+'\n'
694 mcinquil 1.32 print dbgmsg
695 spiga 1.1 ErCode = '60307'
696 spiga 1.11 except WrongOption, ex:
697 spiga 1.30 msg = "Problem copying %s file" % filetocopy
698     msg += str(ex)
699 spiga 1.11 if self.debug :
700 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
701     dbgmsg += '\t'+str(ex.output)+'\n'
702 fanzago 1.93 print dbgmsg
703     ### FEDE added for savannah 97460###
704     ErCode = '60307'
705     ####################################
706 spiga 1.44 except SizeZeroException, ex:
707     msg = "Problem copying %s file" % filetocopy
708     msg += str(ex)
709     if self.debug :
710     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
711     dbgmsg += '\t'+str(ex.output)+'\n'
712 fanzago 1.48 print dbgmsg
713 spiga 1.13 ErCode = '60307'
714 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
715     except MissingCommand, ex:
716     ErCode = '10041'
717     msg = "Problem copying %s file" % filetocopy
718     msg += str(ex)
719     if self.debug :
720     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
721     dbgmsg += '\t'+str(ex.output)+'\n'
722     print dbgmsg
723 mcinquil 1.54 except AuthorizationException, ex:
724     ErCode = '60307'
725     msg = "Problem copying %s file" % filetocopy
726     msg += str(ex)
727     if self.debug :
728     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
729     dbgmsg += '\t'+str(ex.output)+'\n'
730     print dbgmsg
731 mcinquil 1.72 except SEAPITimeout, ex:
732     ErCode = '60317'
733     msg = "Problem copying %s file" % filetocopy
734     msg += str(ex)
735     if self.debug :
736     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
737     dbgmsg += '\t'+str(ex.output)+'\n'
738     print dbgmsg
739    
740 spiga 1.24 if ErCode == '0' and protocol.find('srmv') == 0:
741 spiga 1.19 remote_file_size = -1
742     local_file_size = os.path.getsize( source_file )
743     try:
744 mcinquil 1.72 remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
745 spiga 1.30 if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
746 spiga 1.19 except TransferException, ex:
747 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
748     msg += str(ex)
749 spiga 1.19 if self.debug :
750 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
751     dbgmsg += '\t'+str(ex.output)+'\n'
752     print dbgmsg
753 spiga 1.19 ErCode = '60307'
754     except WrongOption, ex:
755 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
756     msg += str(ex)
757 spiga 1.19 if self.debug :
758 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
759     dbgmsg += '\t'+str(ex.output)+'\n'
760     print dbgmsg
761 spiga 1.19 ErCode = '60307'
762     if local_file_size != remote_file_size:
763     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
764     ErCode = '60307'
765 ewv 1.14
766 spiga 1.27 if ErCode != '0':
767     try :
768 spiga 1.34 self.removeFile( sbi_dest, dest_file, option )
769 spiga 1.27 except Exception, ex:
770     msg += '\n'+str(ex)
771 spiga 1.1 return ErCode, msg
772 ewv 1.7
773 spiga 1.34 def removeFile( self, sbi_dest, filetocopy, option ):
774 spiga 1.30 """
775     """
776     if self.debug : print 'removeFile():\n'
777 spiga 1.21 f_tocopy=filetocopy
778     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
779     try:
780 mcinquil 1.72 sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
781 spiga 1.30 if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
782 spiga 1.21 except OperationException, ex:
783 spiga 1.30 msg ='ERROR: problems removing partially staged file %s'%filetocopy
784     msg += str(ex)
785 spiga 1.21 if self.debug :
786 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
787     dbgmsg += '\t'+str(ex.output)+'\n'
788     print dbgmsg
789 spiga 1.21 raise Exception(msg)
790    
791     return
792    
793 fanzago 1.74 def updateReport(self, file, erCode, reason):
794 spiga 1.8 """
795     Update the final stage out infos
796     """
797     jobStageInfo={}
798     jobStageInfo['erCode']=erCode
799     jobStageInfo['reason']=reason
800 fanzago 1.74 if not self.params['for_lfn']: self.params['for_lfn']=''
801     if not self.params['se_name']: self.params['se_name']=''
802     if not self.hostname: self.hostname=''
803     if (erCode != '0') and (erCode != '60308'):
804     jobStageInfo['for_lfn']='/copy_problem/'
805     else:
806     jobStageInfo['for_lfn']=self.params['for_lfn']
807     jobStageInfo['se_name']=self.params['se_name']
808     jobStageInfo['endpoint']=self.hostname
809 fanzago 1.83 ### ADDING SURLFORGRID FOR COPYDATA
810     if not self.params['surl_for_grid']: self.params['surl_for_grid']=''
811     jobStageInfo['surl_for_grid']=self.params['surl_for_grid']
812     #####
813 spiga 1.8 report = { file : jobStageInfo}
814     return report
815 ewv 1.7
816 spiga 1.8 def finalReport( self , results ):
817     """
818     It a list of LFNs for each SE where data are stored.
819     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
820 spiga 1.1 """
821 fanzago 1.91
822     outFile = 'cmscpReport.sh'
823     if os.getenv("RUNTIME_AREA"):
824     #print "RUNTIME_AREA = ", os.getenv("RUNTIME_AREA")
825     outFile = "%s/cmscpReport.sh"%os.getenv("RUNTIME_AREA")
826     #print "--->>> outFile = ", outFile
827     fp = open(outFile, "w")
828    
829 spiga 1.8 cmscp_exit_status = 0
830 belforte 1.92 txt = '#!/bin/bash\n'
831 ewv 1.14 for file, dict in results.iteritems():
832 spiga 1.52 reason = str(dict['reason'])
833     if str(reason).find("'") > -1:
834     reason = " ".join(reason.split("'"))
835     reason="'%s'"%reason
836 ewv 1.14 if file:
837 fanzago 1.74 if dict['for_lfn']=='':
838 fanzago 1.67 lfn = '${LFNBaseName}'+os.path.basename(file)
839 spiga 1.11 se = '$SE'
840 fanzago 1.66 LFNBaseName = '$LFNBaseName'
841 spiga 1.11 else:
842 fanzago 1.74 lfn = dict['for_lfn']+os.path.basename(file)
843     se = dict['se_name']
844 fanzago 1.66 LFNBaseName = os.path.dirname(lfn)
845     if (LFNBaseName[-1] != '/'):
846     LFNBaseName = LFNBaseName + '/'
847 fanzago 1.74
848    
849 fanzago 1.39 txt += 'echo "Report for File: '+file+'"\n'
850     txt += 'echo "LFN: '+lfn+'"\n'
851     txt += 'echo "StorageElement: '+se+'"\n'
852 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
853 spiga 1.11 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
854 fanzago 1.59
855 spiga 1.71
856     if dict['erCode'] != '0':
857     cmscp_exit_status = dict['erCode']
858 spiga 1.12 else:
859 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
860 spiga 1.12 cmscp_exit_status = dict['erCode']
861 spiga 1.8 txt += '\n'
862     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
863 fanzago 1.65 txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
864 fanzago 1.91 fp.write(str(txt))
865     fp.close()
866     if self.debug:
867     print '--- reading cmscpReport.sh: \n'
868     lp = open(outFile, "r")
869     content = lp.read()
870     lp.close()
871     print " content = ", content
872     print '--- end reading cmscpReport.sh'
873 spiga 1.8 return
874    
875    
876     def usage():
877    
878     msg="""
879 spiga 1.46 cmscp:
880     safe copy of local file to/from remote SE via lcg_cp/srmcp,
881     including success checking version also for CAF using rfcp command to copy the output to SE
882 spiga 1.8
883 spiga 1.46 accepted parameters:
884     source =
885     destination =
886     inputFileList =
887     outputFileList =
888     protocol =
889     option =
890     middleware =
891     srm_version =
892     destinationDir =
893     lfn= =
894     local_stage = activate stage fall back
895     debug = activate verbose print out
896     help = print on line man and exit
897    
898     mandatory:
899     * "source" and/or "destination" must always be defined
900     * either "middleware" or "protocol" must always be defined
901     * "inputFileList" must always be defined
902     * if "local_stage" = 1 also "lfn" must be defined
903 spiga 1.8 """
904 ewv 1.14 print msg
905 spiga 1.8
906 ewv 1.14 return
907 spiga 1.8
908     def HelpOptions(opts=[]):
909     """
910     Check otps, print help if needed
911 ewv 1.14 prepare dict = { opt : value }
912 spiga 1.8 """
913     dict_args = {}
914     if len(opts):
915     for opt, arg in opts:
916 ewv 1.14 dict_args[opt.split('--')[1]] = arg
917 spiga 1.8 if opt in ('-h','-help','--help') :
918     usage()
919     sys.exit(0)
920     return dict_args
921     else:
922     usage()
923     sys.exit(0)
924 spiga 1.1
925     if __name__ == '__main__' :
926 spiga 1.8
927 ewv 1.14 import getopt
928 spiga 1.8
929     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
930 spiga 1.25 "protocol=","option=", "middleware=", "srm_version=", \
931 fanzago 1.74 "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
932 ewv 1.14 try:
933     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
934 spiga 1.8 except getopt.GetoptError, err:
935     print err
936     HelpOptions()
937     sys.exit(2)
938 ewv 1.14
939 spiga 1.8 dictArgs = HelpOptions(opts)
940 spiga 1.1 try:
941 spiga 1.8 cmscp_ = cmscp(dictArgs)
942 spiga 1.1 cmscp_.run()
943 spiga 1.11 except Exception, ex :
944     print str(ex)
945 spiga 1.1