ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/OSUT3Analysis/Configuration/scripts/submitToCondor.py
Revision: 1.12
Committed: Thu Aug 29 21:26:53 2013 UTC (11 years, 8 months ago) by ahart
Content type: text/x-python
Branch: MAIN
CVS Tags: HEAD
Changes since 1.11: +30 -18 lines
Log Message:
Checks for submission errors before launching merging daemon. If there were no
errors, the daemon gets launched automatically. If some jobs were submitted
successfully, the user is asked whether they want the daemon launched or not
for those jobs. If no jobs were submitted successfully, the daemon will simply
not launch.

File Contents

# User Rev Content
1 lantonel 1.1 #!/usr/bin/env python
2     import os
3     import sys
4     import datetime
5 ahart 1.7 import shutil
6     import re
7 ahart 1.8 import subprocess
8     import signal
9 ahart 1.9 import fcntl
10 lantonel 1.5 from optparse import OptionParser
11 lantonel 1.1
12 lantonel 1.2 from OSUT3Analysis.Configuration.configurationOptions import *
13     from OSUT3Analysis.Configuration.processingUtilities import *
14 lantonel 1.1
15 lantonel 1.5 parser = OptionParser()
16 lantonel 1.1 parser = set_commandline_arguments(parser)
17 ahart 1.7
18     parser.remove_option("-o")
19     parser.remove_option("-n")
20     parser.remove_option("-u")
21     parser.remove_option("-e")
22     parser.remove_option("-r")
23     parser.remove_option("-R")
24     parser.remove_option("-d")
25     parser.remove_option("-b")
26     parser.remove_option("--2D")
27     parser.remove_option("-y")
28     parser.remove_option("-p")
29    
30     parser.add_option("-s", "--skimDir", dest="skimDir",
31     help="condor directory containing skim")
32     parser.add_option("-a", "--skimChannel", dest="skimChannel",
33     help="channel from skim to use")
34 ahart 1.8 parser.add_option("-d", "--mergeDaemon", action="store_true", dest="mergeDaemon", default=False,
35     help="launch a daemon to merge output when jobs are done")
36 ahart 1.7
37 lantonel 1.5 (arguments, args) = parser.parse_args()
38 lantonel 1.1
39 ahart 1.7 if (arguments.skimDir and not arguments.skimChannel) or (not arguments.skimDir and arguments.skimChannel):
40     print "Both the skim directory (--skimDir) and channel (--skimChannel) must be given."
41     exit ()
42    
43 lantonel 1.4 if arguments.localConfig:
44 lantonel 1.1 sys.path.append(os.getcwd())
45 lantonel 1.4 exec("from " + arguments.localConfig.rstrip('.py') + " import *")
46 lantonel 1.1
47 lantonel 1.4 condor_dir = set_condor_submit_dir(arguments)
48 ahart 1.7 short_condor_dir = condor_dir
49     short_condor_dir = re.sub (r".*/([^/]*)", r"\1", short_condor_dir)
50     short_condor_dir = list (short_condor_dir)
51     short_condor_dir[0] = short_condor_dir[0].upper ()
52     short_condor_dir = "".join (short_condor_dir)
53 lantonel 1.1
54 ahart 1.7 if not os.path.exists (condor_dir):
55     os.system("mkdir %s" % (condor_dir))
56 lantonel 1.1
57 ahart 1.6 split_datasets = split_composite_datasets(datasets, composite_dataset_definitions)
58 lantonel 1.1
59 ahart 1.8 clusters = ""
60 ahart 1.12 submissionErrors = False
61 lantonel 1.1 for dataset in split_datasets:
62     output_dir = "%s/%s" % (condor_dir, dataset)
63 ahart 1.10 skim_dir = ""
64     skim_channel_dir = ""
65 ahart 1.7 command = "osusub -l %s -m %d -p %s %s %s %s %s" % (dataset, maxEvents[dataset], short_condor_dir, dataset_names[dataset], config_file, output_dir, nJobs[dataset])
66     if arguments.skimDir:
67 ahart 1.10 skim_dir = "condor/" + arguments.skimDir + "/" + dataset
68     skim_channel_dir = "condor/" + arguments.skimDir + "/" + dataset + "/" + arguments.skimChannel
69     if os.path.exists (skim_channel_dir):
70     command = "osusub -d %s -l %s -m %d -p %s %s %s %s %s" % (dataset_names[dataset], dataset, maxEvents[dataset], short_condor_dir, skim_channel_dir, config_file, output_dir, nJobs[dataset])
71 ahart 1.7 else:
72     print dataset + "/" + arguments.skimChannel + " not in skim directory. Skipping."
73     continue
74 lantonel 1.1 print command
75 ahart 1.9 pid = os.getpid ()
76     p0 = subprocess.Popen (command.split (), bufsize=1, stdout=subprocess.PIPE)
77     flags = fcntl.fcntl (p0.stdout.fileno (), fcntl.F_GETFL, 0)
78     fcntl.fcntl (p0.stdout.fileno (), fcntl.F_SETFL, flags | os.O_NONBLOCK)
79     output = ""
80     while p0.poll () is None:
81     try:
82     tmpOutput = p0.stdout.read (1024)
83 ahart 1.12 if re.search (r"[^ \f\n\r\t]", tmpOutput):
84     print tmpOutput,
85 ahart 1.9 output += tmpOutput
86     except IOError:
87     pass
88     tmpOutput = p0.stdout.read (1024)
89 ahart 1.12 if re.search (r"[^ \f\n\r\t]", tmpOutput):
90     print tmpOutput,
91 ahart 1.9 output += tmpOutput
92 ahart 1.8 output = re.sub (r"[\f\n\r]", r"", output)
93 ahart 1.12 if re.search (r"submitted to cluster", output):
94     output = re.sub (r".*submitted to cluster (.*)\..*$", r"\1", output)
95     clusters += " " + output
96     else:
97     submissionErrors = True
98 ahart 1.10 if arguments.skimDir and os.path.exists (skim_channel_dir + "/skimNumberOfEvents.txt") and os.path.exists (skim_dir + "/numberOfEvents.txt") and os.path.exists (skim_dir + "/crossSectionInPicobarn.txt"):
99     shutil.copy (skim_channel_dir + "/skimNumberOfEvents.txt", output_dir + "/skimNumberOfEvents.txt")
100     shutil.copy (skim_dir + "/numberOfEvents.txt", output_dir + "/originalNumberOfEvents.txt")
101     f = open (skim_channel_dir + "/skimNumberOfEvents.txt", "r")
102     skimNumberOfEvents = float (f.read ().rstrip ())
103     f.close ()
104     f = open (skim_dir + "/numberOfEvents.txt", "r")
105     numberOfEvents = float (f.read ().rstrip ())
106     f.close ()
107     f = open (skim_dir + "/crossSectionInPicobarn.txt", "r")
108     crossSectionInPicobarn = float (f.read ().rstrip ())
109     f.close ()
110     if numberOfEvents:
111     crossSectionInPicobarn *= skimNumberOfEvents / numberOfEvents
112     else:
113     crossSectionInPicobarn *= skimNumberOfEvents * numberOfEvents
114     f = open (output_dir + "/crossSectionInPicobarn.txt", "w")
115     f.write (str (crossSectionInPicobarn) + "\n")
116     f.close ()
117 ahart 1.8
118 ahart 1.12 if arguments.mergeDaemon and len (clusters) > 0:
119     response = "y"
120     if submissionErrors:
121     print "\nIt looks like there were errors during submission."
122     response = raw_input ("Launch merging daemon anyway? (y/N): ").lower ()
123     response = re.sub (r"[ \f\n\r\t]", r"", response)
124     if len (response) > 0 and response[0] == "y":
125     command = "mergeOutput.py"
126     if arguments.condorDir:
127     command += " -c " + arguments.condorDir
128     pid = os.fork ()
129     if not pid:
130     signal.signal (signal.SIGHUP, signal.SIG_IGN)
131     pid = os.getpid ()
132     if arguments.localConfig:
133     shutil.copy (arguments.localConfig, "mergeDaemonOptions_" + str (pid) + ".py")
134     command += " -l mergeDaemonOptions_" + str (pid) + ".py"
135     os.execvp ("mergeDaemon.pl", ["mergeDaemon.pl", command] + clusters.split ())
136     else:
137     print "\nMerging daemon PID: " + str (pid)