ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskSource
Revision: 1.19
Committed: Thu Jul 20 03:25:55 2006 UTC (18 years, 9 months ago) by gutsche
Branch: MAIN
Changes since 1.18: +29 -7 lines
Log Message:
adapt site limits and read siteconfig dynamically from file

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env perl
2    
3     ##H This drop box agent initiates tasks on all published datasets.
4     ##H A task is an application to run on a dataset at a particular site.
5     ##H It keeps a record of tasks created and avoids submitting a task too
6     ##H frequently.
7     ##H
8     ##H Usage:
9     ##H TaskSource
10     ##H -state DIRECTORY [-next NEXT] [-wait SECS] [-url URL-PUBLISHED]
11     ##H [-ignore-sites REGEXP] [-accept-sites REGEXP]
12     ##H [-secs-per-event N] [-max-site-queue N]
13     ##H
14     ##H -state agent state directory, including inbox
15     ##H -next next agent to pass the drops to; can be given several times
16     ##H -wait time to wait in seconds between work scans
17     ##H -url contact string for published datasets
18     ##H -ignore-sites
19     ##H regular expression for sites to ignore; ignore applies before
20     ##H accept, and by default nothing is ignored and everything is
21     ##H accepted. applies to pubdb site names, not the host names of
22     ##H the site.
23     ##H -accept-sites
24     ##H regular expression for sites to accept; ignore applies before
25     ##H accept, and by default nothing is ignored and everything is
26     ##H accepted. applies to pubdb site names, not the host names of
27     ##H the site.
28     ##H -secs-per-event
29     ##H minimum time to allocate for a task, given a number of events
30     ##H in the dataset to process. tasks are not created more often
31     ##H than this.
32     ##H -max-site-queue
33     ##H the high water mark of currently submitted jobs to a site,
34 gutsche 1.8 ##H above which new tasks will not be created. Represents the default
35     ##H taken if site specific values cannot be found
36 gutsche 1.1
37     BEGIN {
38     use strict; use warnings; $^W=1;
39     our $me = $0; $me =~ s|.*/||;
40     our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
41     unshift(@INC, $home);
42     }
43    
44     ######################################################################
45     use UtilsHelp;
46     my %args = (WAITTIME => 600, SECS_PER_EVENT => 1., MAX_SITE_QUEUE => 10,
47     URL => "http://cmsdoc.cern.ch/cms/production/www/PubDB/GetPublishedCollectionInfoFromRefDB.php");
48     while (scalar @ARGV)
49     {
50     if ($ARGV[0] eq '-state' && scalar @ARGV > 1)
51 corvo 1.3 { shift (@ARGV); $args{DROPDIR}= shift(@ARGV);}
52 gutsche 1.1 elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1)
53 corvo 1.3 { shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));}
54 gutsche 1.1 elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1)
55     { shift (@ARGV); $args{WAITTIME} = shift(@ARGV); }
56     elsif ($ARGV[0] eq '-ignore-sites' && scalar @ARGV > 1)
57     { shift (@ARGV); $args{IGNORE_REGEXP} = shift(@ARGV); }
58     elsif ($ARGV[0] eq '-accept-sites' && scalar @ARGV > 1)
59     { shift (@ARGV); $args{ACCEPT_REGEXP} = shift(@ARGV); }
60 gutsche 1.19 elsif ($ARGV[0] eq '-siteconfig' && scalar @ARGV > 1)
61     { shift (@ARGV); $args{SITECONFIG} = shift(@ARGV); }
62 gutsche 1.1 elsif ($ARGV[0] eq '-secs-per-event' && scalar @ARGV > 1)
63     { shift (@ARGV); $args{SECS_PER_EVENT} = shift(@ARGV); }
64     elsif ($ARGV[0] eq '-max-site-queue' && scalar @ARGV > 1)
65     { shift (@ARGV); $args{MAX_SITE_QUEUE} = shift(@ARGV); }
66     elsif ($ARGV[0] eq '-url' && scalar @ARGV > 1)
67     { shift (@ARGV); $args{URL} = shift(@ARGV); }
68     # Marco
69     elsif ($ARGV[0] eq '-dataset' && scalar @ARGV > 1)
70     { shift (@ARGV); $args{DATASET} = shift(@ARGV); }
71     elsif ($ARGV[0] eq '-owner' && scalar @ARGV > 1)
72     { shift (@ARGV); $args{OWNER} = shift(@ARGV); }
73     elsif ($ARGV[0] eq '-events' && scalar @ARGV > 1)
74     { shift (@ARGV); $args{NEVENT} = shift(@ARGV); }
75     elsif ($ARGV[0] eq '-mode' && scalar @ARGV > 1)
76     { shift (@ARGV); $args{MODE} = shift(@ARGV); }
77     elsif ($ARGV[0] eq '-scheduler' && scalar @ARGV > 1)
78     { shift (@ARGV); $args{SCHEDULER} = shift(@ARGV); }
79     elsif ($ARGV[0] eq '-jobtype' && scalar @ARGV > 1)
80     { shift (@ARGV); $args{JOBTYPE} = shift(@ARGV); }
81 gutsche 1.5 elsif ($ARGV[0] eq '-filesperjob' && scalar @ARGV > 1)
82     { shift (@ARGV); $args{FILESPERJOB} = shift(@ARGV); }
83     elsif ($ARGV[0] eq '-eventsperjob' && scalar @ARGV > 1)
84     { shift (@ARGV); $args{EVENTSPERJOB} = shift(@ARGV); }
85 corvo 1.3 # marco. Added... don't know why...
86     elsif ($ARGV[0] eq '-log' && scalar @ARGV > 1)
87     { shift (@ARGV); $args{LOGFILE} = shift(@ARGV); }
88     # marco.
89 gutsche 1.1 # Marco
90     elsif ($ARGV[0] eq '-h')
91     { &usage(); }
92     else
93     { last; }
94     }
95    
96     if (@ARGV || !$args{DROPDIR} || !$args{URL})
97     {
98     die "Insufficient parameters, use -h for help.\n";
99     }
100    
101     (new TaskSource (%args))->process();
102    
103     ######################################################################
104     # Routines specific to this agent.
105     package TaskSource; use strict; use warnings; use base 'UtilsAgent';
106     use File::Path;
107     use UtilsCommand;
108     use UtilsLogging;
109     use UtilsTiming;
110     use UtilsNet;
111     use POSIX;
112    
113     sub new
114     {
115     my $proto = shift;
116     my $class = ref($proto) || $proto;
117     my $self = $class->SUPER::new(@_);
118     my %params = (SECS_PER_EVENT => undef, # secs/event to delay per dataset
119     MAX_SITE_QUEUE => undef, # max number of jobs per site
120     IGNORE_REGEXP => undef, # regexp of sites to ignore
121     ACCEPT_REGEXP => undef, # regexp of sites to accept
122     DATASET => undef, # specific dataset
123     OWNER => undef, # specific owner
124 gutsche 1.19 SITECONFIG => undef, # path to siteconfig file
125 gutsche 1.5 NEVENT => 500, # number of events per job
126     MODE => 1, # data discovery mode: (1) PudDB/RefDB, (2) DBS/DLS
127     JOBTYPE => "orca", # standard jobtype
128     SCHEDULER => "edg", # standard scheduler
129     FILESPERJOB => 1, # CMSSW: files per job
130     URL => undef); # published dataset url
131 gutsche 1.1 my %args = (@_);
132     map { $self->{$_} = defined $args{$_} ? $args{$_} : $params{$_} } keys %params;
133     bless $self, $class;
134     return $self;
135     }
136    
137     sub init
138     {
139     my ($self) = @_;
140     $self->{TASKREPO} = "$self->{DROPDIR}/tasks";
141     -d "$self->{TASKREPO}"
142     || mkdir "$self->{TASKREPO}"
143     || die "$self->{TASKREPO}: cannot create directory: $!\n";
144    
145     # Determine if links supports -dump-width option
146     $self->{LINKS_OPTS} = [];
147     open (LINKS_HELP, "links -help 2>/dev/null |");
148 gutsche 1.4 if ( grep(/-no-numbering/, <LINKS_HELP>) ) {
149     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300 -no-numbering 1));
150     } elsif ( grep(/-dump-width/, <LINKS_HELP>) ) {
151     push(@{$self->{LINKS_OPTS}}, qw(-dump-width 300));
152 gutsche 1.1 }
153     close (LINKS_HELP);
154    
155 gutsche 1.4 # Precode whitelist. Should really read this from somewhere...
156    
157     $self->{WHITELIST} = { ASCC => "sinica.edu.tw",
158     NCU => "ncu.edu.tw",
159     FNAL => "fnal.gov",
160     CNAF => "webserver.infn.it",
161     BA => "ba.infn.it",
162     IN2P3=> "in2p3.fr",
163     PIC => "pic.es",
164     T2_SP=> "ciemat.es",
165     RAL => "ral.ac.uk",
166     CERN => "cern.ch",
167     FZK => "fzk.de",
168     DESY => "desy.de",
169     NEBR => "unl.edu",
170     WISC => "wisc.edu",
171     UFL => "ufl.edu",
172     PURDUE => "purdue.edu",
173     UCSD => "ucsd.edu",
174     CALT => "ultralight.org"
175     };
176 gutsche 1.8
177 gutsche 1.9 $self->{SITEMAXQUEUE} = { "cmslcgce.fnal.gov" => 500,
178 gutsche 1.19 "ce01.cmsaf.mit.edu" => 150,
179 gutsche 1.16 "ce04.pic.es" => 250,
180 gutsche 1.9 "red.unl.edu" => 200,
181 gutsche 1.10 "oberon.hep.kbfi.ee" => 500,
182 gutsche 1.19 "cit-gatekeeper.ultralight.org" => 50,
183 gutsche 1.9 "lcg02.ciemat.es" => 200,
184 gutsche 1.19 "ceitep.itep.ru" => 50,
185 gutsche 1.9 "ufloridapg.phys.ufl.edu" => 200,
186 gutsche 1.19 "gridba2.ba.infn.it" => 150,
187 gutsche 1.9 "cclcgceli02.in2p3.fr" => 200,
188 gutsche 1.19 "cmsgrid02.hep.wisc.edu" => 150,
189 gutsche 1.13 "lcgce01.jinr.ru" => 50,
190 gutsche 1.9 "t2-ce-02.lnl.infn.it" => 200,
191 gutsche 1.10 "grid-ce1.desy.de" => 500,
192 gutsche 1.9 "grid-ce.physik.rwth-aachen.de" => 200,
193 gutsche 1.13 "ce01-lcg.projects.cscs.ch" => 50,
194 gutsche 1.19 "gridce.iihe.ac.be" => 150,
195 gutsche 1.9 "ce03-lcg.cr.cnaf.infn.it" => 500,
196 gutsche 1.13 "fce01.grid.sinica.edu.tw" => 50,
197 gutsche 1.9 "gridce.pi.infn.it" => 200,
198 gutsche 1.16 "lcg00125.grid.sinica.edu.tw" => 250,
199 gutsche 1.9 "ce101.cern.ch" => 200,
200     "lcg06.sinp.msu.ru" => 200,
201 gutsche 1.19 "osg-gw-2.t2.ucsd.edu" => 150,
202 gutsche 1.11 "ce-fzk.gridka.de" => 500,
203 gutsche 1.12 "lcgce01.gridpp.rl.ac.uk" => 500,
204     "gw39.hep.ph.ic.ac.uk" => 200,
205 gutsche 1.18 "lepton.rcac.purdue.edu" => 200 };
206 gutsche 1.8
207 corvo 1.3 }
208 gutsche 1.1
209     # Find out how many jobs are pending for each site. This is
210     # insensitive to the job type, and we only check once in the
211     # beginning to avoid favouring one dataset over another --
212     # once we decide to proceed for a site, we submit jobs for
213     # all datasets.
214     sub getSiteStatus
215 gutsche 1.18 {
216 gutsche 1.1 my ($self) = @_;
217     my %result = ();
218     my $taskrepo = $self->{TASKREPO};
219     foreach my $site (<$taskrepo/*/*>)
220 gutsche 1.18 {
221 gutsche 1.1 my ($sitename) = ($site =~ m|.*/(.*)|);
222     foreach my $d (<$site/*/*>)
223 gutsche 1.18 {
224 gutsche 1.1 if (! -f "$d/JOB_CREATE_LOG.txt")
225 gutsche 1.18 {
226 gutsche 1.1 $result{$sitename}{C} ||= 0;
227     $result{$sitename}{C}++;
228     next;
229 gutsche 1.18 }
230    
231     my $f = (<$d/crab_*/share/db/jobs>)[0];
232     next if ! defined $f;
233    
234     foreach my $status (split(/\n/, &input($f) || ''))
235     {
236     my @statusarray = split("\;", $status);
237     $result{$sitename}{$statusarray[1]} ||= 0;
238     $result{$sitename}{$statusarray[1]}++;
239     }
240     }
241     }
242    
243 gutsche 1.1 return %result;
244 gutsche 1.18 }
245 gutsche 1.1
246     sub idle
247     {
248     my ($self, @pending) = @_;
249     eval {
250     # Get status of how busy the sites are. We obtain this only once
251     # in order to not favour datasets "early on" in the list.
252     my %sitestats = $self->getSiteStatus ();
253     if (keys %sitestats)
254     {
255     my @load;
256     foreach my $site (sort keys %sitestats)
257     {
258     push (@load, "$site" . join("", map { " $_=$sitestats{$site}{$_}" }
259     sort keys %{$sitestats{$site}}));
260     }
261     &logmsg ("current load: ", join ("; ", @load));
262     }
263    
264     # Invoke links to fetch a formatted web page of published datasets.
265     if ( $self->{MODE} == 2 ) {
266 gutsche 1.5 &logmsg ("DBS/DLS mode\n");
267     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DBSlistDataset.py";
268     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSlistDataset.py";
269     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
270     while (<PUBLISHED>) {
271     chomp;
272     &timeStart($self->{STARTTIME});
273     # Find out what was published and what we would like to do with it
274     my $datapath = $_;
275     my ($n, $dataset, $datatier, $owner) = split(/\//, $_);
276     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
277     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
278     #my $cmd = "/localscratch/marco/CrabV1/COMP/JOBROBOT/DLSInfo.py \"$_\" ";
279 gutsche 1.7 my $dlsinput = $owner . "/" . $dataset;
280     my $cmd = $ENV{PYTHONSCRIPT} . "/DLSInfo.py \"$dlsinput\" ";
281 gutsche 1.5 open(SITE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
282     while (<SITE>){
283     &logmsg("$_");
284     my ($site, $events) = split(/\//, $_);
285     next if ($self->{SITE} && $site !~ /$self->{SITE}/);
286     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
287     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
288     my $whitelist = $site || '.';
289     $self->createTask($datapath, $site, $events, $whitelist, %sitestats);
290     }
291     }
292     } elsif ( $self->{MODE} == 3 ) {
293     &logmsg ("DBS/DLS CMSSW mode\n");
294     my $cmd = $ENV{PYTHONSCRIPT} . "/DBSInfo_EDM.py";
295     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
296     while (<PUBLISHED>) {
297     chomp;
298     &timeStart($self->{STARTTIME});
299     my ($datapath, $fileblock, $totalevents) = split(/ /, $_);
300 gutsche 1.14 my ($dummy,$tempdataset,$dummy2,$dummy3) = split(/\//, $datapath);
301 gutsche 1.15 next if ($self->{DATASET} && $tempdataset !~ /$self->{DATASET}/);
302 gutsche 1.5 my $cmd = $ENV{PYTHONSCRIPT} . "/DLSInfo.py \"$fileblock\" ";
303     open(SITE, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
304     while (<SITE>){
305     chomp;
306     my $site = $_;
307     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
308     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
309 gutsche 1.19 # read in siteconfig dynamically
310     if ( $self->{SITECONFIG} ) {
311     open(siteconfig_file,"$self->{SITECONFIG}");
312     my @line_array = <siteconfig_file>;
313     my $validsites = "(";
314     for (my $i = 0; $i < scalar @line_array; ++$i) {
315     if (defined $line_array[$i] ) {
316     if ( $line_array[$i] !~ /^#/ && $line_array[$i] !~ /^\n/ ) {
317     my $tempsite = $line_array[$i];
318     chomp($tempsite);
319     $validsites = $validsites . $tempsite . "|";
320     }
321     }
322     }
323     chop($validsites);
324     $validsites = $validsites . ")";
325     close siteconfig_file;
326     next if ($validsites && $site !~ /$validsites/);
327     }
328 gutsche 1.5 $self->createTaskCMSSW($datapath, $site, $totalevents, %sitestats);
329     }
330     }
331 gutsche 1.1 } else {
332 gutsche 1.4 &logmsg ("RefDB/PubDB mode");
333     my $cmd = "links @{$self->{LINKS_OPTS}} -dump '$self->{URL}'";
334     open (PUBLISHED, "$cmd 2>/dev/null |") or die "cannot run `$cmd': $!\n";
335     while (<PUBLISHED>)
336     {
337     &timeStart($self->{STARTTIME});
338     chomp; next if ! /_/; s/\|/ /g; s/^\s+//; s/\s+$//;
339 gutsche 1.1 # Find out what was published and what we would like to do with it
340 gutsche 1.4 my ($dataset, $owner, $events, $site, $proto) = split(/\s+/, $_);
341     $self->createTaskRefDBPubDB($dataset, $owner, $events, $site, 1, %sitestats);
342 gutsche 1.1 }
343     close (PUBLISHED);
344     }
345     };
346     do { chomp ($@); &alert ($@); } if $@;
347    
348     $self->nap ($self->{WAITTIME});
349     }
350    
351     sub createTask()
352     {
353 corvo 1.3 my ($self) = shift(@_);
354     my ($n, $dataset, $owner, $events, $site, $proto, %sitestats, $datapath, $datatier, $whitelist);
355     if (($self->{MODE}) == 1) {
356     ($dataset, $owner, $events, $site, $proto, $whitelist, %sitestats) = @_;
357     }
358     else {
359     ($datapath, $site, $events, $whitelist, %sitestats) = @_;
360     ($n, $dataset, $datatier, $owner) = split(/\//, $datapath);
361     }
362 gutsche 1.1
363 corvo 1.3 # my ($self, $datapath, $dataset, $datatier, $owner, $mode, %sitestats) = @_;
364 gutsche 1.1
365     my ($app, $tiers, $rc, $nevents, $output);
366     if ($dataset =~ /MBforPU/) {
367     next;
368     } elsif ($owner =~ /Hit/) {
369     $rc = "orcarc.read.simhits";
370     $app = "ExSimHitStatistics";
371     $tiers = "Hit";
372     $output = "simhits.aida";
373     } elsif ($owner =~ /DST/) {
374     if (rand(1) > 1.75) {
375 corvo 1.3 $rc = "orcarc.root.dst";
376     $app = "ExRootAnalysisDST";
377 gutsche 1.1 $output = "test.root";
378     } else {
379 corvo 1.3 $rc = "orcarc.read.dst";
380     $app = "ExDSTStatistics";
381 gutsche 1.1 $output = "dststatistics.aida";
382     }
383     $tiers = "DST,Digi,Hit";
384     } else {
385     if (rand(1) > 1.75) {
386 corvo 1.3 $rc = "orcarc.root.digis";
387     $app = "ExRootAnalysisDigi";
388 gutsche 1.1 $output = "test.root";
389     } else {
390 corvo 1.3 $rc = "orcarc.read.digis";
391     $app = "ExDigiStatistics";
392 gutsche 1.1 $output = "digistatistics.aida";
393     }
394 gutsche 1.5 if ($owner =~ m/nopu/i) {
395     $tiers = "Hit";
396     } else {
397     $tiers = "Digi,Hit";
398     }
399 gutsche 1.1 }
400    
401     # Find out what is already pending for this task. First find all
402     # existing tasks in the repository, the latest generation.
403     my $datestamp = strftime ("%y%m%d", gmtime(time()));
404 gutsche 1.5 my ($shortsite) = ( $site =~ /.(\w+).\w+$/ );
405     my $taskdir = "$self->{TASKREPO}/$datestamp/$shortsite/$app";
406 gutsche 1.1 # OLI: shorten path for condor_g (restriction to 256 characters)
407     # my $taskbase = "$datestamp.$site.$app.$dataset.$owner";
408 gutsche 1.5 my $taskbase = "$datestamp.$shortsite.$dataset.$owner";
409 gutsche 1.1 my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
410     my $curgen = pop(@existing) || 0;
411     my $nextgen = $curgen + 1;
412    
413     # If the site isn't too busy already, ignore.
414     my $pending = ($sitestats{$site}{S} || 0);
415     $pending += ($sitestats{$site}{C} || 0);
416     next if $pending > $self->{MAX_SITE_QUEUE};
417    
418     # OK to create the task if enough time has passed from previous
419     # task creation, or there is no previous task.
420     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
421 corvo 1.3 || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
422     < time() - $events * $self->{SECS_PER_EVENT})))
423 gutsche 1.1 {
424 corvo 1.3 my $mydir = $0; $mydir =~ s|/[^/]+$||;
425     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
426     my $ret = &runcmd ("$mydir/CrabJobs", "-app", $app,
427     "-jobevents", $self->{NEVENT},
428     "-orcarc", "$mydir/$rc",
429     "-owner", $owner,
430     "-dataset", $dataset,
431     "-tiers", $tiers,
432     "-whitelist", $whitelist,
433     "-name", "$taskdir/$drop",
434     "-output", $output,
435     "-jobtype", $self->{JOBTYPE},
436     "-scheduler", $self->{SCHEDULER},
437     "-mode", $self->{MODE});
438     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
439 gutsche 1.1
440 corvo 1.3 &output ("$taskdir/$drop/TASK_INIT.txt",
441     &mytimeofday () . "\n");
442    
443     my $dropdir = "$self->{WORKDIR}/$drop";
444     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
445     if (&output ("$dropdir/task", "$taskdir/$drop"))
446     {
447     &touch ("$dropdir/done");
448     $self->relayDrop ($drop);
449     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
450     }
451     else
452     {
453     &alert ("$drop: failed to create drop");
454     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
455     }
456 gutsche 1.1 }
457     }
458 gutsche 1.4
459 gutsche 1.5 sub createTaskCMSSW()
460     {
461     my ($self) = shift(@_);
462     my ($datasetpath, $site, $totalevents, %sitestats) = @_;
463    
464     my($n, $dataset, $tier, $owner) = split(/\//, $datasetpath);
465    
466     my ($rc, $output);
467     if ($tier =~ /SIM/) {
468     $rc = "sim.cfg";
469     $output = "FrameworkJobReport.xml";
470     } elsif ($tier =~ /GEN/) {
471     $rc = "gen.cfg";
472     $output = "FrameworkJobReport.xml";
473     }
474    
475     # Find out what is already pending for this task. First find all
476     # existing tasks in the repository, the latest generation.
477     my $datestamp = strftime ("%y%m%d", gmtime(time()));
478     my $taskdir = "$self->{TASKREPO}/$datestamp/$site/$tier";
479     my $taskbase = "$datestamp.$site.$dataset.$tier.$owner";
480     my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
481     my $curgen = pop(@existing) || 0;
482     my $nextgen = $curgen + 1;
483    
484     # If the site isn't too busy already, ignore.
485     my $pending = ($sitestats{$site}{S} || 0);
486     $pending += ($sitestats{$site}{C} || 0);
487 gutsche 1.8 # take site specific or if not found default max site queue
488     my $max_queue = $self->{SITEMAXQUEUE}->{$site} || $self->{MAX_SITE_QUEUE};
489     next if $pending > $max_queue;
490 gutsche 1.5
491     # OK to create the task if enough time has passed from previous
492     # task creation, or there is no previous task.
493     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
494     || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
495     < time() - $totalevents * $self->{SECS_PER_EVENT})))
496     {
497     my $mydir = $0; $mydir =~ s|/[^/]+$||;
498     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
499     my $ret = &runcmd ("$mydir/CrabJobsCMSSW",
500     "-cfg", "$mydir/$rc",
501     "-datasetpath", $datasetpath,
502     "-output", $output,
503     "-totalevents", $totalevents,
504     "-whitelist", $site,
505     "-filesperjob", $self->{FILESPERJOB},
506     "-eventsperjob", $self->{NEVENT},
507     "-scheduler", $self->{SCHEDULER},
508     "-jobname", "$taskdir/$drop");
509     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
510    
511 gutsche 1.17 &output ("$taskdir/$drop/TASK_INIT.txt",
512 gutsche 1.5 &mytimeofday () . "\n");
513    
514     my $dropdir = "$self->{WORKDIR}/$drop";
515     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
516     if (&output ("$dropdir/task", "$taskdir/$drop"))
517     {
518     &touch ("$dropdir/done");
519     $self->relayDrop ($drop);
520     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
521     }
522     else
523     {
524     &alert ("$drop: failed to create drop");
525     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
526     }
527     }
528     }
529    
530 gutsche 1.4 sub createTaskRefDBPubDB()
531     {
532    
533     my ($self, $dataset, $owner, $events, $site, $mode, %sitestats) = @_;
534    
535     # Marco
536     next if ($self->{DATASET} && $dataset !~ /$self->{DATASET}/);
537     next if ($self->{OWNER} && $owner !~ /$self->{OWNER}/);
538     # Marco
539     my ($app, $tiers, $rc, $nevents, $output);
540     my $whitelist = $self->{WHITELIST}->{$site} || '.';
541     next if ($self->{IGNORE_REGEXP} && $site =~ /$self->{IGNORE_REGEXP}/);
542     next if ($self->{ACCEPT_REGEXP} && $site !~ /$self->{ACCEPT_REGEXP}/);
543     if ($dataset =~ /MBforPU/) {
544     next;
545     } elsif ($owner =~ /Hit/) {
546     $rc = "orcarc.read.simhits";
547     $app = "ExSimHitStatistics";
548     $tiers = "Hit";
549     $output = "simhits.aida";
550     } elsif ($owner =~ /DST/) {
551     next;
552     if (rand(1) > 1.75) {
553     $rc = "orcarc.root.dst";
554     $app = "ExRootAnalysisDST";
555     $output = "test.root";
556     } else {
557     $rc = "orcarc.read.dst";
558     $app = "ExDSTStatistics";
559     $output = "dststatistics.aida";
560     }
561     $tiers = "DST,Digi,Hit";
562     } else {
563     if (rand(1) > 1.75) {
564     $rc = "orcarc.root.digis";
565     $app = "ExRootAnalysisDigi";
566     $output = "test.root";
567     } else {
568     $rc = "orcarc.read.digis";
569     $app = "ExDigiStatistics";
570     $output = "digistatistics.aida";
571     }
572 gutsche 1.5 if ($owner =~ m/nopu/i) {
573     $tiers = "Hit";
574     } else {
575     $tiers = "Digi,Hit";
576     }
577 gutsche 1.4 }
578    
579     # Find out what is already pending for this task. First find all
580     # existing tasks in the repository, the latest generation.
581     my $datestamp = strftime ("%y%m%d", gmtime(time()));
582     my $taskdir = "$self->{TASKREPO}/$datestamp/$site/$app";
583     # OLI: shorten path for condor_g (restriction to 256 characters)
584     # my $taskbase = "$datestamp.$site.$app.$dataset.$owner";
585     my $taskbase = "$datestamp.$site.$dataset.$owner";
586     my @existing = sort { $a <=> $b } map { /.*\.(\d+)$/ } <$taskdir/$taskbase.*>;
587     my $curgen = pop(@existing) || 0;
588     my $nextgen = $curgen + 1;
589    
590     # If the site isn't too busy already, ignore.
591     my $pending = ($sitestats{$site}{S} || 0);
592     $pending += ($sitestats{$site}{C} || 0);
593     next if $pending > $self->{MAX_SITE_QUEUE};
594    
595     # OK to create the task if enough time has passed from previous
596     # task creation, or there is no previous task.
597     if (! -f "$taskdir/$taskbase.$curgen/crab.cfg"
598     || (((stat("$taskdir/$taskbase.$curgen/crab.cfg"))[9]
599     < time() - $events * $self->{SECS_PER_EVENT})))
600     {
601     my $mydir = $0; $mydir =~ s|/[^/]+$||;
602     my $drop = sprintf("%s.%03d", $taskbase, $nextgen);
603     my $ret = &runcmd ("$mydir/CrabJobs", "-app", $app,
604     "-jobevents", $self->{NEVENT},
605     "-orcarc", "$mydir/$rc",
606     "-owner", $owner,
607     "-dataset", $dataset,
608     "-tiers", $tiers,
609     "-whitelist", $whitelist,
610     "-name", "$taskdir/$drop",
611     "-output", $output,
612     "-jobtype", $self->{JOBTYPE},
613     "-scheduler", $self->{SCHEDULER},
614     "-mode", $mode);
615     die "$drop: failed to create task: @{[&runerror($ret)]}\n" if $ret;
616    
617     &output ("$taskdir/$drop/TASK_INIT.txt",
618     &mytimeofday () . "\n");
619    
620     my $dropdir = "$self->{WORKDIR}/$drop";
621     mkdir "$dropdir" || die "$dropdir: cannot create: $!\n";
622     if (&output ("$dropdir/task", "$taskdir/$drop"))
623     {
624     &touch ("$dropdir/done");
625     $self->relayDrop ($drop);
626     &logmsg("stats: $drop @{[&formatElapsedTime($self->{STARTTIME})]} success");
627     }
628     else
629     {
630     &alert ("$drop: failed to create drop");
631     &rmtree ([ "$self->{WORKDIR}/$drop" ]);
632     }
633     }
634     }