ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.76
Committed: Mon Jun 21 15:41:53 2010 UTC (14 years, 10 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_3
Changes since 1.75: +10 -8 lines
Log Message:
Adding fixed timeout for all commands (including copy data)

File Contents

# User Rev Content
1 edelmann 1.56 #!/usr/bin/env python
2 mcinquil 1.16 import sys, os
3 fanzago 1.74 try:
4     import json
5     except:
6     import simplejson as json
7 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
8 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
9 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
10 spiga 1.1
11    
12     class cmscp:
13 spiga 1.8 def __init__(self, args):
14 spiga 1.1 """
15     cmscp
16 spiga 1.46 safe copy of local file to/from remote SE via lcg_cp/srmcp,
17 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
18     input:
19     $1 middleware (CAF, LSF, LCG, OSG)
20 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
21     $3 if needed: file name (the output file name)
22     $5 remote SE (complete endpoint)
23 ewv 1.7 $6 srm version
24 fanzago 1.74 --for_lfn $LFNBaseName
25 spiga 1.1 output:
26     return 0 if all ok
27     return 60307 if srmcp failed
28     return 60303 if file already exists in the SE
29     """
30 spiga 1.8
31 fanzago 1.74 #### FEDE added SE_NAME as parameters
32 spiga 1.1 #set default
33 spiga 1.24 self.params = {"source":'', "destination":'','destinationDir':'', "inputFileList":'', "outputFileList":'', \
34 fanzago 1.74 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2', "for_lfn":'', "se_name":'' }
35 ewv 1.14 self.debug = 0
36 fanzago 1.74 #### for fallback copy
37 spiga 1.46 self.local_stage = 0
38 spiga 1.8 self.params.update( args )
39 mcinquil 1.76 ## timeout needed for subprocess command of SEAPI
40     ## they should be a bit higher then the corresponding passed by command line
41     ## default values
42     self.subprocesstimeout = { \
43     'copy': 3600, \
44     'exists': 1200, \
45     'delete': 1200, \
46     'size': 1200 \
47     }
48    
49 spiga 1.1 return
50    
51 spiga 1.8 def processOptions( self ):
52 spiga 1.1 """
53 spiga 1.8 check command line parameter
54 spiga 1.1 """
55 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
56     if 'debug' in self.params.keys(): self.debug = 1
57 spiga 1.45 if 'local_stage' in self.params.keys(): self.local_stage = 1
58 ewv 1.7
59     # source and dest cannot be undefined at same time
60 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
61 spiga 1.36 HelpOptions()
62 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
63 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
64     HelpOptions()
65    
66 ewv 1.7 # input file must be defined
67 spiga 1.35 if not self.params['inputFileList'] :
68 spiga 1.36 HelpOptions()
69 spiga 1.1 else:
70 spiga 1.8 file_to_copy=[]
71 ewv 1.14 if self.params['inputFileList'].find(','):
72 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
73 ewv 1.7 else:
74 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
75     self.params['inputFileList'] = file_to_copy
76 ewv 1.7
77 fanzago 1.74 #if not self.params['lfn'] and self.local_stage == 1 : HelpOptions()
78     if not self.params['for_lfn'] and self.local_stage == 1 : HelpOptions()
79 fanzago 1.38
80 spiga 1.1 ## TO DO:
81     #### add check for outFiles
82     #### add map {'inFileNAME':'outFileNAME'} to change out name
83    
84    
85 ewv 1.7 def run( self ):
86 spiga 1.1 """
87 ewv 1.7 Check if running on UI (no $middleware) or
88     on WN (on the Grid), and take different action
89 spiga 1.1 """
90 mcinquil 1.37 self.processOptions()
91 spiga 1.30 if self.debug: print 'calling run() : \n'
92 spiga 1.8 # stage out from WN
93     if self.params['middleware'] :
94 spiga 1.36 results = self.stager(self.params['middleware'],self.params['inputFileList'])
95 fanzago 1.74 self.writeJsonFile(results)
96 spiga 1.35 self.finalReport(results)
97 spiga 1.8 # Local interaction with SE
98 spiga 1.1 else:
99 spiga 1.36 results = self.copy(self.params['inputFileList'], self.params['protocol'], self.params['option'] )
100 fanzago 1.74 self.writeJsonFile(results)
101 spiga 1.35 return results
102 fanzago 1.74
103     def writeJsonFile( self, results ):
104     """
105     write a json file containing copy results for each file
106     """
107     if self.debug:
108     print 'in writeJsonFile() : \n'
109     print "---->>>> in writeJsonFile results = ", results
110     fp = open('resultCopyFile', 'w')
111     json.dump(results, fp)
112     fp.close()
113     if self.debug:
114     print ' reading resultCopyFile : \n'
115     lp = open('resultCopyFile', "r")
116     inputDict = json.load(lp)
117     lp.close()
118     print " inputDict = ", inputDict
119     return
120 spiga 1.1
121 mcinquil 1.57 def checkLcgUtils( self ):
122     """
123     _checkLcgUtils_
124     check the lcg-utils version and report
125     """
126     import commands
127     cmd = "lcg-cp --version | grep lcg_util"
128     status, output = commands.getstatusoutput( cmd )
129     num_ver = -1
130     if output.find("not found") == -1 or status == 0:
131     temp = output.split("-")
132     version = ""
133     if len(temp) >= 2:
134     version = output.split("-")[1]
135     temp = version.split(".")
136     if len(temp) >= 1:
137     num_ver = int(temp[0])*10
138     num_ver += int(temp[1])
139     return num_ver
140    
141 spiga 1.8 def setProtocol( self, middleware ):
142 spiga 1.1 """
143     define the allowed potocols based on $middlware
144 ewv 1.7 which depend on scheduler
145 spiga 1.1 """
146 spiga 1.8 # default To be used with "middleware"
147 fanzago 1.38 if self.debug:
148     print 'setProtocol() :\n'
149     print '\tmiddleware = %s utils \n'%middleware
150 mcinquil 1.72
151 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
152     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
153 mcinquil 1.57 if self.checkLcgUtils() >= 17:
154 mcinquil 1.72 lcgOpt={'srmv1':'-b -D srmv1 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose',
155     'srmv2':'-b -D srmv2 --srm-timeout 2400 --sendreceive-timeout 2400 --connect-timeout 300 --verbose'}
156 mcinquil 1.57
157 spiga 1.13 srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
158 spiga 1.71 'srmv2':' -report=./srmcp.report -retry_timeout=480000 -retry_num=3 -storagetype=permanent '}
159 spiga 1.8 rfioOpt=''
160    
161 ewv 1.14 supported_protocol = None
162 spiga 1.40 if middleware.lower() in ['osg','lcg','condor','sge']:
163 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
164 spiga 1.30 (self.params['srm_version'],srmOpt[self.params['srm_version']])]
165 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
166 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
167 mcinquil 1.63 elif middleware.lower() in ['pbs']:
168     supported_protocol = [('rfio',rfioOpt),('local','')]
169 edelmann 1.53 elif middleware.lower() in ['arc']:
170     supported_protocol = [('srmv2','-debug'),('srmv1','-debug')]
171 spiga 1.1 else:
172 ewv 1.7 ## here we can add support for any kind of protocol,
173 spiga 1.1 ## maybe some local schedulers need something dedicated
174     pass
175     return supported_protocol
176 spiga 1.28
177 fanzago 1.41
178 fanzago 1.74 #def checkCopy (self, copy_results, len_list_files, prot, lfn='', se=''):
179     def checkCopy (self, copy_results, len_list_files, prot):
180 fanzago 1.41 """
181     Checks the status of copy and update result dictionary
182 spiga 1.28 """
183 fanzago 1.74
184 spiga 1.28 list_retry = []
185 fanzago 1.41 list_not_existing = []
186 fanzago 1.74 list_already_existing = []
187 fanzago 1.75 list_fallback = []
188 spiga 1.28 list_ok = []
189 fanzago 1.41
190     if self.debug:
191     print 'in checkCopy() :\n'
192     for file, dict in copy_results.iteritems():
193     er_code = dict['erCode']
194     if er_code == '0':
195     list_ok.append(file)
196     reason = 'Copy succedeed with %s utils'%prot
197     dict['reason'] = reason
198 fanzago 1.75 elif er_code == '60308':
199     list_fallback.append( file )
200     reason = 'Copy succedeed with %s utils'%prot
201     dict['reason'] = reason
202 fanzago 1.41 elif er_code == '60302':
203     list_not_existing.append( file )
204 fanzago 1.74 elif er_code == '60303':
205     list_already_existing.append( file )
206     else :
207 fanzago 1.41 list_retry.append( file )
208    
209     if self.debug:
210     print "\t file %s \n"%file
211     print "\t dict['erCode'] %s \n"%dict['erCode']
212     print "\t dict['reason'] %s \n"%dict['reason']
213    
214 fanzago 1.74 #if (lfn != '') and (se != ''):
215     # upDict = self.updateReport(file, er_code, dict['reason'], lfn, se)
216     #else:
217     # upDict = self.updateReport(file, er_code, dict['reason'])
218    
219     upDict = self.updateReport(file, er_code, dict['reason'])
220 fanzago 1.41
221     copy_results.update(upDict)
222    
223     msg = ''
224     if len(list_ok) != 0:
225     msg += '\tCopy of %s succedeed with %s utils\n'%(str(list_ok),prot)
226     if len(list_ok) != len_list_files :
227 fanzago 1.75 if len(list_fallback)!=0:
228     msg += '\tCopy of %s succedeed with %s utils in the fallback SE\n'%(str(list_fallback),prot)
229 fanzago 1.74 if len(list_retry)!=0:
230     msg += '\tCopy of %s failed using %s for files \n'%(str(list_retry),prot)
231     if len(list_not_existing)!=0:
232     msg += '\tCopy of %s failed using %s : files not found \n'%(str(list_not_existing),prot)
233     if len(list_already_existing)!=0:
234     msg += '\tCopy of %s failed using %s : files already existing\n'%(str(list_already_existing),prot)
235 fanzago 1.41 if self.debug : print msg
236    
237 fanzago 1.74 return copy_results, list_ok, list_retry
238    
239     def check_for_retry_localSE (self, copy_results):
240     """
241     Checks the status of copy and create the list of file to copy to CloseSE
242     """
243     list_retry_localSE = []
244    
245     if self.debug:
246     print 'in check_for_retry_localSE() :\n'
247     print "\t results in check local = ", copy_results
248     for file, dict in copy_results.iteritems():
249     er_code = dict['erCode']
250     if er_code != '0' and er_code != '60302' and er_code != '60308':
251     list_retry_localSE.append( file )
252    
253     if self.debug:
254     print "\t file %s \n"%file
255     print "\t dict['erCode'] %s \n"%dict['erCode']
256     print "\t dict['reason'] %s \n"%dict['reason']
257    
258     return list_retry_localSE
259    
260 fanzago 1.41
261 spiga 1.45 def LocalCopy(self, list_retry, results):
262 fanzago 1.41 """
263 spiga 1.45 Tries the stage out to the CloseSE
264 fanzago 1.38 """
265 fanzago 1.41 if self.debug:
266 spiga 1.45 print 'in LocalCopy() :\n'
267 fanzago 1.41 print '\t list_retry %s utils \n'%list_retry
268     print '\t len(list_retry) %s \n'%len(list_retry)
269    
270     list_files = list_retry
271     self.params['inputFilesList']=list_files
272    
273     ### copy backup
274     from ProdCommon.FwkJobRep.SiteLocalConfig import loadSiteLocalConfig
275     siteCfg = loadSiteLocalConfig()
276     seName = siteCfg.localStageOut.get("se-name", None)
277     catalog = siteCfg.localStageOut.get("catalog", None)
278     implName = siteCfg.localStageOut.get("command", None)
279     if (implName == 'srm'):
280     implName='srmv1'
281     self.params['srm_version']=implName
282     ##### to be improved ###############
283     if (implName == 'rfcp'):
284     self.params['middleware']='lsf'
285     ####################################
286    
287     self.params['protocol']=implName
288     tfc = siteCfg.trivialFileCatalog()
289    
290     if self.debug:
291     print '\t siteCFG %s \n'%siteCfg
292     print '\t seName %s \n'%seName
293     print '\t catalog %s \n'%catalog
294     print "\t self.params['protocol'] %s \n"%self.params['protocol']
295     print '\t tfc %s '%tfc
296     print "\t self.params['inputFilesList'] %s \n"%self.params['inputFilesList']
297    
298 fanzago 1.69 #if (str(self.params['lfn']).find("/store/") != -1):
299     # temp = str(self.params['lfn']).split("/store/")
300     # self.params['lfn']= "/store/temp/" + temp[1]
301 fanzago 1.74 if (str(self.params['for_lfn']).find("/store/") == 0):
302     temp = str(self.params['for_lfn']).replace("/store/","/store/temp/",1)
303     self.params['for_lfn']= temp
304 fanzago 1.69
305 fanzago 1.74 if ( self.params['for_lfn'][-1] != '/' ) : self.params['for_lfn'] = self.params['for_lfn'] + '/'
306 fanzago 1.65
307 fanzago 1.41 file_backup=[]
308     for input in self.params['inputFilesList']:
309 fanzago 1.74 #file = self.params['lfn'] + os.path.basename(input)
310     file = self.params['for_lfn'] + os.path.basename(input)
311 fanzago 1.41 surl = tfc.matchLFN(tfc.preferredProtocol, file)
312     file_backup.append(surl)
313     if self.debug:
314 fanzago 1.74 #print '\t lfn %s \n'%self.params['lfn']
315     print '\t for_lfn %s \n'%self.params['for_lfn']
316 fanzago 1.41 print '\t file %s \n'%file
317     print '\t surl %s \n'%surl
318    
319     destination=os.path.dirname(file_backup[0])
320 fanzago 1.59 if ( destination[-1] != '/' ) : destination = destination + '/'
321 fanzago 1.41 self.params['destination']=destination
322 fanzago 1.74 ###########
323    
324     self.params['se_name']=seName
325     #print "######################################################"
326     #print "in local copy self.params['se_name'] = ", self.params['se_name']
327     ###print "in local copy self.params['lfn'] = ", self.params['lfn']
328     #print "in local copy self.params['for_lfn'] = ", self.params['for_lfn']
329     #print "######################################################"
330 fanzago 1.41
331     if self.debug:
332     print "\t self.params['destination']%s \n"%self.params['destination']
333     print "\t self.params['protocol'] %s \n"%self.params['protocol']
334     print "\t self.params['option']%s \n"%self.params['option']
335    
336     for prot, opt in self.setProtocol( self.params['middleware'] ):
337 spiga 1.45 if self.debug: print '\tIn LocalCopy trying the stage out with %s utils \n'%prot
338 fanzago 1.65 localCopy_results = self.copy( self.params['inputFileList'], prot, opt, backup='yes' )
339 spiga 1.45 if localCopy_results.keys() == [''] or localCopy_results.keys() == '' :
340     results.update(localCopy_results)
341 fanzago 1.41 else:
342 fanzago 1.74 localCopy_results, list_ok, list_retry = self.checkCopy(localCopy_results, len(list_files), prot)
343 spiga 1.45 results.update(localCopy_results)
344 fanzago 1.41 if len(list_ok) == len(list_files) :
345     break
346     if len(list_retry):
347     list_files = list_retry
348     else: break
349     if self.debug:
350 spiga 1.45 print "\t localCopy_results = %s \n"%localCopy_results
351 spiga 1.28
352 fanzago 1.41 return results
353    
354 spiga 1.8 def stager( self, middleware, list_files ):
355 spiga 1.1 """
356     Implement the logic for remote stage out
357     """
358 spiga 1.30
359 fanzago 1.38 if self.debug:
360     print 'stager() :\n'
361     print '\tmiddleware %s\n'%middleware
362 spiga 1.45 print '\tlist_files %s\n'%list_files
363 fanzago 1.38
364 spiga 1.6 results={}
365 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
366 fanzago 1.74 if self.debug:
367     print '\tTrying the stage out with %s utils \n'%prot
368     print '\tand options %s\n'%opt
369    
370 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
371 fanzago 1.74 #print "in stager copy_results = ", copy_results
372 spiga 1.30 if copy_results.keys() == [''] or copy_results.keys() == '' :
373 spiga 1.13 results.update(copy_results)
374     else:
375 fanzago 1.74 copy_results, list_ok, list_retry = self.checkCopy(copy_results, len(list_files), prot)
376 spiga 1.13 results.update(copy_results)
377     if len(list_ok) == len(list_files) :
378     break
379 fanzago 1.74 if len(list_retry):
380 fanzago 1.41 list_files = list_retry
381     else: break
382 fanzago 1.74
383 spiga 1.45 if self.local_stage:
384 fanzago 1.74 #print "results before check local = ", results
385     list_retry_localSE = self.check_for_retry_localSE(results)
386     #print "results after check local = ", results
387 fanzago 1.55 if len(list_retry_localSE):
388 fanzago 1.74 if self.debug:
389     print "\t list_retry_localSE %s \n"%list_retry_localSE
390 fanzago 1.55 results = self.LocalCopy(list_retry_localSE, results)
391 fanzago 1.74
392 fanzago 1.38 if self.debug:
393     print "\t results %s \n"%results
394 spiga 1.1 return results
395    
396     def initializeApi(self, protocol ):
397     """
398 ewv 1.7 Instantiate storage interface
399 spiga 1.1 """
400 spiga 1.30 if self.debug : print 'initializeApi() :\n'
401 spiga 1.20 self.source_prot = protocol
402     self.dest_prot = protocol
403     if not self.params['source'] : self.source_prot = 'local'
404     Source_SE = self.storageInterface( self.params['source'], self.source_prot )
405     if not self.params['destination'] : self.dest_prot = 'local'
406     Destination_SE = self.storageInterface( self.params['destination'], self.dest_prot )
407 spiga 1.1
408     if self.debug :
409 spiga 1.30 msg = '\t(source=%s, protocol=%s)'%(self.params['source'], self.source_prot)
410     msg += '\t(destination=%s, protocol=%s)'%(self.params['destination'], self.dest_prot)
411 mcinquil 1.32 print msg
412 spiga 1.1
413     return Source_SE, Destination_SE
414    
415 fanzago 1.65 def copy( self, list_file, protocol, options, backup='no' ):
416 spiga 1.1 """
417 ewv 1.7 Make the real file copy using SE API
418 spiga 1.1 """
419 mcinquil 1.37 msg = ""
420 fanzago 1.74 results = {}
421 spiga 1.1 if self.debug :
422 spiga 1.30 msg = 'copy() :\n'
423     msg += '\tusing %s protocol\n'%protocol
424     print msg
425 spiga 1.13 try:
426     Source_SE, Destination_SE = self.initializeApi( protocol )
427     except Exception, ex:
428 fanzago 1.69 for filetocopy in list_file:
429     results.update( self.updateReport(filetocopy, '-1', str(ex)))
430     return results
431 fanzago 1.74
432     prot = Destination_SE.protocol
433     self.hostname=Destination_SE.hostname
434     #print "in copy hostname = ", self.hostname
435     #print "prot = ", prot
436 ewv 1.14
437 ewv 1.7 # create remote dir
438 mcinquil 1.32 if Destination_SE.protocol in ['gridftp','rfio','srmv2']:
439 spiga 1.13 try:
440 mcinquil 1.32 self.createDir( Destination_SE, Destination_SE.protocol )
441 mcinquil 1.47 except OperationException, ex:
442 fanzago 1.69 for filetocopy in list_file:
443 fanzago 1.70 results.update( self.updateReport(filetocopy, '60316', str(ex)))
444 fanzago 1.69 return results
445 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
446     except MissingCommand, ex:
447     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
448 fanzago 1.69 for filetocopy in list_file:
449     results.update( self.updateReport(filetocopy, '10041', msg))
450     return results
451     except Exception, ex:
452     msg = "ERROR %s" %(str(ex))
453     for filetocopy in list_file:
454     results.update( self.updateReport(filetocopy, '-1', msg))
455     return results
456    
457 spiga 1.1 ## prepare for real copy ##
458 spiga 1.8 try :
459     sbi = SBinterface( Source_SE, Destination_SE )
460     sbi_dest = SBinterface(Destination_SE)
461 spiga 1.20 sbi_source = SBinterface(Source_SE)
462 spiga 1.11 except ProtocolMismatch, ex:
463 spiga 1.30 msg = "ERROR : Unable to create SBinterface with %s protocol"%protocol
464     msg += str(ex)
465 fanzago 1.69 for filetocopy in list_file:
466     results.update( self.updateReport(filetocopy, '-1', msg))
467     return results
468 spiga 1.1
469 fanzago 1.74 self.hostname = Destination_SE.hostname
470    
471 ewv 1.7 ## loop over the complete list of files
472     for filetocopy in list_file:
473 spiga 1.30 if self.debug : print '\tStart real copy for %s'%filetocopy
474 spiga 1.13 try :
475 spiga 1.34 ErCode, msg = self.checkFileExist( sbi_source, sbi_dest, filetocopy, options )
476 spiga 1.13 except Exception, ex:
477 spiga 1.58 ErCode = '60307'
478 spiga 1.18 msg = str(ex)
479 ewv 1.7 if ErCode == '0':
480 spiga 1.19 ErCode, msg = self.makeCopy( sbi, filetocopy , options, protocol,sbi_dest )
481 fanzago 1.65 if (ErCode == '0') and (backup == 'yes'):
482     ErCode = '60308'
483 spiga 1.30 if self.debug : print '\tCopy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
484 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
485     return results
486 ewv 1.7
487 spiga 1.1
488     def storageInterface( self, endpoint, protocol ):
489     """
490 ewv 1.7 Create the storage interface.
491 spiga 1.1 """
492 spiga 1.30 if self.debug : print 'storageInterface():\n'
493 spiga 1.1 try:
494     interface = SElement( FullPath(endpoint), protocol )
495 spiga 1.11 except ProtocolUnknown, ex:
496 spiga 1.30 msg = "ERROR : Unable to create interface with %s protocol"%protocol
497     msg += str(ex)
498 spiga 1.13 raise Exception(msg)
499 spiga 1.1
500     return interface
501    
502     def createDir(self, Destination_SE, protocol):
503     """
504 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
505 ewv 1.7 this should be transparent at SE API level.
506 spiga 1.1 """
507 spiga 1.30 if self.debug : print 'createDir():\n'
508 ewv 1.14 msg = ''
509 spiga 1.1 try:
510     action = SBinterface( Destination_SE )
511     action.createDir()
512 spiga 1.30 if self.debug: print "\tThe directory has been created using protocol %s"%protocol
513 spiga 1.11 except TransferException, ex:
514 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
515     msg += str(ex)
516 spiga 1.11 if self.debug :
517 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
518     dbgmsg += '\t'+str(ex.output)+'\n'
519     print dbgmsg
520 spiga 1.29 raise Exception(msg)
521 spiga 1.11 except OperationException, ex:
522 spiga 1.30 msg = "ERROR: problem with the directory creation using %s protocol "%protocol
523     msg += str(ex)
524     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
525 spiga 1.29 raise Exception(msg)
526 spiga 1.31 except MissingDestination, ex:
527     msg = "ERROR: problem with the directory creation using %s protocol "%protocol
528     msg += str(ex)
529     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
530     raise Exception(msg)
531     except AlreadyExistsException, ex:
532     if self.debug: print "\tThe directory already exist"
533 fanzago 1.74 pass
534     except Exception, ex:
535     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
536     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
537     raise Exception(msg)
538 spiga 1.13 return msg
539 spiga 1.1
540 spiga 1.34 def checkFileExist( self, sbi_source, sbi_dest, filetocopy, option ):
541 spiga 1.1 """
542 spiga 1.20 Check both if source file exist AND
543     if destination file ALREADY exist.
544 ewv 1.7 """
545 spiga 1.30 if self.debug : print 'checkFileExist():\n'
546 spiga 1.8 ErCode = '0'
547     msg = ''
548 spiga 1.20 f_tocopy=filetocopy
549     if self.source_prot != 'local':f_tocopy = os.path.basename(filetocopy)
550 spiga 1.1 try:
551 mcinquil 1.72 checkSource = sbi_source.checkExists( f_tocopy , opt=option, tout = self.subprocesstimeout['exists'] )
552 spiga 1.30 if self.debug : print '\tCheck for local file %s exist succeded \n'%f_tocopy
553 spiga 1.20 except OperationException, ex:
554 spiga 1.30 msg ='ERROR: problems checkig if source file %s exist'%filetocopy
555     msg += str(ex)
556 spiga 1.20 if self.debug :
557 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
558     dbgmsg += '\t'+str(ex.output)+'\n'
559     print dbgmsg
560 spiga 1.20 raise Exception(msg)
561     except WrongOption, ex:
562 spiga 1.30 msg ='ERROR problems checkig if source file % exist'%filetocopy
563     msg += str(ex)
564 spiga 1.20 if self.debug :
565 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
566     dbgmsg += '\t'+str(ex.output)+'\n'
567     print dbgmsg
568 spiga 1.20 raise Exception(msg)
569 spiga 1.31 except MissingDestination, ex:
570     msg ='ERROR problems checkig if source file % exist'%filetocopy
571     msg += str(ex)
572     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
573     raise Exception(msg)
574 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
575     except MissingCommand, ex:
576     ErCode = '10041'
577     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
578     return ErCode, msg
579 spiga 1.20 if not checkSource :
580     ErCode = '60302'
581     msg = "ERROR file %s do not exist"%os.path.basename(filetocopy)
582     return ErCode, msg
583     f_tocopy=filetocopy
584     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
585     try:
586 mcinquil 1.72 check = sbi_dest.checkExists( f_tocopy, opt=option, tout = self.subprocesstimeout['exists'] )
587 spiga 1.30 if self.debug : print '\tCheck for remote file %s exist succeded \n'%f_tocopy
588 spiga 1.11 except OperationException, ex:
589 spiga 1.30 msg = 'ERROR: problems checkig if file %s already exist'%filetocopy
590     msg += str(ex)
591 spiga 1.11 if self.debug :
592 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
593     dbgmsg += '\t'+str(ex.output)+'\n'
594     print dbgmsg
595 spiga 1.13 raise Exception(msg)
596 spiga 1.11 except WrongOption, ex:
597 spiga 1.30 msg = 'ERROR problems checkig if file % already exist'%filetocopy
598     msg += str(ex)
599 spiga 1.11 if self.debug :
600 spiga 1.30 msg += '\t'+msg+'\n\t'+str(ex.detail)+'\n'
601     msg += '\t'+str(ex.output)+'\n'
602 spiga 1.13 raise Exception(msg)
603 spiga 1.31 except MissingDestination, ex:
604     msg ='ERROR problems checkig if source file % exist'%filetocopy
605     msg += str(ex)
606     if self.debug : print '\t'+msg+'\n\t'+str(ex.detail)+'\n'
607     raise Exception(msg)
608 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
609     except MissingCommand, ex:
610     ErCode = '10041'
611     msg = "ERROR %s %s" %(str(ex), str(ex.detail))
612     return ErCode, msg
613 ewv 1.14 if check :
614 ewv 1.7 ErCode = '60303'
615 spiga 1.20 msg = "file %s already exist"%os.path.basename(filetocopy)
616 ewv 1.14
617 spiga 1.13 return ErCode, msg
618 spiga 1.1
619 spiga 1.19 def makeCopy(self, sbi, filetocopy, option, protocol, sbi_dest ):
620 spiga 1.1 """
621 ewv 1.7 call the copy API.
622 spiga 1.1 """
623 spiga 1.30 if self.debug : print 'makeCopy():\n'
624 ewv 1.7 path = os.path.dirname(filetocopy)
625 spiga 1.1 file_name = os.path.basename(filetocopy)
626     source_file = filetocopy
627     dest_file = file_name ## to be improved supporting changing file name TODO
628 spiga 1.8 if self.params['source'] == '' and path == '':
629 spiga 1.1 source_file = os.path.abspath(filetocopy)
630 spiga 1.8 elif self.params['destination'] =='':
631 spiga 1.24 destDir = self.params.get('destinationDir',os.getcwd())
632     dest_file = os.path.join(destDir,file_name)
633 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
634 ewv 1.7 source_file = file_name
635 spiga 1.8
636 spiga 1.1 ErCode = '0'
637     msg = ''
638 ewv 1.7
639 spiga 1.71 if self.params['option'].find('space_token')>=0:
640 slacapra 1.64 space_token=self.params['option'].split('=')[1]
641     if protocol == 'srmv2': option = '%s -space_token=%s'%(option,space_token)
642     if protocol == 'srm-lcg': option = '%s -S %s'%(option,space_token)
643 spiga 1.1 try:
644 mcinquil 1.72 sbi.copy( source_file , dest_file , opt = option, tout = self.subprocesstimeout['copy'])
645 spiga 1.11 except TransferException, ex:
646 spiga 1.30 msg = "Problem copying %s file" % filetocopy
647     msg += str(ex)
648 spiga 1.11 if self.debug :
649 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
650     dbgmsg += '\t'+str(ex.output)+'\n'
651 mcinquil 1.32 print dbgmsg
652 spiga 1.1 ErCode = '60307'
653 spiga 1.11 except WrongOption, ex:
654 spiga 1.30 msg = "Problem copying %s file" % filetocopy
655     msg += str(ex)
656 spiga 1.11 if self.debug :
657 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
658     dbgmsg += '\t'+str(ex.output)+'\n'
659 fanzago 1.48 print dbgmsg
660 spiga 1.44 except SizeZeroException, ex:
661     msg = "Problem copying %s file" % filetocopy
662     msg += str(ex)
663     if self.debug :
664     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
665     dbgmsg += '\t'+str(ex.output)+'\n'
666 fanzago 1.48 print dbgmsg
667 spiga 1.13 ErCode = '60307'
668 mcinquil 1.50 ## when the client commands are not found (wrong env or really missing)
669     except MissingCommand, ex:
670     ErCode = '10041'
671     msg = "Problem copying %s file" % filetocopy
672     msg += str(ex)
673     if self.debug :
674     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
675     dbgmsg += '\t'+str(ex.output)+'\n'
676     print dbgmsg
677 mcinquil 1.54 except AuthorizationException, ex:
678     ErCode = '60307'
679     msg = "Problem copying %s file" % filetocopy
680     msg += str(ex)
681     if self.debug :
682     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
683     dbgmsg += '\t'+str(ex.output)+'\n'
684     print dbgmsg
685 mcinquil 1.72 except SEAPITimeout, ex:
686     ErCode = '60317'
687     msg = "Problem copying %s file" % filetocopy
688     msg += str(ex)
689     if self.debug :
690     dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
691     dbgmsg += '\t'+str(ex.output)+'\n'
692     print dbgmsg
693    
694 spiga 1.24 if ErCode == '0' and protocol.find('srmv') == 0:
695 spiga 1.19 remote_file_size = -1
696     local_file_size = os.path.getsize( source_file )
697     try:
698 mcinquil 1.72 remote_file_size = sbi_dest.getSize( dest_file, opt=option, tout = self.subprocesstimeout['size'] )
699 spiga 1.30 if self.debug : print '\t Check of remote size succeded for file %s\n'%dest_file
700 spiga 1.19 except TransferException, ex:
701 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
702     msg += str(ex)
703 spiga 1.19 if self.debug :
704 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
705     dbgmsg += '\t'+str(ex.output)+'\n'
706     print dbgmsg
707 spiga 1.19 ErCode = '60307'
708     except WrongOption, ex:
709 spiga 1.30 msg = "Problem checking the size of %s file" % filetocopy
710     msg += str(ex)
711 spiga 1.19 if self.debug :
712 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
713     dbgmsg += '\t'+str(ex.output)+'\n'
714     print dbgmsg
715 spiga 1.19 ErCode = '60307'
716     if local_file_size != remote_file_size:
717     msg = "File size dosn't match: local size = %s ; remote size = %s " % (local_file_size, remote_file_size)
718     ErCode = '60307'
719 ewv 1.14
720 spiga 1.27 if ErCode != '0':
721     try :
722 spiga 1.34 self.removeFile( sbi_dest, dest_file, option )
723 spiga 1.27 except Exception, ex:
724     msg += '\n'+str(ex)
725 spiga 1.1 return ErCode, msg
726 ewv 1.7
727 spiga 1.34 def removeFile( self, sbi_dest, filetocopy, option ):
728 spiga 1.30 """
729     """
730     if self.debug : print 'removeFile():\n'
731 spiga 1.21 f_tocopy=filetocopy
732     if self.dest_prot != 'local':f_tocopy = os.path.basename(filetocopy)
733     try:
734 mcinquil 1.72 sbi_dest.delete( f_tocopy, opt=option, tout = self.subprocesstimeout['delete'] )
735 spiga 1.30 if self.debug : '\t deletion of file %s succeeded\n'%str(filetocopy)
736 spiga 1.21 except OperationException, ex:
737 spiga 1.30 msg ='ERROR: problems removing partially staged file %s'%filetocopy
738     msg += str(ex)
739 spiga 1.21 if self.debug :
740 spiga 1.30 dbgmsg = '\t'+msg+'\n\t'+str(ex.detail)+'\n'
741     dbgmsg += '\t'+str(ex.output)+'\n'
742     print dbgmsg
743 spiga 1.21 raise Exception(msg)
744    
745     return
746    
747 fanzago 1.74 #def updateReport(self, file, erCode, reason, lfn='', se='' ):
748     def updateReport(self, file, erCode, reason):
749 spiga 1.8 """
750     Update the final stage out infos
751     """
752     jobStageInfo={}
753     jobStageInfo['erCode']=erCode
754     jobStageInfo['reason']=reason
755 fanzago 1.74 #if not self.params['lfn']: self.params['lfn']=''
756     if not self.params['for_lfn']: self.params['for_lfn']=''
757     if not self.params['se_name']: self.params['se_name']=''
758     if not self.hostname: self.hostname=''
759     if (erCode != '0') and (erCode != '60308'):
760     #jobStageInfo['lfn']='/copy_problem/'
761     jobStageInfo['for_lfn']='/copy_problem/'
762     else:
763     jobStageInfo['for_lfn']=self.params['for_lfn']
764     jobStageInfo['se_name']=self.params['se_name']
765     jobStageInfo['endpoint']=self.hostname
766 spiga 1.1
767 spiga 1.8 report = { file : jobStageInfo}
768     return report
769 ewv 1.7
770 spiga 1.8 def finalReport( self , results ):
771     """
772     It a list of LFNs for each SE where data are stored.
773     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
774 spiga 1.1 """
775 spiga 1.8 outFile = open('cmscpReport.sh',"a")
776     cmscp_exit_status = 0
777     txt = ''
778 ewv 1.14 for file, dict in results.iteritems():
779 spiga 1.52 reason = str(dict['reason'])
780     if str(reason).find("'") > -1:
781     reason = " ".join(reason.split("'"))
782     reason="'%s'"%reason
783 ewv 1.14 if file:
784 fanzago 1.74 #if dict['lfn']=='':
785     if dict['for_lfn']=='':
786 fanzago 1.67 lfn = '${LFNBaseName}'+os.path.basename(file)
787 spiga 1.11 se = '$SE'
788 fanzago 1.66 LFNBaseName = '$LFNBaseName'
789 spiga 1.11 else:
790 fanzago 1.74 #lfn = dict['lfn']+os.path.basename(file)
791     lfn = dict['for_lfn']+os.path.basename(file)
792     #se = dict['se']
793     se = dict['se_name']
794 fanzago 1.66 LFNBaseName = os.path.dirname(lfn)
795     if (LFNBaseName[-1] != '/'):
796     LFNBaseName = LFNBaseName + '/'
797 fanzago 1.74
798    
799 spiga 1.11 #dict['lfn'] # to be implemented
800 fanzago 1.39 txt += 'echo "Report for File: '+file+'"\n'
801     txt += 'echo "LFN: '+lfn+'"\n'
802     txt += 'echo "StorageElement: '+se+'"\n'
803 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
804 spiga 1.11 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
805 fanzago 1.59
806 spiga 1.71
807     if dict['erCode'] != '0':
808     cmscp_exit_status = dict['erCode']
809 spiga 1.12 else:
810 spiga 1.49 txt += 'echo "StageOutExitStatusReason = %s" | tee -a $RUNTIME_AREA/$repo\n'%reason
811 spiga 1.12 cmscp_exit_status = dict['erCode']
812 spiga 1.8 txt += '\n'
813     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
814 fanzago 1.65 txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
815 spiga 1.8 outFile.write(str(txt))
816     outFile.close()
817     return
818    
819    
820     def usage():
821    
822     msg="""
823 spiga 1.46 cmscp:
824     safe copy of local file to/from remote SE via lcg_cp/srmcp,
825     including success checking version also for CAF using rfcp command to copy the output to SE
826 spiga 1.8
827 spiga 1.46 accepted parameters:
828     source =
829     destination =
830     inputFileList =
831     outputFileList =
832     protocol =
833     option =
834     middleware =
835     srm_version =
836     destinationDir =
837     lfn= =
838     local_stage = activate stage fall back
839     debug = activate verbose print out
840     help = print on line man and exit
841    
842     mandatory:
843     * "source" and/or "destination" must always be defined
844     * either "middleware" or "protocol" must always be defined
845     * "inputFileList" must always be defined
846     * if "local_stage" = 1 also "lfn" must be defined
847 spiga 1.8 """
848 ewv 1.14 print msg
849 spiga 1.8
850 ewv 1.14 return
851 spiga 1.8
852     def HelpOptions(opts=[]):
853     """
854     Check otps, print help if needed
855 ewv 1.14 prepare dict = { opt : value }
856 spiga 1.8 """
857     dict_args = {}
858     if len(opts):
859     for opt, arg in opts:
860 ewv 1.14 dict_args[opt.split('--')[1]] = arg
861 spiga 1.8 if opt in ('-h','-help','--help') :
862     usage()
863     sys.exit(0)
864     return dict_args
865     else:
866     usage()
867     sys.exit(0)
868 spiga 1.1
869     if __name__ == '__main__' :
870 spiga 1.8
871 ewv 1.14 import getopt
872 spiga 1.8
873     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
874 spiga 1.25 "protocol=","option=", "middleware=", "srm_version=", \
875 fanzago 1.74 "destinationDir=", "for_lfn=", "local_stage", "debug", "help", "se_name="]
876 ewv 1.14 try:
877     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
878 spiga 1.8 except getopt.GetoptError, err:
879     print err
880     HelpOptions()
881     sys.exit(2)
882 ewv 1.14
883 spiga 1.8 dictArgs = HelpOptions(opts)
884 spiga 1.1 try:
885 spiga 1.8 cmscp_ = cmscp(dictArgs)
886 spiga 1.1 cmscp_.run()
887 spiga 1.11 except Exception, ex :
888     print str(ex)
889 spiga 1.1