ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.18
Committed: Thu Oct 23 07:49:33 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_1_pre2
Changes since 1.17: +2 -1 lines
Log Message:
PCommon _0_12_5_CRAB_1

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 mcinquil 1.16 import sys, os
4 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
6 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
7 spiga 1.1
8    
9     class cmscp:
10 spiga 1.8 def __init__(self, args):
11 spiga 1.1 """
12     cmscp
13    
14 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
16     input:
17     $1 middleware (CAF, LSF, LCG, OSG)
18 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
19     $3 if needed: file name (the output file name)
20     $5 remote SE (complete endpoint)
21 ewv 1.7 $6 srm version
22 spiga 1.1 output:
23     return 0 if all ok
24     return 60307 if srmcp failed
25     return 60303 if file already exists in the SE
26     """
27 spiga 1.8
28 spiga 1.1 #set default
29 spiga 1.8 self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
30 spiga 1.11 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31 ewv 1.14 self.debug = 0
32    
33 spiga 1.8 self.params.update( args )
34 ewv 1.7
35 spiga 1.1 return
36    
37 spiga 1.8 def processOptions( self ):
38 spiga 1.1 """
39 spiga 1.8 check command line parameter
40 spiga 1.1 """
41 spiga 1.8
42 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
43     if 'debug' in self.params.keys(): self.debug = 1
44 ewv 1.7
45     # source and dest cannot be undefined at same time
46 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
47     HelpOptions()
48    
49 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
50 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
51     HelpOptions()
52    
53 ewv 1.7 # input file must be defined
54 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
55 spiga 1.1 else:
56 spiga 1.8 file_to_copy=[]
57 ewv 1.14 if self.params['inputFileList'].find(','):
58 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 ewv 1.7 else:
60 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
61     self.params['inputFileList'] = file_to_copy
62 ewv 1.7
63 spiga 1.1 ## TO DO:
64     #### add check for outFiles
65     #### add map {'inFileNAME':'outFileNAME'} to change out name
66    
67     return
68    
69 ewv 1.7 def run( self ):
70 spiga 1.1 """
71 ewv 1.7 Check if running on UI (no $middleware) or
72     on WN (on the Grid), and take different action
73 spiga 1.1 """
74 spiga 1.8
75     self.processOptions()
76     # stage out from WN
77     if self.params['middleware'] :
78     results = self.stager(self.params['middleware'],self.params['inputFileList'])
79     self.finalReport(results)
80     # Local interaction with SE
81 spiga 1.1 else:
82 mcinquil 1.16 results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'] )
83 spiga 1.8 return results
84 spiga 1.1
85 spiga 1.8 def setProtocol( self, middleware ):
86 spiga 1.1 """
87     define the allowed potocols based on $middlware
88 ewv 1.7 which depend on scheduler
89 spiga 1.1 """
90 spiga 1.8 # default To be used with "middleware"
91 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
92     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
93     srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
94     'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
95 spiga 1.8 rfioOpt=''
96    
97 ewv 1.14 supported_protocol = None
98     if middleware.lower() in ['osg','lcg','condor']:
99 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100     (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
102 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
103 spiga 1.1 else:
104 ewv 1.7 ## here we can add support for any kind of protocol,
105 spiga 1.1 ## maybe some local schedulers need something dedicated
106     pass
107     return supported_protocol
108    
109 spiga 1.8 def stager( self, middleware, list_files ):
110 spiga 1.1 """
111     Implement the logic for remote stage out
112     """
113 spiga 1.6 results={}
114 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
115 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
116 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
117 ewv 1.7 list_retry = []
118     list_existing = []
119     list_ok = []
120 spiga 1.13 if copy_results.keys() == '':
121     results.update(copy_results)
122     else:
123     for file, dict in copy_results.iteritems():
124     er_code = dict['erCode']
125     if er_code == '0':
126     list_ok.append(file)
127     reason = 'Copy succedeed with %s utils'%prot
128     upDict = self.updateReport(file, er_code, reason)
129     copy_results.update(upDict)
130     elif er_code == '60303': list_existing.append( file )
131     else: list_retry.append( file )
132     results.update(copy_results)
133     if len(list_ok) != 0:
134     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135     if self.debug : print msg
136     if len(list_ok) == len(list_files) :
137     break
138 spiga 1.1 else:
139 spiga 1.13 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
140     if len(list_retry): list_files = list_retry
141     else: break
142 ewv 1.7
143     #### TODO Daniele
144 spiga 1.1 #check is something fails and created related dict
145 ewv 1.7 # backup = self.analyzeResults(results)
146    
147     # if backup :
148 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
149     # #backupDict = self.backup()
150 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
151 spiga 1.1 # results.update(backupDict)
152     # print msg
153     return results
154    
155     def initializeApi(self, protocol ):
156     """
157 ewv 1.7 Instantiate storage interface
158 spiga 1.1 """
159 ewv 1.7 source_prot = protocol
160     dest_prot = protocol
161 spiga 1.8 if not self.params['source'] : source_prot = 'local'
162     Source_SE = self.storageInterface( self.params['source'], source_prot )
163     if not self.params['destination'] : dest_prot = 'local'
164     Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
165 spiga 1.1
166     if self.debug :
167 spiga 1.8 print '(source=%s, protocol=%s)'%(self.params['source'], source_prot)
168     print '(destination=%s, protocol=%s)'%(self.params['destination'], dest_prot)
169 spiga 1.1
170     return Source_SE, Destination_SE
171    
172 spiga 1.8 def copy( self, list_file, protocol, options ):
173 spiga 1.1 """
174 ewv 1.7 Make the real file copy using SE API
175 spiga 1.1 """
176     if self.debug :
177 ewv 1.7 print 'copy(): using %s protocol'%protocol
178 spiga 1.13 try:
179     Source_SE, Destination_SE = self.initializeApi( protocol )
180     except Exception, ex:
181     return self.updateReport('', '-1', str(ex))
182 ewv 1.14
183 ewv 1.7 # create remote dir
184 spiga 1.10 if protocol in ['gridftp','rfio']:
185 spiga 1.13 try:
186     self.createDir( Destination_SE, protocol )
187     except Exception, ex:
188     return self.updateReport('', '60316', str(ex))
189 spiga 1.1
190     ## prepare for real copy ##
191 spiga 1.8 try :
192     sbi = SBinterface( Source_SE, Destination_SE )
193     sbi_dest = SBinterface(Destination_SE)
194 spiga 1.11 except ProtocolMismatch, ex:
195     msg = str(ex)+'\n'
196 spiga 1.8 msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
197 spiga 1.13 return self.updateReport('', '-1', str(ex))
198 spiga 1.1
199     results = {}
200 ewv 1.7 ## loop over the complete list of files
201     for filetocopy in list_file:
202 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
203 spiga 1.13 try :
204     ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
205     except Exception, ex:
206 spiga 1.18 ErCode = -1
207     msg = str(ex)
208 ewv 1.7 if ErCode == '0':
209 spiga 1.8 ErCode, msg = self.makeCopy( sbi, filetocopy , options )
210     if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
211 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
212     return results
213 ewv 1.7
214 spiga 1.1
215     def storageInterface( self, endpoint, protocol ):
216     """
217 ewv 1.7 Create the storage interface.
218 spiga 1.1 """
219     try:
220     interface = SElement( FullPath(endpoint), protocol )
221 spiga 1.11 except ProtocolUnknown, ex:
222 ewv 1.7 msg = ''
223 spiga 1.1 if self.debug : msg = str(ex)+'\n'
224 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
225 spiga 1.13 raise Exception(msg)
226 spiga 1.1
227     return interface
228    
229     def createDir(self, Destination_SE, protocol):
230     """
231 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
232 ewv 1.7 this should be transparent at SE API level.
233 spiga 1.1 """
234 ewv 1.14 msg = ''
235 spiga 1.1 try:
236     action = SBinterface( Destination_SE )
237     action.createDir()
238 fanzago 1.15 if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
239 spiga 1.11 except TransferException, ex:
240     msg = str(ex)
241     if self.debug :
242     msg += str(ex.detail)+'\n'
243 spiga 1.12 msg += str(ex.output)+'\n'
244 spiga 1.13 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
245 fanzago 1.15 raise Exceptions(msg)
246 spiga 1.11 except OperationException, ex:
247     msg = str(ex)
248     if self.debug : msg += str(ex.detail)+'\n'
249 mcinquil 1.16 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
250 spiga 1.1
251 spiga 1.13 return msg
252 spiga 1.1
253 spiga 1.8 def checkFileExist( self, sbi, filetocopy ):
254 spiga 1.1 """
255 ewv 1.7 Check if file to copy already exist
256     """
257 spiga 1.8 ErCode = '0'
258     msg = ''
259 spiga 1.1 try:
260     check = sbi.checkExists(filetocopy)
261 spiga 1.11 except OperationException, ex:
262     msg = str(ex)
263     if self.debug :
264     msg += str(ex.detail)+'\n'
265 spiga 1.12 msg += str(ex.output)+'\n'
266 spiga 1.13 msg +='ERROR: problems checkig if file %s already exist'%filetocopy
267     raise Exception(msg)
268 spiga 1.11 except WrongOption, ex:
269     msg = str(ex)
270     if self.debug :
271     msg += str(ex.detail)+'\n'
272 spiga 1.12 msg += str(ex.output)+'\n'
273 spiga 1.13 msg +='ERROR problems checkig if file % already exist'%filetocopy
274     raise Exception(msg)
275 ewv 1.14 if check :
276 ewv 1.7 ErCode = '60303'
277 spiga 1.1 msg = "file %s already exist"%filetocopy
278 ewv 1.14
279 spiga 1.13 return ErCode, msg
280 spiga 1.1
281 spiga 1.8 def makeCopy(self, sbi, filetocopy, option ):
282 spiga 1.1 """
283 ewv 1.7 call the copy API.
284 spiga 1.1 """
285 ewv 1.7 path = os.path.dirname(filetocopy)
286 spiga 1.1 file_name = os.path.basename(filetocopy)
287     source_file = filetocopy
288     dest_file = file_name ## to be improved supporting changing file name TODO
289 spiga 1.8 if self.params['source'] == '' and path == '':
290 spiga 1.1 source_file = os.path.abspath(filetocopy)
291 spiga 1.8 elif self.params['destination'] =='':
292 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
293 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
294 ewv 1.7 source_file = file_name
295 spiga 1.8
296 spiga 1.1 ErCode = '0'
297     msg = ''
298 ewv 1.7
299 spiga 1.1 try:
300 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
301 spiga 1.11 except TransferException, ex:
302     msg = str(ex)
303     if self.debug :
304     msg += str(ex.detail)+'\n'
305 spiga 1.12 msg += str(ex.output)+'\n'
306 spiga 1.11 msg += "Problem copying %s file" % filetocopy
307 spiga 1.1 ErCode = '60307'
308 spiga 1.11 except WrongOption, ex:
309     msg = str(ex)
310     if self.debug :
311     msg += str(ex.detail)+'\n'
312 spiga 1.12 msg += str(ex.output)+'\n'
313 spiga 1.13 msg += "Problem copying %s file" % filetocopy
314     ErCode = '60307'
315 ewv 1.14
316 spiga 1.11 ## TO BE IMPLEMENTED if NEEDED
317     ## NOTE: SE API Already available
318 ewv 1.14 # if self.protocol.find('srm') : self.checkSize( sbi, filetocopy )
319    
320 spiga 1.1 return ErCode, msg
321 ewv 1.7
322     def backup(self):
323 spiga 1.1 """
324     Check infos from TFC using existing api obtaining:
325     1)destination
326     2)protocol
327     """
328     return
329    
330 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
331     """
332     Update the final stage out infos
333     """
334     jobStageInfo={}
335     jobStageInfo['erCode']=erCode
336     jobStageInfo['reason']=reason
337     jobStageInfo['lfn']=lfn
338     jobStageInfo['se']=se
339 spiga 1.1
340 spiga 1.8 report = { file : jobStageInfo}
341     return report
342 ewv 1.7
343 spiga 1.8 def finalReport( self , results ):
344     """
345     It a list of LFNs for each SE where data are stored.
346     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
347 spiga 1.1 """
348 spiga 1.8 outFile = open('cmscpReport.sh',"a")
349     cmscp_exit_status = 0
350     txt = ''
351 ewv 1.14 for file, dict in results.iteritems():
352     if file:
353 spiga 1.11 if dict['lfn']=='':
354     lfn = '$LFNBaseName/'+os.path.basename(file)
355     se = '$SE'
356     else:
357 mcinquil 1.16 lfn = dict['lfn']+os.path.basename(file)
358 spiga 1.11 se = dict['se']
359     #dict['lfn'] # to be implemented
360     txt += 'echo "Report for File: '+file+'"\n'
361     txt += 'echo "LFN: '+lfn+'"\n'
362     txt += 'echo "StorageElement: '+se+'"\n'
363     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
364     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
365     if dict['erCode'] != '0':
366     cmscp_exit_status = dict['erCode']
367     cmscp_exit_status = dict['erCode']
368 spiga 1.12 else:
369     cmscp_exit_status = dict['erCode']
370     cmscp_exit_status = dict['erCode']
371 spiga 1.8 txt += '\n'
372     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
373     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
374     outFile.write(str(txt))
375     outFile.close()
376     return
377    
378    
379     def usage():
380    
381     msg="""
382     required parameters:
383     --source :: REMOTE :
384     --destination :: REMOTE :
385     --debug :
386     --inFile :: absPath : or name NOT RELATIVE PATH
387     --outFIle :: onlyNAME : NOT YET SUPPORTED
388    
389     optional parameters
390     """
391 ewv 1.14 print msg
392 spiga 1.8
393 ewv 1.14 return
394 spiga 1.8
395     def HelpOptions(opts=[]):
396     """
397     Check otps, print help if needed
398 ewv 1.14 prepare dict = { opt : value }
399 spiga 1.8 """
400     dict_args = {}
401     if len(opts):
402     for opt, arg in opts:
403 ewv 1.14 dict_args[opt.split('--')[1]] = arg
404 spiga 1.8 if opt in ('-h','-help','--help') :
405     usage()
406     sys.exit(0)
407     return dict_args
408     else:
409     usage()
410     sys.exit(0)
411 spiga 1.1
412     if __name__ == '__main__' :
413 spiga 1.8
414 ewv 1.14 import getopt
415 spiga 1.8
416     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
417     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
418 ewv 1.14 try:
419     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
420 spiga 1.8 except getopt.GetoptError, err:
421     print err
422     HelpOptions()
423     sys.exit(2)
424 ewv 1.14
425 spiga 1.8 dictArgs = HelpOptions(opts)
426 spiga 1.1 try:
427 spiga 1.8 cmscp_ = cmscp(dictArgs)
428 spiga 1.1 cmscp_.run()
429 spiga 1.11 except Exception, ex :
430     print str(ex)
431 spiga 1.1