ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.3
Committed: Fri Sep 26 07:38:41 2008 UTC (16 years, 7 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.2: +2 -2 lines
Log Message:
detination NOT dest...

File Contents

# Content
1 #!/usr/bin/env python
2
3 import sys, getopt, string
4 import os, popen2
5 from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6 from ProdCommon.Storage.SEAPI.SBinterface import *
7
8
9
10 class cmscp:
11 def __init__(self, argv):
12 """
13 cmscp
14
15 safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16 including success checking version also for CAF using rfcp command to copy the output to SE
17 input:
18 $1 middleware (CAF, LSF, LCG, OSG)
19 $2 local file (the absolute path of output file or just the name if it's in top dir)
20 $3 if needed: file name (the output file name)
21 $5 remote SE (complete endpoint)
22 $6 srm version
23 output:
24 return 0 if all ok
25 return 60307 if srmcp failed
26 return 60303 if file already exists in the SE
27 """
28 #set default
29 self.debug = 0
30 self.source = ''
31 self.destination = ''
32 self.file_to_copy = []
33 self.remote_file_name = []
34 self.protocol = ''
35 self.middleware = ''
36 self.srmv = ''
37
38 try:
39 opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
40 "protocol=", "middleware=", "srm_version=", "debug", "help"])
41 except getopt.GetoptError:
42 print self.usage()
43 sys.exit(2)
44
45 self.setAndCheck(opts)
46
47 return
48
49 def setAndCheck( self, opts ):
50 """
51 Set and check command line parameter
52 """
53 if not opts :
54 print self.usage()
55 sys.exit()
56 for opt, arg in opts :
57 if opt == "--help" :
58 print self.usage()
59 sys.exit()
60 elif opt == "--debug" :
61 self.debug = 1
62 elif opt == "--source" :
63 self.source = arg
64 elif opt == "--destination":
65 self.destination = arg
66 elif opt == "--inputFileList":
67 infile = arg
68 elif opt == "--outputFileList":
69 out_file
70 elif opt == "--protocol":
71 self.protocol = arg
72 elif opt == "--middleware":
73 self.middleware = arg
74 elif opt == "--srm_version":
75 self.srmv = arg
76
77 # source and dest cannot be undefined at same time
78 if self.source == '' and self.destination == '':
79 print self.usage()
80 sys.exit()
81 # if middleware is not defined --> protocol cannot be empty
82 if self.middleware == '' and self.protocol == '':
83 print self.usage()
84 sys.exit()
85 # input file must be defined
86 if infile == '':
87 print self.usage()
88 sys.exit()
89 else:
90 if infile.find(','):
91 [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
92 else:
93 self.file_to_copy.append(infile)
94
95 ## TO DO:
96 #### add check for outFiles
97 #### add map {'inFileNAME':'outFileNAME'} to change out name
98
99 return
100
101 def run( self ):
102 """
103 Check if running on UI (no $middleware) or
104 on WN (on the Grid), and take different action
105 """
106 if self.middleware :
107 results = self.stager()
108 else:
109 results = self.copy( self.file_to_copy, self.protocol )
110
111 self.finalReport(results,self.middleware)
112
113 return
114
115 def setProtocol( self ):
116 """
117 define the allowed potocols based on $middlware
118 which depend on scheduler
119 """
120 if self.middleware.lower() in ['osg','lcg']:
121 supported_protocol = ['srm-lcg','srmv2']
122 elif self.middleware.lower() in ['lsf','caf']:
123 supported_protocol = ['rfio']
124 else:
125 ## here we can add support for any kind of protocol,
126 ## maybe some local schedulers need something dedicated
127 pass
128 return supported_protocol
129
130 def stager( self ):
131 """
132 Implement the logic for remote stage out
133 """
134 protocols = self.setProtocol()
135 count=0
136 list_files = self.file_to_copy
137 results={}
138 for prot in protocols:
139 if self.debug: print 'Trying stage out with %s utils \n'%prot
140 copy_results = self.copy( list_files, prot )
141 list_retry = []
142 list_existing = []
143 list_ok = []
144 for file, dict in copy_results.iteritems():
145 er_code = dict['erCode']
146 if er_code == '60307': list_retry.append( file )
147 elif er_code == '60303': list_existing.append( file )
148 else:
149 list_ok.append(file)
150 reason = 'Copy succedeed with %s utils'%prot
151 upDict = self.updateReport(file, er_code, reason)
152 copy_results.update(upDict)
153 results.update(copy_results)
154 if len(list_ok) != 0:
155 msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
156 # print msg
157 if len(list_ok) == len(list_files) :
158 break
159 else:
160 # print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
161 if len(list_retry): list_files = list_retry
162 else: break
163 count =+1
164
165 #### TODO Daniele
166 #check is something fails and created related dict
167 # backup = self.analyzeResults(results)
168
169 # if backup :
170 # msg = 'WARNING: backup logic is under implementation\n'
171 # #backupDict = self.backup()
172 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
173 # results.update(backupDict)
174 # print msg
175 return results
176
177 def initializeApi(self, protocol ):
178 """
179 Instantiate storage interface
180 """
181 source_prot = protocol
182 dest_prot = protocol
183 if self.source == '' : source_prot = 'local'
184 Source_SE = self.storageInterface( self.source, source_prot )
185 if self.destination == '' : dest_prot = 'local'
186 Destination_SE = self.storageInterface( self.destination, dest_prot )
187
188 if self.debug :
189 print '(source=%s, protocol=%s)'%(self.source, source_prot)
190 print '(destination=%s, protocol=%s)'%(self.destination, dest_prot)
191
192 return Source_SE, Destination_SE
193
194 def copy( self, list_file, protocol ):
195 """
196 Make the real file copy using SE API
197 """
198 if self.debug :
199 print 'copy(): using %s protocol'%protocol
200 Source_SE, Destination_SE = self.initializeApi( protocol )
201
202 # create remote dir
203 if protocol in ['gridftp','rfio']:
204 self.createDir( Destination_SE, protocol )
205
206 ## prepare for real copy ##
207 sbi = SBinterface( Source_SE, Destination_SE )
208 sbi_dest = SBinterface(Destination_SE)
209
210 results = {}
211 ## loop over the complete list of files
212 for filetocopy in list_file:
213 if self.debug : print 'start real copy for %s'%filetocopy
214 ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
215 if ErCode == '0':
216 ErCode, msg = self.makeCopy( sbi, filetocopy )
217 if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
218 results.update( self.updateReport(filetocopy, ErCode, msg))
219 return results
220
221 def updateReport(self, file, erCode, reason, lfn='', se='' ):
222 """
223 Update the final stage out infos
224 """
225 jobStageInfo={}
226 jobStageInfo['erCode']=erCode
227 jobStageInfo['reason']=reason
228 jobStageInfo['lfn']=lfn
229 jobStageInfo['se']=se
230
231 report = { file : jobStageInfo}
232 return report
233
234 def finalReport( self , results, middleware ):
235 """
236 It should return a clear list of LFNs for each SE where data are stored.
237 allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
238 """
239 if middleware:
240 outFile = open('cmscpReport.sh',"a")
241 cmscp_exit_status = 0
242 txt = ''
243 for file, dict in results.iteritems():
244 if dict['lfn']=='':
245 lfn = '$LFNBaseName/'+os.path.basename(file)
246 se = '$SE'
247 else:
248 lfn = dict['lfn']+os.pat.basename(file)
249 se = dict['se']
250 #dict['lfn'] # to be implemented
251 txt += 'echo "Report for File: '+file+'"\n'
252 txt += 'echo "LFN: '+lfn+'"\n'
253 txt += 'echo "StorageElement: '+se+'"\n'
254 txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
255 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
256 if dict['erCode'] != '0':
257 cmscp_exit_status = dict['erCode']
258 txt += '\n'
259 txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
260 txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
261 outFile.write(str(txt))
262 outFile.close()
263 else:
264 for file, code in results.iteritems():
265 print 'error code = %s for file %s'%(code,file)
266 return
267
268 def storageInterface( self, endpoint, protocol ):
269 """
270 Create the storage interface.
271 """
272 try:
273 interface = SElement( FullPath(endpoint), protocol )
274 except Exception, ex:
275 msg = ''
276 if self.debug : msg = str(ex)+'\n'
277 msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
278 print msg
279
280 return interface
281
282 def checkDir(self, Destination_SE, protocol):
283 '''
284 ToBeImplemented NEEDED for castor
285 '''
286 return
287
288 def createDir(self, Destination_SE, protocol):
289 """
290 Create remote dir for gsiftp/rfio REALLY TEMPORARY
291 this should be transparent at SE API level.
292 """
293 ErCode = '0'
294 msg_1 = ''
295 try:
296 action = SBinterface( Destination_SE )
297 action.createDir()
298 if self.debug: print "The directory has been created using protocol %s\n"%protocol
299 except Exception, ex:
300 msg = ''
301 if self.debug : msg = str(ex)+'\n'
302 msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
303 msg += msg_1
304 ErCode = '60316'
305 #print msg
306
307 return ErCode, msg_1
308
309 def checkFileExist(self, sbi, filetocopy):
310 """
311 Check if file to copy already exist
312 """
313 try:
314 check = sbi.checkExists(filetocopy)
315 except Exception, ex:
316 msg = ''
317 if self.debug : msg = str(ex)+'\n'
318 msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
319 # print msg
320 ErCode = '0'
321 msg = ''
322 if check :
323 ErCode = '60303'
324 msg = "file %s already exist"%filetocopy
325 print msg
326
327 return ErCode,msg
328
329 def makeCopy(self, sbi, filetocopy ):
330 """
331 call the copy API.
332 """
333 path = os.path.dirname(filetocopy)
334 file_name = os.path.basename(filetocopy)
335 source_file = filetocopy
336 dest_file = file_name ## to be improved supporting changing file name TODO
337 if self.source == '' and path == '':
338 source_file = os.path.abspath(filetocopy)
339 elif self.destination =='':
340 dest_file = os.path.join(os.getcwd(),file_name)
341 elif self.source != '' and self.destination != '' :
342 source_file = file_name
343 ErCode = '0'
344 msg = ''
345 try:
346 pippo = sbi.copy( source_file , dest_file )
347 if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
348 except Exception, ex:
349 msg = ''
350 if self.debug : msg = str(ex)+'\n'
351 msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
352 ErCode = '60307'
353 #print msg
354
355 return ErCode, msg
356
357 '''
358 def checkSize()
359 """
360 Using srm needed a check of the ouptut file size.
361 """
362
363 echo "--> remoteSize = $remoteSize"
364 ## for local file
365 localSize=$(stat -c%s "$path_out_file")
366 echo "--> localSize = $localSize"
367 if [ $localSize != $remoteSize ]; then
368 echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
369 echo "Copy failed: removing remote file $destination"
370 srmrm $destination
371 cmscp_exit_status=60307
372
373
374 echo "Problem copying $path_out_file to $destination with srmcp command"
375 StageOutExitStatusReason='remote and local file dimension not match'
376 echo "StageOutReport = `cat ./srmcp.report`"
377 '''
378 def backup(self):
379 """
380 Check infos from TFC using existing api obtaining:
381 1)destination
382 2)protocol
383 """
384 return
385
386 def usage(self):
387
388 msg="""
389 required parameters:
390 --source :: REMOTE :
391 --destination :: REMOTE :
392 --debug :
393 --inFile :: absPath : or name NOT RELATIVE PATH
394 --outFIle :: onlyNAME : NOT YET SUPPORTED
395
396 optional parameters
397 """
398 return msg
399
400 if __name__ == '__main__' :
401 try:
402 cmscp_ = cmscp(sys.argv[1:])
403 cmscp_.run()
404 except:
405 pass
406