ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskSource
Revision: 1.16
Committed: Mon Jul 17 03:53:36 2006 UTC (18 years, 9 months ago) by gutsche
Branch: MAIN
Changes since 1.15: +6 -6 lines
Log Message:
 reduced site limits for pic, caltech, itep, bari, asgc

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env perl
2    
3     ##H This drop box agent initiates tasks on all published datasets.
4     ##H A task is an application to run on a dataset at a particular site.
5     ##H It keeps a record of tasks created and avoids submitting a task too
6     ##H frequently.
7     ##H
8     ##H Usage:
9     ##H TaskSource
10     ##H -state DIRECTORY [-next NEXT] [-wait SECS] [-url URL-PUBLISHED]
11     ##H [-ignore-sites REGEXP] [-accept-sites REGEXP]
12     ##H [-secs-per-event N] [-max-site-queue N]
13     ##H
14     ##H -state agent state directory, including inbox
15     ##H -next next agent to pass the drops to; can be given several times
16     ##H -wait time to wait in seconds between work scans
17     ##H -url contact string for published datasets
18     ##H -ignore-sites
19     ##H regular expression for sites to ignore; ignore applies before
20     ##H accept, and by default nothing is ignored and everything is
21     ##H accepted. applies to pubdb site names, not the host names of
22     ##H the site.
23     ##H -accept-sites
24     ##H regular expression for sites to accept; ignore applies before
25     ##H accept, and by default nothing is ignored and everything is
26     ##H accepted. applies to pubdb site names, not the host names of
27     ##H the site.
28     ##H -secs-per-event
29     ##H minimum time to allocate for a task, given a number of events
30     ##H in the dataset to process. tasks are not created more often
31     ##H than this.
32     ##H -max-site-queue
33     ##H the high water mark of currently submitted jobs to a site,
34 gutsche 1.8 ##H above which new tasks will not be created. Represents the default
35     ##H taken if site specific values cannot be found
36 gutsche 1.1
37     BEGIN {
38     use strict; use warnings; $^W=1;
39     our $me = $0; $me =~ s|.*/||;
40     our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
41     unshift(@INC, $home);
42     }
43    
44     ######################################################################
45     use UtilsHelp;
46     my %args = (WAITTIME => 600, SECS_PER_EVENT => 1., MAX_SITE_QUEUE => 10,
47     URL => "http://cmsdoc.cern.ch/cms/production/www/PubDB/GetPublishedCollectionInfoFromRefDB.php");
48     while (scalar @ARGV)
49     {
50     if ($ARGV[0] eq '-state' && scalar @ARGV > 1)
51 corvo 1.3 { shift (@ARGV); $args{DROPDIR}= shift(@ARGV);}
52 gutsche 1.1 elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1)
53 corvo 1.3 { shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));}
54 gutsche 1.1 elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1)
55     { shift (@ARGV); $args{WAITTIME} = shift(@ARGV); }
56     elsif ($ARGV[0] eq '-ignore-sites' && scalar @ARGV > 1)
57     { shift (@ARGV); $args{IGNORE_REGEXP} = shift(@ARGV); }
58     elsif ($ARGV[0] eq '-accept-sites' && scalar @ARGV > 1)
59     { shift (@ARGV); $args{ACCEPT_REGEXP} = shift(@ARGV); }
60     elsif ($ARGV[0] eq '-secs-per-event' && scalar @ARGV > 1)
61     { shift (@ARGV); $args{SECS_PER_EVENT} = shift(@ARGV); }
62     elsif ($ARGV[0] eq '-max-site-queue' && scalar @ARGV > 1)
63     { shift (@ARGV); $args{MAX_SITE_QUEUE} = shift(@ARGV); }
64     elsif ($ARGV[0] eq '-url' && scalar @ARGV > 1)
65     { shift (@ARGV); $args{URL} = shift(@ARGV); }
66     # Marco
67     elsif ($ARGV[0] eq '-dataset' && scalar @ARGV > 1)
68     { shift (@ARGV); $args{DATASET} = shift(@ARGV); }
69     elsif ($ARGV[0] eq '-owner' && scalar @ARGV > 1)
70     { shift (@ARGV); $args{OWNER} = shift(@ARGV); }
71     elsif ($ARGV[0] eq '-events' && scalar @ARGV > 1)
72     { shift (@ARGV); $args{NEVENT} = shift(@ARGV); }
73     elsif ($ARGV[0] eq '-mode' && scalar @ARGV > 1)
74     { shift (@ARGV); $args{MODE} = shift(@ARGV); }
75     elsif ($ARGV[0] eq '-scheduler' && scalar @ARGV > 1)
76     { shift (@ARGV); $args{SCHEDULER} = shift(@ARGV); }
77     elsif ($ARGV[0] eq '-jobtype' && scalar @ARGV > 1)
78     { shift (@ARGV); $args{JOBTYPE} = shift(@ARGV); }
79 gutsche 1.5 elsif ($ARGV[0] eq '-filesperjob' && scalar @ARGV > 1)
80     { shift (@ARGV); $args{FILESPERJOB} = shift(@ARGV); }
81     elsif ($ARGV[0] eq '-eventsperjob' && scalar @ARGV > 1)
82     { shift (@ARGV); $args{EVENTSPERJOB} = shift(@ARGV); }
83 corvo 1.3 # marco. Added... don't know why...
84     elsif ($ARGV[0] eq '-log' && scalar @ARGV > 1)
85     { shift (@ARGV); $args{LOGFILE} = shift(@ARGV); }
86     # marco.
87 gutsche 1.1 # Marco
88     elsif ($ARGV[0] eq '-h')
89     { &usage(); }
90     else
91     { last; }
92     }
93    
94     if (@ARGV || !$args{DROPDIR} || !$args{URL})
95     {
96     die "Insufficient parameters, use -h for help.\n";
97     }
98    
99     (new TaskSource (%args))->process();
100    
101     ######################################################################
102     # Routines specific to this agent.
103     package TaskSource; use strict; use warnings; use base 'UtilsAgent';
104     use File::Path;
105     use UtilsCommand;
106     use UtilsLogging;
107     use UtilsTiming;
108     use UtilsNet;
109     use POSIX;
110    
111     sub new
112     {
113     my $proto = shift;
114     my $class = ref($proto) || $proto;
115     my $self = $class->SUPER::new(@_);
116     my %params = (SECS_PER_EVENT => undef, # secs/event to delay per dataset
117     MAX_SITE_QUEUE => undef, # max number of jobs per site
118     IGNORE_REGEXP => undef, # regexp of sites to ignore
119     ACCEPT_REGEXP => undef, # regexp of sites to accept
120     DATASET => undef, # specific dataset
121     OWNER => undef, # specific owner
122 gutsche 1.5 NEVENT => 500, # number of events per job
123     MODE => 1, # data discovery mode: (1) PudDB/RefDB, (2) DBS/DLS
124     JOBTYPE => "orca", # standard jobtype
125     SCHEDULER => "edg", # standard scheduler
126     FILESPERJOB => 1, # CMSSW: files per job
127     URL => undef); # published dataset url
128 gutsche 1.1 my %args = (@_);
129     map { $self->{$_} = defined $args{$_} ? $args{$_} : $params{$_} } keys %params;
130     bless $self, $class;
131     return $self;
132     }
133    
134     sub init
135     {
136     my ($self) = @_;
137     $self->{TASKREPO} = "$self->{DROPDIR}/tasks";
138     -d "$self->{TASKREPO}"
139     || mkdir "$self->{TASKREPO}"
140     || die "$self->{TASKREPO}: cannot create directory: $!\n";
141    
142     # Determine if links supports -dump-width option
143     $self->{LINKS_OPTS} = [];
144     open (LINKS_HELP, "links -help 2>/dev/null |");
145 gutsche 1.4 if ( grep(/-no-numbering/, <LINKS_HELP>) ) {
146     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300 -no-numbering 1));
147     } elsif ( grep(/-dump-width/, <LINKS_HELP>) ) {
148     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300));
149 gutsche 1.1 }
150     close (LINKS_HELP);
151    
152 gutsche 1.4 # Precode whitelist. Should really read this from somewhere...
153    
154     $self->{WHITELIST} = { ASCC => "sinica.edu.tw",
155     NCU => "ncu.edu.tw",
156     FNAL => "fnal.gov",
157     CNAF => "webserver.infn.it",
158     BA => "ba.infn.it",
159     IN2P3=> "in2p3.fr",
160     PIC => "pic.es",
161     T2_SP=> "ciemat.es",
162     RAL => "ral.ac.uk",
163     CERN => "cern.ch",
164     FZK => "fzk.de",
165     DESY => "desy.de",
166     NEBR => "unl.edu",
167     WISC => "wisc.edu",
168     UFL => "ufl.edu",
169     PURDUE => "purdue.edu",
170     UCSD => "ucsd.edu",
171     CALT => "ultralight.org"
172     };
173 gutsche 1.8
174 gutsche 1.9 $self->{SITEMAXQUEUE} = { "cmslcgce.fnal.gov" => 500,
175     "ce01.cmsaf.mit.edu" => 200,
176 gutsche 1.16 "ce04.pic.es" => 250,
177 gutsche 1.9 "red.unl.edu" => 200,
178 gutsche 1.10 "oberon.hep.kbfi.ee" => 500,
179 gutsche 1.16 "cit-gatekeeper.ultralight.org" => 100,
180 gutsche 1.9 "lcg02.ciemat.es" => 200,
181 gutsche 1.16 "ceitep.itep.ru" => 100,
182 gutsche 1.9 "ufloridapg.phys.ufl.edu" => 200,
183 gutsche 1.16 "gridba2.ba.infn.it" => 200,
184 gutsche 1.9 "cclcgceli02.in2p3.fr" => 200,
185     "cmsgrid02.hep.wisc.edu" => 200,
186 gutsche 1.13 "lcgce01.jinr.ru" => 50,
187 gutsche 1.9 "t2-ce-02.lnl.infn.it" => 200,
188 gutsche 1.10 "grid-ce1.desy.de" => 500,
189 gutsche 1.9 "grid-ce.physik.rwth-aachen.de" => 200,
190 gutsche 1.13 "ce01-lcg.projects.cscs.ch" => 50,
191 gutsche 1.9 "gridce.iihe.ac.be" => 200,
192     "ce03-lcg.cr.cnaf.infn.it" => 500,
193 gutsche 1.13 "fce01.grid.sinica.edu.tw" => 50,
194 gutsche 1.9 "gridce.pi.infn.it" => 200,
195 gutsche 1.16 "lcg00125.grid.sinica.edu.tw" => 250,
196 gutsche 1.9 "ce101.cern.ch" => 200,
197     "lcg06.sinp.msu.ru" => 200,
198     "osg-gw-2.t2.ucsd.edu" => 200,
199 gutsche 1.11 "ce-fzk.gridka.de" => 500,
200 gutsche 1.12 "lcgce01.gridpp.rl.ac.uk" => 500,
201     "gw39.hep.ph.ic.ac.uk" => 200,
202     "lepton.rcac.purdue.edu" => 200 }
203 gutsche 1.8
204 corvo 1.3 }
205 gutsche 1.1
206     # Find out how many jobs are pending for each site. This is
207     # insensitive to the job type, and we only check once in the
208     # beginning to avoid favouring one dataset over another --
209     # once we decide to proceed for a site, we submit jobs for
210     # all datasets.
211     sub getSiteStatus
212     {
213     my ($self) = @_;
214     my %result = ();
215     my $taskrepo = $self->{TASKREPO};
216     foreach my $site (<$taskrepo/*/*>)
217     {
218     my ($sitename) = ($site =~ m|.*/(.*)|);
219     foreach my $d (<$site/*/*>)
220     {
221     if (! -f "$d/JOB_CREATE_LOG.txt")
222     {
223     $result{$sitename}{C} ||= 0;
224     $result{$sitename}{C}++;
225     next;
226     }
227    
228     my $f = (<$d/crab_*/share/db/jobs>)[0];
229     next if ! defined $f;
230    
231     foreach my $status (split(/\n/, &input($f) || ''))
232     {
233     my @statusarray = split("\;", $status);
234     $result{$sitename}{$statusarray[1]} ||= 0;
235     $result{$sitename}{$statusarray[1]}++;
236     }
237     }
238     }
239    
240     return %result;
241     }
242    
243     sub idle
244     {
245     my ($self, @pending) = @_;
246     eval {
247     # Get status of how busy the sites are. We obtain this only once
248     # in order to not favour datasets "early on" in the list.
249     my %sitestats = $self->getSiteStatus ();
250     if (keys %sitestats)
251     {
252     my @load;
253     foreach my $site (sort keys %sitestats)
254     {
255     push (@load, "$site" . join("", map { " $_=$sitestats{$site}{$_}" }
256     sort keys %{$sitestats{$site}}));
257     }
258     &logmsg ("current load: ", join ("; ", @load));
259     }
260    
261     # Invoke links to fetch a formatted web page of published datasets.
262     if ( $self->{MODE} == 2 ) {
263 gutsche 1.5 &logmsg ("DBS/DLS mode\n");
264     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DBSlistDataset.py";
265     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSlistDataset.py";
266     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
267     while (<PUBLISHED>) {
268     chomp;
269     &timeStart($self->{STARTTIME});
270     # Find out what was published and what we would like to do with it
271     my $datapath = $_;
272     my ($n, $dataset, $datatier, $owner) = split(/\//, $_);
273     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
274     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
275     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DLSInfo.py \"$_\" ";
276 gutsche 1.7 my $dlsinput = $owner . "/" . $dataset;
277     my $cmd = $ENV{PYTHONSCRIPT} . "/DLSInfo.py \"$dlsinput\" ";
278 gutsche 1.5 open(SITE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
279     while (<SITE>){
280     &logmsg("$_");
281     my ($site, $events) = split(/\//, $_);
282     next if ($self->{SITE} && $site !~ /$self->{SITE}/);
283     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
284     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
285     my $whitelist = $site || '.';
286     $self->createTask($datapath, $site, $events, $whitelist, %sitestats);
287     }
288     }
289     } elsif ( $self->{MODE} == 3 ) {
290     &logmsg ("DBS/DLS CMSSW mode\n");
291     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSInfo_EDM.py";
292     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
293     while (<PUBLISHED>) {
294     chomp;
295     &timeStart($self->{STARTTIME});
296     my ($datapath, $fileblock, $totalevents) = split(/ /, $_);
297 gutsche 1.14 my ($dummy,$tempdataset,$dummy2,$dummy3) = split(/\//, $datapath);
298 gutsche 1.15 next if ($self->{DATASET} && $tempdataset !~ /$self->{DATASET}/);
299 gutsche 1.5 my $cmd = $ENV{PYTHONSCRIPT} . "/DLSInfo.py \"$fileblock\" ";
300     open(SITE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
301     while (<SITE>){
302     chomp;
303     my $site = $_;
304     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
305     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
306     $self->createTaskCMSSW($datapath, $site, $totalevents, %sitestats);
307     }
308     }
309 gutsche 1.1 } else {
310 gutsche 1.4 &logmsg ("RefDB/PubDB mode");
311     my $cmd = "links @{$self->{LINKS_OPTS}} -dump '$self->{URL}'";
312     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
313     while (<PUBLISHED>)
314     {
315     &timeStart($self->{STARTTIME});
316     chomp; next if ! /_/; s/\|/ /g; s/^\s+//; s/\s+$//;
317 gutsche 1.1 # Find out what was published and what we would like to do with it
318 gutsche 1.4 my ($dataset, $owner, $events, $site, $proto) = split(/\s+/, $_);
319     $self->createTaskRefDBPubDB($dataset, $owner, $events, $site, 1, %sitestats);
320 gutsche 1.1 }
321     close (PUBLISHED);
322     }
323     };
324     do { chomp ($@); &alert ($@); } if $@;
325    
326     $self->nap ($self->{WAITTIME});
327     }
328    
329     sub createTask()
330     {
331 corvo 1.3 my ($self) = shift(@_);
332     my ($n, $dataset, $owner, $events, $site, $proto, %sitestats, $datapath, $datatier, $whitelist);
333     if (($self->{MODE}) == 1) {
334     ($dataset, $owner, $events, $site, $proto, $whitelist, %sitestats) = @_;
335     }
336     else {
337     ($datapath, $site, $events, $whitelist, %sitestats) = @_;
338     ($n, $dataset, $datatier, $owner) = split(/\//, $datapath);
339     }
340 gutsche 1.1
341 corvo 1.3 # my ($self, $datapath, $dataset, $datatier, $owner, $mode, %sitestats) = @_;
342 gutsche 1.1
343     my ($app, $tiers, $rc, $nevents, $output);
344     if ($dataset =~ /MBforPU/) {
345     next;
346     } elsif ($owner =~ /Hit/) {
347     $rc = "orcarc.read.simhits";
348     $app = "ExSimHitStatistics";
349     $tiers = "Hit";
350     $output = "simhits.aida";
351     } elsif ($owner =~ /DST/) {
352     if (rand(1) > 1.75) {
353 corvo 1.3 $rc = "orcarc.root.dst";
354     $app = "ExRootAnalysisDST";
355 gutsche 1.1 $output = "test.root";
356     } else {
357 corvo 1.3 $rc = "orcarc.read.dst";
358     $app = "ExDSTStatistics";
359 gutsche 1.1 $output = "dststatistics.aida";
360     }
361     $tiers = "DST,Digi,Hit";
362     } else {
363     if (rand(1) > 1.75) {
364 corvo 1.3 $rc = "orcarc.root.digis";
365     $app = "ExRootAnalysisDigi";
366 gutsche 1.1 $output = "test.root";
367     } else {
368 corvo 1.3 $rc = "orcarc.read.digis";
369     $app = "ExDigiStatistics";
370 gutsche 1.1 $output = "digistatistics.aida";
371     }
372 gutsche 1.5 if ($owner =~ m/nopu/i) {
373     $tiers = "Hit";
374     } else {
375     $tiers = "Digi,Hit";
376     }
377 gutsche 1.1 }
378    
379     # Find out what is already pending for this task. First find all
380     # existing tasks in the repository, the latest generation.
381     my $datestamp = strftime ("%y%m%d", gmtime(time()));
382 gutsche 1.5 my ($shortsite) = ( $site =~ /.(\w+).\w+$/ );
383     my $taskdir = "$self->{TASKREPO}/$datestamp/$shortsite/$app";
384 gutsche 1.1 # OLI: shorten path for condor_g (restriction to 256 characters)
385     # my $taskbase = "$datestamp.$site.$app.$dataset.$owner";
386 gutsche 1.5 my $taskbase = "$datestamp.$shortsite.$dataset.$owner";
387 gutsche 1.1 my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
388     my $curgen = pop(@existing) || 0;
389     my $nextgen = $curgen + 1;
390    
391     # If the site isn't too busy already, ignore.
392     my $pending = ($sitestats{$site}{S} || 0);
393     $pending += ($sitestats{$site}{C} || 0);
394     next if $pending > $self->{MAX_SITE_QUEUE};
395    
396     # OK to create the task if enough time has passed from previous
397     # task creation, or there is no previous task.
398     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
399 corvo 1.3 || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
400     < time() - $events * $self->{SECS_PER_EVENT})))
401 gutsche 1.1 {
402 corvo 1.3 my $mydir = $0; $mydir =~ s|/[^/]+$||;
403     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
404     my $ret = &runcmd ("$mydir/CrabJobs", "-app", $app,
405     "-jobevents", $self->{NEVENT},
406     "-orcarc", "$mydir/$rc",
407     "-owner", $owner,
408     "-dataset", $dataset,
409     "-tiers", $tiers,
410     "-whitelist", $whitelist,
411     "-name", "$taskdir/$drop",
412     "-output", $output,
413     "-jobtype", $self->{JOBTYPE},
414     "-scheduler", $self->{SCHEDULER},
415     "-mode", $self->{MODE});
416     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
417 gutsche 1.1
418 corvo 1.3 &output ("$taskdir/$drop/TASK_INIT.txt",
419     &mytimeofday () . "\n");
420    
421     my $dropdir = "$self->{WORKDIR}/$drop";
422     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
423     if (&output ("$dropdir/task", "$taskdir/$drop"))
424     {
425     &touch ("$dropdir/done");
426     $self->relayDrop ($drop);
427     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
428     }
429     else
430     {
431     &alert ("$drop: failed to create drop");
432     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
433     }
434 gutsche 1.1 }
435     }
436 gutsche 1.4
437 gutsche 1.5 sub createTaskCMSSW()
438     {
439     my ($self) = shift(@_);
440     my ($datasetpath, $site, $totalevents, %sitestats) = @_;
441    
442     my($n, $dataset, $tier, $owner) = split(/\//, $datasetpath);
443    
444     my ($rc, $output);
445     if ($tier =~ /SIM/) {
446     $rc = "sim.cfg";
447     $output = "FrameworkJobReport.xml";
448     } elsif ($tier =~ /GEN/) {
449     $rc = "gen.cfg";
450     $output = "FrameworkJobReport.xml";
451     }
452    
453     # Find out what is already pending for this task. First find all
454     # existing tasks in the repository, the latest generation.
455     my $datestamp = strftime ("%y%m%d", gmtime(time()));
456     my $taskdir = "$self->{TASKREPO}/$datestamp/$site/$tier";
457     my $taskbase = "$datestamp.$site.$dataset.$tier.$owner";
458     my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
459     my $curgen = pop(@existing) || 0;
460     my $nextgen = $curgen + 1;
461    
462     # If the site isn't too busy already, ignore.
463     my $pending = ($sitestats{$site}{S} || 0);
464     $pending += ($sitestats{$site}{C} || 0);
465 gutsche 1.8 # take site specific or if not found default max site queue
466     my $max_queue = $self->{SITEMAXQUEUE}->{$site} || $self->{MAX_SITE_QUEUE};
467     next if $pending > $max_queue;
468 gutsche 1.5
469     # OK to create the task if enough time has passed from previous
470     # task creation, or there is no previous task.
471     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
472     || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
473     < time() - $totalevents * $self->{SECS_PER_EVENT})))
474     {
475     my $mydir = $0; $mydir =~ s|/[^/]+$||;
476     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
477     my $ret = &runcmd ("$mydir/CrabJobsCMSSW",
478     "-cfg", "$mydir/$rc",
479     "-datasetpath", $datasetpath,
480     "-output", $output,
481     "-totalevents", $totalevents,
482     "-whitelist", $site,
483     "-filesperjob", $self->{FILESPERJOB},
484     "-eventsperjob", $self->{NEVENT},
485     "-scheduler", $self->{SCHEDULER},
486     "-jobname", "$taskdir/$drop");
487     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
488    
489 gutsche 1.16 &OUTPut ("$taskdir/$drop/TASK_INIT.txt",
490 gutsche 1.5 &mytimeofday () . "\n");
491    
492     my $dropdir = "$self->{WORKDIR}/$drop";
493     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
494     if (&output ("$dropdir/task", "$taskdir/$drop"))
495     {
496     &touch ("$dropdir/done");
497     $self->relayDrop ($drop);
498     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
499     }
500     else
501     {
502     &alert ("$drop: failed to create drop");
503     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
504     }
505     }
506     }
507    
508 gutsche 1.4 sub createTaskRefDBPubDB()
509     {
510    
511     my ($self, $dataset, $owner, $events, $site, $mode, %sitestats) = @_;
512    
513     # Marco
514     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
515     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
516     # Marco
517     my ($app, $tiers, $rc, $nevents, $output);
518     my $whitelist = $self->{WHITELIST}->{$site} || '.';
519     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
520     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
521     if ($dataset =~ /MBforPU/) {
522     next;
523     } elsif ($owner =~ /Hit/) {
524     $rc = "orcarc.read.simhits";
525     $app = "ExSimHitStatistics";
526     $tiers = "Hit";
527     $output = "simhits.aida";
528     } elsif ($owner =~ /DST/) {
529     next;
530     if (rand(1) > 1.75) {
531     $rc = "orcarc.root.dst";
532     $app = "ExRootAnalysisDST";
533     $output = "test.root";
534     } else {
535     $rc = "orcarc.read.dst";
536     $app = "ExDSTStatistics";
537     $output = "dststatistics.aida";
538     }
539     $tiers = "DST,Digi,Hit";
540     } else {
541     if (rand(1) > 1.75) {
542     $rc = "orcarc.root.digis";
543     $app = "ExRootAnalysisDigi";
544     $output = "test.root";
545     } else {
546     $rc = "orcarc.read.digis";
547     $app = "ExDigiStatistics";
548     $output = "digistatistics.aida";
549     }
550 gutsche 1.5 if ($owner =~ m/nopu/i) {
551     $tiers = "Hit";
552     } else {
553     $tiers = "Digi,Hit";
554     }
555 gutsche 1.4 }
556    
557     # Find out what is already pending for this task. First find all
558     # existing tasks in the repository, the latest generation.
559     my $datestamp = strftime ("%y%m%d", gmtime(time()));
560     my $taskdir = "$self->{TASKREPO}/$datestamp/$site/$app";
561     # OLI: shorten path for condor_g (restriction to 256 characters)
562     # my $taskbase = "$datestamp.$site.$app.$dataset.$owner";
563     my $taskbase = "$datestamp.$site.$dataset.$owner";
564     my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
565     my $curgen = pop(@existing) || 0;
566     my $nextgen = $curgen + 1;
567    
568     # If the site isn't too busy already, ignore.
569     my $pending = ($sitestats{$site}{S} || 0);
570     $pending += ($sitestats{$site}{C} || 0);
571     next if $pending > $self->{MAX_SITE_QUEUE};
572    
573     # OK to create the task if enough time has passed from previous
574     # task creation, or there is no previous task.
575     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
576     || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
577     < time() - $events * $self->{SECS_PER_EVENT})))
578     {
579     my $mydir = $0; $mydir =~ s|/[^/]+$||;
580     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
581     my $ret = &runcmd ("$mydir/CrabJobs", "-app", $app,
582     "-jobevents", $self->{NEVENT},
583     "-orcarc", "$mydir/$rc",
584     "-owner", $owner,
585     "-dataset", $dataset,
586     "-tiers", $tiers,
587     "-whitelist", $whitelist,
588     "-name", "$taskdir/$drop",
589     "-output", $output,
590     "-jobtype", $self->{JOBTYPE},
591     "-scheduler", $self->{SCHEDULER},
592     "-mode", $mode);
593     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
594    
595     &output ("$taskdir/$drop/TASK_INIT.txt",
596     &mytimeofday () . "\n");
597    
598     my $dropdir = "$self->{WORKDIR}/$drop";
599     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
600     if (&output ("$dropdir/task", "$taskdir/$drop"))
601     {
602     &touch ("$dropdir/done");
603     $self->relayDrop ($drop);
604     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
605     }
606     else
607     {
608     &alert ("$drop: failed to create drop");
609     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
610     }
611     }
612     }