2 |
|
import common |
3 |
|
from crab_util import * |
4 |
|
from crab_exceptions import * |
5 |
+ |
from PhEDExDatasvcInfo import PhEDExDatasvcInfo |
6 |
+ |
|
7 |
|
|
8 |
|
class Resubmitter(Submitter): |
9 |
|
def __init__(self, cfg_params, jobs): |
10 |
|
self.cfg_params = cfg_params |
11 |
< |
|
11 |
> |
|
12 |
|
nj_list = [] |
13 |
|
|
14 |
< |
nj_list = self.checkAlowedJob(jobs,nj_list) |
15 |
< |
|
16 |
< |
|
17 |
< |
common.logger.message('Jobs '+str(nj_list)+' will be resubmitted') |
14 |
> |
self.copy_data = int(cfg_params.get('USER.copy_data',0)) |
15 |
> |
self.check_RemoteDir = int(cfg_params.get('USER.check_user_remote_dir',0)) |
16 |
> |
if (jobs=='bad'): |
17 |
> |
nj_list = self.checkBadJob(nj_list) |
18 |
> |
else: |
19 |
> |
nj_list = self.checkAllowedJob(jobs,nj_list) |
20 |
> |
common.logger.info('Jobs '+str(nj_list)+' will be resubmitted') |
21 |
|
Submitter.__init__(self, cfg_params, nj_list, 'range') |
22 |
|
|
23 |
|
return |
24 |
|
|
25 |
< |
def checkAlowedJob(self,jobs,nj_list): |
26 |
< |
listRunField=[] |
25 |
> |
def checkRemoteDir(self,task): |
26 |
> |
|
27 |
> |
if self.copy_data==1: |
28 |
> |
stageout = PhEDExDatasvcInfo(self.cfg_params) |
29 |
> |
endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint() |
30 |
> |
common.scheduler.checkRemoteDir(endpoint,eval(task['outfileBasename'])) |
31 |
> |
|
32 |
|
|
33 |
+ |
def checkAllowedJob(self,jobs,nj_list): |
34 |
+ |
listRunField=[] |
35 |
|
task=common._db.getTask(jobs) |
36 |
+ |
if self.check_RemoteDir == 1 : self.checkRemoteDir(task) |
37 |
|
for job in task.jobs: |
38 |
< |
st = job.runningJob['status'] |
39 |
< |
nj = job['jobId'] |
40 |
< |
if st in ['K','A','SE','E','UE','DA']: |
41 |
< |
nj_list.append(int(nj)) |
42 |
< |
elif st == 'C': |
43 |
< |
common.logger.message('Job #'+`int(nj)`+' has status '+str(job.runningJob['statusScheduler'])+' not yet submitted!!!') |
44 |
< |
elif st in ['SD','D']: |
45 |
< |
common.logger.message('Job #'+`int(nj)`+' has status '+str(job.runningJob['statusScheduler'])+' must be retrieved before resubmission') |
38 |
> |
st = job.runningJob['state'] |
39 |
> |
nj = int(job['jobId']) |
40 |
> |
if st in ['KillSuccess','SubFailed','Cleared','Aborted']: |
41 |
> |
#['K','A','SE','E','DA','NS']: |
42 |
> |
nj_list.append(nj) |
43 |
> |
elif st == 'Created': |
44 |
> |
common.logger.info('Job #'+`nj`+' last action was '+str(job.runningJob['state'])+' not yet submitted: use -submit') |
45 |
> |
elif st in ['Terminated']: |
46 |
> |
common.logger.info('Job #'+`nj`+' last action was '+str(job.runningJob['state'])+' must be retrieved (-get) before resubmission') |
47 |
|
else: |
48 |
< |
common.logger.message('Job #'+`nj`+' has status '+str(job.runningJob['statusScheduler'])+' must be "killed" before resubmission') |
48 |
> |
common.logger.info('Job #'+`nj`+' last action was '+str(job.runningJob['state'])+' actual status is '\ |
49 |
> |
+str(job.runningJob['statusScheduler'])+' must be killed (-kill) before resubmission') |
50 |
> |
if (job.runningJob['state']=='KillRequested'): common.logger.info('\t\tthe previous Kill request is being processed') |
51 |
> |
|
52 |
|
|
53 |
|
if len(nj_list) == 0 : |
54 |
|
msg='No jobs to resubmit' |
55 |
|
raise CrabException(msg) |
56 |
< |
self.manageNewRunJobs(nj_list) |
56 |
> |
|
57 |
> |
common._db.updateJob_(nj_list, [{'closed':'N'}]*len(nj_list)) |
58 |
> |
# Get new running instances |
59 |
> |
common._db.newRunJobs(nj_list) |
60 |
> |
|
61 |
|
return nj_list |
62 |
|
|
63 |
< |
def manageNewRunJobs(self,nj_list): |
64 |
< |
""" |
65 |
< |
Get new running instances |
66 |
< |
""" |
63 |
> |
def checkBadJob(self,nj_list): |
64 |
> |
listRunField=[] |
65 |
> |
jobs = common._db.nJobs('list') |
66 |
> |
task=common._db.getTask(jobs) |
67 |
> |
if self.check_RemoteDir == 1 : self.checkRemoteDir(task) |
68 |
> |
for job in task.jobs: |
69 |
> |
st = job.runningJob['state'] |
70 |
> |
nj = int(job['jobId']) |
71 |
> |
if st in ['KillSuccess','SubFailed','Aborted','Cancelled']: |
72 |
> |
nj_list.append(nj) |
73 |
> |
elif st in ['Cleared']: |
74 |
> |
if (job.runningJob['applicationReturnCode']!=0 or (job.runningJob['wrapperReturnCode']!=0 and job.runningJob['wrapperReturnCode']!=60308 ) ) : |
75 |
> |
nj_list.append(nj) |
76 |
> |
elif st == 'Created': |
77 |
> |
common.logger.info('Job #'+`nj`+' last action was '+str(job.runningJob['state'])+' not yet submitted: use -submit') |
78 |
> |
elif st in ['Terminated']: |
79 |
> |
common.logger.info('Job #'+`nj`+' last action was '+str(job.runningJob['state'])+' must be retrieved (-get) before resubmission') |
80 |
> |
else: |
81 |
> |
common.logger.info('Job #'+`nj`+' last action was '+str(job.runningJob['state'])+' actual status is '\ |
82 |
> |
+str(job.runningJob['statusScheduler'])+' must be killed (-kill) before resubmission') |
83 |
> |
if (job.runningJob['state']=='KillRequested'): common.logger.info('\t\tthe previous Kill request is being processed') |
84 |
> |
|
85 |
> |
|
86 |
> |
if len(nj_list) == 0 : |
87 |
> |
msg='No jobs to resubmit' |
88 |
> |
raise CrabException(msg) |
89 |
> |
|
90 |
> |
common._db.updateJob_(nj_list, [{'closed':'N'}]*len(nj_list)) |
91 |
> |
# Get new running instances |
92 |
|
common._db.newRunJobs(nj_list) |
93 |
< |
return |
93 |
> |
|
94 |
> |
return nj_list |