ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cmscp.py
Revision: 1.3
Committed: Fri Sep 26 07:38:41 2008 UTC (16 years, 7 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.2: +2 -2 lines
Log Message:
detination NOT dest...

File Contents

# User Rev Content
1 spiga 1.1 #!/usr/bin/env python
2    
3     import sys, getopt, string
4     import os, popen2
5     from ProdCommon.Storage.SEAPI.SElement import SElement, FullPath
6     from ProdCommon.Storage.SEAPI.SBinterface import *
7    
8    
9    
10     class cmscp:
11     def __init__(self, argv):
12     """
13     cmscp
14    
15     safe copy of local file in current directory to remote SE via lcg_cp/srmcp,
16     including success checking version also for CAF using rfcp command to copy the output to SE
17     input:
18     $1 middleware (CAF, LSF, LCG, OSG)
19 spiga 1.2 $2 local file (the absolute path of output file or just the name if it's in top dir)
20     $3 if needed: file name (the output file name)
21     $5 remote SE (complete endpoint)
22     $6 srm version
23 spiga 1.1 output:
24     return 0 if all ok
25     return 60307 if srmcp failed
26     return 60303 if file already exists in the SE
27     """
28     #set default
29     self.debug = 0
30     self.source = ''
31     self.destination = ''
32     self.file_to_copy = []
33     self.remote_file_name = []
34     self.protocol = ''
35     self.middleware = ''
36     self.srmv = ''
37    
38     try:
39     opts, args = getopt.getopt(argv, "", ["source=", "destination=", "inputFileList=", "outputFileList=", \
40     "protocol=", "middleware=", "srm_version=", "debug", "help"])
41     except getopt.GetoptError:
42     print self.usage()
43     sys.exit(2)
44    
45     self.setAndCheck(opts)
46    
47     return
48    
49     def setAndCheck( self, opts ):
50     """
51     Set and check command line parameter
52     """
53     if not opts :
54     print self.usage()
55     sys.exit()
56     for opt, arg in opts :
57     if opt == "--help" :
58     print self.usage()
59     sys.exit()
60     elif opt == "--debug" :
61     self.debug = 1
62     elif opt == "--source" :
63     self.source = arg
64     elif opt == "--destination":
65     self.destination = arg
66     elif opt == "--inputFileList":
67     infile = arg
68     elif opt == "--outputFileList":
69     out_file
70     elif opt == "--protocol":
71     self.protocol = arg
72     elif opt == "--middleware":
73     self.middleware = arg
74     elif opt == "--srm_version":
75     self.srmv = arg
76    
77     # source and dest cannot be undefined at same time
78     if self.source == '' and self.destination == '':
79     print self.usage()
80     sys.exit()
81     # if middleware is not defined --> protocol cannot be empty
82     if self.middleware == '' and self.protocol == '':
83     print self.usage()
84     sys.exit()
85     # input file must be defined
86     if infile == '':
87     print self.usage()
88     sys.exit()
89     else:
90     if infile.find(','):
91     [self.file_to_copy.append(x.strip()) for x in infile.split(',')]
92     else:
93     self.file_to_copy.append(infile)
94    
95     ## TO DO:
96     #### add check for outFiles
97     #### add map {'inFileNAME':'outFileNAME'} to change out name
98    
99     return
100    
101     def run( self ):
102     """
103     Check if running on UI (no $middleware) or
104     on WN (on the Grid), and take different action
105     """
106     if self.middleware :
107     results = self.stager()
108     else:
109     results = self.copy( self.file_to_copy, self.protocol )
110    
111     self.finalReport(results,self.middleware)
112    
113     return
114    
115     def setProtocol( self ):
116     """
117     define the allowed potocols based on $middlware
118     which depend on scheduler
119     """
120     if self.middleware.lower() in ['osg','lcg']:
121     supported_protocol = ['srm-lcg','srmv2']
122     elif self.middleware.lower() in ['lsf','caf']:
123     supported_protocol = ['rfio']
124     else:
125     ## here we can add support for any kind of protocol,
126     ## maybe some local schedulers need something dedicated
127     pass
128     return supported_protocol
129    
130     def stager( self ):
131     """
132     Implement the logic for remote stage out
133     """
134     protocols = self.setProtocol()
135     count=0
136     list_files = self.file_to_copy
137     results={}
138     for prot in protocols:
139     if self.debug: print 'Trying stage out with %s utils \n'%prot
140     copy_results = self.copy( list_files, prot )
141     list_retry = []
142     list_existing = []
143     list_ok = []
144     for file, dict in copy_results.iteritems():
145     er_code = dict['erCode']
146     if er_code == '60307': list_retry.append( file )
147     elif er_code == '60303': list_existing.append( file )
148     else:
149     list_ok.append(file)
150     reason = 'Copy succedeed with %s utils'%prot
151     upDict = self.updateReport(file, er_code, reason)
152     copy_results.update(upDict)
153     results.update(copy_results)
154     if len(list_ok) != 0:
155     msg = 'Copy of %s succedeed with %s utils\n'%(str(list_ok),prot)
156     # print msg
157     if len(list_ok) == len(list_files) :
158     break
159     else:
160     # print 'Copy of files %s failed using %s...\n'%(str(list_retry)+str(list_existing),prot)
161     if len(list_retry): list_files = list_retry
162     else: break
163     count =+1
164    
165     #### TODO Daniele
166     #check is something fails and created related dict
167     # backup = self.analyzeResults(results)
168    
169     # if backup :
170     # msg = 'WARNING: backup logic is under implementation\n'
171     # #backupDict = self.backup()
172 spiga 1.2 # ### NOTE: IT MUST RETURN a DICT contains also LFN and SE Name
173 spiga 1.1 # results.update(backupDict)
174     # print msg
175     return results
176    
177     def initializeApi(self, protocol ):
178     """
179     Instantiate storage interface
180     """
181     source_prot = protocol
182     dest_prot = protocol
183     if self.source == '' : source_prot = 'local'
184     Source_SE = self.storageInterface( self.source, source_prot )
185     if self.destination == '' : dest_prot = 'local'
186     Destination_SE = self.storageInterface( self.destination, dest_prot )
187    
188     if self.debug :
189     print '(source=%s, protocol=%s)'%(self.source, source_prot)
190     print '(destination=%s, protocol=%s)'%(self.destination, dest_prot)
191    
192     return Source_SE, Destination_SE
193    
194     def copy( self, list_file, protocol ):
195     """
196     Make the real file copy using SE API
197     """
198     if self.debug :
199     print 'copy(): using %s protocol'%protocol
200     Source_SE, Destination_SE = self.initializeApi( protocol )
201    
202     # create remote dir
203     if protocol in ['gridftp','rfio']:
204     self.createDir( Destination_SE, protocol )
205    
206     ## prepare for real copy ##
207     sbi = SBinterface( Source_SE, Destination_SE )
208     sbi_dest = SBinterface(Destination_SE)
209    
210     results = {}
211     ## loop over the complete list of files
212     for filetocopy in list_file:
213     if self.debug : print 'start real copy for %s'%filetocopy
214     ErCode, msg = self.checkFileExist( sbi_dest, os.path.basename(filetocopy) )
215     if ErCode == '0':
216     ErCode, msg = self.makeCopy( sbi, filetocopy )
217     if self.debug : print 'Copy results for %s is %s'%( os.path.basename(filetocopy) ,ErCode)
218     results.update( self.updateReport(filetocopy, ErCode, msg))
219     return results
220    
221     def updateReport(self, file, erCode, reason, lfn='', se='' ):
222     """
223     Update the final stage out infos
224     """
225     jobStageInfo={}
226     jobStageInfo['erCode']=erCode
227     jobStageInfo['reason']=reason
228     jobStageInfo['lfn']=lfn
229     jobStageInfo['se']=se
230    
231     report = { file : jobStageInfo}
232     return report
233    
234     def finalReport( self , results, middleware ):
235     """
236     It should return a clear list of LFNs for each SE where data are stored.
237     allow "crab -copyLocal" or better "crab -copyOutput". TO_DO.
238     """
239     if middleware:
240     outFile = open('cmscpReport.sh',"a")
241     cmscp_exit_status = 0
242     txt = ''
243     for file, dict in results.iteritems():
244     if dict['lfn']=='':
245     lfn = '$LFNBaseName/'+os.path.basename(file)
246     se = '$SE'
247     else:
248     lfn = dict['lfn']+os.pat.basename(file)
249     se = dict['se']
250     #dict['lfn'] # to be implemented
251     txt += 'echo "Report for File: '+file+'"\n'
252     txt += 'echo "LFN: '+lfn+'"\n'
253     txt += 'echo "StorageElement: '+se+'"\n'
254     txt += 'echo "StageOutExitStatusReason ='+dict['reason']+'" | tee -a $RUNTIME_AREA/$repo\n'
255 spiga 1.2 txt += 'echo "StageOutSE = '+se+'" >> $RUNTIME_AREA/$repo\n'
256 spiga 1.1 if dict['erCode'] != '0':
257     cmscp_exit_status = dict['erCode']
258     txt += '\n'
259     txt += 'export StageOutExitStatus='+str(cmscp_exit_status)+'\n'
260     txt += 'echo "StageOutExitStatus = '+str(cmscp_exit_status)+'" | tee -a $RUNTIME_AREA/$repo\n'
261     outFile.write(str(txt))
262     outFile.close()
263     else:
264     for file, code in results.iteritems():
265     print 'error code = %s for file %s'%(code,file)
266     return
267    
268     def storageInterface( self, endpoint, protocol ):
269     """
270     Create the storage interface.
271     """
272     try:
273     interface = SElement( FullPath(endpoint), protocol )
274     except Exception, ex:
275     msg = ''
276     if self.debug : msg = str(ex)+'\n'
277     msg += "ERROR : Unable to create interface with %s protocol\n"%protocol
278     print msg
279    
280     return interface
281    
282     def checkDir(self, Destination_SE, protocol):
283     '''
284     ToBeImplemented NEEDED for castor
285     '''
286     return
287    
288     def createDir(self, Destination_SE, protocol):
289     """
290     Create remote dir for gsiftp/rfio REALLY TEMPORARY
291     this should be transparent at SE API level.
292     """
293     ErCode = '0'
294     msg_1 = ''
295     try:
296     action = SBinterface( Destination_SE )
297     action.createDir()
298     if self.debug: print "The directory has been created using protocol %s\n"%protocol
299     except Exception, ex:
300     msg = ''
301     if self.debug : msg = str(ex)+'\n'
302     msg_1 = "ERROR: problem with the directory creation using %s protocol \n"%protocol
303     msg += msg_1
304     ErCode = '60316'
305     #print msg
306    
307     return ErCode, msg_1
308    
309     def checkFileExist(self, sbi, filetocopy):
310     """
311     Check if file to copy already exist
312     """
313     try:
314     check = sbi.checkExists(filetocopy)
315     except Exception, ex:
316     msg = ''
317     if self.debug : msg = str(ex)+'\n'
318     msg += "ERROR: problem with check File Exist using %s protocol \n"%protocol
319     # print msg
320     ErCode = '0'
321     msg = ''
322     if check :
323     ErCode = '60303'
324     msg = "file %s already exist"%filetocopy
325     print msg
326    
327     return ErCode,msg
328    
329     def makeCopy(self, sbi, filetocopy ):
330     """
331     call the copy API.
332     """
333     path = os.path.dirname(filetocopy)
334     file_name = os.path.basename(filetocopy)
335     source_file = filetocopy
336     dest_file = file_name ## to be improved supporting changing file name TODO
337     if self.source == '' and path == '':
338     source_file = os.path.abspath(filetocopy)
339     elif self.destination =='':
340     dest_file = os.path.join(os.getcwd(),file_name)
341     elif self.source != '' and self.destination != '' :
342     source_file = file_name
343     ErCode = '0'
344     msg = ''
345     try:
346     pippo = sbi.copy( source_file , dest_file )
347     if self.protocol == 'srm' : self.checkSize( sbi, filetocopy )
348     except Exception, ex:
349     msg = ''
350     if self.debug : msg = str(ex)+'\n'
351     msg = "Problem copying %s file with %s command"%( filetocopy, protocol )
352     ErCode = '60307'
353     #print msg
354    
355     return ErCode, msg
356    
357     '''
358     def checkSize()
359     """
360     Using srm needed a check of the ouptut file size.
361     """
362    
363     echo "--> remoteSize = $remoteSize"
364     ## for local file
365     localSize=$(stat -c%s "$path_out_file")
366     echo "--> localSize = $localSize"
367     if [ $localSize != $remoteSize ]; then
368     echo "Local fileSize $localSize does not match remote fileSize $remoteSize"
369     echo "Copy failed: removing remote file $destination"
370     srmrm $destination
371     cmscp_exit_status=60307
372    
373    
374     echo "Problem copying $path_out_file to $destination with srmcp command"
375     StageOutExitStatusReason='remote and local file dimension not match'
376     echo "StageOutReport = `cat ./srmcp.report`"
377     '''
378     def backup(self):
379     """
380     Check infos from TFC using existing api obtaining:
381     1)destination
382     2)protocol
383     """
384     return
385    
386     def usage(self):
387    
388     msg="""
389     required parameters:
390 spiga 1.3 --source :: REMOTE :
391     --destination :: REMOTE :
392 spiga 1.1 --debug :
393     --inFile :: absPath : or name NOT RELATIVE PATH
394     --outFIle :: onlyNAME : NOT YET SUPPORTED
395    
396     optional parameters
397     """
398     return msg
399    
400     if __name__ == '__main__' :
401     try:
402     cmscp_ = cmscp(sys.argv[1:])
403     cmscp_.run()
404     except:
405     pass
406