ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/MitProd/Processing/bin/downloadSample.py
Revision: 1.6.2.1
Committed: Fri Jun 14 19:34:38 2013 UTC (11 years, 10 months ago) by paus
Content type: text/x-python
Branch: Mit_025c_branch
CVS Tags: Mit_025c_branch2
Changes since 1.6: +2 -1 lines
Log Message:
PrepareForBackportVersion3.

File Contents

# Content
1 #!/usr/bin/env python
2 #---------------------------------------------------------------------------------------------------
3 # Script to automatically download a MIT dataset to our local cluster
4 #
5 # The download of the MIT dataset is organized in accordance with the dataset production logic. In
6 # general it is allowed to download the dataset from any location of a properly configured storage
7 # element. The script will do all most obvious tests to ensure efficient and safe download. For
8 # performance reason a checksum is not calculated. This ommission is considered completely safe as
9 # failures will be identified in the analysis phase and the rare occasions will be more effective to
10 # fix by hand.
11 #
12 # At present the download proceeds in one thread (one file at a time) which for performance reasons
13 # might not be optimal.
14 #
15 # Author: C.Paus (July 1, 2008)
16 #---------------------------------------------------------------------------------------------------
17 # Missing but desired features:
18 # + accounting of size of each file
19 # + accounting of locally available files (avoid copying already existing files)
20 # + determine full list of files before starting to copy
21 # + minimal success check of the copy
22 # + calculate total data volume (to copy, already copied etc.)
23 # + add feature to check the castor status
24 # - add time estimates and progressions for copies
25 # - multi downloads to enhance performance
26 #---------------------------------------------------------------------------------------------------
27 import os,sys,getopt,re,string
28
29 dCacheDoor = 't2srv0012.cmsaf.mit.edu'
30
31 def Domain():
32 domain = os.uname()[1]
33 f = domain.split('.')
34 return '.'.join(f[1:])
35
36 def Seconds():
37 for secs in os.popen('date +%s').readlines():
38 secs = int(secs[:-1])
39 return secs
40
41 def InSkipList(file,list):
42 for entry in list:
43 if entry == file:
44 return True
45 return False
46
47 def DecodeSrmLs(line):
48 line = line.strip()
49 f = line.split(" ")
50 size = f[0]
51 f = f[1].split("/")
52 file = f.pop()
53 list = [ size, file ]
54 return list
55
56 def DecodeRfDir(line):
57 line = line.strip()
58 f = line.split(" ")
59 file = f.pop()
60 size = f[4]
61 list = [ size, file ]
62 return list
63
64 def BuildFileList(cmd):
65 isSrm = 1
66 f = cmd.split(" ")
67 if f[0] == 'rfdir':
68 isSrm = 0
69
70 fileList = {}
71
72 if debug == 1:
73 print ' Debug:: list: ' + cmd
74 for line in os.popen(cmd).readlines(): # run command
75 line = line[:-1] # strip '\n'
76 #print ' Line: ' + line
77 f = line.split(" ")
78 ##if isSrm == 1:
79 ## f = DecodeSrmLs(line)
80 ##else:
81 ## f = DecodeRfDir(line)
82 size = f[0]
83 file = f[1]
84 f = file.split("/")
85 file = f[-1]
86 if debug == 1:
87 print ' Debug:: adding: ' + file + ' with size ' + size
88 fileList[file] = int(size)
89
90 return fileList
91
92 def BuildStagedFileList(storagePath,allFileList,cacheFile):
93
94 # initialize the basics
95 fileList = {}
96 f = storagePath.split("=");
97 rfPath = f[-1]
98
99 # if this is not castor, trick it and mark them as staged
100 if not (re.search('/castor/',rfPath)):
101 for file, size in allFileList.iteritems():
102 fileList[file] = 'STAGED'
103 return fileList
104
105 # here we deal with castor
106 if debug == 1:
107 print ' Debug:: rfpath: ' + rfPath
108 if os.path.exists(cacheFile) and noCache == 0:
109 print ' Using the cached stager queries at ' + cacheFile
110 for file, size in allFileList.iteritems():
111 fullFile = rfPath + '/' + file
112 if debug == 1:
113 print ' Debug:: full file name: ' + fullFile
114 if os.path.exists(cacheFile) and noCache == 0:
115 cmd = 'grep ' + file + ' ' + cacheFile
116 else:
117 cmd = 'stager_qry -M ' + fullFile
118 fileList[file] = 'undefined'
119 for line in os.popen(cmd).readlines(): # run command
120 line = line[:-1]
121 f = line.split(" ")
122 if f[0] == fullFile:
123 f = line.split(" ")
124 status = f[-1]
125 fileList[file] = status
126
127 return fileList
128
129 def CacheStagedFileList(cacheFile,storagePath,stagedFileList):
130 print ' Caching stager query status to ' + cacheFile
131 f = storagePath.split("=");
132 rfPath = f[-1]
133 fileOutput = open(cacheFile,'w')
134 for file, status in stagedFileList.iteritems():
135 line = rfPath + '/' + file + ' xyz@castorns ' + status + '\n'
136 fileOutput.write(line)
137 fileOutput.close()
138
139 def CopyFile(storageEle,storagePath,storageUrl,file,localDir,fromCern):
140 deltaT = 0
141 print ' working on file: ' + file + ' to ' + localDir + \
142 ' (size: %d MB) '%(int(size)/1024/1024)
143 if storageEle == 'srm-cms.cern.ch' and not fromCern:
144 f = storagePath.split("=");
145 rfPath = f[-1]
146 cpy = 'rfcp ' + rfPath + '/' + file + ' ' + localPath + '/' \
147 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
148 #print ' using rfcp.... ' + cpy
149 #sys.exit(0)
150 elif storageEle == 'se01.cmsaf.mit.edu':
151 f = storagePath.split("=");
152 rfPath = f[-1]
153 #cpy = 'scp paus@cgate.mit.edu:' + rfPath + '/' + file + ' ' + localPath + '/' \
154 # + mitCfg + '/' + version + '/' + mitDataset + '/' + file
155 cpy = 'dccp dcap://' + dCacheDoor + '/' \
156 + rfPath + '/' + file + ' ' + localPath + '/' \
157 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
158 #print ' using rfcp.... ' + cpy
159 #sys.exit(0)
160 else:
161 #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
162 cpy = 'lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
163 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
164
165 # Check whether the file size make sense (zero length files are probably not yet ready to
166 # copy and will not be transfered
167 if size < 1:
168 print ' WARNING - file size is <1b. Probably this file is not yet ready. Stop copy.'
169 else:
170 if debug == 1:
171 print ' Debug:: copy: ' + cpy
172 start = Seconds()
173 status = os.system(cpy)
174 end = Seconds()
175 deltaT = end - start
176
177 return deltaT
178
179 def RecoverFile(storageEle,storagePath,storageUrl,file,localDir):
180 deltaT = 0
181 print ' working on file: ' + file + ' from ' + localDir + \
182 ' (size: %d MB) '%(int(size)/1024/1024)
183 if storageEle == 'srm-cms.cern.ch':
184 f = storagePath.split("=");
185 rfPath = f[-1]
186 cpy = 'rfcp ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + '/' + file \
187 + ' ' + rfPath + '/' + file
188 #print ' using rfcp.... ' + cpy
189 #sys.exit(0)
190 elif storageEle == 'se01.cmsaf.mit.edu':
191 f = storagePath.split("=");
192 rfPath = f[-1]
193 #cpy = 'scp paus@cgate.mit.edu:' + rfPath + '/' + file + ' ' + localPath + '/' \
194 # + mitCfg + '/' + version + '/' + mitDataset + '/' + file
195 cpy = 'dccp ' + localPath + '/' \
196 + mitCfg + '/' + version + '/' + mitDataset + '/' + file \
197 + ' dcap://' + dCacheDoor + '/' + rfPath + '/' + file
198 print ' using dccp.... ' + cpy
199 #sys.exit(0)
200 else:
201 #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
202 cpy = 'lcg-cp ' + 'file:////' + localPath + '/' + mitCfg + '/' + version + '/' \
203 + mitDataset + '/' + file + ' ' + storageUrl + '/' + file
204
205 # Check whether the file size make sense (zero length files are probably not yet ready to
206 # copy and will not be transfered
207 if size < 1:
208 print ' WARNING - file size is <1b. Probably this file is not yet ready. Stop recovery.'
209 else:
210 if debug == 1:
211 print ' Debug:: copy: ' + cpy
212 start = Seconds()
213 status = os.system(cpy)
214 end = Seconds()
215 deltaT = end - start
216
217 return deltaT
218
219 def StageFile(storagePath,storageUrl,file):
220 print ' staging in file: ' + file
221 if storageEle == 'srm-cms.cern.ch' and not fromCern:
222 f = storagePath.split("=");
223 rfPath = f[-1]
224 stg = 'stager_get -M ' + rfPath + '/' + file
225 else:
226 #storageUrl = 'srm://' + storageEle + ':8443' + storagePath
227 stg = 'echo lcg-cp ' + storageUrl + '/' + file + ' file:////' + localPath + '/' \
228 + mitCfg + '/' + version + '/' + mitDataset + '/' + file
229
230 if debug == 1:
231 print ' Debug:: stage: ' + stg
232 status = os.system(stg)
233
234 #===================================================================================================
235 # Main starts here
236 #===================================================================================================
237 # Define string to explain usage of the script
238 usage = "Usage: downloadSample.py --cmsDataset=<name> | --mitDataset=<name>\n"
239 usage += " --mitCfg=<name>\n"
240 usage += " --version=<version>\n"
241 usage += " --cmssw=<name>\n"
242 usage += " --localStorageUrl=<name>\n"
243 usage += " --localPath=<dir>\n"
244 usage += " --skip=<file list>\n"
245 usage += " --fromCern\n"
246 usage += " --forceCopy\n"
247 usage += " --backward\n"
248 usage += " --stopOnError\n"
249 usage += " --debug\n"
250 usage += " --test\n"
251 usage += " --help\n"
252
253 # Define the valid options which can be specified and check out the command line
254 valid = ['cmsDataset=','mitDataset=','mitCfg=','version=','cmssw=','pattern=','localStorageUrl=',
255 'localPath=','noCache','skip=',
256 'fromCern','forceCopy','backward','stopOnError',
257 'debug','test','help']
258 try:
259 opts, args = getopt.getopt(sys.argv[1:], "", valid)
260 except getopt.GetoptError, ex:
261 print usage
262 print str(ex)
263 sys.exit(1)
264
265 # --------------------------------------------------------------------------------------------------
266 # Get all parameters for the production
267 # --------------------------------------------------------------------------------------------------
268 # Set defaults for each option
269 cmsDataset = None
270 mitDataset = None
271 skip = ''
272 skipList = []
273 mitCfg = 'filefi'
274 version = '023'
275 cmssw = ''
276 blockLocal = 0
277 localStorageUrl = ''
278 localPath = '/mnt/hadoop/cmsprod'
279 pattern = ''
280 noCache = 0
281 backward = ''
282 fromCern = False
283 stopOnError = False
284 forceCopy = False
285 debug = 0
286 test = 0
287 cmsswCfg = 'cmssw.cfg'
288
289 # Read new values from the command line
290 for opt, arg in opts:
291 if opt == '--help':
292 print usage
293 sys.exit(0)
294 if opt == '--cmsDataset':
295 cmsDataset = arg
296 if opt == '--mitDataset':
297 mitDataset = arg
298 if opt == '--mitCfg':
299 mitCfg = arg
300 if opt == '--version':
301 version = arg
302 if opt == '--cmssw':
303 cmssw = arg
304 if opt == '--pattern':
305 pattern = arg
306 if opt == '--localStorageUrl':
307 localStorageUrl = arg
308 if opt == '--localPath':
309 blockLocal = 1
310 localPath = arg
311 if opt == '--skip':
312 skip = arg
313 skipList = skip.split(',')
314 if opt == '--noCache':
315 noCache = 1
316 if opt == '--stopOnError':
317 stopOnError = True
318 if opt == '--backward':
319 backward = ' -r '
320 if opt == '--fromCern':
321 fromCern = True
322 if opt == '--forceCopy':
323 forceCopy = True
324 if opt == '--forceCopy':
325 forceCopy = True
326 if opt == '--debug':
327 debug = 1
328 if opt == '--test':
329 test = 1
330
331 # Deal with obvious problems
332 if cmsDataset == None and mitDataset == None:
333 cmd = '--cmsDataset option not provided. This is required.'
334 raise RuntimeError, cmd
335
336 seFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'seTable'
337 if not os.path.exists(seFile):
338 cmd = 'Storage element file not found: %s' % seFile
339 raise RuntimeError, cmd
340 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + cmsswCfg
341 if not os.path.exists(cmsswFile):
342 cmd = 'Cmssw file not found: %s' % cmsswFile
343 cmsswCfg = 'cmssw.py'
344 cmsswFile = os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + cmsswCfg
345 if not os.path.exists(cmsswFile):
346 cmd = 'Cmssw file not found: %s' % cmsswFile
347 cmd = ' XXXX ERROR no valid configuration found XXXX'
348 raise RuntimeError, cmd
349
350 # Resolve the other mitCfg parameters from the configuration file
351 #cmd = 'cat ' + os.environ['MIT_PROD_DIR'] + '/' + mitCfg + '/' + version + '/' + 'Productions'
352 cmd = 'cat ' + './' + mitCfg + '/' + version + '/' + 'Productions'
353 if cmssw != '':
354 cmd = cmd + '.' + cmssw
355
356 join = 0
357 if cmsDataset == None:
358 cmsDataset = ''
359 else:
360 mitDataset = ''
361
362 fullLine = ''
363 bSlash = '\\';
364 for line in os.popen(cmd).readlines(): # run command
365 line = line[:-1]
366 #print 'Line: "' + line + '"'
367 # get ride of empty or commented lines
368 if line == '' or line[0] == '#':
369 continue
370
371 # join lines
372 if join == 1:
373 fullLine += line
374 else:
375 fullLine = line
376
377 # determine if finished or more is coming
378 if fullLine[-1] == bSlash:
379 join = 1
380 fullLine = fullLine[:-1]
381 else:
382 join = 0
383 # test whether there is a directory
384 names = fullLine.split() # splitting every blank
385 #print "FullLine: " + fullLine
386 #print "Datasets: " + mitDataset + ' -> ' + cmsDataset + "\n"
387 if names[0] == cmsDataset:
388 mitDataset = names[1] # this is the equivalent MIT name of the dataset
389 nevents = int(names[2]) # number of events to be used in the production
390 if names[4] != "-" and blockLocal == 0:
391 localPath = names[4]
392 #print "\n Sample Info: " + fullLine + "\n"
393 #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
394 if names[1] == mitDataset:
395 cmsDataset = names[0] # this is the equivalent CMS name of the dataset
396 nevents = int(names[2]) # number of events to be used in the production
397 if names[4] != "-" and blockLocal == 0:
398 localPath = names[4]
399 #print "\n Sample Info: " + fullLine + "\n"
400 #print "\n Local path : " + localPath + ' -> ' + names[4] + "\n"
401
402 if mitDataset == "":
403 print "ERROR - dataset not defined."
404 sys.exit(0)
405
406 #cmd = 'grep ' + cmsDataset + ' ' + mitCfg + '/' + version + '/' + 'Productions'
407 #for file in os.popen(cmd).readlines(): # run command
408 # line = file[:-1] # strip '\n'
409 # # test whether there is a directory
410 # names = line.split() # splitting every blank
411 # mitDataset = names[1] # this is the equivalent MIT name of the dataset
412 # nevents = int(names[2]) # number of events to be used in the production
413
414 # Say what we do now
415 print '\n Preparing dataset for transfer: ' + cmsDataset + ' [MIT: ' + mitDataset + ']\n'
416
417 # --------------------------------------------------------------------------------------------------
418 # Deal with storage element area
419 # --------------------------------------------------------------------------------------------------
420 pMitDset = re.compile('XX-MITDATASET-XX')
421 pMitCfg = re.compile('XX-MITCFG-XX')
422 pMitVers = re.compile('XX-MITVERSION-XX')
423 # decide on the forseen default storage place (where are we running)
424 storageTag = 'T2_US_MIT'
425 domain = Domain()
426 if re.search('mit.edu',domain):
427 storageTag = 'T2_US_MIT'
428 elif re.search('cern.ch',domain):
429 storageTag = 'T0_CH_CERN'
430
431 cmd = 'grep ^' + storageTag + ' ' + seFile
432 for line in os.popen(cmd).readlines(): # run command
433 print ' LINE: ' + line
434 line = line[:-1] # strip '\n'
435 line = line.replace(' ','')
436 f = line.split(':')
437 storageEle = f[1]
438 storagePath = f[2]
439 userRemoteDir = f[3]
440 print ' Storage -- Ele: ' + storageEle \
441 + ' Path: ' + storagePath + ' UserDir: ' + userRemoteDir
442
443 # Hardwire
444 if fromCern:
445 storageEle = 'srm-cms.cern.ch'
446 storagePath = '/srm/managerv2?SFN=/castor/cern.ch'
447 userRemoteDir = "/user/p/paus/" + mitCfg + "/" + version + "/" + mitDataset
448
449 # determine the storage URL
450 storageUrl = 'srm://' + storageEle + ':8443' + storagePath
451 if userRemoteDir != '':
452 storagePath += userRemoteDir
453 storageUrl += userRemoteDir
454
455 if localStorageUrl != '':
456 storageEle = ''
457 storagePath = ''
458 storageUrl = localStorageUrl
459
460 print ' --> StorageUrl: ' + storageUrl
461
462 #---------------------------------------------------------------------------------------------------
463 # create the local storage area
464 #---------------------------------------------------------------------------------------------------
465 print ' Make local path: ' + localPath
466 localDir = localPath + '/' + mitCfg + '/' + version + '/' + mitDataset
467 mkd = 'mkdir -p ' + localDir
468 status = os.system(mkd)
469
470 if status != 0:
471 print ' ERROR - could not create local directory ' + localDir
472 sys.exit(1)
473
474 print ' --> LocalDir: ' + localDir
475
476 cmd = 'df --block-size=1 ' + localDir + ' | tr -s \' \' | tail -1'
477 for line in os.popen(cmd).readlines(): # run command
478 line = line.strip()
479 f = line.split(" ")
480 if line[0:0] == '/' or line[0:4] == 'fuse':
481 free = int(f[3])
482 else:
483 free = int(f[2])
484
485 #---------------------------------------------------------------------------------------------------
486 # create a list af all files to be copied
487 #---------------------------------------------------------------------------------------------------
488 cmd = ''
489 f = storagePath.split('=')
490 path = f.pop()
491 cmd = 'list ' + path + ' | grep root | sort ' + backward
492 if fromCern:
493 cmd = 'srmls ' + storageUrl + '|grep root|sort ' + backward + '|tr -s \' \'|cut -d\' \' -f 2-3'
494
495 ##if storageEle == 'srm.cern.ch' or storageEle == 'srm-cms.cern.ch':
496 ## cmd = 'rfdir ' + path + ' | grep root | tr -s \' \' | sort ' + backward
497 ##else:
498 ## cmd = 'list ' + path + ' | grep root | sort ' + backward
499
500 if pattern != "":
501 cmd += ' | grep ' + pattern
502
503 print ' Find file: ' + cmd
504 cacheFile = '/tmp/.cache_' + mitDataset
505 allFileList = BuildFileList(cmd)
506 stagedFileList = BuildStagedFileList(storagePath,allFileList,cacheFile)
507 cacheStaged = CacheStagedFileList(cacheFile,storagePath,stagedFileList)
508
509 #cmd = 'find ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + \
510 # ' -maxdepth 1 -type f -printf "%s %f\n"'
511 print 'List: ' + cmd
512 cmd = 'list ' + localPath + '/' + mitCfg + '/' + version + '/' + mitDataset + ' | grep root'
513 doneFileList = BuildFileList(cmd)
514
515 #---------------------------------------------------------------------------------------------------
516 # go through the lists: first check files are consistent, then copy the remaining files
517 #---------------------------------------------------------------------------------------------------
518 # initialize data volumes
519 b2G = 1.0/(1024.*1024.*1024)
520 nTotal = 0
521 totalDataVolume = 0
522 nDone = 0
523 doneDataVolume = 0
524
525 for file, size in allFileList.iteritems():
526 nTotal += 1
527 totalDataVolume += size
528 if (file in doneFileList) and (doneFileList[file] == size):
529 nDone += 1
530 doneDataVolume += size
531
532 print ' '
533 print ' Summary of data volume\n'
534 print ' --> number of files to copy: %8d (total: %d) '%(nTotal-nDone,nTotal)
535 print ' --> volume to copy [GB]: %8.2f (total: %.2f) '%(b2G*(totalDataVolume-doneDataVolume), \
536 b2G*totalDataVolume)
537 print ' --> free volume [GB]: %8.2f '%(b2G*free)
538 print ' '
539
540 if free*0.85 < (totalDataVolume-doneDataVolume):
541 print ' ERROR - probably no enough space on volume. See above (some safety assumed)!'
542 sys.exit(1)
543
544 for file, size in doneFileList.iteritems():
545 if file in allFileList:
546 #print ' --> file is done: ' + file
547 if allFileList[file] != size:
548 print ' ERROR - file sizes did not match: ' + file + \
549 ' [ local: %10d, remote: %10d ]'%(size,allFileList[file])
550 if stopOnError:
551 sys.exit(1)
552 continue
553 else:
554 print ' ERROR - file from done list is not in the all files list. File: ' + file
555 print ' RECOVER - File: ' + file
556 sizeMb = size/1024./1024.
557 deltaT = RecoverFile(storageEle,storagePath,storageUrl,file,localDir)
558 if deltaT > 0:
559 print ' time required [sec]: %7d rate [MB/sec]: %9.3f'%\
560 (deltaT,sizeMb/deltaT)
561 else:
562 print ' time required [sec]: %7d rate [MB/sec]: ?'%(deltaT)
563
564 #sys.exit(1)
565
566 totalSizeMb = 0.
567 totalTimeSc = 0.
568 for file, size in allFileList.iteritems():
569 if debug == 1:
570 print ' Debug:: ' + file + ' -> size %d'%size
571
572 totalDataVolume += size
573 if file in doneFileList:
574 print ' --> done, size match: %10d - %s'%(size,file)
575 doneDataVolume = +size
576 else:
577 if not InSkipList(file,skipList):
578 print ' --> copying file: %10d - %s (castor stat: %s)'% \
579 (size,file,stagedFileList[file])
580 if test == 1:
581 print ' testing only.'
582 elif stagedFileList[file] == "STAGED" or forceCopy:
583 sizeMb = size/1024./1024.
584 deltaT = CopyFile(storageEle,storagePath,storageUrl,file,localDir,fromCern)
585 if deltaT > 0:
586 print ' time required [sec]: %7d rate [MB/sec]: %9.3f'%\
587 (deltaT,sizeMb/deltaT)
588 else:
589 print ' time required [sec]: %7d rate [MB/sec]: ?'%(deltaT)
590 totalTimeSc += deltaT
591 totalSizeMb += sizeMb
592 else:
593 print ' skipping file: %s'%(stagedFileList[file])
594 StageFile(storagePath,storageUrl,file,fromCern)
595
596 else:
597 print ' --> skipping file: %10d - %s'%(size,file)
598
599 print ''
600 if totalTimeSc > 0:
601 print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] %9.3f'%\
602 (totalSizeMb/1024.,totalTimeSc,totalSizeMb/totalTimeSc)
603 else:
604 print ' Performance: volume copied [GB] %9.3f; time [sec] %9d; -> rate [MB/sec] ?'%\
605 (totalSizeMb/1024.,totalTimeSc)
606 print ''