ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.12
Committed: Thu Oct 9 11:50:20 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.11: +10 -9 lines
Log Message:
syntax and typos

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 spiga 1.8 import sys, string
4 spiga 1.1 import os, popen2
5 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
7 spiga 1.11 from ProdCommon.Storage.SEAPI.Exceptions import *
8 spiga 1.1
9    
10     class cmscp:
11 spiga 1.8 def __init__(self, args):
12 spiga 1.1 """
13     cmscp
14    
15 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
17     input:
18     $1 middleware (CAF, LSF, LCG, OSG)
19 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
20     $3 if needed: file name (the output file name)
21     $5 remote SE (complete endpoint)
22 ewv 1.7 $6 srm version
23 spiga 1.1 output:
24     return 0 if all ok
25     return 60307 if srmcp failed
26     return 60303 if file already exists in the SE
27     """
28 spiga 1.8
29 spiga 1.1 #set default
30 spiga 1.8 self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
31 spiga 1.11 "protocol":'', "option":'', "middleware":'', "srm_version":'srmv2'}
32 spiga 1.8 self.debug = 0
33    
34     self.params.update( args )
35 ewv 1.7
36 spiga 1.1 return
37    
38 spiga 1.8 def processOptions( self ):
39 spiga 1.1 """
40 spiga 1.8 check command line parameter
41 spiga 1.1 """
42 spiga 1.8
43     if 'help' in self.params.keys(): HelpOptions()
44     if 'debug' in self.params.keys(): self.debug = 1
45 ewv 1.7
46     # source and dest cannot be undefined at same time
47 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
48     HelpOptions()
49    
50 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
51 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
52     HelpOptions()
53    
54 ewv 1.7 # input file must be defined
55 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
56 spiga 1.1 else:
57 spiga 1.8 file_to_copy=[]
58     if self.params['inputFileList'].find(','):
59     [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
60 ewv 1.7 else:
61 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
62     self.params['inputFileList'] = file_to_copy
63 ewv 1.7
64 spiga 1.1 ## TO DO:
65     #### add check for outFiles
66     #### add map {'inFileNAME':'outFileNAME'} to change out name
67    
68     return
69    
70 ewv 1.7 def run( self ):
71 spiga 1.1 """
72 ewv 1.7 Check if running on UI (no $middleware) or
73     on WN (on the Grid), and take different action
74 spiga 1.1 """
75 spiga 1.8
76     self.processOptions()
77     # stage out from WN
78     if self.params['middleware'] :
79     results = self.stager(self.params['middleware'],self.params['inputFileList'])
80     self.finalReport(results)
81     # Local interaction with SE
82 spiga 1.1 else:
83 spiga 1.8 results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.protocols['option'] )
84     return results
85 spiga 1.1
86 spiga 1.8 def setProtocol( self, middleware ):
87 spiga 1.1 """
88     define the allowed potocols based on $middlware
89 ewv 1.7 which depend on scheduler
90 spiga 1.1 """
91 spiga 1.8 # default To be used with "middleware"
92 spiga 1.11 lcgOpt={'srmv1':'-b -D srmv1 --vo cms -t 2400 --verbose',
93     'srmv2':'-b -D srmv2 --vo cms -t 2400 --verbose'}
94     srmOpt={'srmv1':'-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -streams_num=1 ',
95     'srmv2':'-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3 -srm_protocol_version 2 '}
96 spiga 1.8 rfioOpt=''
97    
98     if middleware.lower() in ['osg','lcg']:
99 spiga 1.12 supported_protocol = [('srm-lcg',lcgOpt[self.params['srm_version']]),\
100     (self.params['srm_version'],srmOpt[self.params['srm_version']])]
101 spiga 1.8 elif middleware.lower() in ['lsf','caf']:
102     supported_protocol = [('rfio',rfioOpt)]
103 spiga 1.1 else:
104 ewv 1.7 ## here we can add support for any kind of protocol,
105 spiga 1.1 ## maybe some local schedulers need something dedicated
106     pass
107     return supported_protocol
108    
109 spiga 1.8 def stager( self, middleware, list_files ):
110 spiga 1.1 """
111     Implement the logic for remote stage out
112     """
113 ewv 1.7 count=0
114 spiga 1.6 results={}
115 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
116 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
117 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
118 ewv 1.7 list_retry = []
119     list_existing = []
120     list_ok = []
121 spiga 1.1 for file, dict in copy_results.iteritems():
122     er_code = dict['erCode']
123     if er_code == '60307': list_retry.append( file )
124     elif er_code == '60303': list_existing.append( file )
125     else:
126     list_ok.append(file)
127     reason = 'Copy succedeed with %s utils'%prot
128     upDict = self.updateReport(file, er_code, reason)
129 ewv 1.7 copy_results.update(upDict)
130 spiga 1.1 results.update(copy_results)
131 ewv 1.7 if len(list_ok) != 0:
132 spiga 1.1 msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
133 spiga 1.8 if self.debug : print msg
134 ewv 1.7 if len(list_ok) == len(list_files) :
135 spiga 1.1 break
136     else:
137 spiga 1.8 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
138 spiga 1.1 if len(list_retry): list_files = list_retry
139 ewv 1.7 else: break
140     count =+1
141    
142     #### TODO Daniele
143 spiga 1.1 #check is something fails and created related dict
144 ewv 1.7 # backup = self.analyzeResults(results)
145    
146     # if backup :
147 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
148     # #backupDict = self.backup()
149 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
150 spiga 1.1 # results.update(backupDict)
151     # print msg
152     return results
153    
154     def initializeApi(self, protocol ):
155     """
156 ewv 1.7 Instantiate storage interface
157 spiga 1.1 """
158 ewv 1.7 source_prot = protocol
159     dest_prot = protocol
160 spiga 1.8 if not self.params['source'] : source_prot = 'local'
161     Source_SE = self.storageInterface( self.params['source'], source_prot )
162     if not self.params['destination'] : dest_prot = 'local'
163     Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
164 spiga 1.1
165     if self.debug :
166 spiga 1.8 print '(source=%s, protocol=%s)'%(self.params['source'], source_prot)
167     print '(destination=%s, protocol=%s)'%(self.params['destination'], dest_prot)
168 spiga 1.1
169     return Source_SE, Destination_SE
170    
171 spiga 1.8 def copy( self, list_file, protocol, options ):
172 spiga 1.1 """
173 ewv 1.7 Make the real file copy using SE API
174 spiga 1.1 """
175     if self.debug :
176 ewv 1.7 print 'copy(): using %s protocol'%protocol
177 spiga 1.1 Source_SE, Destination_SE = self.initializeApi( protocol )
178    
179 ewv 1.7 # create remote dir
180 spiga 1.10 if protocol in ['gridftp','rfio']:
181 spiga 1.1 self.createDir( Destination_SE, protocol )
182    
183     ## prepare for real copy ##
184 spiga 1.8 try :
185     sbi = SBinterface( Source_SE, Destination_SE )
186     sbi_dest = SBinterface(Destination_SE)
187 spiga 1.11 except ProtocolMismatch, ex:
188     msg = str(ex)+'\n'
189 spiga 1.8 msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
190     raise msg
191 spiga 1.1
192     results = {}
193 ewv 1.7 ## loop over the complete list of files
194     for filetocopy in list_file:
195 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
196 ewv 1.7 ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
197     if ErCode == '0':
198 spiga 1.8 ErCode, msg = self.makeCopy( sbi, filetocopy , options )
199     if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
200 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
201     return results
202 ewv 1.7
203 spiga 1.1
204     def storageInterface( self, endpoint, protocol ):
205     """
206 ewv 1.7 Create the storage interface.
207 spiga 1.1 """
208     try:
209     interface = SElement( FullPath(endpoint), protocol )
210 spiga 1.11 except ProtocolUnknown, ex:
211 ewv 1.7 msg = ''
212 spiga 1.1 if self.debug : msg = str(ex)+'\n'
213 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
214 spiga 1.8 raise msg
215 spiga 1.1
216     return interface
217    
218     def createDir(self, Destination_SE, protocol):
219     """
220 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
221 ewv 1.7 this should be transparent at SE API level.
222 spiga 1.1 """
223     ErCode = '0'
224 spiga 1.9 msg = ''
225 spiga 1.1 try:
226     action = SBinterface( Destination_SE )
227     action.createDir()
228     if self.debug: print "The directory has been created using protocol %s\n"%protocol
229 spiga 1.11 except TransferException, ex:
230     msg = str(ex)
231     if self.debug :
232     msg += str(ex.detail)+'\n'
233 spiga 1.12 msg += str(ex.output)+'\n'
234 spiga 1.11 msg_rep = "ERROR: problem with the directory creation using %s protocol \n"%protocol
235 ewv 1.7 ErCode = '60316'
236 spiga 1.11 res = self.updateReport('', ErCode, msg_rep )
237     self.finalReport( res )
238     raise msg
239     except OperationException, ex:
240     msg = str(ex)
241     if self.debug : msg += str(ex.detail)+'\n'
242     print msg
243 spiga 1.1
244 spiga 1.8 return ErCode, msg
245 spiga 1.1
246 spiga 1.8 def checkFileExist( self, sbi, filetocopy ):
247 spiga 1.1 """
248 ewv 1.7 Check if file to copy already exist
249     """
250 spiga 1.8 ErCode = '0'
251     msg = ''
252 spiga 1.1 try:
253     check = sbi.checkExists(filetocopy)
254 spiga 1.11 except OperationException, ex:
255     msg = str(ex)
256     if self.debug :
257     msg += str(ex.detail)+'\n'
258 spiga 1.12 msg += str(ex.output)+'\n'
259 spiga 1.8 msg +='problems checkig if file already exist'
260     raise msg
261 spiga 1.11 except WrongOption, ex:
262     msg = str(ex)
263     if self.debug :
264     msg += str(ex.detail)+'\n'
265 spiga 1.12 msg += str(ex.output)+'\n'
266 spiga 1.11 raise msg
267 spiga 1.8 if check :
268 ewv 1.7 ErCode = '60303'
269 spiga 1.1 msg = "file %s already exist"%filetocopy
270    
271 ewv 1.7 return ErCode,msg
272 spiga 1.1
273 spiga 1.8 def makeCopy(self, sbi, filetocopy, option ):
274 spiga 1.1 """
275 ewv 1.7 call the copy API.
276 spiga 1.1 """
277 ewv 1.7 path = os.path.dirname(filetocopy)
278 spiga 1.1 file_name = os.path.basename(filetocopy)
279     source_file = filetocopy
280     dest_file = file_name ## to be improved supporting changing file name TODO
281 spiga 1.8 if self.params['source'] == '' and path == '':
282 spiga 1.1 source_file = os.path.abspath(filetocopy)
283 spiga 1.8 elif self.params['destination'] =='':
284 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
285 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
286 ewv 1.7 source_file = file_name
287 spiga 1.8
288 spiga 1.1 ErCode = '0'
289     msg = ''
290 ewv 1.7
291 spiga 1.1 try:
292 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
293 spiga 1.11 except TransferException, ex:
294     msg = str(ex)
295     if self.debug :
296     msg += str(ex.detail)+'\n'
297 spiga 1.12 msg += str(ex.output)+'\n'
298 spiga 1.11 msg += "Problem copying %s file" % filetocopy
299 spiga 1.1 ErCode = '60307'
300 spiga 1.11 except WrongOption, ex:
301     msg = str(ex)
302     if self.debug :
303     msg += str(ex.detail)+'\n'
304 spiga 1.12 msg += str(ex.output)+'\n'
305 spiga 1.11 raise msg
306    
307     ## TO BE IMPLEMENTED if NEEDED
308     ## NOTE: SE API Already available
309     # if self.protocol.find('srm') : self.checkSize( sbi, filetocopy )
310    
311 spiga 1.1 return ErCode, msg
312 ewv 1.7
313     def backup(self):
314 spiga 1.1 """
315     Check infos from TFC using existing api obtaining:
316     1)destination
317     2)protocol
318     """
319     return
320    
321 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
322     """
323     Update the final stage out infos
324     """
325     jobStageInfo={}
326     jobStageInfo['erCode']=erCode
327     jobStageInfo['reason']=reason
328     jobStageInfo['lfn']=lfn
329     jobStageInfo['se']=se
330 spiga 1.1
331 spiga 1.8 report = { file : jobStageInfo}
332     return report
333 ewv 1.7
334 spiga 1.8 def finalReport( self , results ):
335     """
336     It a list of LFNs for each SE where data are stored.
337     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
338 spiga 1.1 """
339 spiga 1.8 outFile = open('cmscpReport.sh',"a")
340     cmscp_exit_status = 0
341     txt = ''
342 spiga 1.11 for file, dict in results.iteritems():
343     if file:
344     if dict['lfn']=='':
345     lfn = '$LFNBaseName/'+os.path.basename(file)
346     se = '$SE'
347     else:
348     lfn = dict['lfn']+os.pat.basename(file)
349     se = dict['se']
350     #dict['lfn'] # to be implemented
351     txt += 'echo "Report for File: '+file+'"\n'
352     txt += 'echo "LFN: '+lfn+'"\n'
353     txt += 'echo "StorageElement: '+se+'"\n'
354     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
355     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
356     if dict['erCode'] != '0':
357     cmscp_exit_status = dict['erCode']
358     cmscp_exit_status = dict['erCode']
359 spiga 1.12 else:
360     cmscp_exit_status = dict['erCode']
361     cmscp_exit_status = dict['erCode']
362 spiga 1.8 txt += '\n'
363     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
364     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
365     outFile.write(str(txt))
366     outFile.close()
367     return
368    
369    
370     def usage():
371    
372     msg="""
373     required parameters:
374     --source :: REMOTE :
375     --destination :: REMOTE :
376     --debug :
377     --inFile :: absPath : or name NOT RELATIVE PATH
378     --outFIle :: onlyNAME : NOT YET SUPPORTED
379    
380     optional parameters
381     """
382     print msg
383    
384     return
385    
386     def HelpOptions(opts=[]):
387     """
388     Check otps, print help if needed
389     prepare dict = { opt : value }
390     """
391     dict_args = {}
392     if len(opts):
393     for opt, arg in opts:
394     dict_args[opt.split('--')[1]] = arg
395     if opt in ('-h','-help','--help') :
396     usage()
397     sys.exit(0)
398     return dict_args
399     else:
400     usage()
401     sys.exit(0)
402 spiga 1.1
403     if __name__ == '__main__' :
404 spiga 1.8
405     import getopt
406    
407     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
408     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
409     try:
410     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
411     except getopt.GetoptError, err:
412     print err
413     HelpOptions()
414     sys.exit(2)
415    
416     dictArgs = HelpOptions(opts)
417 spiga 1.1 try:
418 spiga 1.8 cmscp_ = cmscp(dictArgs)
419 spiga 1.1 cmscp_.run()
420 spiga 1.11 except Exception, ex :
421     print str(ex)
422 spiga 1.1