ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskSource
Revision: 1.27
Committed: Mon Mar 5 21:45:56 2007 UTC (18 years, 1 month ago) by belforte
Branch: MAIN
Changes since 1.26: +11 -0 lines
Log Message:
support GEN tier

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env perl
2    
3     ##H This drop box agent initiates tasks on all published datasets.
4     ##H A task is an application to run on a dataset at a particular site.
5     ##H It keeps a record of tasks created and avoids submitting a task too
6     ##H frequently.
7     ##H
8     ##H Usage:
9     ##H
10 gutsche 1.21 ##H TaskSource
11     ##H
12     ##H -state agent state directory, including inbox
13     ##H -next next agent to pass the drops to; can be given several times
14     ##H -wait time to wait in seconds between work scans
15     ##H -sec_between_inits seconds between submission of same dataset/tier combination
16     ##H -siteconfig file with list of valid SE and their schedule limits
17 gutsche 1.25 ##H -dbsdlspath path to DBS/DLS discovery ascii files
18 gutsche 1.21 ##H -max-site-queue default value for maximum events scheduled per site
19 gutsche 1.22 ##H -max-jobs maximum number of jobs per project, -1 to deactivate
20 gutsche 1.21 ##H -scheduler scheduler
21     ##H -events number of events per job
22     ##H -pattern DBS discovery pattern
23     ##H -validtiers perl string pattern with valid tiers (DIGI|RECO)
24     ##H -simcfg CMSSW parameter-set for tier SIM
25     ##H -simoutput putput (comma separated) for tier SIM
26     ##H -digicfg CMSSW parameter-set for tier DIGI
27     ##H -digioutput putput (comma separated) for tier DIGI
28     ##H -recocfg CMSSW parameter-set for tier RECO
29     ##H -recooutput putput (comma separated) for tier RECO
30 belforte 1.27 ###H -gencfg CMSSW parameter-set for tier GEN
31     ###H -genoutput putput (comma separater) for tier GEN
32 gutsche 1.1
33     BEGIN {
34     use strict; use warnings; $^W=1;
35     our $me = $0; $me =~ s|.*/||;
36     our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
37     unshift(@INC, $home);
38     }
39    
40     ######################################################################
41     use UtilsHelp;
42 gutsche 1.21 my %args = (WAITTIME => 600, SECS_PER_EVENT => 1., MAX_SITE_QUEUE => 10);
43     while (scalar @ARGV) {
44     if ($ARGV[0] eq '-state' && scalar @ARGV > 1) {
45     shift (@ARGV); $args{DROPDIR}= shift(@ARGV);
46     } elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1) {
47     shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));
48     } elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1) {
49     shift (@ARGV); $args{WAITTIME} = shift(@ARGV);
50     } elsif ($ARGV[0] eq '-secs_between_inits' && scalar @ARGV > 1) {
51     shift (@ARGV); $args{SECS_BETWEEN_INITS} = shift(@ARGV);
52     } elsif ($ARGV[0] eq '-siteconfig' && scalar @ARGV > 1) {
53     shift (@ARGV); $args{SITECONFIG} = shift(@ARGV);
54 gutsche 1.25 } elsif ($ARGV[0] eq '-dbsdlspath' && scalar @ARGV > 1) {
55     shift (@ARGV); $args{DBSDLSPATH} = shift(@ARGV);
56 gutsche 1.21 } elsif ($ARGV[0] eq '-max-site-queue' && scalar @ARGV > 1) {
57     shift (@ARGV); $args{MAX_SITE_QUEUE} = shift(@ARGV);
58 gutsche 1.22 } elsif ($ARGV[0] eq '-max-jobs' && scalar @ARGV > 1) {
59     shift (@ARGV); $args{MAX_JOBS} = shift(@ARGV);
60 gutsche 1.21 } elsif ($ARGV[0] eq '-scheduler' && scalar @ARGV > 1) {
61     shift (@ARGV); $args{SCHEDULER} = shift(@ARGV);
62     } elsif ($ARGV[0] eq '-events' && scalar @ARGV > 1) {
63     shift (@ARGV); $args{EVENTS} = shift(@ARGV);
64     } elsif ($ARGV[0] eq '-pattern' && scalar @ARGV > 1) {
65     shift (@ARGV); $args{PATTERN} = shift(@ARGV);
66     } elsif ($ARGV[0] eq '-validtiers' && scalar @ARGV > 1) {
67     shift (@ARGV); $args{VALIDTIERS} = shift(@ARGV);
68     } elsif ($ARGV[0] eq '-simcfg' && scalar @ARGV > 1) {
69     shift (@ARGV); $args{SIMCFG} = shift(@ARGV);
70     } elsif ($ARGV[0] eq '-simoutput' && scalar @ARGV > 1) {
71     shift (@ARGV); $args{SIMOUTPUT} = shift(@ARGV);
72     } elsif ($ARGV[0] eq '-digicfg' && scalar @ARGV > 1) {
73     shift (@ARGV); $args{DIGICFG} = shift(@ARGV);
74     } elsif ($ARGV[0] eq '-digioutput' && scalar @ARGV > 1) {
75     shift (@ARGV); $args{DIGIOUTPUT} = shift(@ARGV);
76     } elsif ($ARGV[0] eq '-recocfg' && scalar @ARGV > 1) {
77     shift (@ARGV); $args{RECOCFG} = shift(@ARGV);
78     } elsif ($ARGV[0] eq '-recooutput' && scalar @ARGV > 1) {
79     shift (@ARGV); $args{RECOOUTPUT} = shift(@ARGV);
80 belforte 1.27 } elsif ($ARGV[0] eq '-gencfg' && scalar @ARGV > 1) {
81     shift (@ARGV); $args{GENCFG} = shift(@ARGV);
82     } elsif ($ARGV[0] eq '-genoutput' && scalar @ARGV > 1) {
83     shift (@ARGV); $args{GENOUTPUT} = shift(@ARGV);
84 gutsche 1.21 } elsif ($ARGV[0] eq '-h') {
85     &usage();
86     } else {
87     last;
88     }
89 gutsche 1.1 }
90    
91 gutsche 1.21 if (@ARGV || !$args{DROPDIR}) {
92     die "Insufficient parameters, use -h for help.\n";
93 gutsche 1.1 }
94    
95     (new TaskSource (%args))->process();
96    
97     ######################################################################
98     # Routines specific to this agent.
99     package TaskSource; use strict; use warnings; use base 'UtilsAgent';
100     use File::Path;
101     use UtilsCommand;
102     use UtilsLogging;
103     use UtilsTiming;
104     use UtilsNet;
105     use POSIX;
106    
107     sub new
108 gutsche 1.21 {
109 gutsche 1.1 my $proto = shift;
110     my $class = ref($proto) || $proto;
111     my $self = $class->SUPER::new(@_);
112 gutsche 1.21 my %params = (MAX_SITE_QUEUE => undef, # max number of jobs per site
113 gutsche 1.22 MAX_JOBS => undef, # max number of jobs per project, -1 to deactivate
114 gutsche 1.21 SECS_BETWEEN_INITS => 600, # path to siteconfig file
115     SITECONFIG => undef, # path to siteconfig file
116 gutsche 1.25 DBSDLSPATH => "", # path to DBSDLS discovery ascii files
117 gutsche 1.21 EVENTS => 1000, # number of events per job
118     PATTERN => "*", # DBS discovery pattern
119     VALIDTIERS => undef, # perl string pattern with valid tiers (DIGI|RECO)
120     SIMCFG => undef, # CMSSW parameter-set for SIM tier
121     DIGICFG => undef, # CMSSW parameter-set for DIGI tier
122     RECOCFG => undef, # CMSSW parameter-set for RECO tier
123 belforte 1.27 GENCFG => undef, # CMSSW parameter-set for GEN tier
124 gutsche 1.21 SIMOUTPUT => undef, # output (comma separated) for SIM tier
125     DIGIOUTPUT => undef, # output (comma separated) for DIGI tier
126     RECOOUTPUT => undef, # output (comma separated) for RECO tier
127 belforte 1.27 GENOUTPUT => undef, # output (comma separated) for GEN tier
128 gutsche 1.21 SCHEDULER => "edg"); # standard scheduler
129 gutsche 1.1 my %args = (@_);
130     map { $self->{$_} = defined $args{$_} ? $args{$_} : $params{$_} } keys %params;
131     bless $self, $class;
132     return $self;
133 gutsche 1.21 }
134 gutsche 1.1
135     sub init
136 gutsche 1.21 {
137 gutsche 1.1 my ($self) = @_;
138     $self->{TASKREPO} = "$self->{DROPDIR}/tasks";
139     -d "$self->{TASKREPO}"
140 gutsche 1.21 || mkdir "$self->{TASKREPO}"
141 gutsche 1.1 || die "$self->{TASKREPO}: cannot create directory: $!\n";
142    
143     # Determine if links supports -dump-width option
144     $self->{LINKS_OPTS} = [];
145     open (LINKS_HELP, "links -help 2>/dev/null |");
146 gutsche 1.4 if ( grep(/-no-numbering/, <LINKS_HELP>) ) {
147     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300 -no-numbering 1));
148     } elsif ( grep(/-dump-width/, <LINKS_HELP>) ) {
149     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300));
150 gutsche 1.1 }
151     close (LINKS_HELP);
152 gutsche 1.21 }
153 gutsche 1.1
154     # Find out how many jobs are pending for each site. This is
155     # insensitive to the job type, and we only check once in the
156     # beginning to avoid favouring one dataset over another --
157     # once we decide to proceed for a site, we submit jobs for
158     # all datasets.
159     sub getSiteStatus
160 gutsche 1.18 {
161 gutsche 1.1 my ($self) = @_;
162     my %result = ();
163     my $taskrepo = $self->{TASKREPO};
164 gutsche 1.21 foreach my $site (<$taskrepo/*/*>) {
165     my ($sitename) = ($site =~ m|.*/(.*)|);
166     foreach my $d (<$site/*/*>) {
167     if (! -f "$d/JOB_CREATE_LOG.txt") {
168     $result{$sitename}{C} ||= 0;
169     $result{$sitename}{C}++;
170     next;
171     }
172 gutsche 1.18
173 gutsche 1.21 my $f = (<$d/crab_*/share/db/jobs>)[0];
174     next if ! defined $f;
175 gutsche 1.18
176 gutsche 1.21 foreach my $status (split(/\n/, &input($f) || '')) {
177     my @statusarray = split('\|', $status);
178     $result{$sitename}{$statusarray[1]} ||= 0;
179     $result{$sitename}{$statusarray[1]}++;
180     }
181 gutsche 1.18 }
182 gutsche 1.21 }
183 gutsche 1.18
184 gutsche 1.1 return %result;
185 gutsche 1.18 }
186 gutsche 1.1
187 gutsche 1.21 sub createHashFromString
188     {
189     my ($self, $string) = @_;
190     my $cmd = "mktemp /tmp/hash_XXXXXX";
191     open (HASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
192     my $hash_file_name;
193     foreach my $hash_line (split(/\n/, <HASHFILE> || '')) {
194     chomp($hash_line);
195     $hash_file_name = $hash_line;
196     }
197     $cmd = "echo $string >> $hash_file_name";
198     system $cmd;
199     # open (FILLHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
200     $cmd = "cksum $hash_file_name";
201     open (CKSUMHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
202     my ($hash, $dummy_one, $dummy_two);
203     foreach my $hash_output (split(/\n/, <CKSUMHASHFILE> || '')) {
204     chomp($hash_output);
205     ($hash, $dummy_one, $dummy_two) = split(/ /,$hash_output);
206     }
207     $cmd = "rm -f $hash_file_name";
208     system $cmd;
209     # open (RMHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
210    
211     return $hash;
212     }
213    
214 gutsche 1.1 sub idle
215     {
216 gutsche 1.21 &logmsg ("DBS/DLS discovery");
217 gutsche 1.1 my ($self, @pending) = @_;
218     eval {
219     # Get status of how busy the sites are. We obtain this only once
220     # in order to not favour datasets "early on" in the list.
221     my %sitestats = $self->getSiteStatus ();
222 gutsche 1.21 if (keys %sitestats) {
223     my @load;
224     foreach my $site (sort keys %sitestats) {
225     push (@load, "$site" . join("", map { " $_=$sitestats{$site}{$_}" }
226     sort keys %{$sitestats{$site}}));
227 gutsche 1.1 }
228 gutsche 1.21 &logmsg ("current load: ", join ("; ", @load));
229     }
230 gutsche 1.1
231 gutsche 1.25 # DBS/DLS discovery
232     # if ascii DBS/DLS discovery file exists in DBSDLSPATH with name: dlsdls_query_$cleaned_pattern.txt
233     # where cleaned pattern is the pattern with / replaced by _
234     # take ascii file, otherwise do discovery
235     my $cleaned_pattern = $self->{PATTERN};
236     $cleaned_pattern =~ s/\//_/g;
237     my $patternfile = $self->{DBSDLSPATH} . "/dbsdls_query_" . $cleaned_pattern . ".txt";
238     if ( -e "$patternfile" ) {
239     &logmsg("Use DBS/DLS ascii file: $patternfile");
240     open (PUBLISHED,"< $patternfile");
241     } else {
242     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSDLSQuery.py --pattern \"$self->{PATTERN}\"";
243     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
244     }
245 gutsche 1.21 while (<PUBLISHED>) {
246     chomp;
247     &timeStart($self->{STARTTIME});
248     my ($datapath, $site, $totalevents) = split(/ /, $_);
249     # read in siteconfig dynamically
250     if ( $self->{SITECONFIG} ) {
251     open(siteconfig_file,"$self->{SITECONFIG}");
252     my @line_array = <siteconfig_file>;
253     my $validsites = "(";
254     for (my $i = 0; $i < scalar @line_array; ++$i) {
255     if (defined $line_array[$i] ) {
256     if ( $line_array[$i] !~ /^#/ && $line_array[$i] !~ /^\n/ ) {
257     my ($tempsite, $tempshortsite, $templimit) = split(/ /,$line_array[$i]);
258     chomp($tempsite);
259     $validsites = $validsites . $tempsite . "|";
260 gutsche 1.19 }
261     }
262 gutsche 1.5 }
263 gutsche 1.21 chop($validsites);
264     $validsites = $validsites . ")";
265     close siteconfig_file;
266 gutsche 1.24 if ( $validsites eq ")" ) {
267     next;
268     } else {
269     next if ($validsites && $site !~ /$validsites/);
270     }
271 gutsche 1.5 }
272 gutsche 1.22 # check if totalevents is larger than $EVENTS * $MAX_JOBS
273     if ($self->{MAX_JOBS} != -1) {
274     if ( $self->{MAX_JOBS} * $self->{EVENTS} < $totalevents ) {
275     $totalevents = $self->{MAX_JOBS} * $self->{EVENTS};
276     }
277     }
278 gutsche 1.21 $self->createTask($datapath, $site, $totalevents, %sitestats);
279 gutsche 1.1 }
280     };
281     do { chomp ($@); &alert ($@); } if $@;
282    
283     $self->nap ($self->{WAITTIME});
284     }
285    
286     sub createTask()
287     {
288 corvo 1.3 my ($self) = shift(@_);
289 gutsche 1.5 my ($datasetpath, $site, $totalevents, %sitestats) = @_;
290    
291     my($n, $dataset, $tier, $owner) = split(/\//, $datasetpath);
292    
293 gutsche 1.21 # filter on tier
294     next if ($self->{VALIDTIERS} && $tier !~ /$self->{VALIDTIERS}/);
295    
296     my ($cfg, $output);
297     if ($tier =~ /RECO/) {
298     $cfg = $self->{RECOCFG};
299     $output = $self->{RECOOUTPUT};
300     } elsif ($tier =~ /DIGI/) {
301     $cfg = $self->{DIGICFG};
302     $output = $self->{DIGIOUTPUT};
303     } elsif ($tier =~ /SIM/) {
304     $cfg = $self->{SIMCFG};
305     $output = $self->{SIMOUTPUT};
306 belforte 1.27 } elsif ($tier =~ /GEN/) {
307     $cfg = $self->{GENCFG};
308     $output = $self->{GENOUTPUT};
309 gutsche 1.21 } else {
310     &logmsg ("No valid tier, defaults for cfg and output");
311     $cfg = "robot.cfg";
312     $output = "";
313 gutsche 1.5 }
314    
315 gutsche 1.8 # take site specific or if not found default max site queue
316 gutsche 1.20 # read in from siteconfig file
317     my %sitelimits = ();
318 gutsche 1.21 my %siteNameTranslation = ();
319 gutsche 1.20 if ( $self->{SITECONFIG} ) {
320     open(siteconfig_file,"$self->{SITECONFIG}");
321     my @line_array = <siteconfig_file>;
322     for (my $i = 0; $i < scalar @line_array; ++$i) {
323     if (defined $line_array[$i] ) {
324     if ( $line_array[$i] !~ /^#/ && $line_array[$i] !~ /^\n/ ) {
325 gutsche 1.21 my ($tempsite, $tempshortsite, $templimit) = split(/ /,$line_array[$i]);
326 gutsche 1.20 chomp($tempsite);
327 gutsche 1.21 chomp($tempshortsite);
328 gutsche 1.20 chomp($templimit);
329     $sitelimits{$tempsite} = $templimit;
330 gutsche 1.21 $siteNameTranslation{$tempsite} = $tempshortsite;
331 gutsche 1.20 }
332     }
333     }
334     close siteconfig_file;
335     }
336 gutsche 1.21
337     # If the site isn't too busy already, ignore.
338     my $pending = ($sitestats{$siteNameTranslation{$site}}{S} || 0);
339     $pending += ($sitestats{$siteNameTranslation{$site}}{C} || 0);
340 gutsche 1.20 my $max_queue = $sitelimits{$site} || $self->{MAX_SITE_QUEUE};
341 gutsche 1.26 if ( $pending > $max_queue ) {
342 gutsche 1.4 next;
343     }
344    
345     # Find out what is already pending for this task. First find all
346     # existing tasks in the repository, the latest generation.
347     my $datestamp = strftime ("%y%m%d", gmtime(time()));
348 gutsche 1.21 my $taskdir = "$self->{TASKREPO}/$datestamp/$siteNameTranslation{$site}/$tier";
349    
350     # create short unique taskbase from $datestamp.$site.$dataset.$tier.$owner
351     my $taskbase = $self->createHashFromString ("$datestamp.$site.$dataset.$tier.$owner");
352     chomp($taskbase);
353    
354 gutsche 1.4 my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
355     my $curgen = pop(@existing) || 0;
356     my $nextgen = $curgen + 1;
357    
358     # OK to create the task if enough time has passed from previous
359     # task creation, or there is no previous task.
360     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
361 gutsche 1.21 || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
362     < time() - $self->{SECS_BETWEEN_INITS})))
363 gutsche 1.4 {
364     my $mydir = $0; $mydir =~ s|/[^/]+$||;
365     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
366 gutsche 1.21 my $ret = &runcmd ("$mydir/CrabJobs",
367     "-cfg", "$mydir/$cfg",
368     "-datasetpath", $datasetpath,
369 gutsche 1.4 "-output", $output,
370 gutsche 1.21 "-totalevents", $totalevents,
371     "-whitelist", $site,
372     "-eventsperjob", $self->{EVENTS},
373 gutsche 1.4 "-scheduler", $self->{SCHEDULER},
374 gutsche 1.21 "-jobname", "$taskdir/$drop");
375 gutsche 1.4 die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
376 gutsche 1.21
377 gutsche 1.4 &output ("$taskdir/$drop/TASK_INIT.txt",
378     &mytimeofday () . "\n");
379    
380     my $dropdir = "$self->{WORKDIR}/$drop";
381     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
382 gutsche 1.21 if (&output ("$dropdir/task", "$taskdir/$drop")) {
383     &touch ("$dropdir/done");
384     $self->relayDrop ($drop);
385 gutsche 1.23 &logmsg("stats: $drop $datestamp $site $dataset $tier $owner @{[&formatElapsedTime($self->{STARTTIME})]} success");
386 gutsche 1.21 } else {
387 gutsche 1.23 &alert ("$drop $datestamp $site $dataset $tier $owner: failed to create drop");
388 gutsche 1.21 &rmtree ([ "$self->{WORKDIR}/$drop" ]);
389     }
390 gutsche 1.4 }
391     }