ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.7
Committed: Thu Oct 2 16:30:07 2008 UTC (16 years, 6 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.6: +115 -115 lines
Log Message:
Exception being thrown in exception handler

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3     import sys, getopt, string
4     import os, popen2
5 ewv 1.7 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6 spiga 1.1 from ProdCommon.Storage.SEAPI.SBinterface import *
7    
8    
9    
10     class cmscp:
11     def __init__(self, argv):
12     """
13     cmscp
14    
15 ewv 1.7 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16 spiga 1.1 including success checking version also for CAF using rfcp command to copy the output to SE
17     input:
18     $1 middleware (CAF, LSF, LCG, OSG)
19 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
20     $3 if needed: file name (the output file name)
21     $5 remote SE (complete endpoint)
22 ewv 1.7 $6 srm version
23 spiga 1.1 output:
24     return 0 if all ok
25     return 60307 if srmcp failed
26     return 60303 if file already exists in the SE
27     """
28     #set default
29     self.debug = 0
30     self.source = ''
31 ewv 1.7 self.destination = ''
32 spiga 1.1 self.file_to_copy = []
33     self.remote_file_name = []
34     self.protocol = ''
35     self.middleware = ''
36     self.srmv = ''
37 spiga 1.6
38     # default for stage out approach To be used with "middleware"
39 ewv 1.7 self.lcgOpt='-b -D srmv2 --vo cms -t 2400 --verbose'
40 spiga 1.6 self.srmOpt='-debug=true -report ./srmcp.report -retry_timeout 480000 -retry_num 3'
41     self.rfioOpt=''
42     # default to be used with protocol
43 spiga 1.4 self.opt=''
44 spiga 1.6
45 spiga 1.1 try:
46     opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
47     "protocol=", "middleware=", "srm_version=", "debug", "help"])
48     except getopt.GetoptError:
49     print self.usage()
50     sys.exit(2)
51 ewv 1.7
52     self.setAndCheck(opts)
53    
54 spiga 1.1 return
55    
56 ewv 1.7 def setAndCheck( self, opts ):
57 spiga 1.1 """
58     Set and check command line parameter
59     """
60     if not opts :
61     print self.usage()
62     sys.exit()
63     for opt, arg in opts :
64     if opt == "--help" :
65     print self.usage()
66     sys.exit()
67     elif opt == "--debug" :
68     self.debug = 1
69     elif opt == "--source" :
70     self.source = arg
71     elif opt == "--destination":
72 ewv 1.7 self.destination = arg
73 spiga 1.1 elif opt == "--inputFileList":
74     infile = arg
75     elif opt == "--outputFileList":
76     out_file
77     elif opt == "--protocol":
78     self.protocol = arg
79     elif opt == "--middleware":
80     self.middleware = arg
81     elif opt == "--srm_version":
82     self.srmv = arg
83 ewv 1.7
84     # source and dest cannot be undefined at same time
85 spiga 1.1 if self.source == '' and self.destination == '':
86     print self.usage()
87     sys.exit()
88 ewv 1.7 # if middleware is not defined --> protocol cannot be empty
89 spiga 1.1 if self.middleware == '' and self.protocol == '':
90     print self.usage()
91     sys.exit()
92 ewv 1.7 # input file must be defined
93 spiga 1.1 if infile == '':
94     print self.usage()
95     sys.exit()
96     else:
97     if infile.find(','):
98     [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
99 ewv 1.7 else:
100 spiga 1.1 self.file_to_copy.append(infile)
101 ewv 1.7
102 spiga 1.1 ## TO DO:
103     #### add check for outFiles
104     #### add map {'inFileNAME':'outFileNAME'} to change out name
105    
106     return
107    
108 ewv 1.7 def run( self ):
109 spiga 1.1 """
110 ewv 1.7 Check if running on UI (no $middleware) or
111     on WN (on the Grid), and take different action
112 spiga 1.1 """
113 ewv 1.7 if self.middleware :
114     results = self.stager()
115 spiga 1.1 else:
116 spiga 1.4 results = self.copy( self.file_to_copy, self.protocol , self.opt)
117 spiga 1.1
118 ewv 1.7 self.finalReport(results,self.middleware)
119    
120     return
121 spiga 1.1
122 ewv 1.7 def setProtocol( self ):
123 spiga 1.1 """
124     define the allowed potocols based on $middlware
125 ewv 1.7 which depend on scheduler
126 spiga 1.1 """
127     if self.middleware.lower() in ['osg','lcg']:
128 spiga 1.6 supported_protocol = ['srm-lcg','srmv2']
129 ewv 1.7 self.OptMap = {'srm-lcg': self.lcgOpt,
130 spiga 1.6 'srmv2': self.srmOpt }
131 spiga 1.1 elif self.middleware.lower() in ['lsf','caf']:
132 spiga 1.6 supported_protocol = ['rfio']
133     self.OptMap = {'rfio': self.rfioOpt }
134 spiga 1.1 else:
135 ewv 1.7 ## here we can add support for any kind of protocol,
136 spiga 1.1 ## maybe some local schedulers need something dedicated
137     pass
138     return supported_protocol
139    
140 ewv 1.7 def stager( self ):
141 spiga 1.1 """
142     Implement the logic for remote stage out
143     """
144 ewv 1.7 protocols = self.setProtocol()
145     count=0
146 spiga 1.1 list_files = self.file_to_copy
147 spiga 1.6 results={}
148     for prot in protocols:
149 ewv 1.7 if self.debug: print 'Trying stage out with %s utils \n'%prot
150 spiga 1.6 copy_results = self.copy( list_files, prot, self.OptMap[prot] )
151 ewv 1.7 list_retry = []
152     list_existing = []
153     list_ok = []
154 spiga 1.1 for file, dict in copy_results.iteritems():
155     er_code = dict['erCode']
156     if er_code == '60307': list_retry.append( file )
157     elif er_code == '60303': list_existing.append( file )
158     else:
159     list_ok.append(file)
160     reason = 'Copy succedeed with %s utils'%prot
161     upDict = self.updateReport(file, er_code, reason)
162 ewv 1.7 copy_results.update(upDict)
163 spiga 1.1 results.update(copy_results)
164 ewv 1.7 if len(list_ok) != 0:
165 spiga 1.1 msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
166     # print msg
167 ewv 1.7 if len(list_ok) == len(list_files) :
168 spiga 1.1 break
169     else:
170 ewv 1.7 # print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
171 spiga 1.1 if len(list_retry): list_files = list_retry
172 ewv 1.7 else: break
173     count =+1
174    
175     #### TODO Daniele
176 spiga 1.1 #check is something fails and created related dict
177 ewv 1.7 # backup = self.analyzeResults(results)
178    
179     # if backup :
180 spiga 1.1 # msg = 'WARNING: backup logic is under implementation\n'
181     # #backupDict = self.backup()
182 ewv 1.7 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
183 spiga 1.1 # results.update(backupDict)
184     # print msg
185     return results
186    
187     def initializeApi(self, protocol ):
188     """
189 ewv 1.7 Instantiate storage interface
190 spiga 1.1 """
191 ewv 1.7 source_prot = protocol
192     dest_prot = protocol
193 spiga 1.1 if self.source == '' : source_prot = 'local'
194     Source_SE = self.storageInterface( self.source, source_prot )
195     if self.destination == '' : dest_prot = 'local'
196     Destination_SE = self.storageInterface( self.destination, dest_prot )
197    
198     if self.debug :
199 ewv 1.7 print '(source=%s, protocol=%s)'%(self.source, source_prot)
200     print '(destination=%s, protocol=%s)'%(self.destination, dest_prot)
201 spiga 1.1
202     return Source_SE, Destination_SE
203    
204 spiga 1.4 def copy( self, list_file, protocol, opt):
205 spiga 1.1 """
206 ewv 1.7 Make the real file copy using SE API
207 spiga 1.1 """
208     if self.debug :
209 ewv 1.7 print 'copy(): using %s protocol'%protocol
210 spiga 1.1 Source_SE, Destination_SE = self.initializeApi( protocol )
211    
212 ewv 1.7 # create remote dir
213 spiga 1.1 if protocol in ['gridftp','rfio']:
214     self.createDir( Destination_SE, protocol )
215    
216     ## prepare for real copy ##
217     sbi = SBinterface( Source_SE, Destination_SE )
218 ewv 1.7 sbi_dest = SBinterface(Destination_SE)
219 spiga 1.1
220     results = {}
221 ewv 1.7 ## loop over the complete list of files
222     for filetocopy in list_file:
223 spiga 1.1 if self.debug : print 'start real copy for %s'%filetocopy
224 ewv 1.7 ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
225     if ErCode == '0':
226 spiga 1.4 ErCode, msg = self.makeCopy( sbi, filetocopy , opt)
227 spiga 1.1 if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
228     results.update( self.updateReport(filetocopy, ErCode, msg))
229     return results
230 ewv 1.7
231 spiga 1.1 def updateReport(self, file, erCode, reason, lfn='', se='' ):
232 ewv 1.7 """
233     Update the final stage out infos
234     """
235     jobStageInfo={}
236 spiga 1.1 jobStageInfo['erCode']=erCode
237     jobStageInfo['reason']=reason
238 ewv 1.7 jobStageInfo['lfn']=lfn
239     jobStageInfo['se']=se
240 spiga 1.1
241     report = { file : jobStageInfo}
242     return report
243    
244     def finalReport( self , results, middleware ):
245     """
246     It should return a clear list of LFNs for each SE where data are stored.
247 ewv 1.7 allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
248 spiga 1.1 """
249     if middleware:
250     outFile = open('cmscpReport.sh',"a")
251     cmscp_exit_status = 0
252 ewv 1.7 txt = ''
253 spiga 1.1 for file, dict in results.iteritems():
254     if dict['lfn']=='':
255     lfn = '$LFNBaseName/'+os.path.basename(file)
256     se = '$SE'
257     else:
258     lfn = dict['lfn']+os.pat.basename(file)
259 ewv 1.7 se = dict['se']
260 spiga 1.1 #dict['lfn'] # to be implemented
261 ewv 1.7 txt += 'echo "Report for File: '+file+'"\n'
262     txt += 'echo "LFN: '+lfn+'"\n'
263     txt += 'echo "StorageElement: '+se+'"\n'
264 spiga 1.1 txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
265 spiga 1.2 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
266 spiga 1.1 if dict['erCode'] != '0':
267     cmscp_exit_status = dict['erCode']
268     txt += '\n'
269     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
270     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
271     outFile.write(str(txt))
272     outFile.close()
273 ewv 1.7 else:
274 spiga 1.1 for file, code in results.iteritems():
275 ewv 1.7 print 'error code = %s for file %s'%(code,file)
276 spiga 1.1 return
277    
278     def storageInterface( self, endpoint, protocol ):
279     """
280 ewv 1.7 Create the storage interface.
281 spiga 1.1 """
282     try:
283     interface = SElement( FullPath(endpoint), protocol )
284     except Exception, ex:
285 ewv 1.7 msg = ''
286 spiga 1.1 if self.debug : msg = str(ex)+'\n'
287 ewv 1.7 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
288 spiga 1.1 print msg
289    
290     return interface
291    
292     def checkDir(self, Destination_SE, protocol):
293     '''
294     ToBeImplemented NEEDED for castor
295 ewv 1.7 '''
296 spiga 1.1 return
297 ewv 1.7
298 spiga 1.1 def createDir(self, Destination_SE, protocol):
299     """
300 ewv 1.7 Create remote dir for gsiftp/rfio REALLY TEMPORARY
301     this should be transparent at SE API level.
302 spiga 1.1 """
303     ErCode = '0'
304     msg_1 = ''
305     try:
306     action = SBinterface( Destination_SE )
307     action.createDir()
308     if self.debug: print "The directory has been created using protocol %s\n"%protocol
309     except Exception, ex:
310 ewv 1.7 msg = ''
311 spiga 1.1 if self.debug : msg = str(ex)+'\n'
312 ewv 1.7 msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
313 spiga 1.1 msg += msg_1
314 ewv 1.7 ErCode = '60316'
315 spiga 1.1 #print msg
316    
317 ewv 1.7 return ErCode, msg_1
318 spiga 1.1
319     def checkFileExist(self, sbi, filetocopy):
320     """
321 ewv 1.7 Check if file to copy already exist
322     """
323 spiga 1.1 try:
324     check = sbi.checkExists(filetocopy)
325     except Exception, ex:
326 ewv 1.7 msg = ''
327 spiga 1.1 if self.debug : msg = str(ex)+'\n'
328 ewv 1.7 msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
329 spiga 1.1 # print msg
330     ErCode = '0'
331     msg = ''
332 ewv 1.7 if check :
333     ErCode = '60303'
334 spiga 1.1 msg = "file %s already exist"%filetocopy
335     print msg
336    
337 ewv 1.7 return ErCode,msg
338 spiga 1.1
339 ewv 1.7 def makeCopy(self, sbi, filetocopy, opt ):
340 spiga 1.1 """
341 ewv 1.7 call the copy API.
342 spiga 1.1 """
343 ewv 1.7 path = os.path.dirname(filetocopy)
344 spiga 1.1 file_name = os.path.basename(filetocopy)
345     source_file = filetocopy
346     dest_file = file_name ## to be improved supporting changing file name TODO
347     if self.source == '' and path == '':
348     source_file = os.path.abspath(filetocopy)
349 ewv 1.7 elif self.destination =='':
350 spiga 1.1 dest_file = os.path.join(os.getcwd(),file_name)
351     elif self.source != '' and self.destination != '' :
352 ewv 1.7 source_file = file_name
353 spiga 1.1 ErCode = '0'
354     msg = ''
355 ewv 1.7
356 spiga 1.1 try:
357 mcinquil 1.5 pippo = sbi.copy( source_file , dest_file , opt = opt)
358 ewv 1.7 if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
359 spiga 1.1 except Exception, ex:
360 ewv 1.7 msg = ''
361 spiga 1.1 if self.debug : msg = str(ex)+'\n'
362 ewv 1.7 msg = "Problem copying %s file" % filetocopy
363 spiga 1.1 ErCode = '60307'
364     #print msg
365    
366     return ErCode, msg
367 ewv 1.7
368 spiga 1.1 '''
369     def checkSize()
370     """
371 ewv 1.7 Using srm needed a check of the ouptut file size.
372 spiga 1.1 """
373 ewv 1.7
374 spiga 1.1 echo "--> remoteSize = $remoteSize"
375     ## for local file
376     localSize=$(stat -c%s "$path_out_file")
377     echo "--> localSize = $localSize"
378     if [ $localSize != $remoteSize ]; then
379     echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
380     echo "Copy failed: removing remote file $destination"
381     srmrm $destination
382     cmscp_exit_status=60307
383 ewv 1.7
384    
385 spiga 1.1 echo "Problem copying $path_out_file to $destination with srmcp command"
386     StageOutExitStatusReason='remote and local file dimension not match'
387     echo "StageOutReport = `cat ./srmcp.report`"
388 ewv 1.7 '''
389     def backup(self):
390 spiga 1.1 """
391     Check infos from TFC using existing api obtaining:
392     1)destination
393     2)protocol
394     """
395     return
396    
397     def usage(self):
398    
399 ewv 1.7 msg="""
400 spiga 1.1 required parameters:
401 ewv 1.7 --source :: REMOTE :
402     --destination :: REMOTE :
403 spiga 1.1 --debug :
404     --inFile :: absPath : or name NOT RELATIVE PATH
405     --outFIle :: onlyNAME : NOT YET SUPPORTED
406 ewv 1.7
407     optional parameters
408 spiga 1.1 """
409 ewv 1.7 return msg
410 spiga 1.1
411     if __name__ == '__main__' :
412     try:
413     cmscp_ = cmscp(sys.argv[1:])
414     cmscp_.run()
415     except:
416     pass
417