ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.13
Committed: Thu Oct 9 14:10:18 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_pre8
Changes since 1.12: +51 -44 lines
Log Message:
fix the utils failure handling, adjustmens in command option

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 spiga 1.8 import sys, string
4 spiga 1.1 import os, popen2
5 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
7 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
8 spiga 1.1
9    
10     class cmscp:
11 spiga 1.8 def __init__(self, args):
12 spiga 1.1 """
13     cmscp
14    
15 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
17     input:
18     $1 middleware (CAF, LSF, LCG, OSG)
19 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
20     $3 if needed: file name (the output file name)
21     $5 remote SE (complete endpoint)
22 ewv 1.7 $6 srm version
23 spiga 1.1 output:
24     return 0 if all ok
25     return 60307 if srmcp failed
26     return 60303 if file already exists in the SE
27     """
28 spiga 1.8
29 spiga 1.1 #set default
30 spiga 1.8 self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
31 spiga 1.11 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
32 spiga 1.8 self.debug = 0
33    
34     self.params.update( args )
35 ewv 1.7
36 spiga 1.1 return
37    
38 spiga 1.8 def processOptions( self ):
39 spiga 1.1 """
40 spiga 1.8 check command line parameter
41 spiga 1.1 """
42 spiga 1.8
43     if 'help' in self.params.keys(): HelpOptions()
44     if 'debug' in self.params.keys(): self.debug = 1
45 ewv 1.7
46     # source and dest cannot be undefined at same time
47 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
48     HelpOptions()
49    
50 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
51 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
52     HelpOptions()
53    
54 ewv 1.7 # input file must be defined
55 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
56 spiga 1.1 else:
57 spiga 1.8 file_to_copy=[]
58     if self.params['inputFileList'].find(','):
59     [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
60 ewv 1.7 else:
61 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
62     self.params['inputFileList'] = file_to_copy
63 ewv 1.7
64 spiga 1.1 ## TO DO:
65     #### add check for outFiles
66     #### add map {'inFileNAME':'outFileNAME'} to change out name
67    
68     return
69    
70 ewv 1.7 def run( self ):
71 spiga 1.1 """
72 ewv 1.7 Check if running on UI (no $middleware) or
73     on WN (on the Grid), and take different action
74 spiga 1.1 """
75 spiga 1.8
76     self.processOptions()
77     # stage out from WN
78     if self.params['middleware'] :
79     results = self.stager(self.params['middleware'],self.params['inputFileList'])
80     self.finalReport(results)
81     # Local interaction with SE
82 spiga 1.1 else:
83 spiga 1.8 results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.protocols['option'] )
84     return results
85 spiga 1.1
86 spiga 1.8 def setProtocol( self, middleware ):
87 spiga 1.1 """
88     define the allowed potocols based on $middlware
89 ewv 1.7 which depend on scheduler
90 spiga 1.1 """
91 spiga 1.8 # default To be used with "middleware"
92 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
93     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
94     srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
95     'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
96 spiga 1.8 rfioOpt=''
97    
98     if middleware.lower() in ['osg','lcg']:
99 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100     (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
102     supported_protocol = [('rfio',rfioOpt)]
103 spiga 1.1 else:
104 ewv 1.7 ## here we can add support for any kind of protocol,
105 spiga 1.1 ## maybe some local schedulers need something dedicated
106     pass
107     return supported_protocol
108    
109 spiga 1.8 def stager( self, middleware, list_files ):
110 spiga 1.1 """
111     Implement the logic for remote stage out
112     """
113 spiga 1.6 results={}
114 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
115 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
116 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
117 ewv 1.7 list_retry = []
118     list_existing = []
119     list_ok = []
120 spiga 1.13 if copy_results.keys() == '':
121     results.update(copy_results)
122     else:
123     for file, dict in copy_results.iteritems():
124     er_code = dict['erCode']
125     if er_code == '0':
126     list_ok.append(file)
127     reason = 'Copy succedeed with %s utils'%prot
128     upDict = self.updateReport(file, er_code, reason)
129     copy_results.update(upDict)
130     elif er_code == '60303': list_existing.append( file )
131     else: list_retry.append( file )
132     results.update(copy_results)
133     if len(list_ok) != 0:
134     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135     if self.debug : print msg
136     if len(list_ok) == len(list_files) :
137     break
138 spiga 1.1 else:
139 spiga 1.13 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
140     if len(list_retry): list_files = list_retry
141     else: break
142 ewv 1.7
143     #### TODO Daniele
144 spiga 1.1 #check is something fails and created related dict
145 ewv 1.7 # backup = self.analyzeResults(results)
146    
147     # if backup :
148 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
149     # #backupDict = self.backup()
150 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
151 spiga 1.1 # results.update(backupDict)
152     # print msg
153     return results
154    
155     def initializeApi(self, protocol ):
156     """
157 ewv 1.7 Instantiate storage interface
158 spiga 1.1 """
159 ewv 1.7 source_prot = protocol
160     dest_prot = protocol
161 spiga 1.8 if not self.params['source'] : source_prot = 'local'
162     Source_SE = self.storageInterface( self.params['source'], source_prot )
163     if not self.params['destination'] : dest_prot = 'local'
164     Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
165 spiga 1.1
166     if self.debug :
167 spiga 1.8 print '(source=%s, protocol=%s)'%(self.params['source'], source_prot)
168     print '(destination=%s, protocol=%s)'%(self.params['destination'], dest_prot)
169 spiga 1.1
170     return Source_SE, Destination_SE
171    
172 spiga 1.8 def copy( self, list_file, protocol, options ):
173 spiga 1.1 """
174 ewv 1.7 Make the real file copy using SE API
175 spiga 1.1 """
176     if self.debug :
177 ewv 1.7 print 'copy(): using %s protocol'%protocol
178 spiga 1.13 try:
179     Source_SE, Destination_SE = self.initializeApi( protocol )
180     except Exception, ex:
181     return self.updateReport('', '-1', str(ex))
182    
183 ewv 1.7 # create remote dir
184 spiga 1.10 if protocol in ['gridftp','rfio']:
185 spiga 1.13 try:
186     self.createDir( Destination_SE, protocol )
187     except Exception, ex:
188     return self.updateReport('', '60316', str(ex))
189 spiga 1.1
190     ## prepare for real copy ##
191 spiga 1.8 try :
192     sbi = SBinterface( Source_SE, Destination_SE )
193     sbi_dest = SBinterface(Destination_SE)
194 spiga 1.11 except ProtocolMismatch, ex:
195     msg = str(ex)+'\n'
196 spiga 1.8 msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
197 spiga 1.13 return self.updateReport('', '-1', str(ex))
198 spiga 1.1
199     results = {}
200 ewv 1.7 ## loop over the complete list of files
201     for filetocopy in list_file:
202 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
203 spiga 1.13 try :
204     ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
205     except Exception, ex:
206     return self.updateReport(filetocopy, '-1', str(ex))
207 ewv 1.7 if ErCode == '0':
208 spiga 1.8 ErCode, msg = self.makeCopy( sbi, filetocopy , options )
209     if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
210 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
211     return results
212 ewv 1.7
213 spiga 1.1
214     def storageInterface( self, endpoint, protocol ):
215     """
216 ewv 1.7 Create the storage interface.
217 spiga 1.1 """
218     try:
219     interface = SElement( FullPath(endpoint), protocol )
220 spiga 1.11 except ProtocolUnknown, ex:
221 ewv 1.7 msg = ''
222 spiga 1.1 if self.debug : msg = str(ex)+'\n'
223 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
224 spiga 1.13 raise Exception(msg)
225 spiga 1.1
226     return interface
227    
228     def createDir(self, Destination_SE, protocol):
229     """
230 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
231 ewv 1.7 this should be transparent at SE API level.
232 spiga 1.1 """
233 spiga 1.9 msg = ''
234 spiga 1.1 try:
235     action = SBinterface( Destination_SE )
236     action.createDir()
237     if self.debug: print "The directory has been created using protocol %s\n"%protocol
238 spiga 1.11 except TransferException, ex:
239     msg = str(ex)
240     if self.debug :
241     msg += str(ex.detail)+'\n'
242 spiga 1.12 msg += str(ex.output)+'\n'
243 spiga 1.13 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
244 spiga 1.11 except OperationException, ex:
245     msg = str(ex)
246     if self.debug : msg += str(ex.detail)+'\n'
247 spiga 1.13 msg = "ERROR: problem with the directory creation using %s protocol \n"%protocol
248 spiga 1.1
249 spiga 1.13 return msg
250 spiga 1.1
251 spiga 1.8 def checkFileExist( self, sbi, filetocopy ):
252 spiga 1.1 """
253 ewv 1.7 Check if file to copy already exist
254     """
255 spiga 1.8 ErCode = '0'
256     msg = ''
257 spiga 1.1 try:
258     check = sbi.checkExists(filetocopy)
259 spiga 1.11 except OperationException, ex:
260     msg = str(ex)
261     if self.debug :
262     msg += str(ex.detail)+'\n'
263 spiga 1.12 msg += str(ex.output)+'\n'
264 spiga 1.13 msg +='ERROR: problems checkig if file %s already exist'%filetocopy
265     raise Exception(msg)
266 spiga 1.11 except WrongOption, ex:
267     msg = str(ex)
268     if self.debug :
269     msg += str(ex.detail)+'\n'
270 spiga 1.12 msg += str(ex.output)+'\n'
271 spiga 1.13 msg +='ERROR problems checkig if file % already exist'%filetocopy
272     raise Exception(msg)
273 spiga 1.8 if check :
274 ewv 1.7 ErCode = '60303'
275 spiga 1.1 msg = "file %s already exist"%filetocopy
276 spiga 1.13
277     return ErCode, msg
278 spiga 1.1
279 spiga 1.8 def makeCopy(self, sbi, filetocopy, option ):
280 spiga 1.1 """
281 ewv 1.7 call the copy API.
282 spiga 1.1 """
283 ewv 1.7 path = os.path.dirname(filetocopy)
284 spiga 1.1 file_name = os.path.basename(filetocopy)
285     source_file = filetocopy
286     dest_file = file_name ## to be improved supporting changing file name TODO
287 spiga 1.8 if self.params['source'] == '' and path == '':
288 spiga 1.1 source_file = os.path.abspath(filetocopy)
289 spiga 1.8 elif self.params['destination'] =='':
290 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
291 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
292 ewv 1.7 source_file = file_name
293 spiga 1.8
294 spiga 1.1 ErCode = '0'
295     msg = ''
296 ewv 1.7
297 spiga 1.1 try:
298 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
299 spiga 1.11 except TransferException, ex:
300     msg = str(ex)
301     if self.debug :
302     msg += str(ex.detail)+'\n'
303 spiga 1.12 msg += str(ex.output)+'\n'
304 spiga 1.11 msg += "Problem copying %s file" % filetocopy
305 spiga 1.1 ErCode = '60307'
306 spiga 1.11 except WrongOption, ex:
307     msg = str(ex)
308     if self.debug :
309     msg += str(ex.detail)+'\n'
310 spiga 1.12 msg += str(ex.output)+'\n'
311 spiga 1.13 msg += "Problem copying %s file" % filetocopy
312     ErCode = '60307'
313 spiga 1.11
314     ## TO BE IMPLEMENTED if NEEDED
315     ## NOTE: SE API Already available
316     # if self.protocol.find('srm') : self.checkSize( sbi, filetocopy )
317    
318 spiga 1.1 return ErCode, msg
319 ewv 1.7
320     def backup(self):
321 spiga 1.1 """
322     Check infos from TFC using existing api obtaining:
323     1)destination
324     2)protocol
325     """
326     return
327    
328 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
329     """
330     Update the final stage out infos
331     """
332     jobStageInfo={}
333     jobStageInfo['erCode']=erCode
334     jobStageInfo['reason']=reason
335     jobStageInfo['lfn']=lfn
336     jobStageInfo['se']=se
337 spiga 1.1
338 spiga 1.8 report = { file : jobStageInfo}
339     return report
340 ewv 1.7
341 spiga 1.8 def finalReport( self , results ):
342     """
343     It a list of LFNs for each SE where data are stored.
344     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
345 spiga 1.1 """
346 spiga 1.8 outFile = open('cmscpReport.sh',"a")
347     cmscp_exit_status = 0
348     txt = ''
349 spiga 1.11 for file, dict in results.iteritems():
350     if file:
351     if dict['lfn']=='':
352     lfn = '$LFNBaseName/'+os.path.basename(file)
353     se = '$SE'
354     else:
355     lfn = dict['lfn']+os.pat.basename(file)
356     se = dict['se']
357     #dict['lfn'] # to be implemented
358     txt += 'echo "Report for File: '+file+'"\n'
359     txt += 'echo "LFN: '+lfn+'"\n'
360     txt += 'echo "StorageElement: '+se+'"\n'
361     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
362     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
363     if dict['erCode'] != '0':
364     cmscp_exit_status = dict['erCode']
365     cmscp_exit_status = dict['erCode']
366 spiga 1.12 else:
367     cmscp_exit_status = dict['erCode']
368     cmscp_exit_status = dict['erCode']
369 spiga 1.8 txt += '\n'
370     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
371     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
372     outFile.write(str(txt))
373     outFile.close()
374     return
375    
376    
377     def usage():
378    
379     msg="""
380     required parameters:
381     --source :: REMOTE :
382     --destination :: REMOTE :
383     --debug :
384     --inFile :: absPath : or name NOT RELATIVE PATH
385     --outFIle :: onlyNAME : NOT YET SUPPORTED
386    
387     optional parameters
388     """
389     print msg
390    
391     return
392    
393     def HelpOptions(opts=[]):
394     """
395     Check otps, print help if needed
396     prepare dict = { opt : value }
397     """
398     dict_args = {}
399     if len(opts):
400     for opt, arg in opts:
401     dict_args[opt.split('--')[1]] = arg
402     if opt in ('-h','-help','--help') :
403     usage()
404     sys.exit(0)
405     return dict_args
406     else:
407     usage()
408     sys.exit(0)
409 spiga 1.1
410     if __name__ == '__main__' :
411 spiga 1.8
412     import getopt
413    
414     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
415     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
416     try:
417     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
418     except getopt.GetoptError, err:
419     print err
420     HelpOptions()
421     sys.exit(2)
422    
423     dictArgs = HelpOptions(opts)
424 spiga 1.1 try:
425 spiga 1.8 cmscp_ = cmscp(dictArgs)
426 spiga 1.1 cmscp_.run()
427 spiga 1.11 except Exception, ex :
428     print str(ex)
429 spiga 1.1