ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskSource
Revision: 1.38
Committed: Fri Feb 17 09:35:11 2012 UTC (13 years, 2 months ago) by samcms
Branch: MAIN
CVS Tags: HEAD
Changes since 1.37: +2 -0 lines
Log Message:
Insignificant change

File Contents

# Content
1 #!/usr/bin/env perl
2
3 ##H This drop box agent initiates tasks on all published datasets.
4 ##H A task is an application to run on a dataset at a particular site.
5 ##H It keeps a record of tasks created and avoids submitting a task too
6 ##H frequently.
7 ##H
8 ##H Usage:
9 ##H
10 ##H TaskSource
11 ##H
12 ##H -state agent state directory, including inbox
13 ##H -next next agent to pass the drops to; can be given several times
14 ##H -wait time to wait in seconds between work scans
15 ##H -sec_between_inits seconds between submission of same dataset/tier combination
16 ##H -siteconfig file with list of valid SE and their schedule limits
17 ##H -dbsdlspath path to DBS/DLS discovery ascii files
18 ##H -max-site-queue default value for maximum events scheduled per site
19 ##H -max-jobs maximum number of jobs per project, -1 to deactivate
20 ##H -scheduler scheduler
21 ##H -events number of events per job
22 ##H -pattern DBS discovery pattern
23 ##H -validtiers perl string pattern with valid tiers (DIGI|RECO)
24 ##H -cmssw CMSSW directory to use for CRAB
25 ##H -simcfg CMSSW parameter-set for tier SIM
26 ##H -simoutput putput (comma separated) for tier SIM
27 ##H -digicfg CMSSW parameter-set for tier DIGI
28 ##H -digioutput putput (comma separated) for tier DIGI
29 ##H -recocfg CMSSW parameter-set for tier RECO
30 ##H -recooutput putput (comma separated) for tier RECO
31 ###H -gencfg CMSSW parameter-set for tier GEN
32 ###H -genoutput putput (comma separater) for tier GEN
33
34 BEGIN {
35 use strict; use warnings; $^W=1;
36 our $me = $0; $me =~ s|.*/||;
37 our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
38 unshift(@INC, $home);
39 }
40
41 ######################################################################
42 use UtilsHelp;
43 my %args = (WAITTIME => 600, SECS_PER_EVENT => 1., MAX_SITE_QUEUE => 10);
44 while (scalar @ARGV) {
45 if ($ARGV[0] eq '-state' && scalar @ARGV > 1) {
46 shift (@ARGV); $args{DROPDIR}= shift(@ARGV);
47 } elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1) {
48 shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));
49 } elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1) {
50 shift (@ARGV); $args{WAITTIME} = shift(@ARGV);
51 } elsif ($ARGV[0] eq '-secs_between_inits' && scalar @ARGV > 1) {
52 shift (@ARGV); $args{SECS_BETWEEN_INITS} = shift(@ARGV);
53 } elsif ($ARGV[0] eq '-siteconfig' && scalar @ARGV > 1) {
54 shift (@ARGV); $args{SITECONFIG} = shift(@ARGV);
55 } elsif ($ARGV[0] eq '-dbsdlspath' && scalar @ARGV > 1) {
56 shift (@ARGV); $args{DBSDLSPATH} = shift(@ARGV);
57 } elsif ($ARGV[0] eq '-max-site-queue' && scalar @ARGV > 1) {
58 shift (@ARGV); $args{MAX_SITE_QUEUE} = shift(@ARGV);
59 } elsif ($ARGV[0] eq '-max-jobs' && scalar @ARGV > 1) {
60 shift (@ARGV); $args{MAX_JOBS} = shift(@ARGV);
61 } elsif ($ARGV[0] eq '-scheduler' && scalar @ARGV > 1) {
62 shift (@ARGV); $args{SCHEDULER} = shift(@ARGV);
63 } elsif ($ARGV[0] eq '-events' && scalar @ARGV > 1) {
64 shift (@ARGV); $args{EVENTS} = shift(@ARGV);
65 } elsif ($ARGV[0] eq '-pattern' && scalar @ARGV > 1) {
66 shift (@ARGV); $args{PATTERN} = shift(@ARGV);
67 } elsif ($ARGV[0] eq '-validtiers' && scalar @ARGV > 1) {
68 shift (@ARGV); $args{VALIDTIERS} = shift(@ARGV);
69 } elsif ($ARGV[0] eq '-cmssw' && scalar @ARGV > 1) {
70 shift (@ARGV); $args{CMSSW} = shift(@ARGV);
71 } elsif ($ARGV[0] eq '-simcfg' && scalar @ARGV > 1) {
72 shift (@ARGV); $args{SIMCFG} = shift(@ARGV);
73 } elsif ($ARGV[0] eq '-simoutput' && scalar @ARGV > 1) {
74 shift (@ARGV); $args{SIMOUTPUT} = shift(@ARGV);
75 } elsif ($ARGV[0] eq '-digicfg' && scalar @ARGV > 1) {
76 shift (@ARGV); $args{DIGICFG} = shift(@ARGV);
77 } elsif ($ARGV[0] eq '-digioutput' && scalar @ARGV > 1) {
78 shift (@ARGV); $args{DIGIOUTPUT} = shift(@ARGV);
79 } elsif ($ARGV[0] eq '-recocfg' && scalar @ARGV > 1) {
80 shift (@ARGV); $args{RECOCFG} = shift(@ARGV);
81 } elsif ($ARGV[0] eq '-recooutput' && scalar @ARGV > 1) {
82 shift (@ARGV); $args{RECOOUTPUT} = shift(@ARGV);
83 } elsif ($ARGV[0] eq '-usercfg' && scalar @ARGV > 1) {
84 shift (@ARGV); $args{USERCFG} = shift(@ARGV);
85 } elsif ($ARGV[0] eq '-useroutput' && scalar @ARGV > 1) {
86 shift (@ARGV); $args{USEROUTPUT} = shift(@ARGV);
87 } elsif ($ARGV[0] eq '-gencfg' && scalar @ARGV > 1) {
88 shift (@ARGV); $args{GENCFG} = shift(@ARGV);
89 } elsif ($ARGV[0] eq '-genoutput' && scalar @ARGV > 1) {
90 shift (@ARGV); $args{GENOUTPUT} = shift(@ARGV);
91 } elsif ($ARGV[0] eq '-h') {
92 &usage();
93 } else {
94 last;
95 }
96 }
97
98 if (@ARGV || !$args{DROPDIR}) {
99 die "Insufficient parameters, use -h for help.\n";
100 }
101
102 (new TaskSource (%args))->process();
103
104 ######################################################################
105 # Routines specific to this agent.
106 package TaskSource; use strict; use warnings; use base 'UtilsAgent';
107 use File::Path;
108 use UtilsCommand;
109 use UtilsLogging;
110 use UtilsTiming;
111 use UtilsNet;
112 use POSIX;
113
114 sub new
115 {
116 my $proto = shift;
117 my $class = ref($proto) || $proto;
118 my $self = $class->SUPER::new(@_);
119 my %params = (MAX_SITE_QUEUE => undef, # max number of jobs per site
120 MAX_JOBS => undef, # max number of jobs per project, -1 to deactivate
121 SECS_BETWEEN_INITS => 600, # path to siteconfig file
122 SITECONFIG => undef, # path to siteconfig file
123 DBSDLSPATH => "", # path to DBSDLS discovery ascii files
124 EVENTS => 1000, # number of events per job
125 PATTERN => "*", # DBS discovery pattern
126 VALIDTIERS => undef, # perl string pattern with valid tiers (DIGI|RECO)
127 CMSSW => "_none_", # CMSSW directory to use
128 SIMCFG => undef, # CMSSW parameter-set for SIM tier
129 DIGICFG => undef, # CMSSW parameter-set for DIGI tier
130 RECOCFG => undef, # CMSSW parameter-set for RECO tier
131 USERCFG => undef, # CMSSW parameter-set for USER tier
132 GENCFG => undef, # CMSSW parameter-set for GEN tier
133 SIMOUTPUT => undef, # output (comma separated) for SIM tier
134 DIGIOUTPUT => undef, # output (comma separated) for DIGI tier
135 RECOOUTPUT => undef, # output (comma separated) for RECO tier
136 USEROUTPUT => undef, # output (comma separated) for USER tier
137 GENOUTPUT => undef, # output (comma separated) for GEN tier
138 SCHEDULER => "edg"); # standard scheduler
139 my %args = (@_);
140 map { $self->{$_} = defined $args{$_} ? $args{$_} : $params{$_} } keys %params;
141 bless $self, $class;
142 return $self;
143 }
144
145 sub init
146 {
147 my ($self) = @_;
148 $self->{TASKREPO} = "$self->{DROPDIR}/tasks";
149 -d "$self->{TASKREPO}"
150 || mkdir "$self->{TASKREPO}"
151 || die "$self->{TASKREPO}: cannot create directory: $!\n";
152
153 # Determine if links supports -dump-width option
154 $self->{LINKS_OPTS} = [];
155 open (LINKS_HELP, "links -help 2>/dev/null |");
156 if ( grep(/-no-numbering/, <LINKS_HELP>) ) {
157 push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300 -no-numbering 1));
158 } elsif ( grep(/-dump-width/, <LINKS_HELP>) ) {
159 push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300));
160 }
161 close (LINKS_HELP);
162 }
163
164 # Find out how many jobs are pending for each site. This is
165 # insensitive to the job type, and we only check once in the
166 # beginning to avoid favouring one dataset over another --
167 # once we decide to proceed for a site, we submit jobs for
168 # all datasets.
169 sub getSiteStatus
170 {
171 my ($self) = @_;
172 my %result = ();
173 my $taskrepo = $self->{TASKREPO};
174 foreach my $site (<$taskrepo/*/*>) {
175 my ($sitename) = ($site =~ m|.*/(.*)|);
176 foreach my $d (<$site/*/*>) {
177 next if (-f "$d/TASK_FINISHED.txt");
178 if (! -f "$d/JOB_CREATE_LOG.txt") {
179 $result{$sitename}{C} ||= 0;
180 $result{$sitename}{C}++;
181 next;
182 }
183
184 my $f = (<$d/crab_*/share/db/jobs>)[0];
185 next if ! defined $f;
186
187 foreach my $status (split(/\n/, &input($f) || '')) {
188 my @statusarray = split('\|', $status);
189 $result{$sitename}{$statusarray[1]} ||= 0;
190 $result{$sitename}{$statusarray[1]}++;
191 }
192 }
193 }
194 return %result;
195 }
196
197 sub createHashFromString
198 {
199 my ($self, $string) = @_;
200 my $cmd = "mktemp /tmp/hash_XXXXXX";
201 open (HASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
202 my $hash_file_name;
203 foreach my $hash_line (split(/\n/, <HASHFILE> || '')) {
204 chomp($hash_line);
205 $hash_file_name = $hash_line;
206 }
207 $cmd = "echo $string >> $hash_file_name";
208 system $cmd;
209 # open (FILLHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
210 $cmd = "cksum $hash_file_name";
211 open (CKSUMHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
212 my ($hash, $dummy_one, $dummy_two);
213 foreach my $hash_output (split(/\n/, <CKSUMHASHFILE> || '')) {
214 chomp($hash_output);
215 ($hash, $dummy_one, $dummy_two) = split(/\s+/,$hash_output);
216 }
217 $cmd = "rm -f $hash_file_name";
218 system $cmd;
219 # open (RMHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
220
221 return $hash;
222 }
223
224 sub idle
225 {
226 &logmsg ("DBS/DLS discovery");
227 my ($self, @pending) = @_;
228 eval {
229 # Get status of how busy the sites are. We obtain this only once
230 # in order to not favour datasets "early on" in the list.
231 my %sitestats = $self->getSiteStatus ();
232 if (keys %sitestats) {
233 my @load;
234 foreach my $site (sort keys %sitestats) {
235 my $running = ($sitestats{$site}{R} || 0);
236 my $submitted = ($sitestats{$site}{B} || 0);
237 my $scheduled = ($sitestats{$site}{S} || 0);
238 my $created = ($sitestats{$site}{C} || 0);
239 my $pending = $created + $submitted + $scheduled + $running;
240 push (@load, "$site" . join("", map { " $_=$sitestats{$site}{$_}" }
241 sort keys %{$sitestats{$site}}) . " L=$pending");
242 }
243 &logmsg ("current load: ", join ("; ", @load));
244 }
245
246 # DBS/DLS discovery
247 # if ascii DBS/DLS discovery file exists in DBSDLSPATH with name: dlsdls_query_$cleaned_pattern.txt
248 # where cleaned pattern is the pattern with / replaced by _
249 # take ascii file, otherwise do discovery
250 my $cleaned_pattern = $self->{PATTERN};
251 $cleaned_pattern =~ s/\//_/g;
252 my $patternfile = $self->{DBSDLSPATH} . "/dbsdls_query_" . $cleaned_pattern . ".txt";
253 if ( -e "$patternfile" ) {
254 &logmsg("Use DBS/DLS ascii file: $patternfile");
255 open (PUBLISHED,"< $patternfile");
256 } else {
257 my $cmd = $ENV{PYTHONSCRIPT} . "/DBS2DLSQuery.py --pattern \"$self->{PATTERN}\"";
258 open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
259 }
260 while (<PUBLISHED>) {
261 chomp;
262 &timeStart($self->{STARTTIME});
263 my $sitese;
264 my ($datapath, $site, $totalevents, $events, $secsinit, $cmssw,
265 $mycfg) = split(/\s+/, $_);
266 # read in siteconfig dynamically
267 if ( $self->{SITECONFIG} ) {
268 open(siteconfig_file,"$self->{SITECONFIG}");
269 my @line_array = <siteconfig_file>;
270 my $validsites = "(";
271 for (my $i = 0; $i < scalar @line_array; ++$i) {
272 if (defined $line_array[$i] ) {
273 if ( $line_array[$i] !~ /^#/ && $line_array[$i] !~ /^\n/ ) {
274 my ($tempsite, $tempshortsite, $templimit) = split(/\s+/,$line_array[$i]);
275 chomp($tempsite);
276 chomp($tempshortsite);
277 $sitese = $tempsite if ($site =~ /$tempshortsite/);
278 $validsites = $validsites . $tempshortsite . "|";
279 }
280 }
281 }
282 chop($validsites);
283 $validsites = $validsites . ")";
284 close siteconfig_file;
285 if ( $validsites eq ")" ) {
286 next;
287 } else {
288 next if ($validsites && $site !~ /$validsites/);
289 }
290 }
291 # if defined use number of events per job from pattern file
292 $events ||= $self->{EVENTS};
293 $secsinit ||= $self->{SECS_BETWEEN_INITS};
294 $cmssw ||= $self->{CMSSW};
295 $mycfg ||= "";
296 # check if totalevents is larger than $EVENTS * $MAX_JOBS
297 if ($self->{MAX_JOBS} != -1) {
298 if ( $self->{MAX_JOBS} * $events < $totalevents ) {
299 $totalevents = $self->{MAX_JOBS} * $events;
300 }
301 }
302 $self->createTask($datapath, $site, $sitese, $totalevents, $events,
303 $secsinit, $cmssw, $mycfg, %sitestats);
304 }
305 };
306 do { chomp ($@); &alert ($@); } if $@;
307
308 $self->nap ($self->{WAITTIME});
309 }
310
311 sub createTask()
312 {
313 my ($self) = shift(@_);
314 my ($datasetpath, $site, $sitese, $totalevents, $events, $secsinit,
315 $cmssw, $mycfg, %sitestats) = @_;
316
317 # Line changed for supporting the DBS2 name format.
318 #my($n, $dataset, $tier, $owner) = split(/\//, $datasetpath);
319 my($n, $dataset, $owner, $comp_tier) = split(/\//, $datasetpath);
320 my($tier) = (split(/-/, $comp_tier))[-1] || $comp_tier;
321
322 # filter on tier
323 next if ($self->{VALIDTIERS} && $tier !~ /$self->{VALIDTIERS}/);
324
325 my ($cfg, $output);
326 if ($mycfg) {
327 $cfg = $mycfg;
328 } else {
329 if ($tier =~ /RECO/) {
330 $cfg = $self->{RECOCFG};
331 $output = $self->{RECOOUTPUT};
332 } elsif ($tier =~ /USER/) {
333 $cfg = $self->{USERCFG};
334 $output = $self->{USEROUTPUT};
335 } elsif ($tier =~ /DIGI/) {
336 $cfg = $self->{DIGICFG};
337 $output = $self->{DIGIOUTPUT};
338 } elsif ($tier =~ /SIM/) {
339 $cfg = $self->{SIMCFG};
340 $output = $self->{SIMOUTPUT};
341 } elsif ($tier =~ /GEN/) {
342 $cfg = $self->{GENCFG};
343 $output = $self->{GENOUTPUT};
344 } else {
345 &logmsg ("No valid tier, defaults for cfg and output");
346 $cfg = "robot.cfg";
347 $output = "";
348 }
349 }
350
351 # take site specific or if not found default max site queue
352 # read in from siteconfig file
353
354 my %sitelimits = ();
355 my %siteNameTranslation = ();
356 if ( $self->{SITECONFIG} ) {
357 open(siteconfig_file,"$self->{SITECONFIG}");
358 my @line_array = <siteconfig_file>;
359 for (my $i = 0; $i < scalar @line_array; ++$i) {
360 if (defined $line_array[$i] ) {
361 if ( $line_array[$i] !~ /^#/ && $line_array[$i] !~ /^\n/ ) {
362 my ($tempsite, $tempshortsite, $templimit) = split(/\s+/,$line_array[$i]);
363 chomp($tempsite);
364 chomp($tempshortsite);
365 chomp($templimit);
366 $sitelimits{$tempshortsite} = $templimit;
367 $siteNameTranslation{$tempshortsite} = $tempshortsite;
368 }
369 }
370 }
371 close siteconfig_file;
372 }
373
374 # If the site isn't too busy already, ignore.
375 my $running = ($sitestats{$siteNameTranslation{$site}}{R} || 0);
376 my $scheduled = ($sitestats{$siteNameTranslation{$site}}{S} || 0);
377 my $created = ($sitestats{$siteNameTranslation{$site}}{C} || 0);
378 my $pending = $created + $scheduled + $running;
379 my $max_queue = $sitelimits{$site} || $self->{MAX_SITE_QUEUE};
380 if ( $pending > $max_queue ) {
381 &logmsg("$siteNameTranslation{$site} : Task not created. number of R+S+C jobs ($running + $scheduled + $created = $pending) exceeds site limit of $site ($max_queue)");
382 next;
383 }
384
385 # Find out what is already pending for this task. First find all
386 # existing tasks in the repository, the latest generation.
387 my $datestamp = strftime ("%y%m%d", gmtime(time()));
388 my $taskdir = "$self->{TASKREPO}/$datestamp/$siteNameTranslation{$site}/$tier";
389
390 # create short unique taskbase from $datestamp.$site.$dataset.$tier.$owner
391 my $taskbase = $self->createHashFromString ("$datestamp.$site.$dataset.$tier.$owner");
392 chomp($taskbase);
393
394
395 my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
396 my $curgen = pop(@existing) || 0;
397 my $nextgen = $curgen + 1;
398
399 # OK to create the task if enough time has passed from previous
400 # task creation, or there is no previous task.
401 if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
402 || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
403 < time() - $secsinit)))
404 {
405 my $mydir = $0; $mydir =~ s|/[^/]+$||;
406 my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
407 my $ret = &runcmd ("$mydir/CrabJobs",
408 "-cfg", "$mydir/$cfg",
409 "-datasetpath", $datasetpath,
410 "-output", $output,
411 "-totalevents", $totalevents,
412 "-whitelistce", $site,
413 "-whitelistse", $sitese,
414 "-cmssw", $cmssw,
415 "-eventsperjob", $events,
416 "-scheduler", $self->{SCHEDULER},
417 "-jobname", "$taskdir/$drop");
418 die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
419
420 &output ("$taskdir/$drop/TASK_INIT.txt",
421 &mytimeofday () . "\n");
422
423 my $dropdir = "$self->{WORKDIR}/$drop";
424 mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
425 if (&output ("$dropdir/task", "$taskdir/$drop")) {
426 &touch ("$dropdir/done");
427 $self->relayDrop ($drop);
428 &logmsg("stats: $drop $datestamp $site $dataset $tier $owner @{[&formatElapsedTime($self->{STARTTIME})]} success");
429 } else {
430 &alert ("$drop $datestamp $site $dataset $tier $owner: failed to create drop");
431 &rmtree ([ "$self->{WORKDIR}/$drop" ]);
432 }
433 }
434 }