ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/downloadSample.py
Revision: 1.4
Committed: Fri Jul 30 18:41:11 2010 UTC (14 years, 9 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_017pre3, Mit_017pre2, Mit_017pre1, Mit_016, Mit_015b, Mit_015a, Mit_015, Mit_014e, Mit_014d, Mit_014c
Changes since 1.3: +16 -6 lines
Log Message:
Cleaned up and updated version.

File Contents

# User Rev Content
1 paus 1.2 #!/usr/bin/env python
2     #---------------------------------------------------------------------------------------------------
3     # Script to automatically download a MIT dataset to our local cluster
4     #
5     # The download of the MIT dataset is organized in accordance with the dataset production logic. In
6     # general it is allowed to download the dataset from any location of a properly configured storage
7     # element. The script will do all most obvious tests to ensure efficient and safe download. For
8     # performance reason a checksum is not calculated. This ommission is considered completely safe as
9     # failures will be identified in the analysis phase and the rare occasions will be more effective to
10     # fix by hand.
11     #
12     # At present the download proceeds in one thread (one file at a time) which for performance reasons
13     # might not be optimal.
14     #
15     # Author: C.Paus (July 1, 2008)
16     #---------------------------------------------------------------------------------------------------
17     # Missing but desired features:
18     # + accounting of size of each file
19     # + accounting of locally available files (avoid copying already existing files)
20     # + determine full list of files before starting to copy
21     # + minimal success check of the copy
22     # + calculate total data volume (to copy, already copied etc.)
23     # + add feature to check the castor status
24     # - add time estimates and progressions for copies
25     # - multi downloads to enhance performance
26     #---------------------------------------------------------------------------------------------------
27     import os,sys,getopt,re,string
28    
29     def Seconds():
30     for secs in os.popen('date +%s').readlines():
31     secs = int(secs[:-1])
32     return secs
33    
34     def InSkipList(file,list):
35     for entry in list:
36     if entry == file:
37     return True
38     return False
39    
40     def DecodeSrmLs(line):
41     line = line.strip()
42     f = line.split(" ")
43     size = f[0]
44     f = f[1].split("/")
45     file = f.pop()
46     list = [ size, file ]
47     return list
48    
49     def DecodeRfDir(line):
50     line = line.strip()
51     f = line.split(" ")
52     file = f.pop()
53     size = f[4]
54     list = [ size, file ]
55     return list
56    
57     def BuildFileList(cmd):
58     isSrm = 1
59     f = cmd.split(" ")
60     if f[0] == 'rfdir':
61     isSrm = 0
62    
63     fileList = {}
64    
65     if debug == 1:
66     print ' Debug:: list: ' + cmd
67     for line in os.popen(cmd).readlines(): # run command
68     line = line[:-1] # strip '\n'
69     #print ' Line: ' + line
70     f = line.split(" ")
71     ##if isSrm == 1:
72     ## f = DecodeSrmLs(line)
73     ##else:
74     ## f = DecodeRfDir(line)
75     size = f[0]
76     file = f[1]
77     f = file.split("/")
78     file = f[-1]
79     if debug == 1:
80     print ' Debug:: adding: ' + file + ' with size ' + size
81     fileList[file] = int(size)
82    
83     return fileList
84    
85     def BuildStagedFileList(storagePath,allFileList,cacheFile):
86    
87     # initialize the basics
88     fileList = {}
89     f = storagePath.split("=");
90     rfPath = f[-1]
91    
92     # if this is not castor, trick it and mark them as staged
93     if not (re.search('/castor/',rfPath)):
94     for file, size in allFileList.iteritems():
95     fileList[file] = 'STAGED'
96     return fileList
97    
98     # here we deal with castor
99     if debug == 1:
100     print ' Debug:: rfpath: ' + rfPath
101     if os.path.exists(cacheFile) and noCache == 0:
102     print ' Using the cached stager queries at ' + cacheFile
103     for file, size in allFileList.iteritems():
104     fullFile = rfPath + '/' + file
105     if debug == 1:
106     print ' Debug:: full file name: ' + fullFile
107     if os.path.exists(cacheFile) and noCache == 0:
108     cmd = 'grep ' + file + ' ' + cacheFile
109     else:
110     cmd = 'stager_qry -M ' + fullFile
111     fileList[file] = 'undefined'
112     for line in os.popen(cmd).readlines(): # run command
113     line = line[:-1]
114     f = line.split(" ")
115     if f[0] == fullFile:
116     f = line.split(" ")
117     status = f[-1]
118     fileList[file] = status
119    
120     return fileList
121    
122     def CacheStagedFileList(cacheFile,storagePath,stagedFileList):
123     print ' Caching stager query status to ' + cacheFile
124     f = storagePath.split("=");
125     rfPath = f[-1]
126     fileOutput = open(cacheFile,'w')
127     for file, status in stagedFileList.iteritems():
128     line = rfPath + '/' + file + ' xyz@castorns ' + status + '\n'
129     fileOutput.write(line)
130     fileOutput.close()
131    
132     def CopyFile(storageEle,storagePath,storageUrl,file,localDir):
133     deltaT = 0
134     print ' working on file: ' + file + ' to ' + localDir + \
135     ' (size: %d MB) '%(int(size)/1024/1024)
136 paus 1.4 if storageEle == 'srm-cms.cern.ch':
137 paus 1.2 f = storagePath.split("=");
138     rfPath = f[-1]
139     cpy = 'rfcp ' + rfPath + '/' + file + ' ' + localPath + '/' \
140     + mitCfg + '/' + version + '/' + mitDataset + '/' + file
141     #print ' using rfcp.... ' + cpy
142     #sys.exit(0)
143 paus 1.4 elif storageEle == 'se01.cmsaf.mit.edu':
144     f = storagePath.split("=");
145     rfPath = f[-1]
146     #cpy = 'scp paus@cgate.mit.edu:' + rfPath + '/' + file + ' ' + localPath + '/' \
147     # + mitCfg + '/' + version + '/' + mitDataset + '/' + file
148     cpy = 'dccp dcap://t2srv0005.cmsaf.mit.edu/' \
149     + rfPath + '/' + file + ' ' + localPath + '/' \
150     + mitCfg + '/' + version + '/' + mitDataset + '/' + file
151     #print ' using rfcp.... ' + cpy
152     #sys.exit(0)
153 paus 1.2 else:
154     #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
155     cpy = 'lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
156     + mitCfg + '/' + version + '/' + mitDataset + '/' + file
157    
158     # Check whether the file size make sense (zero length files are probably not yet ready to
159     # copy and will not be transfered
160     if size < 1:
161     print ' WARNING - file size is <1b. Probably this file is not yet ready. Stop copy.'
162     else:
163     if debug == 1:
164     print ' Debug:: copy: ' + cpy
165     start = Seconds()
166     status = os.system(cpy)
167     end = Seconds()
168     deltaT = end - start
169    
170     return deltaT
171    
172     def StageFile(storagePath,storageUrl,file):
173     print ' staging in file: ' + file
174     if storageEle == 'srm-cms.cern.ch':
175     f = storagePath.split("=");
176     rfPath = f[-1]
177     stg = 'stager_get -M ' + rfPath + '/' + file
178     else:
179     #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
180     stg = 'echo lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
181     + mitCfg + '/' + version + '/' + mitDataset + '/' + file
182    
183     if debug == 1:
184     print ' Debug:: stage: ' + stg
185     status = os.system(stg)
186    
187     #===================================================================================================
188     # Main starts here
189     #===================================================================================================
190     # Define string to explain usage of the script
191     usage = "Usage: downloadSample.py --cmsDataset=<name> | --mitDataset=<name>\n"
192     usage += " --mitCfg=<name>\n"
193     usage += " --version=<version>\n"
194     usage += " --cmssw=<name>\n"
195     usage += " --localStorageUrl=<name>\n"
196     usage += " --localPath=<dir>\n"
197     usage += " --skip=<file list>\n"
198     usage += " --backward\n"
199     usage += " --debug\n"
200     usage += " --help\n"
201    
202     # Define the valid options which can be specified and check out the command line
203     valid = ['cmsDataset=','mitDataset=','mitCfg=','version=','cmssw=','pattern=','localStorageUrl=',
204     'localPath=','noCache','skip=',
205     'forceCopy','backward',
206     'debug','help']
207     try:
208     opts, args = getopt.getopt(sys.argv[1:], "", valid)
209     except getopt.GetoptError, ex:
210     print usage
211     print str(ex)
212     sys.exit(1)
213    
214     # --------------------------------------------------------------------------------------------------
215     # Get all parameters for the production
216     # --------------------------------------------------------------------------------------------------
217     # Set defaults for each option
218     cmsDataset = None
219     mitDataset = None
220     skip = ''
221     skipList = []
222 paus 1.3 mitCfg = 'filefi'
223     version = '014'
224 paus 1.2 cmssw = ''
225     blockLocal = 0
226     localStorageUrl = ''
227     localPath = '/server/02b/mitprod'
228     pattern = ''
229     noCache = 0
230     backward = ''
231     forceCopy = False
232     debug = 0
233     cmsswCfg = 'cmssw.cfg'
234    
235     # Read new values from the command line
236     for opt, arg in opts:
237     if opt == '--help':
238     print usage
239     sys.exit(0)
240     if opt == '--cmsDataset':
241     cmsDataset = arg
242     if opt == '--mitDataset':
243     mitDataset = arg
244     if opt == '--mitCfg':
245     mitCfg = arg
246     if opt == '--version':
247     version = arg
248     if opt == '--cmssw':
249     cmssw = arg
250     if opt == '--pattern':
251     pattern = arg
252     if opt == '--localStorageUrl':
253     localStorageUrl = arg
254     if opt == '--localPath':
255     blockLocal = 1
256     localPath = arg
257     if opt == '--skip':
258     skip = arg
259     skipList = skip.split(',')
260     if opt == '--noCache':
261     noCache = 1
262     if opt == '--backward':
263     backward = ' -r '
264     if opt == '--forceCopy':
265     forceCopy = True
266     if opt == '--debug':
267     debug = 1
268    
269     # Deal with obvious problems
270     if cmsDataset == None and mitDataset == None:
271     cmd = '--cmsDataset option not provided. This is required.'
272     raise RuntimeError, cmd
273    
274 paus 1.4 crabFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'crab.cfg'
275 paus 1.2 if not os.path.exists(crabFile):
276     cmd = 'Crab file not found: %s' % crabFile
277     raise RuntimeError, cmd
278 paus 1.4 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + cmsswCfg
279 paus 1.2 if not os.path.exists(cmsswFile):
280     cmd = 'Cmssw file not found: %s' % cmsswFile
281     cmsswCfg = 'cmssw.py'
282 paus 1.4 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + cmsswCfg
283 paus 1.2 if not os.path.exists(cmsswFile):
284     cmd = 'Cmssw file not found: %s' % cmsswFile
285     cmd = ' XXXX ERROR no valid configuration found XXXX'
286     raise RuntimeError, cmd
287    
288     # Resolve the other mitCfg parameters from the configuration file
289 paus 1.4 cmd = 'cat ' + os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'Productions'
290 paus 1.2 if cmssw != '':
291     cmd = cmd + '.' + cmssw
292    
293     join = 0
294     if cmsDataset == None:
295     cmsDataset = ''
296     else:
297     mitDataset = ''
298    
299     fullLine = ''
300     bSlash = '\\';
301     for line in os.popen(cmd).readlines(): # run command
302     line = line[:-1]
303     #print 'Line: "' + line + '"'
304     # get ride of empty or commented lines
305     if line == '' or line[0] == '#':
306     continue
307    
308     # join lines
309     if join == 1:
310     fullLine += line
311     else:
312     fullLine = line
313    
314     # determine if finished or more is coming
315     if fullLine[-1] == bSlash:
316     join = 1
317     fullLine = fullLine[:-1]
318     else:
319     join = 0
320     # test whether there is a directory
321     names = fullLine.split() # splitting every blank
322     #print "FullLine: " + fullLine
323     #print "Datasets: " + mitDataset + ' -> ' + cmsDataset + "\n"
324     if names[0] == cmsDataset:
325     mitDataset = names[1] # this is the equivalent MIT name of the dataset
326     nevents = int(names[2]) # number of events to be used in the production
327     if names[4] != "-" and blockLocal == 0:
328     localPath = names[4]
329     #print "\n Sample Info: " + fullLine + "\n"
330     #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
331     if names[1] == mitDataset:
332     cmsDataset = names[0] # this is the equivalent CMS name of the dataset
333     nevents = int(names[2]) # number of events to be used in the production
334     if names[4] != "-" and blockLocal == 0:
335     localPath = names[4]
336     #print "\n Sample Info: " + fullLine + "\n"
337     #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
338    
339     if mitDataset == "":
340     print "ERROR - dataset not defined."
341     sys.exit(0)
342    
343     #cmd = 'grep ' + cmsDataset + ' ' + mitCfg + '/' + version + '/' + 'Productions'
344     #for file in os.popen(cmd).readlines(): # run command
345     # line = file[:-1] # strip '\n'
346     # # test whether there is a directory
347     # names = line.split() # splitting every blank
348     # mitDataset = names[1] # this is the equivalent MIT name of the dataset
349     # nevents = int(names[2]) # number of events to be used in the production
350    
351     # Say what we do now
352     print '\n Preparing dataset for transfer: ' + cmsDataset + ' [MIT: ' + mitDataset + ']\n'
353    
354     # --------------------------------------------------------------------------------------------------
355     # Deal with storage element area
356     # --------------------------------------------------------------------------------------------------
357     pMitDset = re.compile('XX-MITDATASET-XX')
358     pMitCfg = re.compile('XX-MITCFG-XX')
359     pMitVers = re.compile('XX-MITVERSION-XX')
360     # find the forseen storage place
361 paus 1.4 crabFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'crab.cfg'
362 paus 1.2 cmd = 'grep ^storage_element ' + crabFile
363     for file in os.popen(cmd).readlines(): # run command
364     line = file[:-1] # strip '\n'
365     # decode the storage element name
366     names = line.split("=") # splitting every '='
367     storageEle = names.pop()
368     storageEle = re.sub("\s", "",storageEle)
369     # Compile search and replacement sequences just for the path
370     cmd = 'grep ^storage_path ' + crabFile
371     for file in os.popen(cmd).readlines(): # run command
372     line = file[:-1] # strip '\n'
373     line = pMitDset.sub(mitDataset,line);
374     line = pMitCfg .sub(mitCfg, line);
375     line = pMitVers.sub(version, line);
376     # decode the storage directory name
377     names = line.split("=") # splitting every '='
378     names = names[1:]
379     storagePath = "=".join(names)
380     storagePath = re.sub("\s", "",storagePath)
381     storageUrl = 'srm://' + storageEle + ':8443' + storagePath
382    
383     cmd = 'grep ^user_remote_dir ' + crabFile
384     for file in os.popen(cmd).readlines(): # run command
385     line = file[:-1] # strip '\n'
386     line = pMitDset.sub(mitDataset,line);
387     line = pMitCfg .sub(mitCfg, line);
388     line = pMitVers.sub(version, line);
389     # decode the storage directory name
390     names = line.split("=") # splitting every '='
391     names = names[1:]
392     userRemoteDir = "=".join(names)
393     userRemoteDir = re.sub("\s","",userRemoteDir)
394     userRemoteDir = re.sub("/XX-CRABID-XX","",userRemoteDir)
395    
396     if userRemoteDir != '':
397     storagePath += userRemoteDir
398     storageUrl += userRemoteDir
399    
400     if localStorageUrl != '':
401     storageEle = ''
402     storagePath = ''
403     storageUrl = localStorageUrl
404    
405     print ' --> StorageUrl: ' + storageUrl
406    
407     #---------------------------------------------------------------------------------------------------
408     # create the local storage area
409     #---------------------------------------------------------------------------------------------------
410     print ' Make local path: ' + localPath
411     localDir = localPath + '/' + mitCfg + '/' + version + '/' + mitDataset
412     mkd = 'mkdir -p ' + localDir
413     status = os.system(mkd)
414    
415     if status != 0:
416     print ' ERROR - could not create local directory ' + localDir
417     sys.exit(1)
418    
419     print ' --> LocalDir: ' + localDir
420    
421     cmd = 'df --block-size=1 ' + localDir + ' | tr -s \' \' | tail -1'
422     for line in os.popen(cmd).readlines(): # run command
423     line = line.strip()
424     f = line.split(" ")
425     if line[0:0] == '/' or line[0:4] == 'fuse':
426     free = int(f[3])
427     else:
428     free = int(f[2])
429    
430     #---------------------------------------------------------------------------------------------------
431     # create a list af all files to be copied
432     #---------------------------------------------------------------------------------------------------
433     cmd = ''
434     f = storagePath.split('=')
435     path = f.pop()
436     cmd = 'list ' + path + ' | grep root | sort ' + backward
437    
438     ##if storageEle == 'srm.cern.ch' or storageEle == 'srm-cms.cern.ch':
439     ## cmd = 'rfdir ' + path + ' | grep root | tr -s \' \' | sort ' + backward
440     ##else:
441     ## cmd = 'list ' + path + ' | grep root | sort ' + backward
442     ## #cmd = 'srmls ' + storageUrl + ' | grep root | sort ' + backward
443    
444     if pattern != "":
445     cmd += ' | grep ' + pattern
446    
447     print ' Find file: ' + cmd
448     cacheFile = '/tmp/.cache_' + mitDataset
449     allFileList = BuildFileList(cmd)
450     stagedFileList = BuildStagedFileList(storagePath,allFileList,cacheFile)
451     cacheStaged = CacheStagedFileList(cacheFile,storagePath,stagedFileList)
452    
453     #cmd = 'find ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + \
454     # ' -maxdepth 1 -type f -printf "%s %f\n"'
455     print 'List: ' + cmd
456     cmd = 'list ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + ' | grep root'
457     doneFileList = BuildFileList(cmd)
458    
459     #---------------------------------------------------------------------------------------------------
460     # go through the lists: first check files are consistent, then copy the remaining files
461     #---------------------------------------------------------------------------------------------------
462     # initialize data volumes
463     b2G = 1.0/(1024.*1024.*1024)
464     nTotal = 0
465     totalDataVolume = 0
466     nDone = 0
467     doneDataVolume = 0
468    
469     for file, size in allFileList.iteritems():
470     nTotal += 1
471     totalDataVolume += size
472     if (file in doneFileList) and (doneFileList[file] == size):
473     nDone += 1
474     doneDataVolume += size
475    
476     print ' '
477     print ' Summary of data volume\n'
478     print ' --> number of files to copy: %8d (total: %d) '%(nTotal-nDone,nTotal)
479     print ' --> volume to copy [GB]: %8.2f (total: %.2f) '%(b2G*(totalDataVolume-doneDataVolume), \
480     b2G*totalDataVolume)
481     print ' --> free volume [GB]: %8.2f '%(b2G*free)
482     print ' '
483    
484     if free*0.85 < (totalDataVolume-doneDataVolume):
485     print ' ERROR - probably no enough space on volume. See above (some safety assumed)!'
486     sys.exit(1)
487    
488     for file, size in doneFileList.iteritems():
489     if file in allFileList:
490     #print ' --> file is done: ' + file
491     if allFileList[file] != size:
492     print ' ERROR - file sizes did not match: ' + file + \
493     ' [ local: %10d, remote: %10d ]'%(size,allFileList[file])
494     sys.exit(1)
495     else:
496     print ' ERROR - file from done list is not in the all files list. File: ' + file
497     sys.exit(1)
498    
499     totalSizeMb = 0.
500     totalTimeSc = 0.
501     for file, size in allFileList.iteritems():
502     if debug == 1:
503     print ' Debug:: ' + file + ' -> size %d'%size
504    
505     totalDataVolume += size
506     if file in doneFileList:
507     print ' --> done, size match: %10d - %s'%(size,file)
508     doneDataVolume = +size
509     else:
510     if not InSkipList(file,skipList):
511     print ' --> copying file: %10d - %s (castor stat: %s)'% \
512     (size,file,stagedFileList[file])
513     if stagedFileList[file] == "STAGED" or forceCopy:
514    
515     sizeMb = size/1024./1024.
516     deltaT = CopyFile(storageEle,storagePath,storageUrl,file,localDir)
517     if deltaT > 0:
518     print ' time required [sec]: %7d rate [MB/sec]: %9.3f'%\
519     (deltaT,sizeMb/deltaT)
520     else:
521     print ' time required [sec]: %7d rate [MB/sec]: ?'%(deltaT)
522     totalTimeSc += deltaT
523     totalSizeMb += sizeMb
524     else:
525     print ' skipping file: %s'%(stagedFileList[file])
526     StageFile(storagePath,storageUrl,file)
527    
528     else:
529     print ' --> skipping file: %10d - %s'%(size,file)
530    
531     print ''
532     if totalTimeSc > 0:
533     print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] %9.3f'%\
534     (totalSizeMb/1024.,totalTimeSc,totalSizeMb/totalTimeSc)
535     else:
536     print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] ?'%\
537     (totalSizeMb/1024.,totalTimeSc)
538     print ''