ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskSource
Revision: 1.3
Committed: Wed May 10 08:03:07 2006 UTC (18 years, 11 months ago) by corvo
Branch: MAIN
Changes since 1.2: +107 -99 lines
Log Message:
added a call to DLSinfo to retrieve site hosting data

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env perl
2    
3     ##H This drop box agent initiates tasks on all published datasets.
4     ##H A task is an application to run on a dataset at a particular site.
5     ##H It keeps a record of tasks created and avoids submitting a task too
6     ##H frequently.
7     ##H
8     ##H Usage:
9     ##H TaskSource
10     ##H -state DIRECTORY [-next NEXT] [-wait SECS] [-url URL-PUBLISHED]
11     ##H [-ignore-sites REGEXP] [-accept-sites REGEXP]
12     ##H [-secs-per-event N] [-max-site-queue N]
13     ##H
14     ##H -state agent state directory, including inbox
15     ##H -next next agent to pass the drops to; can be given several times
16     ##H -wait time to wait in seconds between work scans
17     ##H -url contact string for published datasets
18     ##H -ignore-sites
19     ##H regular expression for sites to ignore; ignore applies before
20     ##H accept, and by default nothing is ignored and everything is
21     ##H accepted. applies to pubdb site names, not the host names of
22     ##H the site.
23     ##H -accept-sites
24     ##H regular expression for sites to accept; ignore applies before
25     ##H accept, and by default nothing is ignored and everything is
26     ##H accepted. applies to pubdb site names, not the host names of
27     ##H the site.
28     ##H -secs-per-event
29     ##H minimum time to allocate for a task, given a number of events
30     ##H in the dataset to process. tasks are not created more often
31     ##H than this.
32     ##H -max-site-queue
33     ##H the high water mark of currently submitted jobs to a site,
34     ##H above which new tasks will not be created.
35    
36     BEGIN {
37     use strict; use warnings; $^W=1;
38     our $me = $0; $me =~ s|.*/||;
39     our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
40     unshift(@INC, $home);
41     }
42    
43     ######################################################################
44     use UtilsHelp;
45     my %args = (WAITTIME => 600, SECS_PER_EVENT => 1., MAX_SITE_QUEUE => 10,
46     URL => "http://cmsdoc.cern.ch/cms/production/www/PubDB/GetPublishedCollectionInfoFromRefDB.php");
47     while (scalar @ARGV)
48     {
49     if ($ARGV[0] eq '-state' && scalar @ARGV > 1)
50 corvo 1.3 { shift (@ARGV); $args{DROPDIR}= shift(@ARGV);}
51 gutsche 1.1 elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1)
52 corvo 1.3 { shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));}
53 gutsche 1.1 elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1)
54     { shift (@ARGV); $args{WAITTIME} = shift(@ARGV); }
55     elsif ($ARGV[0] eq '-ignore-sites' && scalar @ARGV > 1)
56     { shift (@ARGV); $args{IGNORE_REGEXP} = shift(@ARGV); }
57     elsif ($ARGV[0] eq '-accept-sites' && scalar @ARGV > 1)
58     { shift (@ARGV); $args{ACCEPT_REGEXP} = shift(@ARGV); }
59     elsif ($ARGV[0] eq '-secs-per-event' && scalar @ARGV > 1)
60     { shift (@ARGV); $args{SECS_PER_EVENT} = shift(@ARGV); }
61     elsif ($ARGV[0] eq '-max-site-queue' && scalar @ARGV > 1)
62     { shift (@ARGV); $args{MAX_SITE_QUEUE} = shift(@ARGV); }
63     elsif ($ARGV[0] eq '-url' && scalar @ARGV > 1)
64     { shift (@ARGV); $args{URL} = shift(@ARGV); }
65     # Marco
66     elsif ($ARGV[0] eq '-dataset' && scalar @ARGV > 1)
67     { shift (@ARGV); $args{DATASET} = shift(@ARGV); }
68     elsif ($ARGV[0] eq '-owner' && scalar @ARGV > 1)
69     { shift (@ARGV); $args{OWNER} = shift(@ARGV); }
70     elsif ($ARGV[0] eq '-events' && scalar @ARGV > 1)
71     { shift (@ARGV); $args{NEVENT} = shift(@ARGV); }
72     elsif ($ARGV[0] eq '-mode' && scalar @ARGV > 1)
73     { shift (@ARGV); $args{MODE} = shift(@ARGV); }
74     elsif ($ARGV[0] eq '-scheduler' && scalar @ARGV > 1)
75     { shift (@ARGV); $args{SCHEDULER} = shift(@ARGV); }
76     elsif ($ARGV[0] eq '-jobtype' && scalar @ARGV > 1)
77     { shift (@ARGV); $args{JOBTYPE} = shift(@ARGV); }
78 corvo 1.3 # marco. Added... don't know why...
79     elsif ($ARGV[0] eq '-log' && scalar @ARGV > 1)
80     { shift (@ARGV); $args{LOGFILE} = shift(@ARGV); }
81     # marco.
82 gutsche 1.1 # Marco
83     elsif ($ARGV[0] eq '-h')
84     { &usage(); }
85     else
86     { last; }
87     }
88    
89     if (@ARGV || !$args{DROPDIR} || !$args{URL})
90     {
91     die "Insufficient parameters, use -h for help.\n";
92     }
93    
94     (new TaskSource (%args))->process();
95    
96     ######################################################################
97     # Routines specific to this agent.
98     package TaskSource; use strict; use warnings; use base 'UtilsAgent';
99     use File::Path;
100     use UtilsCommand;
101     use UtilsLogging;
102     use UtilsTiming;
103     use UtilsNet;
104     use POSIX;
105    
106     sub new
107     {
108     my $proto = shift;
109     my $class = ref($proto) || $proto;
110     my $self = $class->SUPER::new(@_);
111     my %params = (SECS_PER_EVENT => undef, # secs/event to delay per dataset
112     MAX_SITE_QUEUE => undef, # max number of jobs per site
113     IGNORE_REGEXP => undef, # regexp of sites to ignore
114     ACCEPT_REGEXP => undef, # regexp of sites to accept
115     DATASET => undef, # specific dataset
116     OWNER => undef, # specific owner
117 corvo 1.3 NEVENT => 500, # number of events per job
118     MODE => 1, # data discovery mode: (1) PudDB/RefDB, (2) DBS/DLS
119     JOBTYPE => "orca", # standard jobtype
120     SCHEDULER => "edg", # standard scheduler
121     URL => undef); # published dataset url
122 gutsche 1.1 my %args = (@_);
123     map { $self->{$_} = defined $args{$_} ? $args{$_} : $params{$_} } keys %params;
124     bless $self, $class;
125     return $self;
126     }
127    
128     sub init
129     {
130     my ($self) = @_;
131     $self->{TASKREPO} = "$self->{DROPDIR}/tasks";
132     -d "$self->{TASKREPO}"
133     || mkdir "$self->{TASKREPO}"
134     || die "$self->{TASKREPO}: cannot create directory: $!\n";
135    
136     # Determine if links supports -dump-width option
137     $self->{LINKS_OPTS} = [];
138     open (LINKS_HELP, "links -help 2>/dev/null |");
139     if (grep (/-dump-width/, <LINKS_HELP>)) {
140     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300));
141     }
142     close (LINKS_HELP);
143    
144 corvo 1.3 }
145 gutsche 1.1
146     # Find out how many jobs are pending for each site. This is
147     # insensitive to the job type, and we only check once in the
148     # beginning to avoid favouring one dataset over another --
149     # once we decide to proceed for a site, we submit jobs for
150     # all datasets.
151     sub getSiteStatus
152     {
153     my ($self) = @_;
154     my %result = ();
155     my $taskrepo = $self->{TASKREPO};
156     foreach my $site (<$taskrepo/*/*>)
157     {
158     my ($sitename) = ($site =~ m|.*/(.*)|);
159     foreach my $d (<$site/*/*>)
160     {
161     if (! -f "$d/JOB_CREATE_LOG.txt")
162     {
163     $result{$sitename}{C} ||= 0;
164     $result{$sitename}{C}++;
165     next;
166     }
167    
168     my $f = (<$d/crab_*/share/db/jobs>)[0];
169     next if ! defined $f;
170    
171     foreach my $status (split(/\n/, &input($f) || ''))
172     {
173     my @statusarray = split("\;", $status);
174     $result{$sitename}{$statusarray[1]} ||= 0;
175     $result{$sitename}{$statusarray[1]}++;
176     }
177     }
178     }
179    
180     return %result;
181     }
182    
183     sub idle
184     {
185     my ($self, @pending) = @_;
186     eval {
187     # Get status of how busy the sites are. We obtain this only once
188     # in order to not favour datasets "early on" in the list.
189     my %sitestats = $self->getSiteStatus ();
190     if (keys %sitestats)
191     {
192     my @load;
193     foreach my $site (sort keys %sitestats)
194     {
195     push (@load, "$site" . join("", map { " $_=$sitestats{$site}{$_}" }
196     sort keys %{$sitestats{$site}}));
197     }
198     &logmsg ("current load: ", join ("; ", @load));
199     }
200    
201     # Invoke links to fetch a formatted web page of published datasets.
202     if ( $self->{MODE} == 2 ) {
203 corvo 1.3 &logmsg ("DBS/DLS mode\n");
204     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DBSlistDataset.py";
205     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSlistDataset.py";
206     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
207     while (<PUBLISHED>) {
208     chomp;
209     &timeStart($self->{STARTTIME});
210     # Find out what was published and what we would like to do with it
211     my $datapath = $_;
212     my ($n, $dataset, $datatier, $owner) = split(/\//, $_);
213     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
214     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
215     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DLSInfo.py \"$_\" ";
216     my $cmd = $ENV{PYTHONSCRIPT} . "/DLSInfo.py \"$_\" ";
217     open(SITE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
218     while (<SITE>){
219     &logmsg("$_");
220     my ($site, $events) = split(/\//, $_);
221     next if ($self->{SITE} && $site !~ /$self->{SITE}/);
222     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
223     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
224     my $whitelist = $site || '.';
225     $self->createTask($datapath, $site, $events, $whitelist, %sitestats);
226     }
227     }
228 gutsche 1.1 } else {
229 corvo 1.3 &logmsg ("RefDB/PubDB mode");
230     my $cmd = "links @{$self->{LINKS_OPTS}} -dump '$self->{URL}'";
231     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
232     while (<PUBLISHED>)
233     {
234     &timeStart($self->{STARTTIME});
235     my @list;
236     chomp; next if ! /_/; s/\|/ /g; s/^\s+//; s/\s+$//;
237     my ($dataset, $owner, $events, $site, $proto) = split(/\s+/, $_);
238     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
239     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
240     next if ($self->{SITE} && $site !~ /$self->{SITE}/);
241     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
242     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
243     my $whitelist = $site || '.';
244 gutsche 1.1 # Find out what was published and what we would like to do with it
245 corvo 1.3 push(@list, split(/\s+/, $_)); push(@list, $whitelist);
246     $self->createTask(@list, %sitestats);
247 gutsche 1.1 }
248     close (PUBLISHED);
249     }
250     };
251     do { chomp ($@); &alert ($@); } if $@;
252    
253     $self->nap ($self->{WAITTIME});
254     }
255    
256     sub createTask()
257     {
258 corvo 1.3
259     my ($self) = shift(@_);
260     my ($n, $dataset, $owner, $events, $site, $proto, %sitestats, $datapath, $datatier, $whitelist);
261     if (($self->{MODE}) == 1) {
262     ($dataset, $owner, $events, $site, $proto, $whitelist, %sitestats) = @_;
263     }
264     else {
265     ($datapath, $site, $events, $whitelist, %sitestats) = @_;
266     ($n, $dataset, $datatier, $owner) = split(/\//, $datapath);
267     }
268 gutsche 1.1
269 corvo 1.3 # my ($self, $datapath, $dataset, $datatier, $owner, $mode, %sitestats) = @_;
270 gutsche 1.1
271     my ($app, $tiers, $rc, $nevents, $output);
272     if ($dataset =~ /MBforPU/) {
273     next;
274     } elsif ($owner =~ /Hit/) {
275     $rc = "orcarc.read.simhits";
276     $app = "ExSimHitStatistics";
277     $tiers = "Hit";
278     $output = "simhits.aida";
279     } elsif ($owner =~ /DST/) {
280     if (rand(1) > 1.75) {
281 corvo 1.3 $rc = "orcarc.root.dst";
282     $app = "ExRootAnalysisDST";
283 gutsche 1.1 $output = "test.root";
284     } else {
285 corvo 1.3 $rc = "orcarc.read.dst";
286     $app = "ExDSTStatistics";
287 gutsche 1.1 $output = "dststatistics.aida";
288     }
289     $tiers = "DST,Digi,Hit";
290     } else {
291     if (rand(1) > 1.75) {
292 corvo 1.3 $rc = "orcarc.root.digis";
293     $app = "ExRootAnalysisDigi";
294 gutsche 1.1 $output = "test.root";
295     } else {
296 corvo 1.3 $rc = "orcarc.read.digis";
297     $app = "ExDigiStatistics";
298 gutsche 1.1 $output = "digistatistics.aida";
299     }
300     $tiers = "Digi,Hit";
301     }
302    
303     # Find out what is already pending for this task. First find all
304     # existing tasks in the repository, the latest generation.
305     my $datestamp = strftime ("%y%m%d", gmtime(time()));
306     my $taskdir = "$self->{TASKREPO}/$datestamp/$site/$app";
307     # OLI: shorten path for condor_g (restriction to 256 characters)
308     # my $taskbase = "$datestamp.$site.$app.$dataset.$owner";
309 corvo 1.3 my $taskbase = "$dataset.$owner";
310 gutsche 1.1 my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
311     my $curgen = pop(@existing) || 0;
312     my $nextgen = $curgen + 1;
313    
314     # If the site isn't too busy already, ignore.
315     my $pending = ($sitestats{$site}{S} || 0);
316     $pending += ($sitestats{$site}{C} || 0);
317     next if $pending > $self->{MAX_SITE_QUEUE};
318    
319     # OK to create the task if enough time has passed from previous
320     # task creation, or there is no previous task.
321     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
322 corvo 1.3 || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
323     < time() - $events * $self->{SECS_PER_EVENT})))
324 gutsche 1.1 {
325 corvo 1.3 my $mydir = $0; $mydir =~ s|/[^/]+$||;
326     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
327     my $ret = &runcmd ("$mydir/CrabJobs", "-app", $app,
328     "-jobevents", $self->{NEVENT},
329     "-orcarc", "$mydir/$rc",
330     "-owner", $owner,
331     "-dataset", $dataset,
332     "-tiers", $tiers,
333     "-whitelist", $whitelist,
334     "-name", "$taskdir/$drop",
335     "-output", $output,
336     "-jobtype", $self->{JOBTYPE},
337     "-scheduler", $self->{SCHEDULER},
338     "-mode", $self->{MODE});
339     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
340 gutsche 1.1
341 corvo 1.3 &output ("$taskdir/$drop/TASK_INIT.txt",
342     &mytimeofday () . "\n");
343    
344     my $dropdir = "$self->{WORKDIR}/$drop";
345     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
346     if (&output ("$dropdir/task", "$taskdir/$drop"))
347     {
348     &touch ("$dropdir/done");
349     $self->relayDrop ($drop);
350     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
351     }
352     else
353     {
354     &alert ("$drop: failed to create drop");
355     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
356     }
357 gutsche 1.1 }
358     }