ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.10
Committed: Sat Oct 4 10:10:21 2008 UTC (16 years, 6 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_0_pre7
Changes since 1.9: +1 -1 lines
Log Message:
typo

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3 spiga 1.8 import sys, string
4 spiga 1.1 import os, popen2
5 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
7    
8    
9     class cmscp:
10 spiga 1.8 def __init__(self, args):
11 spiga 1.1 """
12     cmscp
13    
14 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
15 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
16     input:
17     $1 middleware (CAF, LSF, LCG, OSG)
18 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
19     $3 if needed: file name (the output file name)
20     $5 remote SE (complete endpoint)
21 ewv 1.7 $6 srm version
22 spiga 1.1 output:
23     return 0 if all ok
24     return 60307 if srmcp failed
25     return 60303 if file already exists in the SE
26     """
27 spiga 1.8
28 spiga 1.1 #set default
29 spiga 1.8 self.params = {"source":'', "destination":'', "inputFileList":'', "outputFileList":'', \
30     "protocol":'', "option":'', "middleware":'', "srm_version":''}
31     self.debug = 0
32    
33     self.params.update( args )
34 ewv 1.7
35 spiga 1.1 return
36    
37 spiga 1.8 def processOptions( self ):
38 spiga 1.1 """
39 spiga 1.8 check command line parameter
40 spiga 1.1 """
41 spiga 1.8
42     if 'help' in self.params.keys(): HelpOptions()
43     if 'debug' in self.params.keys(): self.debug = 1
44 ewv 1.7
45     # source and dest cannot be undefined at same time
46 spiga 1.8 if not self.params['source'] and not self.params['destination'] :
47     HelpOptions()
48    
49 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
50 spiga 1.8 if not self.params['middleware'] and not self.params['protocol'] :
51     HelpOptions()
52    
53 ewv 1.7 # input file must be defined
54 spiga 1.8 if not self.params['inputFileList'] : HelpOptions()
55 spiga 1.1 else:
56 spiga 1.8 file_to_copy=[]
57     if self.params['inputFileList'].find(','):
58     [file_to_copy.append(x.strip()) for x in self.params['inputFileList'].split(',')]
59 ewv 1.7 else:
60 spiga 1.8 file_to_copy.append(self.params['inputFileList'])
61     self.params['inputFileList'] = file_to_copy
62 ewv 1.7
63 spiga 1.1 ## TO DO:
64     #### add check for outFiles
65     #### add map {'inFileNAME':'outFileNAME'} to change out name
66    
67     return
68    
69 ewv 1.7 def run( self ):
70 spiga 1.1 """
71 ewv 1.7 Check if running on UI (no $middleware) or
72     on WN (on the Grid), and take different action
73 spiga 1.1 """
74 spiga 1.8
75     self.processOptions()
76     # stage out from WN
77     if self.params['middleware'] :
78     results = self.stager(self.params['middleware'],self.params['inputFileList'])
79     self.finalReport(results)
80     # Local interaction with SE
81 spiga 1.1 else:
82 spiga 1.8 results = self.copy(self.params['inputFilesList'], self.params['protocol'], self.protocols['option'] )
83     return results
84 spiga 1.1
85 spiga 1.8 def setProtocol( self, middleware ):
86 spiga 1.1 """
87     define the allowed potocols based on $middlware
88 ewv 1.7 which depend on scheduler
89 spiga 1.1 """
90 spiga 1.8 # default To be used with "middleware"
91     lcgOpt='-b -D srmv2 --vo cms -t 2400 --verbose'
92     srmOpt='-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3'
93     rfioOpt=''
94    
95     if middleware.lower() in ['osg','lcg']:
96     supported_protocol = [('srm-lcg',lcgOpt),\
97     ('srmv2',srmOpt)]
98     elif middleware.lower() in ['lsf','caf']:
99     supported_protocol = [('rfio',rfioOpt)]
100 spiga 1.1 else:
101 ewv 1.7 ## here we can add support for any kind of protocol,
102 spiga 1.1 ## maybe some local schedulers need something dedicated
103     pass
104     return supported_protocol
105    
106 spiga 1.8 def stager( self, middleware, list_files ):
107 spiga 1.1 """
108     Implement the logic for remote stage out
109     """
110 ewv 1.7 count=0
111 spiga 1.6 results={}
112 spiga 1.8 for prot, opt in self.setProtocol( middleware ):
113 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
114 spiga 1.8 copy_results = self.copy( list_files, prot, opt )
115 ewv 1.7 list_retry = []
116     list_existing = []
117     list_ok = []
118 spiga 1.1 for file, dict in copy_results.iteritems():
119     er_code = dict['erCode']
120     if er_code == '60307': list_retry.append( file )
121     elif er_code == '60303': list_existing.append( file )
122     else:
123     list_ok.append(file)
124     reason = 'Copy succedeed with %s utils'%prot
125     upDict = self.updateReport(file, er_code, reason)
126 ewv 1.7 copy_results.update(upDict)
127 spiga 1.1 results.update(copy_results)
128 ewv 1.7 if len(list_ok) != 0:
129 spiga 1.1 msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
130 spiga 1.8 if self.debug : print msg
131 ewv 1.7 if len(list_ok) == len(list_files) :
132 spiga 1.1 break
133     else:
134 spiga 1.8 if self.debug : print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
135 spiga 1.1 if len(list_retry): list_files = list_retry
136 ewv 1.7 else: break
137     count =+1
138    
139     #### TODO Daniele
140 spiga 1.1 #check is something fails and created related dict
141 ewv 1.7 # backup = self.analyzeResults(results)
142    
143     # if backup :
144 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
145     # #backupDict = self.backup()
146 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
147 spiga 1.1 # results.update(backupDict)
148     # print msg
149     return results
150    
151     def initializeApi(self, protocol ):
152     """
153 ewv 1.7 Instantiate storage interface
154 spiga 1.1 """
155 ewv 1.7 source_prot = protocol
156     dest_prot = protocol
157 spiga 1.8 if not self.params['source'] : source_prot = 'local'
158     Source_SE = self.storageInterface( self.params['source'], source_prot )
159     if not self.params['destination'] : dest_prot = 'local'
160     Destination_SE = self.storageInterface( self.params['destination'], dest_prot )
161 spiga 1.1
162     if self.debug :
163 spiga 1.8 print '(source=%s, protocol=%s)'%(self.params['source'], source_prot)
164     print '(destination=%s, protocol=%s)'%(self.params['destination'], dest_prot)
165 spiga 1.1
166     return Source_SE, Destination_SE
167    
168 spiga 1.8 def copy( self, list_file, protocol, options ):
169 spiga 1.1 """
170 ewv 1.7 Make the real file copy using SE API
171 spiga 1.1 """
172     if self.debug :
173 ewv 1.7 print 'copy(): using %s protocol'%protocol
174 spiga 1.1 Source_SE, Destination_SE = self.initializeApi( protocol )
175    
176 ewv 1.7 # create remote dir
177 spiga 1.10 if protocol in ['gridftp','rfio']:
178 spiga 1.1 self.createDir( Destination_SE, protocol )
179    
180     ## prepare for real copy ##
181 spiga 1.8 try :
182     sbi = SBinterface( Source_SE, Destination_SE )
183     sbi_dest = SBinterface(Destination_SE)
184     except Exception, ex:
185     msg = ''
186     if self.debug : msg = str(ex)+'\n'
187     msg += "ERROR : Unable to create SBinterface with %s protocol\n"%protocol
188     raise msg
189 spiga 1.1
190     results = {}
191 ewv 1.7 ## loop over the complete list of files
192     for filetocopy in list_file:
193 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
194 ewv 1.7 ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
195     if ErCode == '0':
196 spiga 1.8 ErCode, msg = self.makeCopy( sbi, filetocopy , options )
197     if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy), ErCode)
198 spiga 1.1 results.update( self.updateReport(filetocopy, ErCode, msg))
199     return results
200 ewv 1.7
201 spiga 1.1
202     def storageInterface( self, endpoint, protocol ):
203     """
204 ewv 1.7 Create the storage interface.
205 spiga 1.1 """
206     try:
207     interface = SElement( FullPath(endpoint), protocol )
208     except Exception, ex:
209 ewv 1.7 msg = ''
210 spiga 1.1 if self.debug : msg = str(ex)+'\n'
211 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
212 spiga 1.8 raise msg
213 spiga 1.1
214     return interface
215    
216     def checkDir(self, Destination_SE, protocol):
217     '''
218     ToBeImplemented NEEDED for castor
219 ewv 1.7 '''
220 spiga 1.1 return
221 ewv 1.7
222 spiga 1.1 def createDir(self, Destination_SE, protocol):
223     """
224 spiga 1.8 Create remote dir for gsiftp REALLY TEMPORARY
225 ewv 1.7 this should be transparent at SE API level.
226 spiga 1.1 """
227     ErCode = '0'
228 spiga 1.9 msg = ''
229 spiga 1.1 try:
230     action = SBinterface( Destination_SE )
231     action.createDir()
232     if self.debug: print "The directory has been created using protocol %s\n"%protocol
233     except Exception, ex:
234 ewv 1.7 msg = ''
235 spiga 1.1 if self.debug : msg = str(ex)+'\n'
236 spiga 1.8 msg = "ERROR: problem with the directory creation using %s protocol \n"%protocol
237 ewv 1.7 ErCode = '60316'
238 spiga 1.1
239 spiga 1.8 return ErCode, msg
240 spiga 1.1
241 spiga 1.8 def checkFileExist( self, sbi, filetocopy ):
242 spiga 1.1 """
243 ewv 1.7 Check if file to copy already exist
244     """
245 spiga 1.8 ErCode = '0'
246     msg = ''
247 spiga 1.1 try:
248     check = sbi.checkExists(filetocopy)
249     except Exception, ex:
250 ewv 1.7 msg = ''
251 spiga 1.1 if self.debug : msg = str(ex)+'\n'
252 spiga 1.8 msg +='problems checkig if file already exist'
253     raise msg
254     if check :
255 ewv 1.7 ErCode = '60303'
256 spiga 1.1 msg = "file %s already exist"%filetocopy
257    
258 ewv 1.7 return ErCode,msg
259 spiga 1.1
260 spiga 1.8 def makeCopy(self, sbi, filetocopy, option ):
261 spiga 1.1 """
262 ewv 1.7 call the copy API.
263 spiga 1.1 """
264 ewv 1.7 path = os.path.dirname(filetocopy)
265 spiga 1.1 file_name = os.path.basename(filetocopy)
266     source_file = filetocopy
267     dest_file = file_name ## to be improved supporting changing file name TODO
268 spiga 1.8 if self.params['source'] == '' and path == '':
269 spiga 1.1 source_file = os.path.abspath(filetocopy)
270 spiga 1.8 elif self.params['destination'] =='':
271 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
272 spiga 1.8 elif self.params['source'] != '' and self.params['destination'] != '' :
273 ewv 1.7 source_file = file_name
274 spiga 1.8
275 spiga 1.1 ErCode = '0'
276     msg = ''
277 ewv 1.7
278 spiga 1.1 try:
279 spiga 1.8 sbi.copy( source_file , dest_file , opt = option)
280     #if self.protocol == 'srm' : self.checkSize( sbi, filetocopy ) ## TODO
281 spiga 1.1 except Exception, ex:
282 ewv 1.7 msg = ''
283 spiga 1.1 if self.debug : msg = str(ex)+'\n'
284 ewv 1.7 msg = "Problem copying %s file" % filetocopy
285 spiga 1.1 ErCode = '60307'
286    
287     return ErCode, msg
288 ewv 1.7
289 spiga 1.1 '''
290     def checkSize()
291     """
292 ewv 1.7 Using srm needed a check of the ouptut file size.
293 spiga 1.1 """
294 ewv 1.7
295 spiga 1.1 echo "--> remoteSize = $remoteSize"
296     ## for local file
297     localSize=$(stat -c%s "$path_out_file")
298     echo "--> localSize = $localSize"
299     if [ $localSize != $remoteSize ]; then
300     echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
301     echo "Copy failed: removing remote file $destination"
302     srmrm $destination
303     cmscp_exit_status=60307
304 ewv 1.7
305    
306 spiga 1.1 echo "Problem copying $path_out_file to $destination with srmcp command"
307     StageOutExitStatusReason='remote and local file dimension not match'
308     echo "StageOutReport = `cat ./srmcp.report`"
309 ewv 1.7 '''
310     def backup(self):
311 spiga 1.1 """
312     Check infos from TFC using existing api obtaining:
313     1)destination
314     2)protocol
315     """
316     return
317    
318 spiga 1.8 def updateReport(self, file, erCode, reason, lfn='', se='' ):
319     """
320     Update the final stage out infos
321     """
322     jobStageInfo={}
323     jobStageInfo['erCode']=erCode
324     jobStageInfo['reason']=reason
325     jobStageInfo['lfn']=lfn
326     jobStageInfo['se']=se
327 spiga 1.1
328 spiga 1.8 report = { file : jobStageInfo}
329     return report
330 ewv 1.7
331 spiga 1.8 def finalReport( self , results ):
332     """
333     It a list of LFNs for each SE where data are stored.
334     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
335 spiga 1.1 """
336 spiga 1.8 outFile = open('cmscpReport.sh',"a")
337     cmscp_exit_status = 0
338     txt = ''
339     for file, dict in results.iteritems():
340     if dict['lfn']=='':
341     lfn = '$LFNBaseName/'+os.path.basename(file)
342     se = '$SE'
343     else:
344     lfn = dict['lfn']+os.pat.basename(file)
345     se = dict['se']
346     #dict['lfn'] # to be implemented
347     txt += 'echo "Report for File: '+file+'"\n'
348     txt += 'echo "LFN: '+lfn+'"\n'
349     txt += 'echo "StorageElement: '+se+'"\n'
350     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
351     txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
352     if dict['erCode'] != '0':
353     cmscp_exit_status = dict['erCode']
354     txt += '\n'
355     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
356     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
357     outFile.write(str(txt))
358     outFile.close()
359     return
360    
361    
362     def usage():
363    
364     msg="""
365     required parameters:
366     --source :: REMOTE :
367     --destination :: REMOTE :
368     --debug :
369     --inFile :: absPath : or name NOT RELATIVE PATH
370     --outFIle :: onlyNAME : NOT YET SUPPORTED
371    
372     optional parameters
373     """
374     print msg
375    
376     return
377    
378     def HelpOptions(opts=[]):
379     """
380     Check otps, print help if needed
381     prepare dict = { opt : value }
382     """
383     dict_args = {}
384     if len(opts):
385     for opt, arg in opts:
386     dict_args[opt.split('--')[1]] = arg
387     if opt in ('-h','-help','--help') :
388     usage()
389     sys.exit(0)
390     return dict_args
391     else:
392     usage()
393     sys.exit(0)
394 spiga 1.1
395     if __name__ == '__main__' :
396 spiga 1.8
397     import getopt
398    
399     allowedOpt = ["source=", "destination=", "inputFileList=", "outputFileList=", \
400     "protocol=","option=", "middleware=", "srm_version=", "debug", "help"]
401     try:
402     opts, args = getopt.getopt( sys.argv[1:], "", allowedOpt )
403     except getopt.GetoptError, err:
404     print err
405     HelpOptions()
406     sys.exit(2)
407    
408     dictArgs = HelpOptions(opts)
409 spiga 1.1 try:
410 spiga 1.8 cmscp_ = cmscp(dictArgs)
411 spiga 1.1 cmscp_.run()
412     except:
413     pass
414