ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/downloadSample.py
Revision: 1.4
Committed: Fri Jul 30 18:41:11 2010 UTC (14 years, 9 months ago) by paus
Content type: text/x-python
Branch: MAIN
CVS Tags: Mit_017pre3, Mit_017pre2, Mit_017pre1, Mit_016, Mit_015b, Mit_015a, Mit_015, Mit_014e, Mit_014d, Mit_014c
Changes since 1.3: +16 -6 lines
Log Message:
Cleaned up and updated version.

File Contents

# Content
1 #!/usr/bin/env python
2 #---------------------------------------------------------------------------------------------------
3 # Script to automatically download a MIT dataset to our local cluster
4 #
5 # The download of the MIT dataset is organized in accordance with the dataset production logic. In
6 # general it is allowed to download the dataset from any location of a properly configured storage
7 # element. The script will do all most obvious tests to ensure efficient and safe download. For
8 # performance reason a checksum is not calculated. This ommission is considered completely safe as
9 # failures will be identified in the analysis phase and the rare occasions will be more effective to
10 # fix by hand.
11 #
12 # At present the download proceeds in one thread (one file at a time) which for performance reasons
13 # might not be optimal.
14 #
15 # Author: C.Paus (July 1, 2008)
16 #---------------------------------------------------------------------------------------------------
17 # Missing but desired features:
18 # + accounting of size of each file
19 # + accounting of locally available files (avoid copying already existing files)
20 # + determine full list of files before starting to copy
21 # + minimal success check of the copy
22 # + calculate total data volume (to copy, already copied etc.)
23 # + add feature to check the castor status
24 # - add time estimates and progressions for copies
25 # - multi downloads to enhance performance
26 #---------------------------------------------------------------------------------------------------
27 import os,sys,getopt,re,string
28
29 def Seconds():
30 for secs in os.popen('date +%s').readlines():
31 secs = int(secs[:-1])
32 return secs
33
34 def InSkipList(file,list):
35 for entry in list:
36 if entry == file:
37 return True
38 return False
39
40 def DecodeSrmLs(line):
41 line = line.strip()
42 f = line.split(" ")
43 size = f[0]
44 f = f[1].split("/")
45 file = f.pop()
46 list = [ size, file ]
47 return list
48
49 def DecodeRfDir(line):
50 line = line.strip()
51 f = line.split(" ")
52 file = f.pop()
53 size = f[4]
54 list = [ size, file ]
55 return list
56
57 def BuildFileList(cmd):
58 isSrm = 1
59 f = cmd.split(" ")
60 if f[0] == 'rfdir':
61 isSrm = 0
62
63 fileList = {}
64
65 if debug == 1:
66 print ' Debug:: list: ' + cmd
67 for line in os.popen(cmd).readlines(): # run command
68 line = line[:-1] # strip '\n'
69 #print ' Line: ' + line
70 f = line.split(" ")
71 ##if isSrm == 1:
72 ## f = DecodeSrmLs(line)
73 ##else:
74 ## f = DecodeRfDir(line)
75 size = f[0]
76 file = f[1]
77 f = file.split("/")
78 file = f[-1]
79 if debug == 1:
80 print ' Debug:: adding: ' + file + ' with size ' + size
81 fileList[file] = int(size)
82
83 return fileList
84
85 def BuildStagedFileList(storagePath,allFileList,cacheFile):
86
87 # initialize the basics
88 fileList = {}
89 f = storagePath.split("=");
90 rfPath = f[-1]
91
92 # if this is not castor, trick it and mark them as staged
93 if not (re.search('/castor/',rfPath)):
94 for file, size in allFileList.iteritems():
95 fileList[file] = 'STAGED'
96 return fileList
97
98 # here we deal with castor
99 if debug == 1:
100 print ' Debug:: rfpath: ' + rfPath
101 if os.path.exists(cacheFile) and noCache == 0:
102 print ' Using the cached stager queries at ' + cacheFile
103 for file, size in allFileList.iteritems():
104 fullFile = rfPath + '/' + file
105 if debug == 1:
106 print ' Debug:: full file name: ' + fullFile
107 if os.path.exists(cacheFile) and noCache == 0:
108 cmd = 'grep ' + file + ' ' + cacheFile
109 else:
110 cmd = 'stager_qry -M ' + fullFile
111 fileList[file] = 'undefined'
112 for line in os.popen(cmd).readlines(): # run command
113 line = line[:-1]
114 f = line.split(" ")
115 if f[0] == fullFile:
116 f = line.split(" ")
117 status = f[-1]
118 fileList[file] = status
119
120 return fileList
121
122 def CacheStagedFileList(cacheFile,storagePath,stagedFileList):
123 print ' Caching stager query status to ' + cacheFile
124 f = storagePath.split("=");
125 rfPath = f[-1]
126 fileOutput = open(cacheFile,'w')
127 for file, status in stagedFileList.iteritems():
128 line = rfPath + '/' + file + ' xyz@castorns ' + status + '\n'
129 fileOutput.write(line)
130 fileOutput.close()
131
132 def CopyFile(storageEle,storagePath,storageUrl,file,localDir):
133 deltaT = 0
134 print ' working on file: ' + file + ' to ' + localDir + \
135 ' (size: %d MB) '%(int(size)/1024/1024)
136 if storageEle == 'srm-cms.cern.ch':
137 f = storagePath.split("=");
138 rfPath = f[-1]
139 cpy = 'rfcp ' + rfPath + '/' + file + ' ' + localPath + '/' \
140 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
141 #print ' using rfcp.... ' + cpy
142 #sys.exit(0)
143 elif storageEle == 'se01.cmsaf.mit.edu':
144 f = storagePath.split("=");
145 rfPath = f[-1]
146 #cpy = 'scp paus@cgate.mit.edu:' + rfPath + '/' + file + ' ' + localPath + '/' \
147 # + mitCfg + '/' + version + '/' + mitDataset + '/' + file
148 cpy = 'dccp dcap://t2srv0005.cmsaf.mit.edu/' \
149 + rfPath + '/' + file + ' ' + localPath + '/' \
150 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
151 #print ' using rfcp.... ' + cpy
152 #sys.exit(0)
153 else:
154 #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
155 cpy = 'lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
156 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
157
158 # Check whether the file size make sense (zero length files are probably not yet ready to
159 # copy and will not be transfered
160 if size < 1:
161 print ' WARNING - file size is <1b. Probably this file is not yet ready. Stop copy.'
162 else:
163 if debug == 1:
164 print ' Debug:: copy: ' + cpy
165 start = Seconds()
166 status = os.system(cpy)
167 end = Seconds()
168 deltaT = end - start
169
170 return deltaT
171
172 def StageFile(storagePath,storageUrl,file):
173 print ' staging in file: ' + file
174 if storageEle == 'srm-cms.cern.ch':
175 f = storagePath.split("=");
176 rfPath = f[-1]
177 stg = 'stager_get -M ' + rfPath + '/' + file
178 else:
179 #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
180 stg = 'echo lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
181 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
182
183 if debug == 1:
184 print ' Debug:: stage: ' + stg
185 status = os.system(stg)
186
187 #===================================================================================================
188 # Main starts here
189 #===================================================================================================
190 # Define string to explain usage of the script
191 usage = "Usage: downloadSample.py --cmsDataset=<name> | --mitDataset=<name>\n"
192 usage += " --mitCfg=<name>\n"
193 usage += " --version=<version>\n"
194 usage += " --cmssw=<name>\n"
195 usage += " --localStorageUrl=<name>\n"
196 usage += " --localPath=<dir>\n"
197 usage += " --skip=<file list>\n"
198 usage += " --backward\n"
199 usage += " --debug\n"
200 usage += " --help\n"
201
202 # Define the valid options which can be specified and check out the command line
203 valid = ['cmsDataset=','mitDataset=','mitCfg=','version=','cmssw=','pattern=','localStorageUrl=',
204 'localPath=','noCache','skip=',
205 'forceCopy','backward',
206 'debug','help']
207 try:
208 opts, args = getopt.getopt(sys.argv[1:], "", valid)
209 except getopt.GetoptError, ex:
210 print usage
211 print str(ex)
212 sys.exit(1)
213
214 # --------------------------------------------------------------------------------------------------
215 # Get all parameters for the production
216 # --------------------------------------------------------------------------------------------------
217 # Set defaults for each option
218 cmsDataset = None
219 mitDataset = None
220 skip = ''
221 skipList = []
222 mitCfg = 'filefi'
223 version = '014'
224 cmssw = ''
225 blockLocal = 0
226 localStorageUrl = ''
227 localPath = '/server/02b/mitprod'
228 pattern = ''
229 noCache = 0
230 backward = ''
231 forceCopy = False
232 debug = 0
233 cmsswCfg = 'cmssw.cfg'
234
235 # Read new values from the command line
236 for opt, arg in opts:
237 if opt == '--help':
238 print usage
239 sys.exit(0)
240 if opt == '--cmsDataset':
241 cmsDataset = arg
242 if opt == '--mitDataset':
243 mitDataset = arg
244 if opt == '--mitCfg':
245 mitCfg = arg
246 if opt == '--version':
247 version = arg
248 if opt == '--cmssw':
249 cmssw = arg
250 if opt == '--pattern':
251 pattern = arg
252 if opt == '--localStorageUrl':
253 localStorageUrl = arg
254 if opt == '--localPath':
255 blockLocal = 1
256 localPath = arg
257 if opt == '--skip':
258 skip = arg
259 skipList = skip.split(',')
260 if opt == '--noCache':
261 noCache = 1
262 if opt == '--backward':
263 backward = ' -r '
264 if opt == '--forceCopy':
265 forceCopy = True
266 if opt == '--debug':
267 debug = 1
268
269 # Deal with obvious problems
270 if cmsDataset == None and mitDataset == None:
271 cmd = '--cmsDataset option not provided. This is required.'
272 raise RuntimeError, cmd
273
274 crabFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'crab.cfg'
275 if not os.path.exists(crabFile):
276 cmd = 'Crab file not found: %s' % crabFile
277 raise RuntimeError, cmd
278 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + cmsswCfg
279 if not os.path.exists(cmsswFile):
280 cmd = 'Cmssw file not found: %s' % cmsswFile
281 cmsswCfg = 'cmssw.py'
282 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + cmsswCfg
283 if not os.path.exists(cmsswFile):
284 cmd = 'Cmssw file not found: %s' % cmsswFile
285 cmd = ' XXXX ERROR no valid configuration found XXXX'
286 raise RuntimeError, cmd
287
288 # Resolve the other mitCfg parameters from the configuration file
289 cmd = 'cat ' + os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'Productions'
290 if cmssw != '':
291 cmd = cmd + '.' + cmssw
292
293 join = 0
294 if cmsDataset == None:
295 cmsDataset = ''
296 else:
297 mitDataset = ''
298
299 fullLine = ''
300 bSlash = '\\';
301 for line in os.popen(cmd).readlines(): # run command
302 line = line[:-1]
303 #print 'Line: "' + line + '"'
304 # get ride of empty or commented lines
305 if line == '' or line[0] == '#':
306 continue
307
308 # join lines
309 if join == 1:
310 fullLine += line
311 else:
312 fullLine = line
313
314 # determine if finished or more is coming
315 if fullLine[-1] == bSlash:
316 join = 1
317 fullLine = fullLine[:-1]
318 else:
319 join = 0
320 # test whether there is a directory
321 names = fullLine.split() # splitting every blank
322 #print "FullLine: " + fullLine
323 #print "Datasets: " + mitDataset + ' -> ' + cmsDataset + "\n"
324 if names[0] == cmsDataset:
325 mitDataset = names[1] # this is the equivalent MIT name of the dataset
326 nevents = int(names[2]) # number of events to be used in the production
327 if names[4] != "-" and blockLocal == 0:
328 localPath = names[4]
329 #print "\n Sample Info: " + fullLine + "\n"
330 #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
331 if names[1] == mitDataset:
332 cmsDataset = names[0] # this is the equivalent CMS name of the dataset
333 nevents = int(names[2]) # number of events to be used in the production
334 if names[4] != "-" and blockLocal == 0:
335 localPath = names[4]
336 #print "\n Sample Info: " + fullLine + "\n"
337 #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
338
339 if mitDataset == "":
340 print "ERROR - dataset not defined."
341 sys.exit(0)
342
343 #cmd = 'grep ' + cmsDataset + ' ' + mitCfg + '/' + version + '/' + 'Productions'
344 #for file in os.popen(cmd).readlines(): # run command
345 # line = file[:-1] # strip '\n'
346 # # test whether there is a directory
347 # names = line.split() # splitting every blank
348 # mitDataset = names[1] # this is the equivalent MIT name of the dataset
349 # nevents = int(names[2]) # number of events to be used in the production
350
351 # Say what we do now
352 print '\n Preparing dataset for transfer: ' + cmsDataset + ' [MIT: ' + mitDataset + ']\n'
353
354 # --------------------------------------------------------------------------------------------------
355 # Deal with storage element area
356 # --------------------------------------------------------------------------------------------------
357 pMitDset = re.compile('XX-MITDATASET-XX')
358 pMitCfg = re.compile('XX-MITCFG-XX')
359 pMitVers = re.compile('XX-MITVERSION-XX')
360 # find the forseen storage place
361 crabFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'crab.cfg'
362 cmd = 'grep ^storage_element ' + crabFile
363 for file in os.popen(cmd).readlines(): # run command
364 line = file[:-1] # strip '\n'
365 # decode the storage element name
366 names = line.split("=") # splitting every '='
367 storageEle = names.pop()
368 storageEle = re.sub("\s", "",storageEle)
369 # Compile search and replacement sequences just for the path
370 cmd = 'grep ^storage_path ' + crabFile
371 for file in os.popen(cmd).readlines(): # run command
372 line = file[:-1] # strip '\n'
373 line = pMitDset.sub(mitDataset,line);
374 line = pMitCfg .sub(mitCfg, line);
375 line = pMitVers.sub(version, line);
376 # decode the storage directory name
377 names = line.split("=") # splitting every '='
378 names = names[1:]
379 storagePath = "=".join(names)
380 storagePath = re.sub("\s", "",storagePath)
381 storageUrl = 'srm://' + storageEle + ':8443' + storagePath
382
383 cmd = 'grep ^user_remote_dir ' + crabFile
384 for file in os.popen(cmd).readlines(): # run command
385 line = file[:-1] # strip '\n'
386 line = pMitDset.sub(mitDataset,line);
387 line = pMitCfg .sub(mitCfg, line);
388 line = pMitVers.sub(version, line);
389 # decode the storage directory name
390 names = line.split("=") # splitting every '='
391 names = names[1:]
392 userRemoteDir = "=".join(names)
393 userRemoteDir = re.sub("\s","",userRemoteDir)
394 userRemoteDir = re.sub("/XX-CRABID-XX","",userRemoteDir)
395
396 if userRemoteDir != '':
397 storagePath += userRemoteDir
398 storageUrl += userRemoteDir
399
400 if localStorageUrl != '':
401 storageEle = ''
402 storagePath = ''
403 storageUrl = localStorageUrl
404
405 print ' --> StorageUrl: ' + storageUrl
406
407 #---------------------------------------------------------------------------------------------------
408 # create the local storage area
409 #---------------------------------------------------------------------------------------------------
410 print ' Make local path: ' + localPath
411 localDir = localPath + '/' + mitCfg + '/' + version + '/' + mitDataset
412 mkd = 'mkdir -p ' + localDir
413 status = os.system(mkd)
414
415 if status != 0:
416 print ' ERROR - could not create local directory ' + localDir
417 sys.exit(1)
418
419 print ' --> LocalDir: ' + localDir
420
421 cmd = 'df --block-size=1 ' + localDir + ' | tr -s \' \' | tail -1'
422 for line in os.popen(cmd).readlines(): # run command
423 line = line.strip()
424 f = line.split(" ")
425 if line[0:0] == '/' or line[0:4] == 'fuse':
426 free = int(f[3])
427 else:
428 free = int(f[2])
429
430 #---------------------------------------------------------------------------------------------------
431 # create a list af all files to be copied
432 #---------------------------------------------------------------------------------------------------
433 cmd = ''
434 f = storagePath.split('=')
435 path = f.pop()
436 cmd = 'list ' + path + ' | grep root | sort ' + backward
437
438 ##if storageEle == 'srm.cern.ch' or storageEle == 'srm-cms.cern.ch':
439 ## cmd = 'rfdir ' + path + ' | grep root | tr -s \' \' | sort ' + backward
440 ##else:
441 ## cmd = 'list ' + path + ' | grep root | sort ' + backward
442 ## #cmd = 'srmls ' + storageUrl + ' | grep root | sort ' + backward
443
444 if pattern != "":
445 cmd += ' | grep ' + pattern
446
447 print ' Find file: ' + cmd
448 cacheFile = '/tmp/.cache_' + mitDataset
449 allFileList = BuildFileList(cmd)
450 stagedFileList = BuildStagedFileList(storagePath,allFileList,cacheFile)
451 cacheStaged = CacheStagedFileList(cacheFile,storagePath,stagedFileList)
452
453 #cmd = 'find ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + \
454 # ' -maxdepth 1 -type f -printf "%s %f\n"'
455 print 'List: ' + cmd
456 cmd = 'list ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + ' | grep root'
457 doneFileList = BuildFileList(cmd)
458
459 #---------------------------------------------------------------------------------------------------
460 # go through the lists: first check files are consistent, then copy the remaining files
461 #---------------------------------------------------------------------------------------------------
462 # initialize data volumes
463 b2G = 1.0/(1024.*1024.*1024)
464 nTotal = 0
465 totalDataVolume = 0
466 nDone = 0
467 doneDataVolume = 0
468
469 for file, size in allFileList.iteritems():
470 nTotal += 1
471 totalDataVolume += size
472 if (file in doneFileList) and (doneFileList[file] == size):
473 nDone += 1
474 doneDataVolume += size
475
476 print ' '
477 print ' Summary of data volume\n'
478 print ' --> number of files to copy: %8d (total: %d) '%(nTotal-nDone,nTotal)
479 print ' --> volume to copy [GB]: %8.2f (total: %.2f) '%(b2G*(totalDataVolume-doneDataVolume), \
480 b2G*totalDataVolume)
481 print ' --> free volume [GB]: %8.2f '%(b2G*free)
482 print ' '
483
484 if free*0.85 < (totalDataVolume-doneDataVolume):
485 print ' ERROR - probably no enough space on volume. See above (some safety assumed)!'
486 sys.exit(1)
487
488 for file, size in doneFileList.iteritems():
489 if file in allFileList:
490 #print ' --> file is done: ' + file
491 if allFileList[file] != size:
492 print ' ERROR - file sizes did not match: ' + file + \
493 ' [ local: %10d, remote: %10d ]'%(size,allFileList[file])
494 sys.exit(1)
495 else:
496 print ' ERROR - file from done list is not in the all files list. File: ' + file
497 sys.exit(1)
498
499 totalSizeMb = 0.
500 totalTimeSc = 0.
501 for file, size in allFileList.iteritems():
502 if debug == 1:
503 print ' Debug:: ' + file + ' -> size %d'%size
504
505 totalDataVolume += size
506 if file in doneFileList:
507 print ' --> done, size match: %10d - %s'%(size,file)
508 doneDataVolume = +size
509 else:
510 if not InSkipList(file,skipList):
511 print ' --> copying file: %10d - %s (castor stat: %s)'% \
512 (size,file,stagedFileList[file])
513 if stagedFileList[file] == "STAGED" or forceCopy:
514
515 sizeMb = size/1024./1024.
516 deltaT = CopyFile(storageEle,storagePath,storageUrl,file,localDir)
517 if deltaT > 0:
518 print ' time required [sec]: %7d rate [MB/sec]: %9.3f'%\
519 (deltaT,sizeMb/deltaT)
520 else:
521 print ' time required [sec]: %7d rate [MB/sec]: ?'%(deltaT)
522 totalTimeSc += deltaT
523 totalSizeMb += sizeMb
524 else:
525 print ' skipping file: %s'%(stagedFileList[file])
526 StageFile(storagePath,storageUrl,file)
527
528 else:
529 print ' --> skipping file: %10d - %s'%(size,file)
530
531 print ''
532 if totalTimeSc > 0:
533 print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] %9.3f'%\
534 (totalSizeMb/1024.,totalTimeSc,totalSizeMb/totalTimeSc)
535 else:
536 print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] ?'%\
537 (totalSizeMb/1024.,totalTimeSc)
538 print ''