ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/JOBROBOT/TaskQuery
Revision: 1.10
Committed: Tue May 19 20:07:58 2009 UTC (15 years, 11 months ago) by belforte
Branch: MAIN
Changes since 1.9: +0 -15 lines
Log Message:
sync. to what is running in JR now

File Contents

# User Rev Content
1 gutsche 1.1 #!/usr/bin/env perl
2    
3     ##H This drop box agent collects the output from CRAB tasks after the
4     ##H the task jobs have run to completion. Tasks with jobs in them
5     ##H should not be passed into this agent until all jobs have completed.
6     ##H
7     ##H Usage:
8     ##H TaskQuery
9     ##H -state DIRECTORY [-next NEXT] [-wait SECS]
10     ##H
11     ##H -state agent state directory, including inbox
12     ##H -next next agent to pass the drops to; can be given several times
13     ##H -wait time to wait in seconds between work scans
14 gutsche 1.2 ##H -mode mode: EGEE or OSG, prevents kill of aborted jobs for EGEE
15 gutsche 1.1
16     BEGIN {
17     use strict; use warnings; $^W=1;
18     our $me = $0; $me =~ s|.*/||;
19     our $home = $0; $home =~ s|/[^/]+$||; $home ||= "."; $home .= "/../PHEDEX/Toolkit/Common";
20     unshift(@INC, $home);
21     }
22    
23     ######################################################################
24     use UtilsHelp;
25     while (scalar @ARGV) {
26     if ($ARGV[0] eq '-state' && scalar @ARGV > 1) {
27     shift (@ARGV); $args{DROPDIR}= shift(@ARGV);
28     } elsif ($ARGV[0] eq '-next' && scalar @ARGV > 1) {
29     shift (@ARGV); push (@{$args{NEXTDIR}}, shift(@ARGV));
30     } elsif ($ARGV[0] eq '-wait' && scalar @ARGV > 1) {
31     shift (@ARGV); $args{WAITTIME} = shift(@ARGV);
32 gutsche 1.2 } elsif ($ARGV[0] eq '-mode' && scalar @ARGV > 1) {
33     shift (@ARGV); $args{MODE} = shift(@ARGV);
34 gutsche 1.1 } elsif ($ARGV[0] eq '-h') {
35     &usage();
36     } else {
37     last;
38     }
39     }
40    
41     if (@ARGV || !$args{DROPDIR}) {
42     die "Insufficient parameters, use -h for help.\n";
43     }
44    
45     (new TaskCollect (%args))->process();
46    
47     ######################################################################
48     # Routines specific to this agent.
49     package TaskCollect; use strict; use warnings; use base 'UtilsAgent';
50     use UtilsCommand;
51     use UtilsLogging;
52     use UtilsTiming;
53    
54     sub getTimeSinceStart
55     {
56     # extract time since start of project in hours
57     my ($self, $taskdir) = @_;
58     my $time=time();
59    
60     # read start time from file in taskdir
61     open( FILE, "< $taskdir/TASK_INIT.txt" ) or die "Can't open $taskdir/TASK_INIT.txt : $!";
62     my $starttime = 0.0;
63     while ( <FILE> ) {
64     chomp;
65     $starttime = scalar ($_);
66     }
67     close FILE;
68    
69     my $diffsec = $time - $starttime;
70     my $diffhours = $diffsec/3600.;
71     return $diffhours;
72     }
73    
74 asciaba 1.7 sub getTimeSinceSubmit
75     {
76     # extract time since task submission in hours
77     my ($self, $taskdir) = @_;
78     my $time=time();
79     my $diffhours = 0;
80    
81     # read start time from file in taskdir
82     open( FILE, "< $taskdir/JOB_SUBMIT_LOG.txt" ) or return $diffhours;
83     while ( <FILE> ) {
84     chomp;
85     if ( /^START.*==\s+(\d+)/ ) {
86     my $subtime = $1;
87     $diffhours = ($time - $subtime) / 3600.;
88     last;
89     }
90     }
91     close FILE;
92     return $diffhours;
93     }
94    
95 gutsche 1.1 sub new
96     {
97     my $proto = shift;
98     my $class = ref($proto) || $proto;
99     my $self = $class->SUPER::new(@_);
100     # OLI 060702
101     # deactivate resubmission.
102 gutsche 1.2 my %params = (MODE => 'EGEE');
103 gutsche 1.1 my %args = (@_);
104     map { $self->{$_} = $args{$_} || $params{$_} } keys %params;
105     bless $self, $class;
106     return $self;
107     }
108    
109     # Actually process the drop.
110     sub processDrop {
111     my ($self, $drop) = @_;
112    
113     # cmd variable
114     my $cmd = "";
115    
116     # Sanity checking
117     return if (! $self->inspectDrop ($drop));
118     # waiting time for too many status checks
119     return if (($$self{NEXT_CHECK}{$drop} || 0) > time());
120     $$self{NEXT_CHECK}{$drop} = time() + 0;
121     delete $self->{BAD}{$drop};
122     &timeStart($self->{STARTTIME});
123    
124     # Read CRAB confiugration
125     my $taskdir = &input ("$self->{WORKDIR}/$drop/task");
126     if (! $taskdir || ! -d $taskdir) {
127     &alert ("missing task directory in $drop");
128     $self->markBad ($drop);
129     return;
130     }
131    
132     # Time to check job status on the task again. Let CRAB do the hard
133     # work, then check what it found.
134     $cmd = qq{
135     cd $taskdir || exit \$?;
136     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
137     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
138 asciaba 1.5 time crab -continue -status; a=\$?; which UpdateOldCrabDb >/dev/null 2>&1 && UpdateOldCrabDb; exit \$a) >> JOB_STATUS_LOG.txt 2>&1};
139 gutsche 1.1 if (my $rc = system($cmd)) {
140     &alert ("$drop $taskdir: failed to check job status: exit code @{[&runerror($rc)]}, move jobdb out of way");
141     # move crab jobs db out of the way to have the jobs not be counted in statistics for project creation (TaskSource)
142     $cmd = qq{
143     cd $taskdir || exit \$?;
144     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
145     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
146     mv -v crab_*/share/db/jobs crab.db.jobs.old) >> JOB_REMOVE_CRABDIR_FAILED_STATUS_CHECK_LOG.txt 2>&1};
147     if (my $rc3 = system($cmd)) {
148     &alert ("$drop $taskdir: failed to move crab jobs to crab.db.jobs.old");
149     }
150     # mark task as finished
151     &logmsg("stats: $drop $taskdir @{[&formatElapsedTime($self->{STARTTIME})]} failed checking status");
152     &output ("$taskdir/TASK_FINISHED.txt", &mytimeofday () . "\n");
153     $self->markBad ($drop);
154     return;
155     }
156    
157     # parse jobdb
158     my $f = (<$taskdir/crab_*/share/db/jobs>)[0];
159    
160     my @gridids = ();
161     my @crabids = ();
162     my @jobstat = ();
163     foreach my $status (split(/\n/, &input($f) || '')) {
164     my @statusarray = split('\|', $status);
165     push(@crabids, $statusarray[4]);
166     push(@gridids, $statusarray[3]);
167     push(@jobstat, $statusarray[1]);
168     }
169    
170     my $hold = 0;
171    
172     # check time between start of project and now
173     my $lifetime = $self->getTimeSinceStart("$taskdir");
174 asciaba 1.7 # check time from job submission
175     my $subtime = $self->getTimeSinceSubmit("$taskdir");
176 belforte 1.6 # set time after which all jobs are cleaned and task marked bad
177     my $cleanuptime = 31.;
178 gutsche 1.1 # set time after which all jobs (running, pending and others) are killed
179 belforte 1.6 my $killtime = 30.;
180 gutsche 1.1 # set time after which pending and other jobs are killed, running jobs are left running
181     my $stoptime = 24.;
182 asciaba 1.7 # set time from submission after which the task is cleared if there are still jobs in status Submitted
183     my $stucktime = 8.;
184 gutsche 1.1
185     for (my $i = 0; $i <= $#jobstat; ++$i) {
186     # Compute CRAB index and nice label for this job
187     my $crabi = $i+1;
188     my $joblabel = $gridids[$i];
189     $joblabel =~ s/([^-A-Za-z0-9_:.])/sprintf("%%%02x", ord($1))/ge;
190    
191     # Determine job status.
192     my $status = $jobstat[$i];
193 asciaba 1.7 if ($status !~ /^[YHSBCARDKZ]$/) {
194 gutsche 1.1 &alert ("$drop $taskdir: unexpected job status `$status' for `$crabi'");
195     # kill job
196     $cmd = qq{
197     cd $taskdir || exit \$?;
198     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
199     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
200 belforte 1.4 time crab -continue -kill '$crabids[$i]'; which CrabStatusAndUpdateOldCrabDb >/dev/null 2>&1 && CrabStatusAndUpdateOldCrabDb) >> JOB_KILL_LOG.txt 2>&1};
201 gutsche 1.1 system($cmd);
202     next;
203     }
204    
205     # if job is created but not submitted, submit again, check afterwards, otherwise kill and continue
206     if ($status eq 'C') {
207     &alert ("$drop $taskdir: job $crabids[$i] not submitted yet, submit now ");
208     $cmd = qq{
209     cd $taskdir || exit \$?;
210     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
211     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
212 asciaba 1.5 time crab -continue -submit '$crabids[$i]'; a=\$?; which CrabStatusAndUpdateOldCrabDb >/dev/null 2>&1 && CrabStatusAndUpdateOldCrabDb; exit \$a) >> JOB_SUBMIT_LOG.txt 2>&1};
213 gutsche 1.1 if (my $rc = system($cmd)) {
214     &alert ("$drop $taskdir: failed to submit job $crabids[$i] with exit code @{[&runerror($rc)]}, kill job");
215     # kill job
216     $cmd = qq{
217     cd $taskdir || exit \$?;
218     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
219     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
220 belforte 1.4 time crab -continue -kill '$crabids[$i]'; which CrabStatusAndUpdateOldCrabDb >/dev/null 2>&1 && CrabStatusAndUpdateOldCrabDb) >> JOB_KILL_LOG.txt 2>&1};
221 gutsche 1.1 system($cmd);
222     }
223     next;
224     }
225    
226 belforte 1.6 if ( $lifetime >= $cleanuptime ) {
227     &alert ("$drop $taskdir: task kill did not work in time");
228     # move crab jobs db out of the way to have the jobs not be counted in statistics for project creation (TaskSource)
229     $cmd = qq{
230     cd $taskdir || exit \$?;
231     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
232     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
233     mv -v crab_*/share/db/jobs crab.db.jobs.old) >> JOB_REMOVE_CRABDIR_KILL_FAILED.txt 2>&1};
234     if (my $rc = system($cmd)) {
235     &alert ("$drop $taskdir: failed to move crab jobs to crab.db.jobs.old");
236     }
237     # mark task as finished
238     &output ("$taskdir/TASK_FINISHED.txt", &mytimeofday () . "\n");
239     $self->markBad ($drop);
240     return;
241     }
242    
243     elsif ( $lifetime >= $killtime ) {
244 gutsche 1.1
245     # That's all for cleared jobs.
246     next if ($status eq 'Y');
247    
248     # That's all for killed jobs.
249     next if ($status eq 'K');
250    
251 belforte 1.3 # Retrieve job logging information if abort
252     if ($status eq 'A') {
253     my $outfile = "JOB_HISTORY.$crabi.$status.$joblabel.txt";
254     if (! -f "$taskdir/$outfile") {
255     $cmd = qq{
256     cd $taskdir || exit \$?;
257     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
258     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
259     time crab -continue -postMortem '$crabids[$i]') > $outfile 2>&1};
260     system($cmd);
261     }
262 asciaba 1.7 next;
263 gutsche 1.1 }
264    
265     # get output for job if done
266     if ($status eq 'D') {
267     next;
268     }
269    
270 asciaba 1.7 # Useless to try to kill Submitted jobs
271     next if ($status eq 'B');
272    
273 gutsche 1.1 # kill the rest
274     $cmd = qq{
275     cd $taskdir || exit \$?;
276     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
277     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
278 belforte 1.4 time crab -continue -kill '$crabids[$i]'; which CrabStatusAndUpdateOldCrabDb >/dev/null 2>&1 && CrabStatusAndUpdateOldCrabDb) >> JOB_KILL_LOG.txt 2>&1};
279 gutsche 1.1 system($cmd);
280 belforte 1.6 # come back to verify it was killed
281     $hold = 1;
282 gutsche 1.1 next;
283    
284     } elsif ( $lifetime >= $stoptime ) {
285    
286     # keep the running
287     if ($status =~ /^[R]$/) {
288     $hold = 1;
289     next;
290     }
291    
292     # That's all for cleared jobs.
293     next if ($status eq 'Y');
294    
295     # That's all for killed jobs.
296     next if ($status eq 'K');
297    
298 belforte 1.3 # Retrieve job logging information if abort
299     if ($status eq 'A') {
300     my $outfile = "JOB_HISTORY.$crabi.$status.$joblabel.txt";
301     if (! -f "$taskdir/$outfile") {
302     $cmd = qq{
303     cd $taskdir || exit \$?;
304     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
305     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
306     time crab -continue -postMortem '$crabids[$i]') > $outfile 2>&1};
307     system($cmd);
308     }
309 asciaba 1.7 next;
310 gutsche 1.1 }
311     # get output for job if done
312     if ($status eq 'D') {
313     next;
314     }
315    
316 asciaba 1.7 # Useless to try to kill Submitted jobs
317     next if ($status eq 'B');
318    
319 gutsche 1.1 # kill the rest
320     $cmd = qq{
321     cd $taskdir || exit \$?;
322     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
323     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
324 belforte 1.4 time crab -continue -kill '$crabids[$i]'; which CrabStatusAndUpdateOldCrabDb >/dev/null 2>&1 && CrabStatusAndUpdateOldCrabDb) >> JOB_KILL_LOG.txt 2>&1};
325 gutsche 1.1 system($cmd);
326     next;
327    
328     } else {
329    
330 asciaba 1.7 # Invalidate the task if after some time there are still jobs in status Submitted
331     if ($subtime >= $stucktime && $status eq 'B') {
332     &alert ("$drop $taskdir: jobs stuck in Submitted status");
333     # move crab jobs db out of the way to have the jobs not be counted in statistics for project creation (TaskSource)
334     $cmd = qq{
335     cd $taskdir || exit \$?;
336     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
337     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
338     mv -v crab_*/share/db/jobs crab.db.jobs.old) >> JOB_REMOVE_JOB_STATUS_STUCK.txt 2>&1};
339     if (my $rc = system($cmd)) {
340     &alert ("$drop $taskdir: failed to move crab jobs to crab.db.jobs.old");
341     }
342     # mark task as finished
343     &output ("$taskdir/TASK_FINISHED.txt", &mytimeofday () . "\n");
344     $self->markBad($drop);
345     return;
346     }
347    
348    
349 gutsche 1.1 # Process the easy cases.
350 asciaba 1.7 if ($status =~ /^[BSR]$/) {
351 gutsche 1.1 $hold = 1;
352     next;
353     }
354    
355     # That's all for cleared jobs.
356     next if ($status eq 'Y');
357    
358     # That's all for killed jobs.
359     next if ($status eq 'K');
360    
361 belforte 1.3 # Retrieve job logging information if abort
362     if ($status eq 'A') {
363     my $outfile = "JOB_HISTORY.$crabi.$status.$joblabel.txt";
364     if (! -f "$taskdir/$outfile") {
365     $cmd = qq{
366     cd $taskdir || exit \$?;
367     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
368     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
369     time crab -continue -postMortem '$crabids[$i]') > $outfile 2>&1};
370     system($cmd);
371     }
372 gutsche 1.1 }
373    
374     # get output for job if done
375     if ($status eq 'D') {
376     next;
377     }
378    
379 gutsche 1.2 # kill the rest if mode OSG
380     if ( $self->{MODE} eq "OSG" ) {
381     $cmd = qq{
382 gutsche 1.1 cd $taskdir || exit \$?;
383     TIMEFORMAT="Timing information in seconds: \%R real, \%U user, \%S system, \%P \%\%";
384     (date -u +"START \%Y\%m\%dZ\%H\%M\%S == \%s"; set -x;
385 belforte 1.4 time crab -continue -kill '$crabids[$i]'; which CrabStatusAndUpdateOldCrabDb >/dev/null 2>&1 && CrabStatusAndUpdateOldCrabDb) >> JOB_KILL_LOG.txt 2>&1};
386 gutsche 1.2 system($cmd);
387     next;
388     }
389 gutsche 1.1
390     }
391     }
392    
393     # Check if we are supposed to hold this task.
394     return if $hold;
395    
396     &touch ("$self->{WORKDIR}/$drop/done");
397     delete $$self{NEXT_CHECK}{$drop};
398     $self->relayDrop ($drop);
399     &logmsg("stats: $drop $taskdir @{[&formatElapsedTime($self->{STARTTIME})]} success");
400     }