ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.75
Committed: Wed Jun 9 17:17:03 2010 UTC (14 years, 10 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3_pre3, CRAB_2_7_3_pre3_beta, CRAB_2_7_3_pre2, CRAB_2_7_3_pre2_beta
Changes since 1.74: +7 -0 lines
Log Message:
fix check for fallback

File Contents

# User Rev Content
1 edelmann 1.56 #!/usr/bin/env python
2 mcinquil 1.16 import sys, os
3 fanzago 1.74 try:
4     import json
5     except:
6     import simplejson as json
7 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
8 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
9 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
10 spiga 1.1
11    
12     class cmscp:
13 spiga 1.8 def __init__(self, args):
14 spiga 1.1 """
15     cmscp
16 spiga 1.46 safe copy of local file to/from remote SE via lcg_cp/srmcp,
17 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
18     input:
19     $1 middleware (CAF, LSF, LCG, OSG)
20 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
21     $3 if needed: file name (the output file name)
22     $5 remote SE (complete endpoint)
23 ewv 1.7 $6 srm version
24 fanzago 1.74 --for_lfn $LFNBaseName
25 spiga 1.1 output:
26     return 0 if all ok
27     return 60307 if srmcp failed
28     return 60303 if file already exists in the SE
29     """
30 spiga 1.8
31 fanzago 1.74 #### FEDE added SE_NAME as parameters
32 spiga 1.1 #set default
33 spiga 1.24 self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
34 fanzago 1.74 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'' }
35 ewv 1.14 self.debug = 0
36 fanzago 1.74 #### for fallback copy
37 spiga 1.46 self.local_stage = 0
38 spiga 1.8 self.params.update( args )
39 mcinquil 1.72 self.subprocesstimeout = {'copy': None, 'exists': None, 'delete': None, 'size': None}
40 spiga 1.1 return
41    
42 spiga 1.8 def processOptions( self ):
43 spiga 1.1 """
44 spiga 1.8 check command line parameter
45 spiga 1.1 """
46 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
47     if 'debug' in self.params.keys(): self.debug = 1
48 spiga 1.45 if 'local_stage' in self.params.keys(): self.local_stage = 1
49 ewv 1.7
50     # source and dest cannot be undefined at same time
51 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
52 spiga 1.36 HelpOptions()
53 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
54 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
55     HelpOptions()
56    
57 ewv 1.7 # input file must be defined
58 spiga 1.35 if not self.params['inputFileList'] :
59 spiga 1.36 HelpOptions()
60 spiga 1.1 else:
61 spiga 1.8 file_to_copy=[]
62 ewv 1.14 if self.params['inputFileList'].find(','):
63 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
64 ewv 1.7 else:
65 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
66     self.params['inputFileList'] = file_to_copy
67 ewv 1.7
68 fanzago 1.74 #if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
69     if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
70 fanzago 1.38
71 spiga 1.1 ## TO DO:
72     #### add check for outFiles
73     #### add map {'inFileNAME':'outFileNAME'} to change out name
74    
75    
76 ewv 1.7 def run( self ):
77 spiga 1.1 """
78 ewv 1.7 Check if running on UI (no $middleware) or
79     on WN (on the Grid), and take different action
80 spiga 1.1 """
81 mcinquil 1.37 self.processOptions()
82 spiga 1.30 if self.debug: print 'calling run() : \n'
83 spiga 1.8 # stage out from WN
84     if self.params['middleware'] :
85 spiga 1.36 results = self.stager(self.params['middleware'],self.params['inputFileList'])
86 fanzago 1.74 self.writeJsonFile(results)
87 spiga 1.35 self.finalReport(results)
88 spiga 1.8 # Local interaction with SE
89 spiga 1.1 else:
90 spiga 1.36 results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
91 fanzago 1.74 self.writeJsonFile(results)
92 spiga 1.35 return results
93 fanzago 1.74
94     def writeJsonFile( self, results ):
95     """
96     write a json file containing copy results for each file
97     """
98     if self.debug:
99     print 'in writeJsonFile() : \n'
100     print "---->>>> in writeJsonFile results = ", results
101     fp = open('resultCopyFile', 'w')
102     json.dump(results, fp)
103     fp.close()
104     if self.debug:
105     print ' reading resultCopyFile : \n'
106     lp = open('resultCopyFile', "r")
107     inputDict = json.load(lp)
108     lp.close()
109     print " inputDict = ", inputDict
110     return
111 spiga 1.1
112 mcinquil 1.57 def checkLcgUtils( self ):
113     """
114     _checkLcgUtils_
115     check the lcg-utils version and report
116     """
117     import commands
118     cmd = "lcg-cp --version | grep lcg_util"
119     status, output = commands.getstatusoutput( cmd )
120     num_ver = -1
121     if output.find("not found") == -1 or status == 0:
122     temp = output.split("-")
123     version = ""
124     if len(temp) >= 2:
125     version = output.split("-")[1]
126     temp = version.split(".")
127     if len(temp) >= 1:
128     num_ver = int(temp[0])*10
129     num_ver += int(temp[1])
130     return num_ver
131    
132 spiga 1.8 def setProtocol( self, middleware ):
133 spiga 1.1 """
134     define the allowed potocols based on $middlware
135 ewv 1.7 which depend on scheduler
136 spiga 1.1 """
137 spiga 1.8 # default To be used with "middleware"
138 fanzago 1.38 if self.debug:
139     print 'setProtocol() :\n'
140     print '\tmiddleware = %s utils \n'%middleware
141 mcinquil 1.72
142     ## timeout needed for subprocess command of SEAPI
143     ## they should be a bit higher then the corresponding passed by command line
144 mcinquil 1.73 self.subprocesstimeout['copy'] = 3600
145     self.subprocesstimeout['exists'] = 1200
146     self.subprocesstimeout['delete'] = 1200
147     self.subprocesstimeout['size'] = 1200
148 mcinquil 1.72
149 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
150     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
151 mcinquil 1.57 if self.checkLcgUtils() >= 17:
152 mcinquil 1.72 lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
153     'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
154 mcinquil 1.57
155 spiga 1.13 srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
156 spiga 1.71 'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
157 spiga 1.8 rfioOpt=''
158    
159 ewv 1.14 supported_protocol = None
160 spiga 1.40 if middleware.lower() in ['osg','lcg','condor','sge']:
161 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
162 spiga 1.30 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
163 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
164 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
165 mcinquil 1.63 elif middleware.lower() in ['pbs']:
166     supported_protocol = [('rfio',rfioOpt),('local','')]
167 edelmann 1.53 elif middleware.lower() in ['arc']:
168     supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
169 spiga 1.1 else:
170 ewv 1.7 ## here we can add support for any kind of protocol,
171 spiga 1.1 ## maybe some local schedulers need something dedicated
172     pass
173     return supported_protocol
174 spiga 1.28
175 fanzago 1.41
176 fanzago 1.74 #def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
177     def checkCopy (self, copy_results, len_list_files, prot):
178 fanzago 1.41 """
179     Checks the status of copy and update result dictionary
180 spiga 1.28 """
181 fanzago 1.74
182 spiga 1.28 list_retry = []
183 fanzago 1.41 list_not_existing = []
184 fanzago 1.74 list_already_existing = []
185 fanzago 1.75 list_fallback = []
186 spiga 1.28 list_ok = []
187 fanzago 1.41
188     if self.debug:
189     print 'in checkCopy() :\n'
190     for file, dict in copy_results.iteritems():
191     er_code = dict['erCode']
192     if er_code == '0':
193     list_ok.append(file)
194     reason = 'Copy succedeed with %s utils'%prot
195     dict['reason'] = reason
196 fanzago 1.75 elif er_code == '60308':
197     list_fallback.append( file )
198     reason = 'Copy succedeed with %s utils'%prot
199     dict['reason'] = reason
200 fanzago 1.41 elif er_code == '60302':
201     list_not_existing.append( file )
202 fanzago 1.74 elif er_code == '60303':
203     list_already_existing.append( file )
204     else :
205 fanzago 1.41 list_retry.append( file )
206    
207     if self.debug:
208     print "\t file %s \n"%file
209     print "\t dict['erCode'] %s \n"%dict['erCode']
210     print "\t dict['reason'] %s \n"%dict['reason']
211    
212 fanzago 1.74 #if (lfn != '') and (se != ''):
213     # upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
214     #else:
215     # upDict = self.updateReport(file, er_code, dict['reason'])
216    
217     upDict = self.updateReport(file, er_code, dict['reason'])
218 fanzago 1.41
219     copy_results.update(upDict)
220    
221     msg = ''
222     if len(list_ok) != 0:
223     msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
224     if len(list_ok) != len_list_files :
225 fanzago 1.75 if len(list_fallback)!=0:
226     msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
227 fanzago 1.74 if len(list_retry)!=0:
228     msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
229     if len(list_not_existing)!=0:
230     msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
231     if len(list_already_existing)!=0:
232     msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
233 fanzago 1.41 if self.debug : print msg
234    
235 fanzago 1.74 return copy_results, list_ok, list_retry
236    
237     def check_for_retry_localSE (self, copy_results):
238     """
239     Checks the status of copy and create the list of file to copy to CloseSE
240     """
241     list_retry_localSE = []
242    
243     if self.debug:
244     print 'in check_for_retry_localSE() :\n'
245     print "\t results in check local = ", copy_results
246     for file, dict in copy_results.iteritems():
247     er_code = dict['erCode']
248     if er_code != '0' and er_code != '60302' and er_code != '60308':
249     list_retry_localSE.append( file )
250    
251     if self.debug:
252     print "\t file %s \n"%file
253     print "\t dict['erCode'] %s \n"%dict['erCode']
254     print "\t dict['reason'] %s \n"%dict['reason']
255    
256     return list_retry_localSE
257    
258 fanzago 1.41
259 spiga 1.45 def LocalCopy(self, list_retry, results):
260 fanzago 1.41 """
261 spiga 1.45 Tries the stage out to the CloseSE
262 fanzago 1.38 """
263 fanzago 1.41 if self.debug:
264 spiga 1.45 print 'in LocalCopy() :\n'
265 fanzago 1.41 print '\t list_retry %s utils \n'%list_retry
266     print '\t len(list_retry) %s \n'%len(list_retry)
267    
268     list_files = list_retry
269     self.params['inputFilesList']=list_files
270    
271     ### copy backup
272     from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
273     siteCfg = loadSiteLocalConfig()
274     seName = siteCfg.localStageOut.get("se-name", None)
275     catalog = siteCfg.localStageOut.get("catalog", None)
276     implName = siteCfg.localStageOut.get("command", None)
277     if (implName == 'srm'):
278     implName='srmv1'
279     self.params['srm_version']=implName
280     ##### to be improved ###############
281     if (implName == 'rfcp'):
282     self.params['middleware']='lsf'
283     ####################################
284    
285     self.params['protocol']=implName
286     tfc = siteCfg.trivialFileCatalog()
287    
288     if self.debug:
289     print '\t siteCFG %s \n'%siteCfg
290     print '\t seName %s \n'%seName
291     print '\t catalog %s \n'%catalog
292     print "\t self.params['protocol'] %s \n"%self.params['protocol']
293     print '\t tfc %s '%tfc
294     print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
295    
296 fanzago 1.69 #if (str(self.params['lfn']).find("/store/") != -1):
297     # temp = str(self.params['lfn']).split("/store/")
298     # self.params['lfn']= "/store/temp/" + temp[1]
299 fanzago 1.74 if (str(self.params['for_lfn']).find("/store/") == 0):
300     temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
301     self.params['for_lfn']= temp
302 fanzago 1.69
303 fanzago 1.74 if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
304 fanzago 1.65
305 fanzago 1.41 file_backup=[]
306     for input in self.params['inputFilesList']:
307 fanzago 1.74 #file = self.params['lfn'] + os.path.basename(input)
308     file = self.params['for_lfn'] + os.path.basename(input)
309 fanzago 1.41 surl = tfc.matchLFN(tfc.preferredProtocol, file)
310     file_backup.append(surl)
311     if self.debug:
312 fanzago 1.74 #print '\t lfn %s \n'%self.params['lfn']
313     print '\t for_lfn %s \n'%self.params['for_lfn']
314 fanzago 1.41 print '\t file %s \n'%file
315     print '\t surl %s \n'%surl
316    
317     destination=os.path.dirname(file_backup[0])
318 fanzago 1.59 if ( destination[-1] != '/' ) : destination = destination + '/'
319 fanzago 1.41 self.params['destination']=destination
320 fanzago 1.74 ###########
321    
322     self.params['se_name']=seName
323     #print "######################################################"
324     #print "in local copy self.params['se_name'] = ", self.params['se_name']
325     ###print "in local copy self.params['lfn'] = ", self.params['lfn']
326     #print "in local copy self.params['for_lfn'] = ", self.params['for_lfn']
327     #print "######################################################"
328 fanzago 1.41
329     if self.debug:
330     print "\t self.params['destination']%s \n"%self.params['destination']
331     print "\t self.params['protocol'] %s \n"%self.params['protocol']
332     print "\t self.params['option']%s \n"%self.params['option']
333    
334     for prot, opt in self.setProtocol( self.params['middleware'] ):
335 spiga 1.45 if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
336 fanzago 1.65 localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
337 spiga 1.45 if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
338     results.update(localCopy_results)
339 fanzago 1.41 else:
340 fanzago 1.74 localCopy_results, list_ok, list_retry = self.checkCopy(localCopy_results, len(list_files), prot)
341 spiga 1.45 results.update(localCopy_results)
342 fanzago 1.41 if len(list_ok) == len(list_files) :
343     break
344     if len(list_retry):
345     list_files = list_retry
346     else: break
347     if self.debug:
348 spiga 1.45 print "\t localCopy_results = %s \n"%localCopy_results
349 spiga 1.28
350 fanzago 1.41 return results
351    
352 spiga 1.8 def stager( self, middleware, list_files ):
353 spiga 1.1 """
354     Implement the logic for remote stage out
355     """
356 spiga 1.30
357 fanzago 1.38 if self.debug:
358     print 'stager() :\n'
359     print '\tmiddleware %s\n'%middleware
360 spiga 1.45 print '\tlist_files %s\n'%list_files
361 fanzago 1.38
362 spiga 1.6 results={}
363 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
364 fanzago 1.74 if self.debug:
365     print '\tTrying the stage out with %s utils \n'%prot
366     print '\tand options %s\n'%opt
367    
368 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
369 fanzago 1.74 #print "in stager copy_results = ", copy_results
370 spiga 1.30 if copy_results.keys() == [''] or copy_results.keys() == '' :
371 spiga 1.13 results.update(copy_results)
372     else:
373 fanzago 1.74 copy_results, list_ok, list_retry = self.checkCopy(copy_results, len(list_files), prot)
374 spiga 1.13 results.update(copy_results)
375     if len(list_ok) == len(list_files) :
376     break
377 fanzago 1.74 if len(list_retry):
378 fanzago 1.41 list_files = list_retry
379     else: break
380 fanzago 1.74
381 spiga 1.45 if self.local_stage:
382 fanzago 1.74 #print "results before check local = ", results
383     list_retry_localSE = self.check_for_retry_localSE(results)
384     #print "results after check local = ", results
385 fanzago 1.55 if len(list_retry_localSE):
386 fanzago 1.74 if self.debug:
387     print "\t list_retry_localSE %s \n"%list_retry_localSE
388 fanzago 1.55 results = self.LocalCopy(list_retry_localSE, results)
389 fanzago 1.74
390 fanzago 1.38 if self.debug:
391     print "\t results %s \n"%results
392 spiga 1.1 return results
393    
394     def initializeApi(self, protocol ):
395     """
396 ewv 1.7 Instantiate storage interface
397 spiga 1.1 """
398 spiga 1.30 if self.debug : print 'initializeApi() :\n'
399 spiga 1.20 self.source_prot = protocol
400     self.dest_prot = protocol
401     if not self.params['source'] : self.source_prot = 'local'
402     Source_SE = self.storageInterface( self.params['source'], self.source_prot )
403     if not self.params['destination'] : self.dest_prot = 'local'
404     Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
405 spiga 1.1
406     if self.debug :
407 spiga 1.30 msg = '\t(source=%s, protocol=%s)'%(self.params['source'], self.source_prot)
408     msg += '\t(destination=%s, protocol=%s)'%(self.params['destination'], self.dest_prot)
409 mcinquil 1.32 print msg
410 spiga 1.1
411     return Source_SE, Destination_SE
412    
413 fanzago 1.65 def copy( self, list_file, protocol, options, backup='no' ):
414 spiga 1.1 """
415 ewv 1.7 Make the real file copy using SE API
416 spiga 1.1 """
417 mcinquil 1.37 msg = ""
418 fanzago 1.74 results = {}
419 spiga 1.1 if self.debug :
420 spiga 1.30 msg = 'copy() :\n'
421     msg += '\tusing %s protocol\n'%protocol
422     print msg
423 spiga 1.13 try:
424     Source_SE, Destination_SE = self.initializeApi( protocol )
425     except Exception, ex:
426 fanzago 1.69 for filetocopy in list_file:
427     results.update( self.updateReport(filetocopy, '-1', str(ex)))
428     return results
429 fanzago 1.74
430     prot = Destination_SE.protocol
431     self.hostname=Destination_SE.hostname
432     #print "in copy hostname = ", self.hostname
433     #print "prot = ", prot
434 ewv 1.14
435 ewv 1.7 # create remote dir
436 mcinquil 1.32 if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
437 spiga 1.13 try:
438 mcinquil 1.32 self.createDir( Destination_SE, Destination_SE.protocol )
439 mcinquil 1.47 except OperationException, ex:
440 fanzago 1.69 for filetocopy in list_file:
441 fanzago 1.70 results.update( self.updateReport(filetocopy, '60316', str(ex)))
442 fanzago 1.69 return results
443 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
444     except MissingCommand, ex:
445     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
446 fanzago 1.69 for filetocopy in list_file:
447     results.update( self.updateReport(filetocopy, '10041', msg))
448     return results
449     except Exception, ex:
450     msg = "ERROR %s" %(str(ex))
451     for filetocopy in list_file:
452     results.update( self.updateReport(filetocopy, '-1', msg))
453     return results
454    
455 spiga 1.1 ## prepare for real copy ##
456 spiga 1.8 try :
457     sbi = SBinterface( Source_SE, Destination_SE )
458     sbi_dest = SBinterface(Destination_SE)
459 spiga 1.20 sbi_source = SBinterface(Source_SE)
460 spiga 1.11 except ProtocolMismatch, ex:
461 spiga 1.30 msg = "ERROR : Unable to create SBinterface with %s protocol"%protocol
462     msg += str(ex)
463 fanzago 1.69 for filetocopy in list_file:
464     results.update( self.updateReport(filetocopy, '-1', msg))
465     return results
466 spiga 1.1
467 fanzago 1.74 self.hostname = Destination_SE.hostname
468    
469 ewv 1.7 ## loop over the complete list of files
470     for filetocopy in list_file:
471 spiga 1.30 if self.debug : print '\tStart real copy for %s'%filetocopy
472 spiga 1.13 try :
473 spiga 1.34 ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
474 spiga 1.13 except Exception, ex:
475 spiga 1.58 ErCode = '60307'
476 spiga 1.18 msg = str(ex)
477 ewv 1.7 if ErCode == '0':
478 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
479 fanzago 1.65 if (ErCode == '0') and (backup == 'yes'):
480     ErCode = '60308'
481 spiga 1.30 if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
482 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
483     return results
484 ewv 1.7
485 spiga 1.1
486     def storageInterface( self, endpoint, protocol ):
487     """
488 ewv 1.7 Create the storage interface.
489 spiga 1.1 """
490 spiga 1.30 if self.debug : print 'storageInterface():\n'
491 spiga 1.1 try:
492     interface = SElement( FullPath(endpoint), protocol )
493 spiga 1.11 except ProtocolUnknown, ex:
494 spiga 1.30 msg = "ERROR : Unable to create interface with %s protocol"%protocol
495     msg += str(ex)
496 spiga 1.13 raise Exception(msg)
497 spiga 1.1
498     return interface
499    
500     def createDir(self, Destination_SE, protocol):
501     """
502 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
503 ewv 1.7 this should be transparent at SE API level.
504 spiga 1.1 """
505 spiga 1.30 if self.debug : print 'createDir():\n'
506 ewv 1.14 msg = ''
507 spiga 1.1 try:
508     action = SBinterface( Destination_SE )
509     action.createDir()
510 spiga 1.30 if self.debug: print "\tThe directory has been created using protocol %s"%protocol
511 spiga 1.11 except TransferException, ex:
512 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
513     msg += str(ex)
514 spiga 1.11 if self.debug :
515 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
516     dbgmsg += '\t'+str(ex.output)+'\n'
517     print dbgmsg
518 spiga 1.29 raise Exception(msg)
519 spiga 1.11 except OperationException, ex:
520 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
521     msg += str(ex)
522     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
523 spiga 1.29 raise Exception(msg)
524 spiga 1.31 except MissingDestination, ex:
525     msg = "ERROR: problem with the directory creation using %s protocol "%protocol
526     msg += str(ex)
527     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
528     raise Exception(msg)
529     except AlreadyExistsException, ex:
530     if self.debug: print "\tThe directory already exist"
531 fanzago 1.74 pass
532     except Exception, ex:
533     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
534     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
535     raise Exception(msg)
536 spiga 1.13 return msg
537 spiga 1.1
538 spiga 1.34 def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
539 spiga 1.1 """
540 spiga 1.20 Check both if source file exist AND
541     if destination file ALREADY exist.
542 ewv 1.7 """
543 spiga 1.30 if self.debug : print 'checkFileExist():\n'
544 spiga 1.8 ErCode = '0'
545     msg = ''
546 spiga 1.20 f_tocopy=filetocopy
547     if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
548 spiga 1.1 try:
549 mcinquil 1.72 checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
550 spiga 1.30 if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy
551 spiga 1.20 except OperationException, ex:
552 spiga 1.30 msg ='ERROR: problems checkig if source file %s exist'%filetocopy
553     msg += str(ex)
554 spiga 1.20 if self.debug :
555 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
556     dbgmsg += '\t'+str(ex.output)+'\n'
557     print dbgmsg
558 spiga 1.20 raise Exception(msg)
559     except WrongOption, ex:
560 spiga 1.30 msg ='ERROR problems checkig if source file % exist'%filetocopy
561     msg += str(ex)
562 spiga 1.20 if self.debug :
563 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
564     dbgmsg += '\t'+str(ex.output)+'\n'
565     print dbgmsg
566 spiga 1.20 raise Exception(msg)
567 spiga 1.31 except MissingDestination, ex:
568     msg ='ERROR problems checkig if source file % exist'%filetocopy
569     msg += str(ex)
570     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
571     raise Exception(msg)
572 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
573     except MissingCommand, ex:
574     ErCode = '10041'
575     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
576     return ErCode, msg
577 spiga 1.20 if not checkSource :
578     ErCode = '60302'
579     msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
580     return ErCode, msg
581     f_tocopy=filetocopy
582     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
583     try:
584 mcinquil 1.72 check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
585 spiga 1.30 if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy
586 spiga 1.11 except OperationException, ex:
587 spiga 1.30 msg = 'ERROR: problems checkig if file %s already exist'%filetocopy
588     msg += str(ex)
589 spiga 1.11 if self.debug :
590 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
591     dbgmsg += '\t'+str(ex.output)+'\n'
592     print dbgmsg
593 spiga 1.13 raise Exception(msg)
594 spiga 1.11 except WrongOption, ex:
595 spiga 1.30 msg = 'ERROR problems checkig if file % already exist'%filetocopy
596     msg += str(ex)
597 spiga 1.11 if self.debug :
598 spiga 1.30 msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
599     msg += '\t'+str(ex.output)+'\n'
600 spiga 1.13 raise Exception(msg)
601 spiga 1.31 except MissingDestination, ex:
602     msg ='ERROR problems checkig if source file % exist'%filetocopy
603     msg += str(ex)
604     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
605     raise Exception(msg)
606 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
607     except MissingCommand, ex:
608     ErCode = '10041'
609     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
610     return ErCode, msg
611 ewv 1.14 if check :
612 ewv 1.7 ErCode = '60303'
613 spiga 1.20 msg = "file %s already exist"%os.path.basename(filetocopy)
614 ewv 1.14
615 spiga 1.13 return ErCode, msg
616 spiga 1.1
617 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
618 spiga 1.1 """
619 ewv 1.7 call the copy API.
620 spiga 1.1 """
621 spiga 1.30 if self.debug : print 'makeCopy():\n'
622 ewv 1.7 path = os.path.dirname(filetocopy)
623 spiga 1.1 file_name = os.path.basename(filetocopy)
624     source_file = filetocopy
625     dest_file = file_name ## to be improved supporting changing file name TODO
626 spiga 1.8 if self.params['source'] == '' and path == '':
627 spiga 1.1 source_file = os.path.abspath(filetocopy)
628 spiga 1.8 elif self.params['destination'] =='':
629 spiga 1.24 destDir = self.params.get('destinationDir',os.getcwd())
630     dest_file = os.path.join(destDir,file_name)
631 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
632 ewv 1.7 source_file = file_name
633 spiga 1.8
634 spiga 1.1 ErCode = '0'
635     msg = ''
636 ewv 1.7
637 spiga 1.71 if self.params['option'].find('space_token')>=0:
638 slacapra 1.64 space_token=self.params['option'].split('=')[1]
639     if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
640     if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
641 spiga 1.1 try:
642 mcinquil 1.72 sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
643 spiga 1.11 except TransferException, ex:
644 spiga 1.30 msg = "Problem copying %s file" % filetocopy
645     msg += str(ex)
646 spiga 1.11 if self.debug :
647 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
648     dbgmsg += '\t'+str(ex.output)+'\n'
649 mcinquil 1.32 print dbgmsg
650 spiga 1.1 ErCode = '60307'
651 spiga 1.11 except WrongOption, ex:
652 spiga 1.30 msg = "Problem copying %s file" % filetocopy
653     msg += str(ex)
654 spiga 1.11 if self.debug :
655 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
656     dbgmsg += '\t'+str(ex.output)+'\n'
657 fanzago 1.48 print dbgmsg
658 spiga 1.44 except SizeZeroException, ex:
659     msg = "Problem copying %s file" % filetocopy
660     msg += str(ex)
661     if self.debug :
662     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
663     dbgmsg += '\t'+str(ex.output)+'\n'
664 fanzago 1.48 print dbgmsg
665 spiga 1.13 ErCode = '60307'
666 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
667     except MissingCommand, ex:
668     ErCode = '10041'
669     msg = "Problem copying %s file" % filetocopy
670     msg += str(ex)
671     if self.debug :
672     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
673     dbgmsg += '\t'+str(ex.output)+'\n'
674     print dbgmsg
675 mcinquil 1.54 except AuthorizationException, ex:
676     ErCode = '60307'
677     msg = "Problem copying %s file" % filetocopy
678     msg += str(ex)
679     if self.debug :
680     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
681     dbgmsg += '\t'+str(ex.output)+'\n'
682     print dbgmsg
683 mcinquil 1.72 except SEAPITimeout, ex:
684     ErCode = '60317'
685     msg = "Problem copying %s file" % filetocopy
686     msg += str(ex)
687     if self.debug :
688     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
689     dbgmsg += '\t'+str(ex.output)+'\n'
690     print dbgmsg
691    
692 spiga 1.24 if ErCode == '0' and protocol.find('srmv') == 0:
693 spiga 1.19 remote_file_size = -1
694     local_file_size = os.path.getsize( source_file )
695     try:
696 mcinquil 1.72 remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
697 spiga 1.30 if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
698 spiga 1.19 except TransferException, ex:
699 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
700     msg += str(ex)
701 spiga 1.19 if self.debug :
702 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
703     dbgmsg += '\t'+str(ex.output)+'\n'
704     print dbgmsg
705 spiga 1.19 ErCode = '60307'
706     except WrongOption, ex:
707 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
708     msg += str(ex)
709 spiga 1.19 if self.debug :
710 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
711     dbgmsg += '\t'+str(ex.output)+'\n'
712     print dbgmsg
713 spiga 1.19 ErCode = '60307'
714     if local_file_size != remote_file_size:
715     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
716     ErCode = '60307'
717 ewv 1.14
718 spiga 1.27 if ErCode != '0':
719     try :
720 spiga 1.34 self.removeFile( sbi_dest, dest_file, option )
721 spiga 1.27 except Exception, ex:
722     msg += '\n'+str(ex)
723 spiga 1.1 return ErCode, msg
724 ewv 1.7
725 spiga 1.34 def removeFile( self, sbi_dest, filetocopy, option ):
726 spiga 1.30 """
727     """
728     if self.debug : print 'removeFile():\n'
729 spiga 1.21 f_tocopy=filetocopy
730     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
731     try:
732 mcinquil 1.72 sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
733 spiga 1.30 if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
734 spiga 1.21 except OperationException, ex:
735 spiga 1.30 msg ='ERROR: problems removing partially staged file %s'%filetocopy
736     msg += str(ex)
737 spiga 1.21 if self.debug :
738 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
739     dbgmsg += '\t'+str(ex.output)+'\n'
740     print dbgmsg
741 spiga 1.21 raise Exception(msg)
742    
743     return
744    
745 fanzago 1.74 #def updateReport(self, file, erCode, reason, lfn='', se='' ):
746     def updateReport(self, file, erCode, reason):
747 spiga 1.8 """
748     Update the final stage out infos
749     """
750     jobStageInfo={}
751     jobStageInfo['erCode']=erCode
752     jobStageInfo['reason']=reason
753 fanzago 1.74 #if not self.params['lfn']: self.params['lfn']=''
754     if not self.params['for_lfn']: self.params['for_lfn']=''
755     if not self.params['se_name']: self.params['se_name']=''
756     if not self.hostname: self.hostname=''
757     if (erCode != '0') and (erCode != '60308'):
758     #jobStageInfo['lfn']='/copy_problem/'
759     jobStageInfo['for_lfn']='/copy_problem/'
760     else:
761     jobStageInfo['for_lfn']=self.params['for_lfn']
762     jobStageInfo['se_name']=self.params['se_name']
763     jobStageInfo['endpoint']=self.hostname
764 spiga 1.1
765 spiga 1.8 report = { file : jobStageInfo}
766     return report
767 ewv 1.7
768 spiga 1.8 def finalReport( self , results ):
769     """
770     It a list of LFNs for each SE where data are stored.
771     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
772 spiga 1.1 """
773 spiga 1.8 outFile = open('cmscpReport.sh',"a")
774     cmscp_exit_status = 0
775     txt = ''
776 ewv 1.14 for file, dict in results.iteritems():
777 spiga 1.52 reason = str(dict['reason'])
778     if str(reason).find("'") > -1:
779     reason = " ".join(reason.split("'"))
780     reason="'%s'"%reason
781 ewv 1.14 if file:
782 fanzago 1.74 #if dict['lfn']=='':
783     if dict['for_lfn']=='':
784 fanzago 1.67 lfn = '${LFNBaseName}'+os.path.basename(file)
785 spiga 1.11 se = '$SE'
786 fanzago 1.66 LFNBaseName = '$LFNBaseName'
787 spiga 1.11 else:
788 fanzago 1.74 #lfn = dict['lfn']+os.path.basename(file)
789     lfn = dict['for_lfn']+os.path.basename(file)
790     #se = dict['se']
791     se = dict['se_name']
792 fanzago 1.66 LFNBaseName = os.path.dirname(lfn)
793     if (LFNBaseName[-1] != '/'):
794     LFNBaseName = LFNBaseName + '/'
795 fanzago 1.74
796    
797 spiga 1.11 #dict['lfn'] # to be implemented
798 fanzago 1.39 txt += 'echo "Report for File: '+file+'"\n'
799     txt += 'echo "LFN: '+lfn+'"\n'
800     txt += 'echo "StorageElement: '+se+'"\n'
801 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
802 spiga 1.11 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
803 fanzago 1.59
804 spiga 1.71
805     if dict['erCode'] != '0':
806     cmscp_exit_status = dict['erCode']
807 spiga 1.12 else:
808 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
809 spiga 1.12 cmscp_exit_status = dict['erCode']
810 spiga 1.8 txt += '\n'
811     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
812 fanzago 1.65 txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
813 spiga 1.8 outFile.write(str(txt))
814     outFile.close()
815     return
816    
817    
818     def usage():
819    
820     msg="""
821 spiga 1.46 cmscp:
822     safe copy of local file to/from remote SE via lcg_cp/srmcp,
823     including success checking version also for CAF using rfcp command to copy the output to SE
824 spiga 1.8
825 spiga 1.46 accepted parameters:
826     source =
827     destination =
828     inputFileList =
829     outputFileList =
830     protocol =
831     option =
832     middleware =
833     srm_version =
834     destinationDir =
835     lfn= =
836     local_stage = activate stage fall back
837     debug = activate verbose print out
838     help = print on line man and exit
839    
840     mandatory:
841     * "source" and/or "destination" must always be defined
842     * either "middleware" or "protocol" must always be defined
843     * "inputFileList" must always be defined
844     * if "local_stage" = 1 also "lfn" must be defined
845 spiga 1.8 """
846 ewv 1.14 print msg
847 spiga 1.8
848 ewv 1.14 return
849 spiga 1.8
850     def HelpOptions(opts=[]):
851     """
852     Check otps, print help if needed
853 ewv 1.14 prepare dict = { opt : value }
854 spiga 1.8 """
855     dict_args = {}
856     if len(opts):
857     for opt, arg in opts:
858 ewv 1.14 dict_args[opt.split('--')[1]] = arg
859 spiga 1.8 if opt in ('-h','-help','--help') :
860     usage()
861     sys.exit(0)
862     return dict_args
863     else:
864     usage()
865     sys.exit(0)
866 spiga 1.1
867     if __name__ == '__main__' :
868 spiga 1.8
869 ewv 1.14 import getopt
870 spiga 1.8
871     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
872 spiga 1.25 "protocol=","option=", "middleware=", "srm_version=", \
873 fanzago 1.74 "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
874 ewv 1.14 try:
875     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
876 spiga 1.8 except getopt.GetoptError, err:
877     print err
878     HelpOptions()
879     sys.exit(2)
880 ewv 1.14
881 spiga 1.8 dictArgs = HelpOptions(opts)
882 spiga 1.1 try:
883 spiga 1.8 cmscp_ = cmscp(dictArgs)
884 spiga 1.1 cmscp_.run()
885 spiga 1.11 except Exception, ex :
886     print str(ex)
887 spiga 1.1