ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/downloadSample.py
Revision: 1.2
Committed: Sat Jun 5 02:36:28 2010 UTC (14 years, 11 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_014a
Changes since 1.1: +528 -0 lines
Log Message:
Wow I forgot all about my cvs.

File Contents

# Content
1 #!/usr/bin/env python
2 #---------------------------------------------------------------------------------------------------
3 # Script to automatically download a MIT dataset to our local cluster
4 #
5 # The download of the MIT dataset is organized in accordance with the dataset production logic. In
6 # general it is allowed to download the dataset from any location of a properly configured storage
7 # element. The script will do all most obvious tests to ensure efficient and safe download. For
8 # performance reason a checksum is not calculated. This ommission is considered completely safe as
9 # failures will be identified in the analysis phase and the rare occasions will be more effective to
10 # fix by hand.
11 #
12 # At present the download proceeds in one thread (one file at a time) which for performance reasons
13 # might not be optimal.
14 #
15 # Author: C.Paus (July 1, 2008)
16 #---------------------------------------------------------------------------------------------------
17 # Missing but desired features:
18 # + accounting of size of each file
19 # + accounting of locally available files (avoid copying already existing files)
20 # + determine full list of files before starting to copy
21 # + minimal success check of the copy
22 # + calculate total data volume (to copy, already copied etc.)
23 # + add feature to check the castor status
24 # - add time estimates and progressions for copies
25 # - multi downloads to enhance performance
26 #---------------------------------------------------------------------------------------------------
27 import os,sys,getopt,re,string
28
29 def Seconds():
30 for secs in os.popen('date +%s').readlines():
31 secs = int(secs[:-1])
32 return secs
33
34 def InSkipList(file,list):
35 for entry in list:
36 if entry == file:
37 return True
38 return False
39
40 def DecodeSrmLs(line):
41 line = line.strip()
42 f = line.split(" ")
43 size = f[0]
44 f = f[1].split("/")
45 file = f.pop()
46 list = [ size, file ]
47 return list
48
49 def DecodeRfDir(line):
50 line = line.strip()
51 f = line.split(" ")
52 file = f.pop()
53 size = f[4]
54 list = [ size, file ]
55 return list
56
57 def BuildFileList(cmd):
58 isSrm = 1
59 f = cmd.split(" ")
60 if f[0] == 'rfdir':
61 isSrm = 0
62
63 fileList = {}
64
65 if debug == 1:
66 print ' Debug:: list: ' + cmd
67 for line in os.popen(cmd).readlines(): # run command
68 line = line[:-1] # strip '\n'
69 #print ' Line: ' + line
70 f = line.split(" ")
71 ##if isSrm == 1:
72 ## f = DecodeSrmLs(line)
73 ##else:
74 ## f = DecodeRfDir(line)
75 size = f[0]
76 file = f[1]
77 f = file.split("/")
78 file = f[-1]
79 if debug == 1:
80 print ' Debug:: adding: ' + file + ' with size ' + size
81 fileList[file] = int(size)
82
83 return fileList
84
85 def BuildStagedFileList(storagePath,allFileList,cacheFile):
86
87 # initialize the basics
88 fileList = {}
89 f = storagePath.split("=");
90 rfPath = f[-1]
91
92 # if this is not castor, trick it and mark them as staged
93 if not (re.search('/castor/',rfPath)):
94 for file, size in allFileList.iteritems():
95 fileList[file] = 'STAGED'
96 return fileList
97
98 # here we deal with castor
99 if debug == 1:
100 print ' Debug:: rfpath: ' + rfPath
101 if os.path.exists(cacheFile) and noCache == 0:
102 print ' Using the cached stager queries at ' + cacheFile
103 for file, size in allFileList.iteritems():
104 fullFile = rfPath + '/' + file
105 if debug == 1:
106 print ' Debug:: full file name: ' + fullFile
107 if os.path.exists(cacheFile) and noCache == 0:
108 cmd = 'grep ' + file + ' ' + cacheFile
109 else:
110 cmd = 'stager_qry -M ' + fullFile
111 fileList[file] = 'undefined'
112 for line in os.popen(cmd).readlines(): # run command
113 line = line[:-1]
114 f = line.split(" ")
115 if f[0] == fullFile:
116 f = line.split(" ")
117 status = f[-1]
118 fileList[file] = status
119
120 return fileList
121
122 def CacheStagedFileList(cacheFile,storagePath,stagedFileList):
123 print ' Caching stager query status to ' + cacheFile
124 f = storagePath.split("=");
125 rfPath = f[-1]
126 fileOutput = open(cacheFile,'w')
127 for file, status in stagedFileList.iteritems():
128 line = rfPath + '/' + file + ' xyz@castorns ' + status + '\n'
129 fileOutput.write(line)
130 fileOutput.close()
131
132 def CopyFile(storageEle,storagePath,storageUrl,file,localDir):
133 deltaT = 0
134 print ' working on file: ' + file + ' to ' + localDir + \
135 ' (size: %d MB) '%(int(size)/1024/1024)
136 if storageEle == 'srm-cms.cern.ch':
137 f = storagePath.split("=");
138 rfPath = f[-1]
139 cpy = 'rfcp ' + rfPath + '/' + file + ' ' + localPath + '/' \
140 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
141 #print ' using rfcp.... ' + cpy
142 #sys.exit(0)
143 else:
144 #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
145 cpy = 'lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
146 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
147
148 # Check whether the file size make sense (zero length files are probably not yet ready to
149 # copy and will not be transfered
150 if size < 1:
151 print ' WARNING - file size is <1b. Probably this file is not yet ready. Stop copy.'
152 else:
153 if debug == 1:
154 print ' Debug:: copy: ' + cpy
155 start = Seconds()
156 status = os.system(cpy)
157 end = Seconds()
158 deltaT = end - start
159
160 return deltaT
161
162 def StageFile(storagePath,storageUrl,file):
163 print ' staging in file: ' + file
164 if storageEle == 'srm-cms.cern.ch':
165 f = storagePath.split("=");
166 rfPath = f[-1]
167 stg = 'stager_get -M ' + rfPath + '/' + file
168 else:
169 #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
170 stg = 'echo lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
171 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
172
173 if debug == 1:
174 print ' Debug:: stage: ' + stg
175 status = os.system(stg)
176
177 #===================================================================================================
178 # Main starts here
179 #===================================================================================================
180 # Define string to explain usage of the script
181 usage = "Usage: downloadSample.py --cmsDataset=<name> | --mitDataset=<name>\n"
182 usage += " --mitCfg=<name>\n"
183 usage += " --version=<version>\n"
184 usage += " --cmssw=<name>\n"
185 usage += " --localStorageUrl=<name>\n"
186 usage += " --localPath=<dir>\n"
187 usage += " --skip=<file list>\n"
188 usage += " --backward\n"
189 usage += " --debug\n"
190 usage += " --help\n"
191
192 # Define the valid options which can be specified and check out the command line
193 valid = ['cmsDataset=','mitDataset=','mitCfg=','version=','cmssw=','pattern=','localStorageUrl=',
194 'localPath=','noCache','skip=',
195 'forceCopy','backward',
196 'debug','help']
197 try:
198 opts, args = getopt.getopt(sys.argv[1:], "", valid)
199 except getopt.GetoptError, ex:
200 print usage
201 print str(ex)
202 sys.exit(1)
203
204 # --------------------------------------------------------------------------------------------------
205 # Get all parameters for the production
206 # --------------------------------------------------------------------------------------------------
207 # Set defaults for each option
208 cmsDataset = None
209 mitDataset = None
210 skip = ''
211 skipList = []
212 mitCfg = 'filler'
213 version = '012'
214 cmssw = ''
215 blockLocal = 0
216 localStorageUrl = ''
217 localPath = '/server/02b/mitprod'
218 pattern = ''
219 noCache = 0
220 backward = ''
221 forceCopy = False
222 debug = 0
223 cmsswCfg = 'cmssw.cfg'
224
225 # Read new values from the command line
226 for opt, arg in opts:
227 if opt == '--help':
228 print usage
229 sys.exit(0)
230 if opt == '--cmsDataset':
231 cmsDataset = arg
232 if opt == '--mitDataset':
233 mitDataset = arg
234 if opt == '--mitCfg':
235 mitCfg = arg
236 if opt == '--version':
237 version = arg
238 if opt == '--cmssw':
239 cmssw = arg
240 if opt == '--pattern':
241 pattern = arg
242 if opt == '--localStorageUrl':
243 localStorageUrl = arg
244 if opt == '--localPath':
245 blockLocal = 1
246 localPath = arg
247 if opt == '--skip':
248 skip = arg
249 skipList = skip.split(',')
250 if opt == '--noCache':
251 noCache = 1
252 if opt == '--backward':
253 backward = ' -r '
254 if opt == '--forceCopy':
255 forceCopy = True
256 if opt == '--debug':
257 debug = 1
258
259 # Deal with obvious problems
260 if cmsDataset == None and mitDataset == None:
261 cmd = '--cmsDataset option not provided. This is required.'
262 raise RuntimeError, cmd
263
264 crabFile = mitCfg + '/' + version + '/' + 'crab.cfg'
265 if not os.path.exists(crabFile):
266 cmd = 'Crab file not found: %s' % crabFile
267 raise RuntimeError, cmd
268 cmsswFile = mitCfg + '/' + version + '/' + cmsswCfg
269 if not os.path.exists(cmsswFile):
270 cmd = 'Cmssw file not found: %s' % cmsswFile
271 cmsswCfg = 'cmssw.py'
272 cmsswFile = mitCfg + '/' + version + '/' + cmsswCfg
273 if not os.path.exists(cmsswFile):
274 cmd = 'Cmssw file not found: %s' % cmsswFile
275 cmd = ' XXXX ERROR no valid configuration found XXXX'
276 raise RuntimeError, cmd
277
278 # Resolve the other mitCfg parameters from the configuration file
279 cmd = 'cat ' + mitCfg + '/' + version + '/' + 'Productions'
280 if cmssw != '':
281 cmd = cmd + '.' + cmssw
282
283 join = 0
284 if cmsDataset == None:
285 cmsDataset = ''
286 else:
287 mitDataset = ''
288
289 fullLine = ''
290 bSlash = '\\';
291 for line in os.popen(cmd).readlines(): # run command
292 line = line[:-1]
293 #print 'Line: "' + line + '"'
294 # get ride of empty or commented lines
295 if line == '' or line[0] == '#':
296 continue
297
298 # join lines
299 if join == 1:
300 fullLine += line
301 else:
302 fullLine = line
303
304 # determine if finished or more is coming
305 if fullLine[-1] == bSlash:
306 join = 1
307 fullLine = fullLine[:-1]
308 else:
309 join = 0
310 # test whether there is a directory
311 names = fullLine.split() # splitting every blank
312 #print "FullLine: " + fullLine
313 #print "Datasets: " + mitDataset + ' -> ' + cmsDataset + "\n"
314 if names[0] == cmsDataset:
315 mitDataset = names[1] # this is the equivalent MIT name of the dataset
316 nevents = int(names[2]) # number of events to be used in the production
317 if names[4] != "-" and blockLocal == 0:
318 localPath = names[4]
319 #print "\n Sample Info: " + fullLine + "\n"
320 #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
321 if names[1] == mitDataset:
322 cmsDataset = names[0] # this is the equivalent CMS name of the dataset
323 nevents = int(names[2]) # number of events to be used in the production
324 if names[4] != "-" and blockLocal == 0:
325 localPath = names[4]
326 #print "\n Sample Info: " + fullLine + "\n"
327 #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
328
329 if mitDataset == "":
330 print "ERROR - dataset not defined."
331 sys.exit(0)
332
333 #cmd = 'grep ' + cmsDataset + ' ' + mitCfg + '/' + version + '/' + 'Productions'
334 #for file in os.popen(cmd).readlines(): # run command
335 # line = file[:-1] # strip '\n'
336 # # test whether there is a directory
337 # names = line.split() # splitting every blank
338 # mitDataset = names[1] # this is the equivalent MIT name of the dataset
339 # nevents = int(names[2]) # number of events to be used in the production
340
341 # Say what we do now
342 print '\n Preparing dataset for transfer: ' + cmsDataset + ' [MIT: ' + mitDataset + ']\n'
343
344 # --------------------------------------------------------------------------------------------------
345 # Deal with storage element area
346 # --------------------------------------------------------------------------------------------------
347 pMitDset = re.compile('XX-MITDATASET-XX')
348 pMitCfg = re.compile('XX-MITCFG-XX')
349 pMitVers = re.compile('XX-MITVERSION-XX')
350 # find the forseen storage place
351 crabFile = mitCfg + '/' + version + '/' + 'crab.cfg'
352 cmd = 'grep ^storage_element ' + crabFile
353 for file in os.popen(cmd).readlines(): # run command
354 line = file[:-1] # strip '\n'
355 # decode the storage element name
356 names = line.split("=") # splitting every '='
357 storageEle = names.pop()
358 storageEle = re.sub("\s", "",storageEle)
359 # Compile search and replacement sequences just for the path
360 cmd = 'grep ^storage_path ' + crabFile
361 for file in os.popen(cmd).readlines(): # run command
362 line = file[:-1] # strip '\n'
363 line = pMitDset.sub(mitDataset,line);
364 line = pMitCfg .sub(mitCfg, line);
365 line = pMitVers.sub(version, line);
366 # decode the storage directory name
367 names = line.split("=") # splitting every '='
368 names = names[1:]
369 storagePath = "=".join(names)
370 storagePath = re.sub("\s", "",storagePath)
371 storageUrl = 'srm://' + storageEle + ':8443' + storagePath
372
373 cmd = 'grep ^user_remote_dir ' + crabFile
374 for file in os.popen(cmd).readlines(): # run command
375 line = file[:-1] # strip '\n'
376 line = pMitDset.sub(mitDataset,line);
377 line = pMitCfg .sub(mitCfg, line);
378 line = pMitVers.sub(version, line);
379 # decode the storage directory name
380 names = line.split("=") # splitting every '='
381 names = names[1:]
382 userRemoteDir = "=".join(names)
383 userRemoteDir = re.sub("\s","",userRemoteDir)
384 userRemoteDir = re.sub("/XX-CRABID-XX","",userRemoteDir)
385
386 if userRemoteDir != '':
387 storagePath += userRemoteDir
388 storageUrl += userRemoteDir
389
390 if localStorageUrl != '':
391 storageEle = ''
392 storagePath = ''
393 storageUrl = localStorageUrl
394
395 print ' --> StorageUrl: ' + storageUrl
396
397 #---------------------------------------------------------------------------------------------------
398 # create the local storage area
399 #---------------------------------------------------------------------------------------------------
400 print ' Make local path: ' + localPath
401 localDir = localPath + '/' + mitCfg + '/' + version + '/' + mitDataset
402 mkd = 'mkdir -p ' + localDir
403 status = os.system(mkd)
404
405 if status != 0:
406 print ' ERROR - could not create local directory ' + localDir
407 sys.exit(1)
408
409 print ' --> LocalDir: ' + localDir
410
411 cmd = 'df --block-size=1 ' + localDir + ' | tr -s \' \' | tail -1'
412 for line in os.popen(cmd).readlines(): # run command
413 line = line.strip()
414 f = line.split(" ")
415 if line[0:0] == '/' or line[0:4] == 'fuse':
416 free = int(f[3])
417 else:
418 free = int(f[2])
419
420 #---------------------------------------------------------------------------------------------------
421 # create a list af all files to be copied
422 #---------------------------------------------------------------------------------------------------
423 cmd = ''
424 f = storagePath.split('=')
425 path = f.pop()
426 cmd = 'list ' + path + ' | grep root | sort ' + backward
427
428 ##if storageEle == 'srm.cern.ch' or storageEle == 'srm-cms.cern.ch':
429 ## cmd = 'rfdir ' + path + ' | grep root | tr -s \' \' | sort ' + backward
430 ##else:
431 ## cmd = 'list ' + path + ' | grep root | sort ' + backward
432 ## #cmd = 'srmls ' + storageUrl + ' | grep root | sort ' + backward
433
434 if pattern != "":
435 cmd += ' | grep ' + pattern
436
437 print ' Find file: ' + cmd
438 cacheFile = '/tmp/.cache_' + mitDataset
439 allFileList = BuildFileList(cmd)
440 stagedFileList = BuildStagedFileList(storagePath,allFileList,cacheFile)
441 cacheStaged = CacheStagedFileList(cacheFile,storagePath,stagedFileList)
442
443 #cmd = 'find ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + \
444 # ' -maxdepth 1 -type f -printf "%s %f\n"'
445 print 'List: ' + cmd
446 cmd = 'list ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + ' | grep root'
447 doneFileList = BuildFileList(cmd)
448
449 #---------------------------------------------------------------------------------------------------
450 # go through the lists: first check files are consistent, then copy the remaining files
451 #---------------------------------------------------------------------------------------------------
452 # initialize data volumes
453 b2G = 1.0/(1024.*1024.*1024)
454 nTotal = 0
455 totalDataVolume = 0
456 nDone = 0
457 doneDataVolume = 0
458
459 for file, size in allFileList.iteritems():
460 nTotal += 1
461 totalDataVolume += size
462 if (file in doneFileList) and (doneFileList[file] == size):
463 nDone += 1
464 doneDataVolume += size
465
466 print ' '
467 print ' Summary of data volume\n'
468 print ' --> number of files to copy: %8d (total: %d) '%(nTotal-nDone,nTotal)
469 print ' --> volume to copy [GB]: %8.2f (total: %.2f) '%(b2G*(totalDataVolume-doneDataVolume), \
470 b2G*totalDataVolume)
471 print ' --> free volume [GB]: %8.2f '%(b2G*free)
472 print ' '
473
474 if free*0.85 < (totalDataVolume-doneDataVolume):
475 print ' ERROR - probably no enough space on volume. See above (some safety assumed)!'
476 sys.exit(1)
477
478 for file, size in doneFileList.iteritems():
479 if file in allFileList:
480 #print ' --> file is done: ' + file
481 if allFileList[file] != size:
482 print ' ERROR - file sizes did not match: ' + file + \
483 ' [ local: %10d, remote: %10d ]'%(size,allFileList[file])
484 sys.exit(1)
485 else:
486 print ' ERROR - file from done list is not in the all files list. File: ' + file
487 sys.exit(1)
488
489 totalSizeMb = 0.
490 totalTimeSc = 0.
491 for file, size in allFileList.iteritems():
492 if debug == 1:
493 print ' Debug:: ' + file + ' -> size %d'%size
494
495 totalDataVolume += size
496 if file in doneFileList:
497 print ' --> done, size match: %10d - %s'%(size,file)
498 doneDataVolume = +size
499 else:
500 if not InSkipList(file,skipList):
501 print ' --> copying file: %10d - %s (castor stat: %s)'% \
502 (size,file,stagedFileList[file])
503 if stagedFileList[file] == "STAGED" or forceCopy:
504
505 sizeMb = size/1024./1024.
506 deltaT = CopyFile(storageEle,storagePath,storageUrl,file,localDir)
507 if deltaT > 0:
508 print ' time required [sec]: %7d rate [MB/sec]: %9.3f'%\
509 (deltaT,sizeMb/deltaT)
510 else:
511 print ' time required [sec]: %7d rate [MB/sec]: ?'%(deltaT)
512 totalTimeSc += deltaT
513 totalSizeMb += sizeMb
514 else:
515 print ' skipping file: %s'%(stagedFileList[file])
516 StageFile(storagePath,storageUrl,file)
517
518 else:
519 print ' --> skipping file: %10d - %s'%(size,file)
520
521 print ''
522 if totalTimeSc > 0:
523 print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] %9.3f'%\
524 (totalSizeMb/1024.,totalTimeSc,totalSizeMb/totalTimeSc)
525 else:
526 print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] ?'%\
527 (totalSizeMb/1024.,totalTimeSc)
528 print ''