1 |
#!/usr/bin/env perl
|
2 |
|
3 |
##H This drop box agent collects the output from CRAB tasks after the
|
4 |
##H the task jobs have run to completion. Tasks with jobs in them
|
5 |
##H should not be passed into this agent until all jobs have completed.
|
6 |
##H
|
7 |
##H Usage:
|
8 |
##H TaskCollect
|
9 |
##H -state DIRECTORY [-next NEXT] [-wait SECS]
|
10 |
##H
|
11 |
##H -state agent state directory, including inbox
|
12 |
##H -next next agent to pass the drops to; can be given several times
|
13 |
##H -wait time to wait in seconds between work scans
|
14 |
|
15 |
BEGIN {
|
16 |
use strict; use warnings; $^W=1;
|
17 |
our $me = $0; $me =~ s|.*/||;
|
18 |
our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
|
19 |
unshift(@INC, $home);
|
20 |
}
|
21 |
|
22 |
######################################################################
|
23 |
use UtilsHelp;
|
24 |
while (scalar @ARGV) {
|
25 |
if ($ARGV[0] eq '-state' && scalar @ARGV > 1) {
|
26 |
shift (@ARGV); $args{DROPDIR}= shift(@ARGV);
|
27 |
} elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1) {
|
28 |
shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));
|
29 |
} elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1) {
|
30 |
shift (@ARGV); $args{WAITTIME} = shift(@ARGV);
|
31 |
} elsif ($ARGV[0] eq '-h') {
|
32 |
&usage();
|
33 |
} else {
|
34 |
last;
|
35 |
}
|
36 |
}
|
37 |
|
38 |
if (@ARGV || !$args{DROPDIR}) {
|
39 |
die "Insufficient parameters, use -h for help.\n";
|
40 |
}
|
41 |
|
42 |
(new TaskCollect (%args))->process();
|
43 |
|
44 |
######################################################################
|
45 |
# Routines specific to this agent.
|
46 |
package TaskCollect; use strict; use warnings; use base 'UtilsAgent';
|
47 |
use UtilsCommand;
|
48 |
use UtilsLogging;
|
49 |
use UtilsTiming;
|
50 |
|
51 |
sub getTimeSinceStart
|
52 |
{
|
53 |
# extract time since start of project in hours
|
54 |
my ($self, $taskdir) = @_;
|
55 |
my $time=time();
|
56 |
|
57 |
# read start time from file in taskdir
|
58 |
open( FILE, "< $taskdir/TASK_INIT.txt" ) or die "Can't open $taskdir/TASK_INIT.txt : $!";
|
59 |
my $starttime = 0.0;
|
60 |
while ( <FILE> ) {
|
61 |
chomp;
|
62 |
$starttime = scalar ($_);
|
63 |
}
|
64 |
close FILE;
|
65 |
|
66 |
my $diffsec = $time - $starttime;
|
67 |
my $diffhours = $diffsec/3600.;
|
68 |
return $diffhours;
|
69 |
}
|
70 |
|
71 |
sub new
|
72 |
{
|
73 |
my $proto = shift;
|
74 |
my $class = ref($proto) || $proto;
|
75 |
my $self = $class->SUPER::new(@_);
|
76 |
# OLI 060702
|
77 |
# deactivate resubmission.
|
78 |
# my %params = (RESUBMIT => 3); # Number of times to (re)submit
|
79 |
my %params = ();
|
80 |
my %args = (@_);
|
81 |
map { $self->{$_} = $args{$_} || $params{$_} } keys %params;
|
82 |
bless $self, $class;
|
83 |
return $self;
|
84 |
}
|
85 |
|
86 |
# Actually process the drop.
|
87 |
sub processDrop {
|
88 |
my ($self, $drop) = @_;
|
89 |
|
90 |
|
91 |
# cmd variable
|
92 |
my $cmd = "";
|
93 |
|
94 |
# Sanity checking
|
95 |
return if (! $self->inspectDrop ($drop));
|
96 |
# waiting time for too many status checks
|
97 |
return if (($$self{NEXT_CHECK}{$drop} || 0) > time());
|
98 |
$$self{NEXT_CHECK}{$drop} = time() + 0;
|
99 |
delete $self->{BAD}{$drop};
|
100 |
&timeStart($self->{STARTTIME});
|
101 |
|
102 |
# Read CRAB confiugration
|
103 |
my $taskdir = &input ("$self->{WORKDIR}/$drop/task");
|
104 |
if (! $taskdir || ! -d $taskdir) {
|
105 |
&alert ("missing task directory in $drop");
|
106 |
$self->markBad ($drop);
|
107 |
return;
|
108 |
}
|
109 |
|
110 |
$cmd = qq{
|
111 |
cd $taskdir || exit \$?;
|
112 |
TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
|
113 |
(date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
|
114 |
time crab -continue -getoutput; which CrabStatusAndUpdateOldCrabDb >/dev/null 2>&1 && CrabStatusAndUpdateOldCrabDb) >> JOB_COLLECT_LOG.txt 2>&1;
|
115 |
chmod 755 crab_0_*/res/*};
|
116 |
system($cmd);
|
117 |
|
118 |
# delete lock
|
119 |
$cmd = qq{rm -f $taskdir/lock};
|
120 |
system($cmd);
|
121 |
|
122 |
&output ("$taskdir/TASK_FINISHED.txt", &mytimeofday () . "\n");
|
123 |
|
124 |
# add to list of tasks finished today
|
125 |
|
126 |
$cmd = qq{
|
127 |
cd $taskdir || exit \$?;
|
128 |
(which AddToTasksFinishedToday >/dev/null 2>&1 && AddToTasksFinishedToday ) >> JOB_COLLECT_LOG.txt 2>&1};
|
129 |
system($cmd);
|
130 |
|
131 |
|
132 |
# Success, relay onwards
|
133 |
&touch ("$self->{WORKDIR}/$drop/done");
|
134 |
delete $$self{NEXT_CHECK}{$drop};
|
135 |
$self->relayDrop ($drop);
|
136 |
&logmsg("stats: $drop $taskdir @{[&formatElapsedTime($self->{STARTTIME})]} success");
|
137 |
}
|