ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskSource
Revision: 1.23
Committed: Mon Oct 23 00:24:32 2006 UTC (18 years, 6 months ago) by gutsche
Branch: MAIN
Changes since 1.22: +2 -2 lines
Log Message:
increased log output

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env perl
2    
3     ##H This drop box agent initiates tasks on all published datasets.
4     ##H A task is an application to run on a dataset at a particular site.
5     ##H It keeps a record of tasks created and avoids submitting a task too
6     ##H frequently.
7     ##H
8     ##H Usage:
9     ##H
10 gutsche 1.21 ##H TaskSource
11     ##H
12     ##H -state agent state directory, including inbox
13     ##H -next next agent to pass the drops to; can be given several times
14     ##H -wait time to wait in seconds between work scans
15     ##H -sec_between_inits seconds between submission of same dataset/tier combination
16     ##H -siteconfig file with list of valid SE and their schedule limits
17     ##H -max-site-queue default value for maximum events scheduled per site
18 gutsche 1.22 ##H -max-jobs maximum number of jobs per project, -1 to deactivate
19 gutsche 1.21 ##H -scheduler scheduler
20     ##H -events number of events per job
21     ##H -pattern DBS discovery pattern
22     ##H -validtiers perl string pattern with valid tiers (DIGI|RECO)
23     ##H -simcfg CMSSW parameter-set for tier SIM
24     ##H -simoutput putput (comma separated) for tier SIM
25     ##H -digicfg CMSSW parameter-set for tier DIGI
26     ##H -digioutput putput (comma separated) for tier DIGI
27     ##H -recocfg CMSSW parameter-set for tier RECO
28     ##H -recooutput putput (comma separated) for tier RECO
29 gutsche 1.1
30     BEGIN {
31     use strict; use warnings; $^W=1;
32     our $me = $0; $me =~ s|.*/||;
33     our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
34     unshift(@INC, $home);
35     }
36    
37     ######################################################################
38     use UtilsHelp;
39 gutsche 1.21 my %args = (WAITTIME => 600, SECS_PER_EVENT => 1., MAX_SITE_QUEUE => 10);
40     while (scalar @ARGV) {
41     if ($ARGV[0] eq '-state' && scalar @ARGV > 1) {
42     shift (@ARGV); $args{DROPDIR}= shift(@ARGV);
43     } elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1) {
44     shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));
45     } elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1) {
46     shift (@ARGV); $args{WAITTIME} = shift(@ARGV);
47     } elsif ($ARGV[0] eq '-secs_between_inits' && scalar @ARGV > 1) {
48     shift (@ARGV); $args{SECS_BETWEEN_INITS} = shift(@ARGV);
49     } elsif ($ARGV[0] eq '-siteconfig' && scalar @ARGV > 1) {
50     shift (@ARGV); $args{SITECONFIG} = shift(@ARGV);
51     } elsif ($ARGV[0] eq '-max-site-queue' && scalar @ARGV > 1) {
52     shift (@ARGV); $args{MAX_SITE_QUEUE} = shift(@ARGV);
53 gutsche 1.22 } elsif ($ARGV[0] eq '-max-jobs' && scalar @ARGV > 1) {
54     shift (@ARGV); $args{MAX_JOBS} = shift(@ARGV);
55 gutsche 1.21 } elsif ($ARGV[0] eq '-scheduler' && scalar @ARGV > 1) {
56     shift (@ARGV); $args{SCHEDULER} = shift(@ARGV);
57     } elsif ($ARGV[0] eq '-events' && scalar @ARGV > 1) {
58     shift (@ARGV); $args{EVENTS} = shift(@ARGV);
59     } elsif ($ARGV[0] eq '-pattern' && scalar @ARGV > 1) {
60     shift (@ARGV); $args{PATTERN} = shift(@ARGV);
61     } elsif ($ARGV[0] eq '-validtiers' && scalar @ARGV > 1) {
62     shift (@ARGV); $args{VALIDTIERS} = shift(@ARGV);
63     } elsif ($ARGV[0] eq '-simcfg' && scalar @ARGV > 1) {
64     shift (@ARGV); $args{SIMCFG} = shift(@ARGV);
65     } elsif ($ARGV[0] eq '-simoutput' && scalar @ARGV > 1) {
66     shift (@ARGV); $args{SIMOUTPUT} = shift(@ARGV);
67     } elsif ($ARGV[0] eq '-digicfg' && scalar @ARGV > 1) {
68     shift (@ARGV); $args{DIGICFG} = shift(@ARGV);
69     } elsif ($ARGV[0] eq '-digioutput' && scalar @ARGV > 1) {
70     shift (@ARGV); $args{DIGIOUTPUT} = shift(@ARGV);
71     } elsif ($ARGV[0] eq '-recocfg' && scalar @ARGV > 1) {
72     shift (@ARGV); $args{RECOCFG} = shift(@ARGV);
73     } elsif ($ARGV[0] eq '-recooutput' && scalar @ARGV > 1) {
74     shift (@ARGV); $args{RECOOUTPUT} = shift(@ARGV);
75     } elsif ($ARGV[0] eq '-h') {
76     &usage();
77     } else {
78     last;
79     }
80 gutsche 1.1 }
81    
82 gutsche 1.21 if (@ARGV || !$args{DROPDIR}) {
83     die "Insufficient parameters, use -h for help.\n";
84 gutsche 1.1 }
85    
86     (new TaskSource (%args))->process();
87    
88     ######################################################################
89     # Routines specific to this agent.
90     package TaskSource; use strict; use warnings; use base 'UtilsAgent';
91     use File::Path;
92     use UtilsCommand;
93     use UtilsLogging;
94     use UtilsTiming;
95     use UtilsNet;
96     use POSIX;
97    
98     sub new
99 gutsche 1.21 {
100 gutsche 1.1 my $proto = shift;
101     my $class = ref($proto) || $proto;
102     my $self = $class->SUPER::new(@_);
103 gutsche 1.21 my %params = (MAX_SITE_QUEUE => undef, # max number of jobs per site
104 gutsche 1.22 MAX_JOBS => undef, # max number of jobs per project, -1 to deactivate
105 gutsche 1.21 SECS_BETWEEN_INITS => 600, # path to siteconfig file
106     SITECONFIG => undef, # path to siteconfig file
107     EVENTS => 1000, # number of events per job
108     PATTERN => "*", # DBS discovery pattern
109     VALIDTIERS => undef, # perl string pattern with valid tiers (DIGI|RECO)
110     SIMCFG => undef, # CMSSW parameter-set for SIM tier
111     DIGICFG => undef, # CMSSW parameter-set for DIGI tier
112     RECOCFG => undef, # CMSSW parameter-set for RECO tier
113     SIMOUTPUT => undef, # output (comma separated) for SIM tier
114     DIGIOUTPUT => undef, # output (comma separated) for DIGI tier
115     RECOOUTPUT => undef, # output (comma separated) for RECO tier
116     SCHEDULER => "edg"); # standard scheduler
117 gutsche 1.1 my %args = (@_);
118     map { $self->{$_} = defined $args{$_} ? $args{$_} : $params{$_} } keys %params;
119     bless $self, $class;
120     return $self;
121 gutsche 1.21 }
122 gutsche 1.1
123     sub init
124 gutsche 1.21 {
125 gutsche 1.1 my ($self) = @_;
126     $self->{TASKREPO} = "$self->{DROPDIR}/tasks";
127     -d "$self->{TASKREPO}"
128 gutsche 1.21 || mkdir "$self->{TASKREPO}"
129 gutsche 1.1 || die "$self->{TASKREPO}: cannot create directory: $!\n";
130    
131     # Determine if links supports -dump-width option
132     $self->{LINKS_OPTS} = [];
133     open (LINKS_HELP, "links -help 2>/dev/null |");
134 gutsche 1.4 if ( grep(/-no-numbering/, <LINKS_HELP>) ) {
135     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300 -no-numbering 1));
136     } elsif ( grep(/-dump-width/, <LINKS_HELP>) ) {
137     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300));
138 gutsche 1.1 }
139     close (LINKS_HELP);
140 gutsche 1.21 }
141 gutsche 1.1
142     # Find out how many jobs are pending for each site. This is
143     # insensitive to the job type, and we only check once in the
144     # beginning to avoid favouring one dataset over another --
145     # once we decide to proceed for a site, we submit jobs for
146     # all datasets.
147     sub getSiteStatus
148 gutsche 1.18 {
149 gutsche 1.1 my ($self) = @_;
150     my %result = ();
151     my $taskrepo = $self->{TASKREPO};
152 gutsche 1.21 foreach my $site (<$taskrepo/*/*>) {
153     my ($sitename) = ($site =~ m|.*/(.*)|);
154     foreach my $d (<$site/*/*>) {
155     if (! -f "$d/JOB_CREATE_LOG.txt") {
156     $result{$sitename}{C} ||= 0;
157     $result{$sitename}{C}++;
158     next;
159     }
160 gutsche 1.18
161 gutsche 1.21 my $f = (<$d/crab_*/share/db/jobs>)[0];
162     next if ! defined $f;
163 gutsche 1.18
164 gutsche 1.21 foreach my $status (split(/\n/, &input($f) || '')) {
165     my @statusarray = split('\|', $status);
166     $result{$sitename}{$statusarray[1]} ||= 0;
167     $result{$sitename}{$statusarray[1]}++;
168     }
169 gutsche 1.18 }
170 gutsche 1.21 }
171 gutsche 1.18
172 gutsche 1.1 return %result;
173 gutsche 1.18 }
174 gutsche 1.1
175 gutsche 1.21 sub createHashFromString
176     {
177     my ($self, $string) = @_;
178     my $cmd = "mktemp /tmp/hash_XXXXXX";
179     open (HASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
180     my $hash_file_name;
181     foreach my $hash_line (split(/\n/, <HASHFILE> || '')) {
182     chomp($hash_line);
183     $hash_file_name = $hash_line;
184     }
185     $cmd = "echo $string >> $hash_file_name";
186     system $cmd;
187     # open (FILLHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
188     $cmd = "cksum $hash_file_name";
189     open (CKSUMHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
190     my ($hash, $dummy_one, $dummy_two);
191     foreach my $hash_output (split(/\n/, <CKSUMHASHFILE> || '')) {
192     chomp($hash_output);
193     ($hash, $dummy_one, $dummy_two) = split(/ /,$hash_output);
194     }
195     $cmd = "rm -f $hash_file_name";
196     system $cmd;
197     # open (RMHASHFILE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
198    
199     return $hash;
200     }
201    
202 gutsche 1.1 sub idle
203     {
204 gutsche 1.21 &logmsg ("DBS/DLS discovery");
205 gutsche 1.1 my ($self, @pending) = @_;
206     eval {
207     # Get status of how busy the sites are. We obtain this only once
208     # in order to not favour datasets "early on" in the list.
209     my %sitestats = $self->getSiteStatus ();
210 gutsche 1.21 if (keys %sitestats) {
211     my @load;
212     foreach my $site (sort keys %sitestats) {
213     push (@load, "$site" . join("", map { " $_=$sitestats{$site}{$_}" }
214     sort keys %{$sitestats{$site}}));
215 gutsche 1.1 }
216 gutsche 1.21 &logmsg ("current load: ", join ("; ", @load));
217     }
218 gutsche 1.1
219     # Invoke links to fetch a formatted web page of published datasets.
220 gutsche 1.21 my $cmd = $ENV{PYTHONSCRIPT} . "/DBSDLSQuery.py --pattern \"$self->{PATTERN}\"";
221     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
222     while (<PUBLISHED>) {
223     chomp;
224     &timeStart($self->{STARTTIME});
225     my ($datapath, $site, $totalevents) = split(/ /, $_);
226     # read in siteconfig dynamically
227     if ( $self->{SITECONFIG} ) {
228     open(siteconfig_file,"$self->{SITECONFIG}");
229     my @line_array = <siteconfig_file>;
230     my $validsites = "(";
231     for (my $i = 0; $i < scalar @line_array; ++$i) {
232     if (defined $line_array[$i] ) {
233     if ( $line_array[$i] !~ /^#/ && $line_array[$i] !~ /^\n/ ) {
234     my ($tempsite, $tempshortsite, $templimit) = split(/ /,$line_array[$i]);
235     chomp($tempsite);
236     $validsites = $validsites . $tempsite . "|";
237 gutsche 1.19 }
238     }
239 gutsche 1.5 }
240 gutsche 1.21 chop($validsites);
241     $validsites = $validsites . ")";
242     close siteconfig_file;
243     next if ($validsites && $site !~ /$validsites/);
244 gutsche 1.5 }
245 gutsche 1.22 # check if totalevents is larger than $EVENTS * $MAX_JOBS
246     if ($self->{MAX_JOBS} != -1) {
247     if ( $self->{MAX_JOBS} * $self->{EVENTS} < $totalevents ) {
248     $totalevents = $self->{MAX_JOBS} * $self->{EVENTS};
249     }
250     }
251 gutsche 1.21 $self->createTask($datapath, $site, $totalevents, %sitestats);
252 gutsche 1.1 }
253     };
254     do { chomp ($@); &alert ($@); } if $@;
255    
256     $self->nap ($self->{WAITTIME});
257     }
258    
259     sub createTask()
260     {
261 corvo 1.3 my ($self) = shift(@_);
262 gutsche 1.5 my ($datasetpath, $site, $totalevents, %sitestats) = @_;
263    
264     my($n, $dataset, $tier, $owner) = split(/\//, $datasetpath);
265    
266 gutsche 1.21 # filter on tier
267     next if ($self->{VALIDTIERS} && $tier !~ /$self->{VALIDTIERS}/);
268    
269     my ($cfg, $output);
270     if ($tier =~ /RECO/) {
271     $cfg = $self->{RECOCFG};
272     $output = $self->{RECOOUTPUT};
273     } elsif ($tier =~ /DIGI/) {
274     $cfg = $self->{DIGICFG};
275     $output = $self->{DIGIOUTPUT};
276     } elsif ($tier =~ /SIM/) {
277     $cfg = $self->{SIMCFG};
278     $output = $self->{SIMOUTPUT};
279     } else {
280     &logmsg ("No valid tier, defaults for cfg and output");
281     $cfg = "robot.cfg";
282     $output = "";
283 gutsche 1.5 }
284    
285 gutsche 1.8 # take site specific or if not found default max site queue
286 gutsche 1.20 # read in from siteconfig file
287     my %sitelimits = ();
288 gutsche 1.21 my %siteNameTranslation = ();
289 gutsche 1.20 if ( $self->{SITECONFIG} ) {
290     open(siteconfig_file,"$self->{SITECONFIG}");
291     my @line_array = <siteconfig_file>;
292     for (my $i = 0; $i < scalar @line_array; ++$i) {
293     if (defined $line_array[$i] ) {
294     if ( $line_array[$i] !~ /^#/ && $line_array[$i] !~ /^\n/ ) {
295 gutsche 1.21 my ($tempsite, $tempshortsite, $templimit) = split(/ /,$line_array[$i]);
296 gutsche 1.20 chomp($tempsite);
297 gutsche 1.21 chomp($tempshortsite);
298 gutsche 1.20 chomp($templimit);
299     $sitelimits{$tempsite} = $templimit;
300 gutsche 1.21 $siteNameTranslation{$tempsite} = $tempshortsite;
301 gutsche 1.20 }
302     }
303     }
304     close siteconfig_file;
305     }
306 gutsche 1.21
307     # If the site isn't too busy already, ignore.
308     my $pending = ($sitestats{$siteNameTranslation{$site}}{S} || 0);
309     $pending += ($sitestats{$siteNameTranslation{$site}}{C} || 0);
310 gutsche 1.20 my $max_queue = $sitelimits{$site} || $self->{MAX_SITE_QUEUE};
311 gutsche 1.21 if ( $pending gt $max_queue ) {
312 gutsche 1.4 next;
313     }
314    
315     # Find out what is already pending for this task. First find all
316     # existing tasks in the repository, the latest generation.
317     my $datestamp = strftime ("%y%m%d", gmtime(time()));
318 gutsche 1.21 my $taskdir = "$self->{TASKREPO}/$datestamp/$siteNameTranslation{$site}/$tier";
319    
320     # create short unique taskbase from $datestamp.$site.$dataset.$tier.$owner
321     my $taskbase = $self->createHashFromString ("$datestamp.$site.$dataset.$tier.$owner");
322     chomp($taskbase);
323    
324 gutsche 1.4 my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
325     my $curgen = pop(@existing) || 0;
326     my $nextgen = $curgen + 1;
327    
328     # OK to create the task if enough time has passed from previous
329     # task creation, or there is no previous task.
330     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
331 gutsche 1.21 || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
332     < time() - $self->{SECS_BETWEEN_INITS})))
333 gutsche 1.4 {
334     my $mydir = $0; $mydir =~ s|/[^/]+$||;
335     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
336 gutsche 1.21 my $ret = &runcmd ("$mydir/CrabJobs",
337     "-cfg", "$mydir/$cfg",
338     "-datasetpath", $datasetpath,
339 gutsche 1.4 "-output", $output,
340 gutsche 1.21 "-totalevents", $totalevents,
341     "-whitelist", $site,
342     "-eventsperjob", $self->{EVENTS},
343 gutsche 1.4 "-scheduler", $self->{SCHEDULER},
344 gutsche 1.21 "-jobname", "$taskdir/$drop");
345 gutsche 1.4 die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
346 gutsche 1.21
347 gutsche 1.4 &output ("$taskdir/$drop/TASK_INIT.txt",
348     &mytimeofday () . "\n");
349    
350     my $dropdir = "$self->{WORKDIR}/$drop";
351     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
352 gutsche 1.21 if (&output ("$dropdir/task", "$taskdir/$drop")) {
353     &touch ("$dropdir/done");
354     $self->relayDrop ($drop);
355 gutsche 1.23 &logmsg("stats: $drop $datestamp $site $dataset $tier $owner @{[&formatElapsedTime($self->{STARTTIME})]} success");
356 gutsche 1.21 } else {
357 gutsche 1.23 &alert ("$drop $datestamp $site $dataset $tier $owner: failed to create drop");
358 gutsche 1.21 &rmtree ([ "$self->{WORKDIR}/$drop" ]);
359     }
360 gutsche 1.4 }
361     }