ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/downloadSample.py
Revision: 1.1.2.1
Committed: Sat Jun 5 01:49:21 2010 UTC (14 years, 11 months ago) by paus
Content type: text/x-python
Branch: Mit_013c
Changes since 1.1: +528 -0 lines
Log Message:
first reasonably working production environment

File Contents

# User Rev Content
1 paus 1.1.2.1 #!/usr/bin/env python
2     #---------------------------------------------------------------------------------------------------
3     # Script to automatically download a MIT dataset to our local cluster
4     #
5     # The download of the MIT dataset is organized in accordance with the dataset production logic. In
6     # general it is allowed to download the dataset from any location of a properly configured storage
7     # element. The script will do all most obvious tests to ensure efficient and safe download. For
8     # performance reason a checksum is not calculated. This ommission is considered completely safe as
9     # failures will be identified in the analysis phase and the rare occasions will be more effective to
10     # fix by hand.
11     #
12     # At present the download proceeds in one thread (one file at a time) which for performance reasons
13     # might not be optimal.
14     #
15     # Author: C.Paus (July 1, 2008)
16     #---------------------------------------------------------------------------------------------------
17     # Missing but desired features:
18     # + accounting of size of each file
19     # + accounting of locally available files (avoid copying already existing files)
20     # + determine full list of files before starting to copy
21     # + minimal success check of the copy
22     # + calculate total data volume (to copy, already copied etc.)
23     # + add feature to check the castor status
24     # - add time estimates and progressions for copies
25     # - multi downloads to enhance performance
26     #---------------------------------------------------------------------------------------------------
27     import os,sys,getopt,re,string
28    
29     def Seconds():
30     for secs in os.popen('date +%s').readlines():
31     secs = int(secs[:-1])
32     return secs
33    
34     def InSkipList(file,list):
35     for entry in list:
36     if entry == file:
37     return True
38     return False
39    
40     def DecodeSrmLs(line):
41     line = line.strip()
42     f = line.split(" ")
43     size = f[0]
44     f = f[1].split("/")
45     file = f.pop()
46     list = [ size, file ]
47     return list
48    
49     def DecodeRfDir(line):
50     line = line.strip()
51     f = line.split(" ")
52     file = f.pop()
53     size = f[4]
54     list = [ size, file ]
55     return list
56    
57     def BuildFileList(cmd):
58     isSrm = 1
59     f = cmd.split(" ")
60     if f[0] == 'rfdir':
61     isSrm = 0
62    
63     fileList = {}
64    
65     if debug == 1:
66     print ' Debug:: list: ' + cmd
67     for line in os.popen(cmd).readlines(): # run command
68     line = line[:-1] # strip '\n'
69     #print ' Line: ' + line
70     f = line.split(" ")
71     ##if isSrm == 1:
72     ## f = DecodeSrmLs(line)
73     ##else:
74     ## f = DecodeRfDir(line)
75     size = f[0]
76     file = f[1]
77     f = file.split("/")
78     file = f[-1]
79     if debug == 1:
80     print ' Debug:: adding: ' + file + ' with size ' + size
81     fileList[file] = int(size)
82    
83     return fileList
84    
85     def BuildStagedFileList(storagePath,allFileList,cacheFile):
86    
87     # initialize the basics
88     fileList = {}
89     f = storagePath.split("=");
90     rfPath = f[-1]
91    
92     # if this is not castor, trick it and mark them as staged
93     if not (re.search('/castor/',rfPath)):
94     for file, size in allFileList.iteritems():
95     fileList[file] = 'STAGED'
96     return fileList
97    
98     # here we deal with castor
99     if debug == 1:
100     print ' Debug:: rfpath: ' + rfPath
101     if os.path.exists(cacheFile) and noCache == 0:
102     print ' Using the cached stager queries at ' + cacheFile
103     for file, size in allFileList.iteritems():
104     fullFile = rfPath + '/' + file
105     if debug == 1:
106     print ' Debug:: full file name: ' + fullFile
107     if os.path.exists(cacheFile) and noCache == 0:
108     cmd = 'grep ' + file + ' ' + cacheFile
109     else:
110     cmd = 'stager_qry -M ' + fullFile
111     fileList[file] = 'undefined'
112     for line in os.popen(cmd).readlines(): # run command
113     line = line[:-1]
114     f = line.split(" ")
115     if f[0] == fullFile:
116     f = line.split(" ")
117     status = f[-1]
118     fileList[file] = status
119    
120     return fileList
121    
122     def CacheStagedFileList(cacheFile,storagePath,stagedFileList):
123     print ' Caching stager query status to ' + cacheFile
124     f = storagePath.split("=");
125     rfPath = f[-1]
126     fileOutput = open(cacheFile,'w')
127     for file, status in stagedFileList.iteritems():
128     line = rfPath + '/' + file + ' xyz@castorns ' + status + '\n'
129     fileOutput.write(line)
130     fileOutput.close()
131    
132     def CopyFile(storageEle,storagePath,storageUrl,file,localDir):
133     deltaT = 0
134     print ' working on file: ' + file + ' to ' + localDir + \
135     ' (size: %d MB) '%(int(size)/1024/1024)
136     if storageEle == 'srm-cms.cern.ch':
137     f = storagePath.split("=");
138     rfPath = f[-1]
139     cpy = 'rfcp ' + rfPath + '/' + file + ' ' + localPath + '/' \
140     + mitCfg + '/' + version + '/' + mitDataset + '/' + file
141     #print ' using rfcp.... ' + cpy
142     #sys.exit(0)
143     else:
144     #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
145     cpy = 'lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
146     + mitCfg + '/' + version + '/' + mitDataset + '/' + file
147    
148     # Check whether the file size make sense (zero length files are probably not yet ready to
149     # copy and will not be transfered
150     if size < 1:
151     print ' WARNING - file size is <1b. Probably this file is not yet ready. Stop copy.'
152     else:
153     if debug == 1:
154     print ' Debug:: copy: ' + cpy
155     start = Seconds()
156     status = os.system(cpy)
157     end = Seconds()
158     deltaT = end - start
159    
160     return deltaT
161    
162     def StageFile(storagePath,storageUrl,file):
163     print ' staging in file: ' + file
164     if storageEle == 'srm-cms.cern.ch':
165     f = storagePath.split("=");
166     rfPath = f[-1]
167     stg = 'stager_get -M ' + rfPath + '/' + file
168     else:
169     #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
170     stg = 'echo lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
171     + mitCfg + '/' + version + '/' + mitDataset + '/' + file
172    
173     if debug == 1:
174     print ' Debug:: stage: ' + stg
175     status = os.system(stg)
176    
177     #===================================================================================================
178     # Main starts here
179     #===================================================================================================
180     # Define string to explain usage of the script
181     usage = "Usage: downloadSample.py --cmsDataset=<name> | --mitDataset=<name>\n"
182     usage += " --mitCfg=<name>\n"
183     usage += " --version=<version>\n"
184     usage += " --cmssw=<name>\n"
185     usage += " --localStorageUrl=<name>\n"
186     usage += " --localPath=<dir>\n"
187     usage += " --skip=<file list>\n"
188     usage += " --backward\n"
189     usage += " --debug\n"
190     usage += " --help\n"
191    
192     # Define the valid options which can be specified and check out the command line
193     valid = ['cmsDataset=','mitDataset=','mitCfg=','version=','cmssw=','pattern=','localStorageUrl=',
194     'localPath=','noCache','skip=',
195     'forceCopy','backward',
196     'debug','help']
197     try:
198     opts, args = getopt.getopt(sys.argv[1:], "", valid)
199     except getopt.GetoptError, ex:
200     print usage
201     print str(ex)
202     sys.exit(1)
203    
204     # --------------------------------------------------------------------------------------------------
205     # Get all parameters for the production
206     # --------------------------------------------------------------------------------------------------
207     # Set defaults for each option
208     cmsDataset = None
209     mitDataset = None
210     skip = ''
211     skipList = []
212     mitCfg = 'filler'
213     version = '012'
214     cmssw = ''
215     blockLocal = 0
216     localStorageUrl = ''
217     localPath = '/server/02b/mitprod'
218     pattern = ''
219     noCache = 0
220     backward = ''
221     forceCopy = False
222     debug = 0
223     cmsswCfg = 'cmssw.cfg'
224    
225     # Read new values from the command line
226     for opt, arg in opts:
227     if opt == '--help':
228     print usage
229     sys.exit(0)
230     if opt == '--cmsDataset':
231     cmsDataset = arg
232     if opt == '--mitDataset':
233     mitDataset = arg
234     if opt == '--mitCfg':
235     mitCfg = arg
236     if opt == '--version':
237     version = arg
238     if opt == '--cmssw':
239     cmssw = arg
240     if opt == '--pattern':
241     pattern = arg
242     if opt == '--localStorageUrl':
243     localStorageUrl = arg
244     if opt == '--localPath':
245     blockLocal = 1
246     localPath = arg
247     if opt == '--skip':
248     skip = arg
249     skipList = skip.split(',')
250     if opt == '--noCache':
251     noCache = 1
252     if opt == '--backward':
253     backward = ' -r '
254     if opt == '--forceCopy':
255     forceCopy = True
256     if opt == '--debug':
257     debug = 1
258    
259     # Deal with obvious problems
260     if cmsDataset == None and mitDataset == None:
261     cmd = '--cmsDataset option not provided. This is required.'
262     raise RuntimeError, cmd
263    
264     crabFile = mitCfg + '/' + version + '/' + 'crab.cfg'
265     if not os.path.exists(crabFile):
266     cmd = 'Crab file not found: %s' % crabFile
267     raise RuntimeError, cmd
268     cmsswFile = mitCfg + '/' + version + '/' + cmsswCfg
269     if not os.path.exists(cmsswFile):
270     cmd = 'Cmssw file not found: %s' % cmsswFile
271     cmsswCfg = 'cmssw.py'
272     cmsswFile = mitCfg + '/' + version + '/' + cmsswCfg
273     if not os.path.exists(cmsswFile):
274     cmd = 'Cmssw file not found: %s' % cmsswFile
275     cmd = ' XXXX ERROR no valid configuration found XXXX'
276     raise RuntimeError, cmd
277    
278     # Resolve the other mitCfg parameters from the configuration file
279     cmd = 'cat ' + mitCfg + '/' + version + '/' + 'Productions'
280     if cmssw != '':
281     cmd = cmd + '.' + cmssw
282    
283     join = 0
284     if cmsDataset == None:
285     cmsDataset = ''
286     else:
287     mitDataset = ''
288    
289     fullLine = ''
290     bSlash = '\\';
291     for line in os.popen(cmd).readlines(): # run command
292     line = line[:-1]
293     #print 'Line: "' + line + '"'
294     # get ride of empty or commented lines
295     if line == '' or line[0] == '#':
296     continue
297    
298     # join lines
299     if join == 1:
300     fullLine += line
301     else:
302     fullLine = line
303    
304     # determine if finished or more is coming
305     if fullLine[-1] == bSlash:
306     join = 1
307     fullLine = fullLine[:-1]
308     else:
309     join = 0
310     # test whether there is a directory
311     names = fullLine.split() # splitting every blank
312     #print "FullLine: " + fullLine
313     #print "Datasets: " + mitDataset + ' -> ' + cmsDataset + "\n"
314     if names[0] == cmsDataset:
315     mitDataset = names[1] # this is the equivalent MIT name of the dataset
316     nevents = int(names[2]) # number of events to be used in the production
317     if names[4] != "-" and blockLocal == 0:
318     localPath = names[4]
319     #print "\n Sample Info: " + fullLine + "\n"
320     #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
321     if names[1] == mitDataset:
322     cmsDataset = names[0] # this is the equivalent CMS name of the dataset
323     nevents = int(names[2]) # number of events to be used in the production
324     if names[4] != "-" and blockLocal == 0:
325     localPath = names[4]
326     #print "\n Sample Info: " + fullLine + "\n"
327     #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
328    
329     if mitDataset == "":
330     print "ERROR - dataset not defined."
331     sys.exit(0)
332    
333     #cmd = 'grep ' + cmsDataset + ' ' + mitCfg + '/' + version + '/' + 'Productions'
334     #for file in os.popen(cmd).readlines(): # run command
335     # line = file[:-1] # strip '\n'
336     # # test whether there is a directory
337     # names = line.split() # splitting every blank
338     # mitDataset = names[1] # this is the equivalent MIT name of the dataset
339     # nevents = int(names[2]) # number of events to be used in the production
340    
341     # Say what we do now
342     print '\n Preparing dataset for transfer: ' + cmsDataset + ' [MIT: ' + mitDataset + ']\n'
343    
344     # --------------------------------------------------------------------------------------------------
345     # Deal with storage element area
346     # --------------------------------------------------------------------------------------------------
347     pMitDset = re.compile('XX-MITDATASET-XX')
348     pMitCfg = re.compile('XX-MITCFG-XX')
349     pMitVers = re.compile('XX-MITVERSION-XX')
350     # find the forseen storage place
351     crabFile = mitCfg + '/' + version + '/' + 'crab.cfg'
352     cmd = 'grep ^storage_element ' + crabFile
353     for file in os.popen(cmd).readlines(): # run command
354     line = file[:-1] # strip '\n'
355     # decode the storage element name
356     names = line.split("=") # splitting every '='
357     storageEle = names.pop()
358     storageEle = re.sub("\s", "",storageEle)
359     # Compile search and replacement sequences just for the path
360     cmd = 'grep ^storage_path ' + crabFile
361     for file in os.popen(cmd).readlines(): # run command
362     line = file[:-1] # strip '\n'
363     line = pMitDset.sub(mitDataset,line);
364     line = pMitCfg .sub(mitCfg, line);
365     line = pMitVers.sub(version, line);
366     # decode the storage directory name
367     names = line.split("=") # splitting every '='
368     names = names[1:]
369     storagePath = "=".join(names)
370     storagePath = re.sub("\s", "",storagePath)
371     storageUrl = 'srm://' + storageEle + ':8443' + storagePath
372    
373     cmd = 'grep ^user_remote_dir ' + crabFile
374     for file in os.popen(cmd).readlines(): # run command
375     line = file[:-1] # strip '\n'
376     line = pMitDset.sub(mitDataset,line);
377     line = pMitCfg .sub(mitCfg, line);
378     line = pMitVers.sub(version, line);
379     # decode the storage directory name
380     names = line.split("=") # splitting every '='
381     names = names[1:]
382     userRemoteDir = "=".join(names)
383     userRemoteDir = re.sub("\s","",userRemoteDir)
384     userRemoteDir = re.sub("/XX-CRABID-XX","",userRemoteDir)
385    
386     if userRemoteDir != '':
387     storagePath += userRemoteDir
388     storageUrl += userRemoteDir
389    
390     if localStorageUrl != '':
391     storageEle = ''
392     storagePath = ''
393     storageUrl = localStorageUrl
394    
395     print ' --> StorageUrl: ' + storageUrl
396    
397     #---------------------------------------------------------------------------------------------------
398     # create the local storage area
399     #---------------------------------------------------------------------------------------------------
400     print ' Make local path: ' + localPath
401     localDir = localPath + '/' + mitCfg + '/' + version + '/' + mitDataset
402     mkd = 'mkdir -p ' + localDir
403     status = os.system(mkd)
404    
405     if status != 0:
406     print ' ERROR - could not create local directory ' + localDir
407     sys.exit(1)
408    
409     print ' --> LocalDir: ' + localDir
410    
411     cmd = 'df --block-size=1 ' + localDir + ' | tr -s \' \' | tail -1'
412     for line in os.popen(cmd).readlines(): # run command
413     line = line.strip()
414     f = line.split(" ")
415     if line[0:0] == '/' or line[0:4] == 'fuse':
416     free = int(f[3])
417     else:
418     free = int(f[2])
419    
420     #---------------------------------------------------------------------------------------------------
421     # create a list af all files to be copied
422     #---------------------------------------------------------------------------------------------------
423     cmd = ''
424     f = storagePath.split('=')
425     path = f.pop()
426     cmd = 'list ' + path + ' | grep root | sort ' + backward
427    
428     ##if storageEle == 'srm.cern.ch' or storageEle == 'srm-cms.cern.ch':
429     ## cmd = 'rfdir ' + path + ' | grep root | tr -s \' \' | sort ' + backward
430     ##else:
431     ## cmd = 'list ' + path + ' | grep root | sort ' + backward
432     ## #cmd = 'srmls ' + storageUrl + ' | grep root | sort ' + backward
433    
434     if pattern != "":
435     cmd += ' | grep ' + pattern
436    
437     print ' Find file: ' + cmd
438     cacheFile = '/tmp/.cache_' + mitDataset
439     allFileList = BuildFileList(cmd)
440     stagedFileList = BuildStagedFileList(storagePath,allFileList,cacheFile)
441     cacheStaged = CacheStagedFileList(cacheFile,storagePath,stagedFileList)
442    
443     #cmd = 'find ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + \
444     # ' -maxdepth 1 -type f -printf "%s %f\n"'
445     print 'List: ' + cmd
446     cmd = 'list ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + ' | grep root'
447     doneFileList = BuildFileList(cmd)
448    
449     #---------------------------------------------------------------------------------------------------
450     # go through the lists: first check files are consistent, then copy the remaining files
451     #---------------------------------------------------------------------------------------------------
452     # initialize data volumes
453     b2G = 1.0/(1024.*1024.*1024)
454     nTotal = 0
455     totalDataVolume = 0
456     nDone = 0
457     doneDataVolume = 0
458    
459     for file, size in allFileList.iteritems():
460     nTotal += 1
461     totalDataVolume += size
462     if (file in doneFileList) and (doneFileList[file] == size):
463     nDone += 1
464     doneDataVolume += size
465    
466     print ' '
467     print ' Summary of data volume\n'
468     print ' --> number of files to copy: %8d (total: %d) '%(nTotal-nDone,nTotal)
469     print ' --> volume to copy [GB]: %8.2f (total: %.2f) '%(b2G*(totalDataVolume-doneDataVolume), \
470     b2G*totalDataVolume)
471     print ' --> free volume [GB]: %8.2f '%(b2G*free)
472     print ' '
473    
474     if free*0.85 < (totalDataVolume-doneDataVolume):
475     print ' ERROR - probably no enough space on volume. See above (some safety assumed)!'
476     sys.exit(1)
477    
478     for file, size in doneFileList.iteritems():
479     if file in allFileList:
480     #print ' --> file is done: ' + file
481     if allFileList[file] != size:
482     print ' ERROR - file sizes did not match: ' + file + \
483     ' [ local: %10d, remote: %10d ]'%(size,allFileList[file])
484     sys.exit(1)
485     else:
486     print ' ERROR - file from done list is not in the all files list. File: ' + file
487     sys.exit(1)
488    
489     totalSizeMb = 0.
490     totalTimeSc = 0.
491     for file, size in allFileList.iteritems():
492     if debug == 1:
493     print ' Debug:: ' + file + ' -> size %d'%size
494    
495     totalDataVolume += size
496     if file in doneFileList:
497     print ' --> done, size match: %10d - %s'%(size,file)
498     doneDataVolume = +size
499     else:
500     if not InSkipList(file,skipList):
501     print ' --> copying file: %10d - %s (castor stat: %s)'% \
502     (size,file,stagedFileList[file])
503     if stagedFileList[file] == "STAGED" or forceCopy:
504    
505     sizeMb = size/1024./1024.
506     deltaT = CopyFile(storageEle,storagePath,storageUrl,file,localDir)
507     if deltaT > 0:
508     print ' time required [sec]: %7d rate [MB/sec]: %9.3f'%\
509     (deltaT,sizeMb/deltaT)
510     else:
511     print ' time required [sec]: %7d rate [MB/sec]: ?'%(deltaT)
512     totalTimeSc += deltaT
513     totalSizeMb += sizeMb
514     else:
515     print ' skipping file: %s'%(stagedFileList[file])
516     StageFile(storagePath,storageUrl,file)
517    
518     else:
519     print ' --> skipping file: %10d - %s'%(size,file)
520    
521     print ''
522     if totalTimeSc > 0:
523     print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] %9.3f'%\
524     (totalSizeMb/1024.,totalTimeSc,totalSizeMb/totalTimeSc)
525     else:
526     print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] ?'%\
527     (totalSizeMb/1024.,totalTimeSc)
528     print ''