ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.17
Committed: Tue Oct 21 10:21:11 2008 UTC (16 years, 6 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
Changes since 1.16: +0 -2 lines
Log Message:
Removed not useful prints

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 mcinquil 1.16 import sys, os
4 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
5 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
6 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
7 spiga 1.1
8    
9     class cmscp:
10 spiga 1.8 def __init__(self, args):
11 spiga 1.1 """
12     cmscp
13    
14 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
16     input:
17     $1 middleware (CAF, LSF, LCG, OSG)
18 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
19     $3 if needed: file name (the output file name)
20     $5 remote SE (complete endpoint)
21 ewv 1.7 $6 srm version
22 spiga 1.1 output:
23     return 0 if all ok
24     return 60307 if srmcp failed
25     return 60303 if file already exists in the SE
26     """
27 spiga 1.8
28 spiga 1.1 #set default
29 spiga 1.8 self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
30 spiga 1.11 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
31 ewv 1.14 self.debug = 0
32    
33 spiga 1.8 self.params.update( args )
34 ewv 1.7
35 spiga 1.1 return
36    
37 spiga 1.8 def processOptions( self ):
38 spiga 1.1 """
39 spiga 1.8 check command line parameter
40 spiga 1.1 """
41 spiga 1.8
42 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
43     if 'debug' in self.params.keys(): self.debug = 1
44 ewv 1.7
45     # source and dest cannot be undefined at same time
46 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
47     HelpOptions()
48    
49 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
50 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
51     HelpOptions()
52    
53 ewv 1.7 # input file must be defined
54 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
55 spiga 1.1 else:
56 spiga 1.8 file_to_copy=[]
57 ewv 1.14 if self.params['inputFileList'].find(','):
58 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 ewv 1.7 else:
60 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
61     self.params['inputFileList'] = file_to_copy
62 ewv 1.7
63 spiga 1.1 ## TO DO:
64     #### add check for outFiles
65     #### add map {'inFileNAME':'outFileNAME'} to change out name
66    
67     return
68    
69 ewv 1.7 def run( self ):
70 spiga 1.1 """
71 ewv 1.7 Check if running on UI (no $middleware) or
72     on WN (on the Grid), and take different action
73 spiga 1.1 """
74 spiga 1.8
75     self.processOptions()
76     # stage out from WN
77     if self.params['middleware'] :
78     results = self.stager(self.params['middleware'],self.params['inputFileList'])
79     self.finalReport(results)
80     # Local interaction with SE
81 spiga 1.1 else:
82 mcinquil 1.16 results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.params['option'] )
83 spiga 1.8 return results
84 spiga 1.1
85 spiga 1.8 def setProtocol( self, middleware ):
86 spiga 1.1 """
87     define the allowed potocols based on $middlware
88 ewv 1.7 which depend on scheduler
89 spiga 1.1 """
90 spiga 1.8 # default To be used with "middleware"
91 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
92     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
93     srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
94     'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
95 spiga 1.8 rfioOpt=''
96    
97 ewv 1.14 supported_protocol = None
98     if middleware.lower() in ['osg','lcg','condor']:
99 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100     (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
102 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
103 spiga 1.1 else:
104 ewv 1.7 ## here we can add support for any kind of protocol,
105 spiga 1.1 ## maybe some local schedulers need something dedicated
106     pass
107     return supported_protocol
108    
109 spiga 1.8 def stager( self, middleware, list_files ):
110 spiga 1.1 """
111     Implement the logic for remote stage out
112     """
113 spiga 1.6 results={}
114 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
115 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
116 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
117 ewv 1.7 list_retry = []
118     list_existing = []
119     list_ok = []
120 spiga 1.13 if copy_results.keys() == '':
121     results.update(copy_results)
122     else:
123     for file, dict in copy_results.iteritems():
124     er_code = dict['erCode']
125     if er_code == '0':
126     list_ok.append(file)
127     reason = 'Copy succedeed with %s utils'%prot
128     upDict = self.updateReport(file, er_code, reason)
129     copy_results.update(upDict)
130     elif er_code == '60303': list_existing.append( file )
131     else: list_retry.append( file )
132     results.update(copy_results)
133     if len(list_ok) != 0:
134     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
135     if self.debug : print msg
136     if len(list_ok) == len(list_files) :
137     break
138 spiga 1.1 else:
139 spiga 1.13 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
140     if len(list_retry): list_files = list_retry
141     else: break
142 ewv 1.7
143     #### TODO Daniele
144 spiga 1.1 #check is something fails and created related dict
145 ewv 1.7 # backup = self.analyzeResults(results)
146    
147     # if backup :
148 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
149     # #backupDict = self.backup()
150 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
151 spiga 1.1 # results.update(backupDict)
152     # print msg
153     return results
154    
155     def initializeApi(self, protocol ):
156     """
157 ewv 1.7 Instantiate storage interface
158 spiga 1.1 """
159 ewv 1.7 source_prot = protocol
160     dest_prot = protocol
161 spiga 1.8 if not self.params['source'] : source_prot = 'local'
162     Source_SE = self.storageInterface( self.params['source'], source_prot )
163     if not self.params['destination'] : dest_prot = 'local'
164     Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
165 spiga 1.1
166     if self.debug :
167 spiga 1.8 print '(source=%s, protocol=%s)'%(self.params['source'], source_prot)
168     print '(destination=%s, protocol=%s)'%(self.params['destination'], dest_prot)
169 spiga 1.1
170     return Source_SE, Destination_SE
171    
172 spiga 1.8 def copy( self, list_file, protocol, options ):
173 spiga 1.1 """
174 ewv 1.7 Make the real file copy using SE API
175 spiga 1.1 """
176     if self.debug :
177 ewv 1.7 print 'copy(): using %s protocol'%protocol
178 spiga 1.13 try:
179     Source_SE, Destination_SE = self.initializeApi( protocol )
180     except Exception, ex:
181     return self.updateReport('', '-1', str(ex))
182 ewv 1.14
183 ewv 1.7 # create remote dir
184 spiga 1.10 if protocol in ['gridftp','rfio']:
185 spiga 1.13 try:
186     self.createDir( Destination_SE, protocol )
187     except Exception, ex:
188     return self.updateReport('', '60316', str(ex))
189 spiga 1.1
190     ## prepare for real copy ##
191 spiga 1.8 try :
192     sbi = SBinterface( Source_SE, Destination_SE )
193     sbi_dest = SBinterface(Destination_SE)
194 spiga 1.11 except ProtocolMismatch, ex:
195     msg = str(ex)+'\n'
196 spiga 1.8 msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
197 spiga 1.13 return self.updateReport('', '-1', str(ex))
198 spiga 1.1
199     results = {}
200 ewv 1.7 ## loop over the complete list of files
201     for filetocopy in list_file:
202 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
203 spiga 1.13 try :
204     ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
205     except Exception, ex:
206     return self.updateReport(filetocopy, '-1', str(ex))
207 ewv 1.7 if ErCode == '0':
208 spiga 1.8 ErCode, msg = self.makeCopy( sbi, filetocopy , options )
209     if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
210 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
211     return results
212 ewv 1.7
213 spiga 1.1
214     def storageInterface( self, endpoint, protocol ):
215     """
216 ewv 1.7 Create the storage interface.
217 spiga 1.1 """
218     try:
219     interface = SElement( FullPath(endpoint), protocol )
220 spiga 1.11 except ProtocolUnknown, ex:
221 ewv 1.7 msg = ''
222 spiga 1.1 if self.debug : msg = str(ex)+'\n'
223 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
224 spiga 1.13 raise Exception(msg)
225 spiga 1.1
226     return interface
227    
228     def createDir(self, Destination_SE, protocol):
229     """
230 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
231 ewv 1.7 this should be transparent at SE API level.
232 spiga 1.1 """
233 ewv 1.14 msg = ''
234 spiga 1.1 try:
235     action = SBinterface( Destination_SE )
236     action.createDir()
237 fanzago 1.15 if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
238 spiga 1.11 except TransferException, ex:
239     msg = str(ex)
240     if self.debug :
241     msg += str(ex.detail)+'\n'
242 spiga 1.12 msg += str(ex.output)+'\n'
243 spiga 1.13 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
244 fanzago 1.15 raise Exceptions(msg)
245 spiga 1.11 except OperationException, ex:
246     msg = str(ex)
247     if self.debug : msg += str(ex.detail)+'\n'
248 mcinquil 1.16 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
249 spiga 1.1
250 spiga 1.13 return msg
251 spiga 1.1
252 spiga 1.8 def checkFileExist( self, sbi, filetocopy ):
253 spiga 1.1 """
254 ewv 1.7 Check if file to copy already exist
255     """
256 spiga 1.8 ErCode = '0'
257     msg = ''
258 spiga 1.1 try:
259     check = sbi.checkExists(filetocopy)
260 spiga 1.11 except OperationException, ex:
261     msg = str(ex)
262     if self.debug :
263     msg += str(ex.detail)+'\n'
264 spiga 1.12 msg += str(ex.output)+'\n'
265 spiga 1.13 msg +='ERROR: problems checkig if file %s already exist'%filetocopy
266     raise Exception(msg)
267 spiga 1.11 except WrongOption, ex:
268     msg = str(ex)
269     if self.debug :
270     msg += str(ex.detail)+'\n'
271 spiga 1.12 msg += str(ex.output)+'\n'
272 spiga 1.13 msg +='ERROR problems checkig if file % already exist'%filetocopy
273     raise Exception(msg)
274 ewv 1.14 if check :
275 ewv 1.7 ErCode = '60303'
276 spiga 1.1 msg = "file %s already exist"%filetocopy
277 ewv 1.14
278 spiga 1.13 return ErCode, msg
279 spiga 1.1
280 spiga 1.8 def makeCopy(self, sbi, filetocopy, option ):
281 spiga 1.1 """
282 ewv 1.7 call the copy API.
283 spiga 1.1 """
284 ewv 1.7 path = os.path.dirname(filetocopy)
285 spiga 1.1 file_name = os.path.basename(filetocopy)
286     source_file = filetocopy
287     dest_file = file_name ## to be improved supporting changing file name TODO
288 spiga 1.8 if self.params['source'] == '' and path == '':
289 spiga 1.1 source_file = os.path.abspath(filetocopy)
290 spiga 1.8 elif self.params['destination'] =='':
291 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
292 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
293 ewv 1.7 source_file = file_name
294 spiga 1.8
295 spiga 1.1 ErCode = '0'
296     msg = ''
297 ewv 1.7
298 spiga 1.1 try:
299 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
300 spiga 1.11 except TransferException, ex:
301     msg = str(ex)
302     if self.debug :
303     msg += str(ex.detail)+'\n'
304 spiga 1.12 msg += str(ex.output)+'\n'
305 spiga 1.11 msg += "Problem copying %s file" % filetocopy
306 spiga 1.1 ErCode = '60307'
307 spiga 1.11 except WrongOption, ex:
308     msg = str(ex)
309     if self.debug :
310     msg += str(ex.detail)+'\n'
311 spiga 1.12 msg += str(ex.output)+'\n'
312 spiga 1.13 msg += "Problem copying %s file" % filetocopy
313     ErCode = '60307'
314 ewv 1.14
315 spiga 1.11 ## TO BE IMPLEMENTED if NEEDED
316     ## NOTE: SE API Already available
317 ewv 1.14 # if self.protocol.find('srm') : self.checkSize( sbi, filetocopy )
318    
319 spiga 1.1 return ErCode, msg
320 ewv 1.7
321     def backup(self):
322 spiga 1.1 """
323     Check infos from TFC using existing api obtaining:
324     1)destination
325     2)protocol
326     """
327     return
328    
329 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
330     """
331     Update the final stage out infos
332     """
333     jobStageInfo={}
334     jobStageInfo['erCode']=erCode
335     jobStageInfo['reason']=reason
336     jobStageInfo['lfn']=lfn
337     jobStageInfo['se']=se
338 spiga 1.1
339 spiga 1.8 report = { file : jobStageInfo}
340     return report
341 ewv 1.7
342 spiga 1.8 def finalReport( self , results ):
343     """
344     It a list of LFNs for each SE where data are stored.
345     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
346 spiga 1.1 """
347 spiga 1.8 outFile = open('cmscpReport.sh',"a")
348     cmscp_exit_status = 0
349     txt = ''
350 ewv 1.14 for file, dict in results.iteritems():
351     if file:
352 spiga 1.11 if dict['lfn']=='':
353     lfn = '$LFNBaseName/'+os.path.basename(file)
354     se = '$SE'
355     else:
356 mcinquil 1.16 lfn = dict['lfn']+os.path.basename(file)
357 spiga 1.11 se = dict['se']
358     #dict['lfn'] # to be implemented
359     txt += 'echo "Report for File: '+file+'"\n'
360     txt += 'echo "LFN: '+lfn+'"\n'
361     txt += 'echo "StorageElement: '+se+'"\n'
362     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
363     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
364     if dict['erCode'] != '0':
365     cmscp_exit_status = dict['erCode']
366     cmscp_exit_status = dict['erCode']
367 spiga 1.12 else:
368     cmscp_exit_status = dict['erCode']
369     cmscp_exit_status = dict['erCode']
370 spiga 1.8 txt += '\n'
371     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
372     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
373     outFile.write(str(txt))
374     outFile.close()
375     return
376    
377    
378     def usage():
379    
380     msg="""
381     required parameters:
382     --source :: REMOTE :
383     --destination :: REMOTE :
384     --debug :
385     --inFile :: absPath : or name NOT RELATIVE PATH
386     --outFIle :: onlyNAME : NOT YET SUPPORTED
387    
388     optional parameters
389     """
390 ewv 1.14 print msg
391 spiga 1.8
392 ewv 1.14 return
393 spiga 1.8
394     def HelpOptions(opts=[]):
395     """
396     Check otps, print help if needed
397 ewv 1.14 prepare dict = { opt : value }
398 spiga 1.8 """
399     dict_args = {}
400     if len(opts):
401     for opt, arg in opts:
402 ewv 1.14 dict_args[opt.split('--')[1]] = arg
403 spiga 1.8 if opt in ('-h','-help','--help') :
404     usage()
405     sys.exit(0)
406     return dict_args
407     else:
408     usage()
409     sys.exit(0)
410 spiga 1.1
411     if __name__ == '__main__' :
412 spiga 1.8
413 ewv 1.14 import getopt
414 spiga 1.8
415     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
416     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
417 ewv 1.14 try:
418     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
419 spiga 1.8 except getopt.GetoptError, err:
420     print err
421     HelpOptions()
422     sys.exit(2)
423 ewv 1.14
424 spiga 1.8 dictArgs = HelpOptions(opts)
425 spiga 1.1 try:
426 spiga 1.8 cmscp_ = cmscp(dictArgs)
427 spiga 1.1 cmscp_.run()
428 spiga 1.11 except Exception, ex :
429     print str(ex)
430 spiga 1.1