1 |
#!/usr/bin/env python
|
2 |
import os
|
3 |
import sys
|
4 |
import datetime
|
5 |
import shutil
|
6 |
import re
|
7 |
import subprocess
|
8 |
import signal
|
9 |
import fcntl
|
10 |
from optparse import OptionParser
|
11 |
|
12 |
from OSUT3Analysis.Configuration.configurationOptions import *
|
13 |
from OSUT3Analysis.Configuration.processingUtilities import *
|
14 |
|
15 |
parser = OptionParser()
|
16 |
parser = set_commandline_arguments(parser)
|
17 |
|
18 |
parser.remove_option("-o")
|
19 |
parser.remove_option("-n")
|
20 |
parser.remove_option("-u")
|
21 |
parser.remove_option("-e")
|
22 |
parser.remove_option("-r")
|
23 |
parser.remove_option("-R")
|
24 |
parser.remove_option("-d")
|
25 |
parser.remove_option("-b")
|
26 |
parser.remove_option("--2D")
|
27 |
parser.remove_option("-y")
|
28 |
parser.remove_option("-p")
|
29 |
|
30 |
parser.add_option("-s", "--skimDir", dest="skimDir",
|
31 |
help="condor directory containing skim")
|
32 |
parser.add_option("-a", "--skimChannel", dest="skimChannel",
|
33 |
help="channel from skim to use")
|
34 |
parser.add_option("-d", "--mergeDaemon", action="store_true", dest="mergeDaemon", default=False,
|
35 |
help="launch a daemon to merge output when jobs are done")
|
36 |
|
37 |
(arguments, args) = parser.parse_args()
|
38 |
|
39 |
if (arguments.skimDir and not arguments.skimChannel) or (not arguments.skimDir and arguments.skimChannel):
|
40 |
print "Both the skim directory (--skimDir) and channel (--skimChannel) must be given."
|
41 |
exit ()
|
42 |
|
43 |
if arguments.localConfig:
|
44 |
sys.path.append(os.getcwd())
|
45 |
exec("from " + arguments.localConfig.rstrip('.py') + " import *")
|
46 |
|
47 |
condor_dir = set_condor_submit_dir(arguments)
|
48 |
short_condor_dir = condor_dir
|
49 |
short_condor_dir = re.sub (r".*/([^/]*)", r"\1", short_condor_dir)
|
50 |
short_condor_dir = list (short_condor_dir)
|
51 |
short_condor_dir[0] = short_condor_dir[0].upper ()
|
52 |
short_condor_dir = "".join (short_condor_dir)
|
53 |
|
54 |
if not os.path.exists (condor_dir):
|
55 |
os.system("mkdir %s" % (condor_dir))
|
56 |
|
57 |
split_datasets = split_composite_datasets(datasets, composite_dataset_definitions)
|
58 |
|
59 |
clusters = ""
|
60 |
submissionErrors = False
|
61 |
for dataset in split_datasets:
|
62 |
output_dir = "%s/%s" % (condor_dir, dataset)
|
63 |
skim_dir = ""
|
64 |
skim_channel_dir = ""
|
65 |
command = "osusub -l %s -m %d -p %s %s %s %s %s" % (dataset, maxEvents[dataset], short_condor_dir, dataset_names[dataset], config_file, output_dir, nJobs[dataset])
|
66 |
if arguments.skimDir:
|
67 |
skim_dir = "condor/" + arguments.skimDir + "/" + dataset
|
68 |
skim_channel_dir = "condor/" + arguments.skimDir + "/" + dataset + "/" + arguments.skimChannel
|
69 |
if os.path.exists (skim_channel_dir):
|
70 |
command = "osusub -d %s -l %s -m %d -p %s %s %s %s %s" % (dataset_names[dataset], dataset, maxEvents[dataset], short_condor_dir, skim_channel_dir, config_file, output_dir, nJobs[dataset])
|
71 |
else:
|
72 |
print dataset + "/" + arguments.skimChannel + " not in skim directory. Skipping."
|
73 |
continue
|
74 |
print command
|
75 |
pid = os.getpid ()
|
76 |
p0 = subprocess.Popen (command.split (), bufsize=1, stdout=subprocess.PIPE)
|
77 |
flags = fcntl.fcntl (p0.stdout.fileno (), fcntl.F_GETFL, 0)
|
78 |
fcntl.fcntl (p0.stdout.fileno (), fcntl.F_SETFL, flags | os.O_NONBLOCK)
|
79 |
output = ""
|
80 |
while p0.poll () is None:
|
81 |
try:
|
82 |
tmpOutput = p0.stdout.read (1024)
|
83 |
if re.search (r"[^ \f\n\r\t]", tmpOutput):
|
84 |
print tmpOutput,
|
85 |
output += tmpOutput
|
86 |
except IOError:
|
87 |
pass
|
88 |
tmpOutput = p0.stdout.read (1024)
|
89 |
if re.search (r"[^ \f\n\r\t]", tmpOutput):
|
90 |
print tmpOutput,
|
91 |
output += tmpOutput
|
92 |
output = re.sub (r"[\f\n\r]", r"", output)
|
93 |
if re.search (r"submitted to cluster", output):
|
94 |
output = re.sub (r".*submitted to cluster (.*)\..*$", r"\1", output)
|
95 |
clusters += " " + output
|
96 |
else:
|
97 |
submissionErrors = True
|
98 |
if arguments.skimDir and os.path.exists (skim_channel_dir + "/skimNumberOfEvents.txt") and os.path.exists (skim_dir + "/numberOfEvents.txt") and os.path.exists (skim_dir + "/crossSectionInPicobarn.txt"):
|
99 |
shutil.copy (skim_channel_dir + "/skimNumberOfEvents.txt", output_dir + "/skimNumberOfEvents.txt")
|
100 |
shutil.copy (skim_dir + "/numberOfEvents.txt", output_dir + "/originalNumberOfEvents.txt")
|
101 |
f = open (skim_channel_dir + "/skimNumberOfEvents.txt", "r")
|
102 |
skimNumberOfEvents = float (f.read ().rstrip ())
|
103 |
f.close ()
|
104 |
f = open (skim_dir + "/numberOfEvents.txt", "r")
|
105 |
numberOfEvents = float (f.read ().rstrip ())
|
106 |
f.close ()
|
107 |
f = open (skim_dir + "/crossSectionInPicobarn.txt", "r")
|
108 |
crossSectionInPicobarn = float (f.read ().rstrip ())
|
109 |
f.close ()
|
110 |
if numberOfEvents:
|
111 |
crossSectionInPicobarn *= skimNumberOfEvents / numberOfEvents
|
112 |
else:
|
113 |
crossSectionInPicobarn *= skimNumberOfEvents * numberOfEvents
|
114 |
f = open (output_dir + "/crossSectionInPicobarn.txt", "w")
|
115 |
f.write (str (crossSectionInPicobarn) + "\n")
|
116 |
f.close ()
|
117 |
|
118 |
if arguments.mergeDaemon and len (clusters) > 0:
|
119 |
response = "y"
|
120 |
if submissionErrors:
|
121 |
print "\nIt looks like there were errors during submission."
|
122 |
response = raw_input ("Launch merging daemon anyway? (y/N): ").lower ()
|
123 |
response = re.sub (r"[ \f\n\r\t]", r"", response)
|
124 |
if len (response) > 0 and response[0] == "y":
|
125 |
command = "mergeOutput.py"
|
126 |
if arguments.condorDir:
|
127 |
command += " -c " + arguments.condorDir
|
128 |
pid = os.fork ()
|
129 |
if not pid:
|
130 |
signal.signal (signal.SIGHUP, signal.SIG_IGN)
|
131 |
pid = os.getpid ()
|
132 |
if arguments.localConfig:
|
133 |
shutil.copy (arguments.localConfig, "mergeDaemonOptions_" + str (pid) + ".py")
|
134 |
command += " -l mergeDaemonOptions_" + str (pid) + ".py"
|
135 |
os.execvp ("mergeDaemon.pl", ["mergeDaemon.pl", command] + clusters.split ())
|
136 |
else:
|
137 |
print "\nMerging daemon PID: " + str (pid)
|