ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/OSUT3Analysis/Configuration/scripts/submitToCondor.py
Revision: 1.12
Committed: Thu Aug 29 21:26:53 2013 UTC (11 years, 8 months ago) by ahart
Content type: text/x-python
Branch: MAIN
CVS Tags: HEAD
Changes since 1.11: +30 -18 lines
Log Message:
Checks for submission errors before launching merging daemon. If there were no
errors, the daemon gets launched automatically. If some jobs were submitted
successfully, the user is asked whether they want the daemon launched or not
for those jobs. If no jobs were submitted successfully, the daemon will simply
not launch.

File Contents

# Content
1 #!/usr/bin/env python
2 import os
3 import sys
4 import datetime
5 import shutil
6 import re
7 import subprocess
8 import signal
9 import fcntl
10 from optparse import OptionParser
11
12 from OSUT3Analysis.Configuration.configurationOptions import *
13 from OSUT3Analysis.Configuration.processingUtilities import *
14
15 parser = OptionParser()
16 parser = set_commandline_arguments(parser)
17
18 parser.remove_option("-o")
19 parser.remove_option("-n")
20 parser.remove_option("-u")
21 parser.remove_option("-e")
22 parser.remove_option("-r")
23 parser.remove_option("-R")
24 parser.remove_option("-d")
25 parser.remove_option("-b")
26 parser.remove_option("--2D")
27 parser.remove_option("-y")
28 parser.remove_option("-p")
29
30 parser.add_option("-s", "--skimDir", dest="skimDir",
31 help="condor directory containing skim")
32 parser.add_option("-a", "--skimChannel", dest="skimChannel",
33 help="channel from skim to use")
34 parser.add_option("-d", "--mergeDaemon", action="store_true", dest="mergeDaemon", default=False,
35 help="launch a daemon to merge output when jobs are done")
36
37 (arguments, args) = parser.parse_args()
38
39 if (arguments.skimDir and not arguments.skimChannel) or (not arguments.skimDir and arguments.skimChannel):
40 print "Both the skim directory (--skimDir) and channel (--skimChannel) must be given."
41 exit ()
42
43 if arguments.localConfig:
44 sys.path.append(os.getcwd())
45 exec("from " + arguments.localConfig.rstrip('.py') + " import *")
46
47 condor_dir = set_condor_submit_dir(arguments)
48 short_condor_dir = condor_dir
49 short_condor_dir = re.sub (r".*/([^/]*)", r"\1", short_condor_dir)
50 short_condor_dir = list (short_condor_dir)
51 short_condor_dir[0] = short_condor_dir[0].upper ()
52 short_condor_dir = "".join (short_condor_dir)
53
54 if not os.path.exists (condor_dir):
55 os.system("mkdir %s" % (condor_dir))
56
57 split_datasets = split_composite_datasets(datasets, composite_dataset_definitions)
58
59 clusters = ""
60 submissionErrors = False
61 for dataset in split_datasets:
62 output_dir = "%s/%s" % (condor_dir, dataset)
63 skim_dir = ""
64 skim_channel_dir = ""
65 command = "osusub -l %s -m %d -p %s %s %s %s %s" % (dataset, maxEvents[dataset], short_condor_dir, dataset_names[dataset], config_file, output_dir, nJobs[dataset])
66 if arguments.skimDir:
67 skim_dir = "condor/" + arguments.skimDir + "/" + dataset
68 skim_channel_dir = "condor/" + arguments.skimDir + "/" + dataset + "/" + arguments.skimChannel
69 if os.path.exists (skim_channel_dir):
70 command = "osusub -d %s -l %s -m %d -p %s %s %s %s %s" % (dataset_names[dataset], dataset, maxEvents[dataset], short_condor_dir, skim_channel_dir, config_file, output_dir, nJobs[dataset])
71 else:
72 print dataset + "/" + arguments.skimChannel + " not in skim directory. Skipping."
73 continue
74 print command
75 pid = os.getpid ()
76 p0 = subprocess.Popen (command.split (), bufsize=1, stdout=subprocess.PIPE)
77 flags = fcntl.fcntl (p0.stdout.fileno (), fcntl.F_GETFL, 0)
78 fcntl.fcntl (p0.stdout.fileno (), fcntl.F_SETFL, flags | os.O_NONBLOCK)
79 output = ""
80 while p0.poll () is None:
81 try:
82 tmpOutput = p0.stdout.read (1024)
83 if re.search (r"[^ \f\n\r\t]", tmpOutput):
84 print tmpOutput,
85 output += tmpOutput
86 except IOError:
87 pass
88 tmpOutput = p0.stdout.read (1024)
89 if re.search (r"[^ \f\n\r\t]", tmpOutput):
90 print tmpOutput,
91 output += tmpOutput
92 output = re.sub (r"[\f\n\r]", r"", output)
93 if re.search (r"submitted to cluster", output):
94 output = re.sub (r".*submitted to cluster (.*)\..*$", r"\1", output)
95 clusters += " " + output
96 else:
97 submissionErrors = True
98 if arguments.skimDir and os.path.exists (skim_channel_dir + "/skimNumberOfEvents.txt") and os.path.exists (skim_dir + "/numberOfEvents.txt") and os.path.exists (skim_dir + "/crossSectionInPicobarn.txt"):
99 shutil.copy (skim_channel_dir + "/skimNumberOfEvents.txt", output_dir + "/skimNumberOfEvents.txt")
100 shutil.copy (skim_dir + "/numberOfEvents.txt", output_dir + "/originalNumberOfEvents.txt")
101 f = open (skim_channel_dir + "/skimNumberOfEvents.txt", "r")
102 skimNumberOfEvents = float (f.read ().rstrip ())
103 f.close ()
104 f = open (skim_dir + "/numberOfEvents.txt", "r")
105 numberOfEvents = float (f.read ().rstrip ())
106 f.close ()
107 f = open (skim_dir + "/crossSectionInPicobarn.txt", "r")
108 crossSectionInPicobarn = float (f.read ().rstrip ())
109 f.close ()
110 if numberOfEvents:
111 crossSectionInPicobarn *= skimNumberOfEvents / numberOfEvents
112 else:
113 crossSectionInPicobarn *= skimNumberOfEvents * numberOfEvents
114 f = open (output_dir + "/crossSectionInPicobarn.txt", "w")
115 f.write (str (crossSectionInPicobarn) + "\n")
116 f.close ()
117
118 if arguments.mergeDaemon and len (clusters) > 0:
119 response = "y"
120 if submissionErrors:
121 print "\nIt looks like there were errors during submission."
122 response = raw_input ("Launch merging daemon anyway? (y/N): ").lower ()
123 response = re.sub (r"[ \f\n\r\t]", r"", response)
124 if len (response) > 0 and response[0] == "y":
125 command = "mergeOutput.py"
126 if arguments.condorDir:
127 command += " -c " + arguments.condorDir
128 pid = os.fork ()
129 if not pid:
130 signal.signal (signal.SIGHUP, signal.SIG_IGN)
131 pid = os.getpid ()
132 if arguments.localConfig:
133 shutil.copy (arguments.localConfig, "mergeDaemonOptions_" + str (pid) + ".py")
134 command += " -l mergeDaemonOptions_" + str (pid) + ".py"
135 os.execvp ("mergeDaemon.pl", ["mergeDaemon.pl", command] + clusters.split ())
136 else:
137 print "\nMerging daemon PID: " + str (pid)