ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskSource
Revision: 1.5
Committed: Sun Jun 4 17:13:30 2006 UTC (18 years, 10 months ago) by gutsche
Branch: MAIN
CVS Tags: JOBROBOT_1_0004_for_CRAB_1_2_0_cmssw_pre6, JOBROBOT_1_0003_for_CRAB_1_2_0_cmssw_pre6
Changes since 1.4: +135 -35 lines
Log Message:
adapt to new DBS/DLS api from CRAB_1_2_0_cmssw_pre6, added CMSSW mode (config: Config.rb.cmssw)

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env perl
2    
3     ##H This drop box agent initiates tasks on all published datasets.
4     ##H A task is an application to run on a dataset at a particular site.
5     ##H It keeps a record of tasks created and avoids submitting a task too
6     ##H frequently.
7     ##H
8     ##H Usage:
9     ##H TaskSource
10     ##H -state DIRECTORY [-next NEXT] [-wait SECS] [-url URL-PUBLISHED]
11     ##H [-ignore-sites REGEXP] [-accept-sites REGEXP]
12     ##H [-secs-per-event N] [-max-site-queue N]
13     ##H
14     ##H -state agent state directory, including inbox
15     ##H -next next agent to pass the drops to; can be given several times
16     ##H -wait time to wait in seconds between work scans
17     ##H -url contact string for published datasets
18     ##H -ignore-sites
19     ##H regular expression for sites to ignore; ignore applies before
20     ##H accept, and by default nothing is ignored and everything is
21     ##H accepted. applies to pubdb site names, not the host names of
22     ##H the site.
23     ##H -accept-sites
24     ##H regular expression for sites to accept; ignore applies before
25     ##H accept, and by default nothing is ignored and everything is
26     ##H accepted. applies to pubdb site names, not the host names of
27     ##H the site.
28     ##H -secs-per-event
29     ##H minimum time to allocate for a task, given a number of events
30     ##H in the dataset to process. tasks are not created more often
31     ##H than this.
32     ##H -max-site-queue
33     ##H the high water mark of currently submitted jobs to a site,
34     ##H above which new tasks will not be created.
35    
36     BEGIN {
37     use strict; use warnings; $^W=1;
38     our $me = $0; $me =~ s|.*/||;
39     our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
40     unshift(@INC, $home);
41     }
42    
43     ######################################################################
44     use UtilsHelp;
45     my %args = (WAITTIME => 600, SECS_PER_EVENT => 1., MAX_SITE_QUEUE => 10,
46     URL => "http://cmsdoc.cern.ch/cms/production/www/PubDB/GetPublishedCollectionInfoFromRefDB.php");
47     while (scalar @ARGV)
48     {
49     if ($ARGV[0] eq '-state' && scalar @ARGV > 1)
50 corvo 1.3 { shift (@ARGV); $args{DROPDIR}= shift(@ARGV);}
51 gutsche 1.1 elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1)
52 corvo 1.3 { shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));}
53 gutsche 1.1 elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1)
54     { shift (@ARGV); $args{WAITTIME} = shift(@ARGV); }
55     elsif ($ARGV[0] eq '-ignore-sites' && scalar @ARGV > 1)
56     { shift (@ARGV); $args{IGNORE_REGEXP} = shift(@ARGV); }
57     elsif ($ARGV[0] eq '-accept-sites' && scalar @ARGV > 1)
58     { shift (@ARGV); $args{ACCEPT_REGEXP} = shift(@ARGV); }
59     elsif ($ARGV[0] eq '-secs-per-event' && scalar @ARGV > 1)
60     { shift (@ARGV); $args{SECS_PER_EVENT} = shift(@ARGV); }
61     elsif ($ARGV[0] eq '-max-site-queue' && scalar @ARGV > 1)
62     { shift (@ARGV); $args{MAX_SITE_QUEUE} = shift(@ARGV); }
63     elsif ($ARGV[0] eq '-url' && scalar @ARGV > 1)
64     { shift (@ARGV); $args{URL} = shift(@ARGV); }
65     # Marco
66     elsif ($ARGV[0] eq '-dataset' && scalar @ARGV > 1)
67     { shift (@ARGV); $args{DATASET} = shift(@ARGV); }
68     elsif ($ARGV[0] eq '-owner' && scalar @ARGV > 1)
69     { shift (@ARGV); $args{OWNER} = shift(@ARGV); }
70     elsif ($ARGV[0] eq '-events' && scalar @ARGV > 1)
71     { shift (@ARGV); $args{NEVENT} = shift(@ARGV); }
72     elsif ($ARGV[0] eq '-mode' && scalar @ARGV > 1)
73     { shift (@ARGV); $args{MODE} = shift(@ARGV); }
74     elsif ($ARGV[0] eq '-scheduler' && scalar @ARGV > 1)
75     { shift (@ARGV); $args{SCHEDULER} = shift(@ARGV); }
76     elsif ($ARGV[0] eq '-jobtype' && scalar @ARGV > 1)
77     { shift (@ARGV); $args{JOBTYPE} = shift(@ARGV); }
78 gutsche 1.5 elsif ($ARGV[0] eq '-filesperjob' && scalar @ARGV > 1)
79     { shift (@ARGV); $args{FILESPERJOB} = shift(@ARGV); }
80     elsif ($ARGV[0] eq '-eventsperjob' && scalar @ARGV > 1)
81     { shift (@ARGV); $args{EVENTSPERJOB} = shift(@ARGV); }
82 corvo 1.3 # marco. Added... don't know why...
83     elsif ($ARGV[0] eq '-log' && scalar @ARGV > 1)
84     { shift (@ARGV); $args{LOGFILE} = shift(@ARGV); }
85     # marco.
86 gutsche 1.1 # Marco
87     elsif ($ARGV[0] eq '-h')
88     { &usage(); }
89     else
90     { last; }
91     }
92    
93     if (@ARGV || !$args{DROPDIR} || !$args{URL})
94     {
95     die "Insufficient parameters, use -h for help.\n";
96     }
97    
98     (new TaskSource (%args))->process();
99    
100     ######################################################################
101     # Routines specific to this agent.
102     package TaskSource; use strict; use warnings; use base 'UtilsAgent';
103     use File::Path;
104     use UtilsCommand;
105     use UtilsLogging;
106     use UtilsTiming;
107     use UtilsNet;
108     use POSIX;
109    
110     sub new
111     {
112     my $proto = shift;
113     my $class = ref($proto) || $proto;
114     my $self = $class->SUPER::new(@_);
115     my %params = (SECS_PER_EVENT => undef, # secs/event to delay per dataset
116     MAX_SITE_QUEUE => undef, # max number of jobs per site
117     IGNORE_REGEXP => undef, # regexp of sites to ignore
118     ACCEPT_REGEXP => undef, # regexp of sites to accept
119     DATASET => undef, # specific dataset
120     OWNER => undef, # specific owner
121 gutsche 1.5 NEVENT => 500, # number of events per job
122     MODE => 1, # data discovery mode: (1) PudDB/RefDB, (2) DBS/DLS
123     JOBTYPE => "orca", # standard jobtype
124     SCHEDULER => "edg", # standard scheduler
125     FILESPERJOB => 1, # CMSSW: files per job
126     URL => undef); # published dataset url
127 gutsche 1.1 my %args = (@_);
128     map { $self->{$_} = defined $args{$_} ? $args{$_} : $params{$_} } keys %params;
129     bless $self, $class;
130     return $self;
131     }
132    
133     sub init
134     {
135     my ($self) = @_;
136     $self->{TASKREPO} = "$self->{DROPDIR}/tasks";
137     -d "$self->{TASKREPO}"
138     || mkdir "$self->{TASKREPO}"
139     || die "$self->{TASKREPO}: cannot create directory: $!\n";
140    
141     # Determine if links supports -dump-width option
142     $self->{LINKS_OPTS} = [];
143     open (LINKS_HELP, "links -help 2>/dev/null |");
144 gutsche 1.4 if ( grep(/-no-numbering/, <LINKS_HELP>) ) {
145     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300 -no-numbering 1));
146     } elsif ( grep(/-dump-width/, <LINKS_HELP>) ) {
147     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300));
148 gutsche 1.1 }
149     close (LINKS_HELP);
150    
151 gutsche 1.4 # Precode whitelist. Should really read this from somewhere...
152    
153     $self->{WHITELIST} = { ASCC => "sinica.edu.tw",
154     NCU => "ncu.edu.tw",
155     FNAL => "fnal.gov",
156     CNAF => "webserver.infn.it",
157     BA => "ba.infn.it",
158     IN2P3=> "in2p3.fr",
159     PIC => "pic.es",
160     T2_SP=> "ciemat.es",
161     RAL => "ral.ac.uk",
162     CERN => "cern.ch",
163     FZK => "fzk.de",
164     DESY => "desy.de",
165     NEBR => "unl.edu",
166     WISC => "wisc.edu",
167     UFL => "ufl.edu",
168     PURDUE => "purdue.edu",
169     UCSD => "ucsd.edu",
170     CALT => "ultralight.org"
171     };
172 corvo 1.3 }
173 gutsche 1.1
174     # Find out how many jobs are pending for each site. This is
175     # insensitive to the job type, and we only check once in the
176     # beginning to avoid favouring one dataset over another --
177     # once we decide to proceed for a site, we submit jobs for
178     # all datasets.
179     sub getSiteStatus
180     {
181     my ($self) = @_;
182     my %result = ();
183     my $taskrepo = $self->{TASKREPO};
184     foreach my $site (<$taskrepo/*/*>)
185     {
186     my ($sitename) = ($site =~ m|.*/(.*)|);
187     foreach my $d (<$site/*/*>)
188     {
189     if (! -f "$d/JOB_CREATE_LOG.txt")
190     {
191     $result{$sitename}{C} ||= 0;
192     $result{$sitename}{C}++;
193     next;
194     }
195    
196     my $f = (<$d/crab_*/share/db/jobs>)[0];
197     next if ! defined $f;
198    
199     foreach my $status (split(/\n/, &input($f) || ''))
200     {
201     my @statusarray = split("\;", $status);
202     $result{$sitename}{$statusarray[1]} ||= 0;
203     $result{$sitename}{$statusarray[1]}++;
204     }
205     }
206     }
207    
208     return %result;
209     }
210    
211     sub idle
212     {
213     my ($self, @pending) = @_;
214     eval {
215     # Get status of how busy the sites are. We obtain this only once
216     # in order to not favour datasets "early on" in the list.
217     my %sitestats = $self->getSiteStatus ();
218     if (keys %sitestats)
219     {
220     my @load;
221     foreach my $site (sort keys %sitestats)
222     {
223     push (@load, "$site" . join("", map { " $_=$sitestats{$site}{$_}" }
224     sort keys %{$sitestats{$site}}));
225     }
226     &logmsg ("current load: ", join ("; ", @load));
227     }
228    
229     # Invoke links to fetch a formatted web page of published datasets.
230     if ( $self->{MODE} == 2 ) {
231 gutsche 1.5 &logmsg ("DBS/DLS mode\n");
232     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DBSlistDataset.py";
233     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSlistDataset.py";
234     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
235     while (<PUBLISHED>) {
236     chomp;
237     &timeStart($self->{STARTTIME});
238     # Find out what was published and what we would like to do with it
239     my $datapath = $_;
240     my ($n, $dataset, $datatier, $owner) = split(/\//, $_);
241     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
242     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
243     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DLSInfo.py \"$_\" ";
244     my $cmd = $ENV{PYTHONSCRIPT} . "/DLSInfo.py \"$_\" ";
245     open(SITE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
246     while (<SITE>){
247     &logmsg("$_");
248     my ($site, $events) = split(/\//, $_);
249     next if ($self->{SITE} && $site !~ /$self->{SITE}/);
250     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
251     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
252     my $whitelist = $site || '.';
253     $self->createTask($datapath, $site, $events, $whitelist, %sitestats);
254     }
255     }
256     } elsif ( $self->{MODE} == 3 ) {
257     &logmsg ("DBS/DLS CMSSW mode\n");
258     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSInfo_EDM.py";
259     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
260     while (<PUBLISHED>) {
261     chomp;
262     &timeStart($self->{STARTTIME});
263     my ($datapath, $fileblock, $totalevents) = split(/ /, $_);
264     my $cmd = $ENV{PYTHONSCRIPT} . "/DLSInfo.py \"$fileblock\" ";
265     open(SITE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
266     while (<SITE>){
267     chomp;
268     my $site = $_;
269     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
270     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
271     $self->createTaskCMSSW($datapath, $site, $totalevents, %sitestats);
272     }
273     }
274 gutsche 1.1 } else {
275 gutsche 1.4 &logmsg ("RefDB/PubDB mode");
276     my $cmd = "links @{$self->{LINKS_OPTS}} -dump '$self->{URL}'";
277     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
278     while (<PUBLISHED>)
279     {
280     &timeStart($self->{STARTTIME});
281     chomp; next if ! /_/; s/\|/ /g; s/^\s+//; s/\s+$//;
282 gutsche 1.1 # Find out what was published and what we would like to do with it
283 gutsche 1.4 my ($dataset, $owner, $events, $site, $proto) = split(/\s+/, $_);
284     $self->createTaskRefDBPubDB($dataset, $owner, $events, $site, 1, %sitestats);
285 gutsche 1.1 }
286     close (PUBLISHED);
287     }
288     };
289     do { chomp ($@); &alert ($@); } if $@;
290    
291     $self->nap ($self->{WAITTIME});
292     }
293    
294     sub createTask()
295     {
296 corvo 1.3 my ($self) = shift(@_);
297     my ($n, $dataset, $owner, $events, $site, $proto, %sitestats, $datapath, $datatier, $whitelist);
298     if (($self->{MODE}) == 1) {
299     ($dataset, $owner, $events, $site, $proto, $whitelist, %sitestats) = @_;
300     }
301     else {
302     ($datapath, $site, $events, $whitelist, %sitestats) = @_;
303     ($n, $dataset, $datatier, $owner) = split(/\//, $datapath);
304     }
305 gutsche 1.1
306 corvo 1.3 # my ($self, $datapath, $dataset, $datatier, $owner, $mode, %sitestats) = @_;
307 gutsche 1.1
308     my ($app, $tiers, $rc, $nevents, $output);
309     if ($dataset =~ /MBforPU/) {
310     next;
311     } elsif ($owner =~ /Hit/) {
312     $rc = "orcarc.read.simhits";
313     $app = "ExSimHitStatistics";
314     $tiers = "Hit";
315     $output = "simhits.aida";
316     } elsif ($owner =~ /DST/) {
317     if (rand(1) > 1.75) {
318 corvo 1.3 $rc = "orcarc.root.dst";
319     $app = "ExRootAnalysisDST";
320 gutsche 1.1 $output = "test.root";
321     } else {
322 corvo 1.3 $rc = "orcarc.read.dst";
323     $app = "ExDSTStatistics";
324 gutsche 1.1 $output = "dststatistics.aida";
325     }
326     $tiers = "DST,Digi,Hit";
327     } else {
328     if (rand(1) > 1.75) {
329 corvo 1.3 $rc = "orcarc.root.digis";
330     $app = "ExRootAnalysisDigi";
331 gutsche 1.1 $output = "test.root";
332     } else {
333 corvo 1.3 $rc = "orcarc.read.digis";
334     $app = "ExDigiStatistics";
335 gutsche 1.1 $output = "digistatistics.aida";
336     }
337 gutsche 1.5 if ($owner =~ m/nopu/i) {
338     $tiers = "Hit";
339     } else {
340     $tiers = "Digi,Hit";
341     }
342 gutsche 1.1 }
343    
344     # Find out what is already pending for this task. First find all
345     # existing tasks in the repository, the latest generation.
346     my $datestamp = strftime ("%y%m%d", gmtime(time()));
347 gutsche 1.5 my ($shortsite) = ( $site =~ /.(\w+).\w+$/ );
348     my $taskdir = "$self->{TASKREPO}/$datestamp/$shortsite/$app";
349 gutsche 1.1 # OLI: shorten path for condor_g (restriction to 256 characters)
350     # my $taskbase = "$datestamp.$site.$app.$dataset.$owner";
351 gutsche 1.5 my $taskbase = "$datestamp.$shortsite.$dataset.$owner";
352 gutsche 1.1 my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
353     my $curgen = pop(@existing) || 0;
354     my $nextgen = $curgen + 1;
355    
356     # If the site isn't too busy already, ignore.
357     my $pending = ($sitestats{$site}{S} || 0);
358     $pending += ($sitestats{$site}{C} || 0);
359     next if $pending > $self->{MAX_SITE_QUEUE};
360    
361     # OK to create the task if enough time has passed from previous
362     # task creation, or there is no previous task.
363     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
364 corvo 1.3 || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
365     < time() - $events * $self->{SECS_PER_EVENT})))
366 gutsche 1.1 {
367 corvo 1.3 my $mydir = $0; $mydir =~ s|/[^/]+$||;
368     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
369     my $ret = &runcmd ("$mydir/CrabJobs", "-app", $app,
370     "-jobevents", $self->{NEVENT},
371     "-orcarc", "$mydir/$rc",
372     "-owner", $owner,
373     "-dataset", $dataset,
374     "-tiers", $tiers,
375     "-whitelist", $whitelist,
376     "-name", "$taskdir/$drop",
377     "-output", $output,
378     "-jobtype", $self->{JOBTYPE},
379     "-scheduler", $self->{SCHEDULER},
380     "-mode", $self->{MODE});
381     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
382 gutsche 1.1
383 corvo 1.3 &output ("$taskdir/$drop/TASK_INIT.txt",
384     &mytimeofday () . "\n");
385    
386     my $dropdir = "$self->{WORKDIR}/$drop";
387     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
388     if (&output ("$dropdir/task", "$taskdir/$drop"))
389     {
390     &touch ("$dropdir/done");
391     $self->relayDrop ($drop);
392     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
393     }
394     else
395     {
396     &alert ("$drop: failed to create drop");
397     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
398     }
399 gutsche 1.1 }
400     }
401 gutsche 1.4
402 gutsche 1.5 sub createTaskCMSSW()
403     {
404     my ($self) = shift(@_);
405     my ($datasetpath, $site, $totalevents, %sitestats) = @_;
406    
407     my($n, $dataset, $tier, $owner) = split(/\//, $datasetpath);
408    
409     my ($rc, $output);
410     if ($tier =~ /SIM/) {
411     $rc = "sim.cfg";
412     $output = "FrameworkJobReport.xml";
413     } elsif ($tier =~ /GEN/) {
414     $rc = "gen.cfg";
415     $output = "FrameworkJobReport.xml";
416     }
417    
418     # Find out what is already pending for this task. First find all
419     # existing tasks in the repository, the latest generation.
420     my $datestamp = strftime ("%y%m%d", gmtime(time()));
421     my $taskdir = "$self->{TASKREPO}/$datestamp/$site/$tier";
422     my $taskbase = "$datestamp.$site.$dataset.$tier.$owner";
423     my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
424     my $curgen = pop(@existing) || 0;
425     my $nextgen = $curgen + 1;
426    
427     # If the site isn't too busy already, ignore.
428     my $pending = ($sitestats{$site}{S} || 0);
429     $pending += ($sitestats{$site}{C} || 0);
430     next if $pending > $self->{MAX_SITE_QUEUE};
431    
432     # OK to create the task if enough time has passed from previous
433     # task creation, or there is no previous task.
434     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
435     || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
436     < time() - $totalevents * $self->{SECS_PER_EVENT})))
437     {
438     my $mydir = $0; $mydir =~ s|/[^/]+$||;
439     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
440     my $ret = &runcmd ("$mydir/CrabJobsCMSSW",
441     "-cfg", "$mydir/$rc",
442     "-datasetpath", $datasetpath,
443     "-output", $output,
444     "-totalevents", $totalevents,
445     "-whitelist", $site,
446     "-filesperjob", $self->{FILESPERJOB},
447     "-eventsperjob", $self->{NEVENT},
448     "-scheduler", $self->{SCHEDULER},
449     "-jobname", "$taskdir/$drop");
450     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
451    
452     &output ("$taskdir/$drop/TASK_INIT.txt",
453     &mytimeofday () . "\n");
454    
455     my $dropdir = "$self->{WORKDIR}/$drop";
456     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
457     if (&output ("$dropdir/task", "$taskdir/$drop"))
458     {
459     &touch ("$dropdir/done");
460     $self->relayDrop ($drop);
461     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
462     }
463     else
464     {
465     &alert ("$drop: failed to create drop");
466     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
467     }
468     }
469     }
470    
471 gutsche 1.4 sub createTaskRefDBPubDB()
472     {
473    
474     my ($self, $dataset, $owner, $events, $site, $mode, %sitestats) = @_;
475    
476     # Marco
477     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
478     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
479     # Marco
480     my ($app, $tiers, $rc, $nevents, $output);
481     my $whitelist = $self->{WHITELIST}->{$site} || '.';
482     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
483     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
484     if ($dataset =~ /MBforPU/) {
485     next;
486     } elsif ($owner =~ /Hit/) {
487     $rc = "orcarc.read.simhits";
488     $app = "ExSimHitStatistics";
489     $tiers = "Hit";
490     $output = "simhits.aida";
491     } elsif ($owner =~ /DST/) {
492     next;
493     if (rand(1) > 1.75) {
494     $rc = "orcarc.root.dst";
495     $app = "ExRootAnalysisDST";
496     $output = "test.root";
497     } else {
498     $rc = "orcarc.read.dst";
499     $app = "ExDSTStatistics";
500     $output = "dststatistics.aida";
501     }
502     $tiers = "DST,Digi,Hit";
503     } else {
504     if (rand(1) > 1.75) {
505     $rc = "orcarc.root.digis";
506     $app = "ExRootAnalysisDigi";
507     $output = "test.root";
508     } else {
509     $rc = "orcarc.read.digis";
510     $app = "ExDigiStatistics";
511     $output = "digistatistics.aida";
512     }
513 gutsche 1.5 if ($owner =~ m/nopu/i) {
514     $tiers = "Hit";
515     } else {
516     $tiers = "Digi,Hit";
517     }
518 gutsche 1.4 }
519    
520     # Find out what is already pending for this task. First find all
521     # existing tasks in the repository, the latest generation.
522     my $datestamp = strftime ("%y%m%d", gmtime(time()));
523     my $taskdir = "$self->{TASKREPO}/$datestamp/$site/$app";
524     # OLI: shorten path for condor_g (restriction to 256 characters)
525     # my $taskbase = "$datestamp.$site.$app.$dataset.$owner";
526     my $taskbase = "$datestamp.$site.$dataset.$owner";
527     my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
528     my $curgen = pop(@existing) || 0;
529     my $nextgen = $curgen + 1;
530    
531     # If the site isn't too busy already, ignore.
532     my $pending = ($sitestats{$site}{S} || 0);
533     $pending += ($sitestats{$site}{C} || 0);
534     next if $pending > $self->{MAX_SITE_QUEUE};
535    
536     # OK to create the task if enough time has passed from previous
537     # task creation, or there is no previous task.
538     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
539     || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
540     < time() - $events * $self->{SECS_PER_EVENT})))
541     {
542     my $mydir = $0; $mydir =~ s|/[^/]+$||;
543     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
544     my $ret = &runcmd ("$mydir/CrabJobs", "-app", $app,
545     "-jobevents", $self->{NEVENT},
546     "-orcarc", "$mydir/$rc",
547     "-owner", $owner,
548     "-dataset", $dataset,
549     "-tiers", $tiers,
550     "-whitelist", $whitelist,
551     "-name", "$taskdir/$drop",
552     "-output", $output,
553     "-jobtype", $self->{JOBTYPE},
554     "-scheduler", $self->{SCHEDULER},
555     "-mode", $mode);
556     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
557    
558     &output ("$taskdir/$drop/TASK_INIT.txt",
559     &mytimeofday () . "\n");
560    
561     my $dropdir = "$self->{WORKDIR}/$drop";
562     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
563     if (&output ("$dropdir/task", "$taskdir/$drop"))
564     {
565     &touch ("$dropdir/done");
566     $self->relayDrop ($drop);
567     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
568     }
569     else
570     {
571     &alert ("$drop: failed to create drop");
572     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
573     }
574     }
575     }