ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.15
Committed: Wed Oct 15 16:25:09 2008 UTC (16 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_1_pre1, CRAB_2_4_0_Tutorial, CRAB_2_4_0_Tutorial_pre1, CRAB_2_4_0
Changes since 1.14: +2 -1 lines
Log Message:
raise for creation dir error

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 spiga 1.8 import sys, string
4 spiga 1.1 import os, popen2
5 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
7 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
8 spiga 1.1
9    
10     class cmscp:
11 spiga 1.8 def __init__(self, args):
12 spiga 1.1 """
13     cmscp
14    
15 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
17     input:
18     $1 middleware (CAF, LSF, LCG, OSG)
19 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
20     $3 if needed: file name (the output file name)
21     $5 remote SE (complete endpoint)
22 ewv 1.7 $6 srm version
23 spiga 1.1 output:
24     return 0 if all ok
25     return 60307 if srmcp failed
26     return 60303 if file already exists in the SE
27     """
28 spiga 1.8
29 spiga 1.1 #set default
30 spiga 1.8 self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
31 spiga 1.11 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
32 ewv 1.14 self.debug = 0
33    
34 spiga 1.8 self.params.update( args )
35 ewv 1.7
36 spiga 1.1 return
37    
38 spiga 1.8 def processOptions( self ):
39 spiga 1.1 """
40 spiga 1.8 check command line parameter
41 spiga 1.1 """
42 spiga 1.8
43 ewv 1.14 if 'help' in self.params.keys(): HelpOptions()
44     if 'debug' in self.params.keys(): self.debug = 1
45 ewv 1.7
46     # source and dest cannot be undefined at same time
47 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
48     HelpOptions()
49    
50 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
51 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
52     HelpOptions()
53    
54 ewv 1.7 # input file must be defined
55 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
56 spiga 1.1 else:
57 spiga 1.8 file_to_copy=[]
58 ewv 1.14 if self.params['inputFileList'].find(','):
59 spiga 1.8 [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
60 ewv 1.7 else:
61 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
62     self.params['inputFileList'] = file_to_copy
63 ewv 1.7
64 spiga 1.1 ## TO DO:
65     #### add check for outFiles
66     #### add map {'inFileNAME':'outFileNAME'} to change out name
67    
68     return
69    
70 ewv 1.7 def run( self ):
71 spiga 1.1 """
72 ewv 1.7 Check if running on UI (no $middleware) or
73     on WN (on the Grid), and take different action
74 spiga 1.1 """
75 spiga 1.8
76     self.processOptions()
77     # stage out from WN
78     if self.params['middleware'] :
79     results = self.stager(self.params['middleware'],self.params['inputFileList'])
80     self.finalReport(results)
81     # Local interaction with SE
82 spiga 1.1 else:
83 spiga 1.8 results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.protocols['option'] )
84     return results
85 spiga 1.1
86 spiga 1.8 def setProtocol( self, middleware ):
87 spiga 1.1 """
88     define the allowed potocols based on $middlware
89 ewv 1.7 which depend on scheduler
90 spiga 1.1 """
91 spiga 1.8 # default To be used with "middleware"
92 spiga 1.13 lcgOpt={'srmv1':'-b -D srmv1 -t 2400 --verbose',
93     'srmv2':'-b -D srmv2 -t 2400 --verbose'}
94     srmOpt={'srmv1':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
95     'srmv2':' -report ./srmcp.report -retry_timeout 480000 -retry_num 3 '}
96 spiga 1.8 rfioOpt=''
97    
98 ewv 1.14 supported_protocol = None
99     if middleware.lower() in ['osg','lcg','condor']:
100 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
101     (self.params['srm_version'],srmOpt[self.params['srm_version']])]
102 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
103 ewv 1.14 supported_protocol = [('rfio',rfioOpt)]
104 spiga 1.1 else:
105 ewv 1.7 ## here we can add support for any kind of protocol,
106 spiga 1.1 ## maybe some local schedulers need something dedicated
107     pass
108     return supported_protocol
109    
110 spiga 1.8 def stager( self, middleware, list_files ):
111 spiga 1.1 """
112     Implement the logic for remote stage out
113     """
114 spiga 1.6 results={}
115 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
116 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
117 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
118 ewv 1.7 list_retry = []
119     list_existing = []
120     list_ok = []
121 spiga 1.13 if copy_results.keys() == '':
122     results.update(copy_results)
123     else:
124     for file, dict in copy_results.iteritems():
125     er_code = dict['erCode']
126     if er_code == '0':
127     list_ok.append(file)
128     reason = 'Copy succedeed with %s utils'%prot
129     upDict = self.updateReport(file, er_code, reason)
130     copy_results.update(upDict)
131     elif er_code == '60303': list_existing.append( file )
132     else: list_retry.append( file )
133     results.update(copy_results)
134     if len(list_ok) != 0:
135     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
136     if self.debug : print msg
137     if len(list_ok) == len(list_files) :
138     break
139 spiga 1.1 else:
140 spiga 1.13 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
141     if len(list_retry): list_files = list_retry
142     else: break
143 ewv 1.7
144     #### TODO Daniele
145 spiga 1.1 #check is something fails and created related dict
146 ewv 1.7 # backup = self.analyzeResults(results)
147    
148     # if backup :
149 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
150     # #backupDict = self.backup()
151 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
152 spiga 1.1 # results.update(backupDict)
153     # print msg
154     return results
155    
156     def initializeApi(self, protocol ):
157     """
158 ewv 1.7 Instantiate storage interface
159 spiga 1.1 """
160 ewv 1.7 source_prot = protocol
161     dest_prot = protocol
162 spiga 1.8 if not self.params['source'] : source_prot = 'local'
163     Source_SE = self.storageInterface( self.params['source'], source_prot )
164     if not self.params['destination'] : dest_prot = 'local'
165     Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
166 spiga 1.1
167     if self.debug :
168 spiga 1.8 print '(source=%s, protocol=%s)'%(self.params['source'], source_prot)
169     print '(destination=%s, protocol=%s)'%(self.params['destination'], dest_prot)
170 spiga 1.1
171     return Source_SE, Destination_SE
172    
173 spiga 1.8 def copy( self, list_file, protocol, options ):
174 spiga 1.1 """
175 ewv 1.7 Make the real file copy using SE API
176 spiga 1.1 """
177     if self.debug :
178 ewv 1.7 print 'copy(): using %s protocol'%protocol
179 spiga 1.13 try:
180     Source_SE, Destination_SE = self.initializeApi( protocol )
181     except Exception, ex:
182     return self.updateReport('', '-1', str(ex))
183 ewv 1.14
184 ewv 1.7 # create remote dir
185 spiga 1.10 if protocol in ['gridftp','rfio']:
186 spiga 1.13 try:
187     self.createDir( Destination_SE, protocol )
188     except Exception, ex:
189     return self.updateReport('', '60316', str(ex))
190 spiga 1.1
191     ## prepare for real copy ##
192 spiga 1.8 try :
193     sbi = SBinterface( Source_SE, Destination_SE )
194     sbi_dest = SBinterface(Destination_SE)
195 spiga 1.11 except ProtocolMismatch, ex:
196     msg = str(ex)+'\n'
197 spiga 1.8 msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
198 spiga 1.13 return self.updateReport('', '-1', str(ex))
199 spiga 1.1
200     results = {}
201 ewv 1.7 ## loop over the complete list of files
202     for filetocopy in list_file:
203 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
204 spiga 1.13 try :
205     ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
206     except Exception, ex:
207     return self.updateReport(filetocopy, '-1', str(ex))
208 ewv 1.7 if ErCode == '0':
209 spiga 1.8 ErCode, msg = self.makeCopy( sbi, filetocopy , options )
210     if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
211 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
212     return results
213 ewv 1.7
214 spiga 1.1
215     def storageInterface( self, endpoint, protocol ):
216     """
217 ewv 1.7 Create the storage interface.
218 spiga 1.1 """
219     try:
220     interface = SElement( FullPath(endpoint), protocol )
221 spiga 1.11 except ProtocolUnknown, ex:
222 ewv 1.7 msg = ''
223 spiga 1.1 if self.debug : msg = str(ex)+'\n'
224 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
225 spiga 1.13 raise Exception(msg)
226 spiga 1.1
227     return interface
228    
229     def createDir(self, Destination_SE, protocol):
230     """
231 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
232 ewv 1.7 this should be transparent at SE API level.
233 spiga 1.1 """
234 ewv 1.14 msg = ''
235 spiga 1.1 try:
236     action = SBinterface( Destination_SE )
237     action.createDir()
238 fanzago 1.15 if self.debug: msg+= "The directory has been created using protocol %s\n"%protocol
239 spiga 1.11 except TransferException, ex:
240     msg = str(ex)
241     if self.debug :
242     msg += str(ex.detail)+'\n'
243 spiga 1.12 msg += str(ex.output)+'\n'
244 spiga 1.13 msg += "ERROR: problem with the directory creation using %s protocol \n"%protocol
245 fanzago 1.15 raise Exceptions(msg)
246 spiga 1.11 except OperationException, ex:
247     msg = str(ex)
248     if self.debug : msg += str(ex.detail)+'\n'
249 spiga 1.13 msg = "ERROR: problem with the directory creation using %s protocol \n"%protocol
250 spiga 1.1
251 spiga 1.13 return msg
252 spiga 1.1
253 spiga 1.8 def checkFileExist( self, sbi, filetocopy ):
254 spiga 1.1 """
255 ewv 1.7 Check if file to copy already exist
256     """
257 spiga 1.8 ErCode = '0'
258     msg = ''
259 spiga 1.1 try:
260     check = sbi.checkExists(filetocopy)
261 spiga 1.11 except OperationException, ex:
262     msg = str(ex)
263     if self.debug :
264     msg += str(ex.detail)+'\n'
265 spiga 1.12 msg += str(ex.output)+'\n'
266 spiga 1.13 msg +='ERROR: problems checkig if file %s already exist'%filetocopy
267     raise Exception(msg)
268 spiga 1.11 except WrongOption, ex:
269     msg = str(ex)
270     if self.debug :
271     msg += str(ex.detail)+'\n'
272 spiga 1.12 msg += str(ex.output)+'\n'
273 spiga 1.13 msg +='ERROR problems checkig if file % already exist'%filetocopy
274     raise Exception(msg)
275 ewv 1.14 if check :
276 ewv 1.7 ErCode = '60303'
277 spiga 1.1 msg = "file %s already exist"%filetocopy
278 ewv 1.14
279 spiga 1.13 return ErCode, msg
280 spiga 1.1
281 spiga 1.8 def makeCopy(self, sbi, filetocopy, option ):
282 spiga 1.1 """
283 ewv 1.7 call the copy API.
284 spiga 1.1 """
285 ewv 1.7 path = os.path.dirname(filetocopy)
286 spiga 1.1 file_name = os.path.basename(filetocopy)
287     source_file = filetocopy
288     dest_file = file_name ## to be improved supporting changing file name TODO
289 spiga 1.8 if self.params['source'] == '' and path == '':
290 spiga 1.1 source_file = os.path.abspath(filetocopy)
291 spiga 1.8 elif self.params['destination'] =='':
292 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
293 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
294 ewv 1.7 source_file = file_name
295 spiga 1.8
296 spiga 1.1 ErCode = '0'
297     msg = ''
298 ewv 1.7
299 spiga 1.1 try:
300 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
301 spiga 1.11 except TransferException, ex:
302     msg = str(ex)
303     if self.debug :
304     msg += str(ex.detail)+'\n'
305 spiga 1.12 msg += str(ex.output)+'\n'
306 spiga 1.11 msg += "Problem copying %s file" % filetocopy
307 spiga 1.1 ErCode = '60307'
308 spiga 1.11 except WrongOption, ex:
309     msg = str(ex)
310     if self.debug :
311     msg += str(ex.detail)+'\n'
312 spiga 1.12 msg += str(ex.output)+'\n'
313 spiga 1.13 msg += "Problem copying %s file" % filetocopy
314     ErCode = '60307'
315 ewv 1.14
316 spiga 1.11 ## TO BE IMPLEMENTED if NEEDED
317     ## NOTE: SE API Already available
318 ewv 1.14 # if self.protocol.find('srm') : self.checkSize( sbi, filetocopy )
319    
320 spiga 1.1 return ErCode, msg
321 ewv 1.7
322     def backup(self):
323 spiga 1.1 """
324     Check infos from TFC using existing api obtaining:
325     1)destination
326     2)protocol
327     """
328     return
329    
330 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
331     """
332     Update the final stage out infos
333     """
334     jobStageInfo={}
335     jobStageInfo['erCode']=erCode
336     jobStageInfo['reason']=reason
337     jobStageInfo['lfn']=lfn
338     jobStageInfo['se']=se
339 spiga 1.1
340 spiga 1.8 report = { file : jobStageInfo}
341     return report
342 ewv 1.7
343 spiga 1.8 def finalReport( self , results ):
344     """
345     It a list of LFNs for each SE where data are stored.
346     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
347 spiga 1.1 """
348 spiga 1.8 outFile = open('cmscpReport.sh',"a")
349     cmscp_exit_status = 0
350     txt = ''
351 ewv 1.14 for file, dict in results.iteritems():
352     if file:
353 spiga 1.11 if dict['lfn']=='':
354     lfn = '$LFNBaseName/'+os.path.basename(file)
355     se = '$SE'
356     else:
357     lfn = dict['lfn']+os.pat.basename(file)
358     se = dict['se']
359     #dict['lfn'] # to be implemented
360     txt += 'echo "Report for File: '+file+'"\n'
361     txt += 'echo "LFN: '+lfn+'"\n'
362     txt += 'echo "StorageElement: '+se+'"\n'
363     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
364     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
365     if dict['erCode'] != '0':
366     cmscp_exit_status = dict['erCode']
367     cmscp_exit_status = dict['erCode']
368 spiga 1.12 else:
369     cmscp_exit_status = dict['erCode']
370     cmscp_exit_status = dict['erCode']
371 spiga 1.8 txt += '\n'
372     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
373     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
374     outFile.write(str(txt))
375     outFile.close()
376     return
377    
378    
379     def usage():
380    
381     msg="""
382     required parameters:
383     --source :: REMOTE :
384     --destination :: REMOTE :
385     --debug :
386     --inFile :: absPath : or name NOT RELATIVE PATH
387     --outFIle :: onlyNAME : NOT YET SUPPORTED
388    
389     optional parameters
390     """
391 ewv 1.14 print msg
392 spiga 1.8
393 ewv 1.14 return
394 spiga 1.8
395     def HelpOptions(opts=[]):
396     """
397     Check otps, print help if needed
398 ewv 1.14 prepare dict = { opt : value }
399 spiga 1.8 """
400     dict_args = {}
401     if len(opts):
402     for opt, arg in opts:
403 ewv 1.14 dict_args[opt.split('--')[1]] = arg
404 spiga 1.8 if opt in ('-h','-help','--help') :
405     usage()
406     sys.exit(0)
407     return dict_args
408     else:
409     usage()
410     sys.exit(0)
411 spiga 1.1
412     if __name__ == '__main__' :
413 spiga 1.8
414 ewv 1.14 import getopt
415 spiga 1.8
416     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
417     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
418 ewv 1.14 try:
419     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
420 spiga 1.8 except getopt.GetoptError, err:
421     print err
422     HelpOptions()
423     sys.exit(2)
424 ewv 1.14
425 spiga 1.8 dictArgs = HelpOptions(opts)
426 spiga 1.1 try:
427 spiga 1.8 cmscp_ = cmscp(dictArgs)
428 spiga 1.1 cmscp_.run()
429 spiga 1.11 except Exception, ex :
430     print str(ex)
431 spiga 1.1