ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/UserCode/OSUT3Analysis/Configuration/scripts/submitToCondor.py
Revision: 1.9
Committed: Tue Jul 2 23:50:19 2013 UTC (11 years, 10 months ago) by ahart
Content type: text/x-python
Branch: MAIN
CVS Tags: V02-03-01
Changes since 1.8: +16 -2 lines
Log Message:
Buffer the output from osusub so that we can extract the cluster numbers from it while displaying it at the same time.

File Contents

# Content
1 #!/usr/bin/env python
2 import os
3 import sys
4 import datetime
5 import shutil
6 import re
7 import subprocess
8 import signal
9 import fcntl
10 from optparse import OptionParser
11
12 from OSUT3Analysis.Configuration.configurationOptions import *
13 from OSUT3Analysis.Configuration.processingUtilities import *
14
15 parser = OptionParser()
16 parser = set_commandline_arguments(parser)
17
18 parser.remove_option("-o")
19 parser.remove_option("-t")
20 parser.remove_option("-q")
21 parser.remove_option("-n")
22 parser.remove_option("-u")
23 parser.remove_option("-e")
24 parser.remove_option("-r")
25 parser.remove_option("-R")
26 parser.remove_option("-d")
27 parser.remove_option("-b")
28 parser.remove_option("--2D")
29 parser.remove_option("-y")
30 parser.remove_option("-p")
31
32 parser.add_option("-s", "--skimDir", dest="skimDir",
33 help="condor directory containing skim")
34 parser.add_option("-a", "--skimChannel", dest="skimChannel",
35 help="channel from skim to use")
36 parser.add_option("-d", "--mergeDaemon", action="store_true", dest="mergeDaemon", default=False,
37 help="launch a daemon to merge output when jobs are done")
38
39 (arguments, args) = parser.parse_args()
40
41 if (arguments.skimDir and not arguments.skimChannel) or (not arguments.skimDir and arguments.skimChannel):
42 print "Both the skim directory (--skimDir) and channel (--skimChannel) must be given."
43 exit ()
44
45 if arguments.localConfig:
46 sys.path.append(os.getcwd())
47 exec("from " + arguments.localConfig.rstrip('.py') + " import *")
48
49 condor_dir = set_condor_submit_dir(arguments)
50 short_condor_dir = condor_dir
51 short_condor_dir = re.sub (r".*/([^/]*)", r"\1", short_condor_dir)
52 short_condor_dir = list (short_condor_dir)
53 short_condor_dir[0] = short_condor_dir[0].upper ()
54 short_condor_dir = "".join (short_condor_dir)
55
56 if not os.path.exists (condor_dir):
57 os.system("mkdir %s" % (condor_dir))
58
59 split_datasets = split_composite_datasets(datasets, composite_dataset_definitions)
60
61 clusters = ""
62 for dataset in split_datasets:
63 output_dir = "%s/%s" % (condor_dir, dataset)
64 command = "osusub -l %s -m %d -p %s %s %s %s %s" % (dataset, maxEvents[dataset], short_condor_dir, dataset_names[dataset], config_file, output_dir, nJobs[dataset])
65 if arguments.skimDir:
66 skim_dir = "condor/" + arguments.skimDir + "/" + dataset + "/" + arguments.skimChannel
67 if os.path.exists (skim_dir):
68 command = "osusub -d %s -l %s -m %d -p %s %s %s %s %s" % (dataset_names[dataset], dataset, maxEvents[dataset], short_condor_dir, skim_dir, config_file, output_dir, nJobs[dataset])
69 else:
70 print dataset + "/" + arguments.skimChannel + " not in skim directory. Skipping."
71 continue
72 print command
73 pid = os.getpid ()
74 p0 = subprocess.Popen (command.split (), bufsize=1, stdout=subprocess.PIPE)
75 flags = fcntl.fcntl (p0.stdout.fileno (), fcntl.F_GETFL, 0)
76 fcntl.fcntl (p0.stdout.fileno (), fcntl.F_SETFL, flags | os.O_NONBLOCK)
77 output = ""
78 while p0.poll () is None:
79 try:
80 tmpOutput = p0.stdout.read (1024)
81 print tmpOutput,
82 output += tmpOutput
83 except IOError:
84 pass
85 tmpOutput = p0.stdout.read (1024)
86 print tmpOutput,
87 output += tmpOutput
88 output = re.sub (r"[\f\n\r]", r"", output)
89 output = re.sub (r".*submitted to cluster (.*)\..*$", r"\1", output)
90 clusters += " " + output
91
92 if arguments.mergeDaemon:
93 command = "mergeOutput.py"
94 if arguments.condorDir:
95 command += " -c " + arguments.condorDir
96 pid = os.fork ()
97 if not pid:
98 signal.signal (signal.SIGHUP, signal.SIG_IGN)
99 pid = os.getpid ()
100 if arguments.localConfig:
101 shutil.copy (arguments.localConfig, "mergeDaemonOptions_" + str (pid) + ".py")
102 command += " -l mergeDaemonOptions_" + str (pid) + ".py"
103 os.execvp ("mergeDaemon.pl", ["mergeDaemon.pl", command] + clusters.split ())
104 else:
105 print "\nMerging daemon PID: " + str (pid)