1 |
from Actor import *
|
2 |
from crab_util import *
|
3 |
from crab_exceptions import *
|
4 |
from crab_logger import Logger
|
5 |
import common
|
6 |
import string
|
7 |
|
8 |
class CopyLocal(Actor):
|
9 |
def __init__(self, cfg_params, nj_list):
|
10 |
"""
|
11 |
constructor
|
12 |
"""
|
13 |
self.cfg_params = cfg_params
|
14 |
self.nj_list = nj_list
|
15 |
if (cfg_params.get('USER.copy_data',0) == '0') :
|
16 |
raise CrabException("Cannot copy output locally if it has not \
|
17 |
been stored to SE via USER.copy_data=1")
|
18 |
|
19 |
def run(self):
|
20 |
|
21 |
results = self.copyLocal()
|
22 |
|
23 |
self.parseResults( results )
|
24 |
|
25 |
return
|
26 |
|
27 |
def copyLocal(self):
|
28 |
"""
|
29 |
prepare to copy the pre staged output to local
|
30 |
"""
|
31 |
from PhEDExDatasvcInfo import PhEDExDatasvcInfo
|
32 |
|
33 |
stageout = PhEDExDatasvcInfo(self.cfg_params)
|
34 |
endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
|
35 |
|
36 |
# FIXME DS. we'll use the proper DB info once filled..
|
37 |
# output files to be returned via sandbox or copied to SE
|
38 |
self.output_file = []
|
39 |
tmp = self.cfg_params.get('CMSSW.output_file',None)
|
40 |
if tmp.find(',') >= 0:
|
41 |
[self.output_file.append(x.strip()) for x in tmp.split(',')]
|
42 |
else: self.output_file.append( tmp.strip() )
|
43 |
|
44 |
# loop over jobs
|
45 |
task=common._db.getTask(self.nj_list)
|
46 |
allMatch={}
|
47 |
InfileList = ''
|
48 |
for job in task.jobs:
|
49 |
id_job = job['jobId']
|
50 |
if ( job.runningJob['status'] in ['E','UE'] and job.runningJob[ 'wrapperReturnCode'] == 0):
|
51 |
for of in self.output_file:
|
52 |
a,b=of.split('.')
|
53 |
InfileList = '%s_%s.%s%s'%(a,id_job,b,',')
|
54 |
elif ( job.runningJob['status'] in ['E','UE'] and job.runningJob['wrapperReturnCode'] != 0):
|
55 |
common.logger.message("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
|
56 |
%(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
|
57 |
else:
|
58 |
common.logger.message("Not possible copy outputs of Job # %s : Status is %s" \
|
59 |
%(str(job['jobId']),str(job.runningJob['statusScheduler'])))
|
60 |
pass
|
61 |
|
62 |
if (InfileList == '') :
|
63 |
raise CrabException("No files to be copyed")
|
64 |
|
65 |
self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
|
66 |
print InfileList
|
67 |
cmscpConfig = {
|
68 |
"source": endpoint,
|
69 |
"destinationDir": self.outDir,
|
70 |
"inputFileList": InfileList[:-1],
|
71 |
"protocol": 'srm-lcg',
|
72 |
"option": '-b -D srmv2 -t 2400 --verbose'
|
73 |
}
|
74 |
|
75 |
results = self.performCopy(cmscpConfig)
|
76 |
|
77 |
return results
|
78 |
|
79 |
def copyRemote(self):
|
80 |
"""
|
81 |
prepare to copy from local to SE
|
82 |
"""
|
83 |
results = 'still to be implemented'
|
84 |
|
85 |
# to be implemeted
|
86 |
cmscpConfig = {
|
87 |
"source": '',
|
88 |
"inputFileList":'',
|
89 |
"protocol":'',
|
90 |
"option":''
|
91 |
}
|
92 |
|
93 |
# results = self.performCopy(cmscpConfig)
|
94 |
|
95 |
return results
|
96 |
|
97 |
def performCopy(self, dict):
|
98 |
"""
|
99 |
call the cmscp class and do the copy
|
100 |
"""
|
101 |
from cmscp import cmscp
|
102 |
|
103 |
doCopy = cmscp(dict)
|
104 |
|
105 |
start = time.time()
|
106 |
results = doCopy.run()
|
107 |
stop = time.time()
|
108 |
|
109 |
common.logger.debug(1, "CopyLocal Time: "+str(stop - start))
|
110 |
common.logger.write("CopyLocal time :"+str(stop - start))
|
111 |
|
112 |
return results
|
113 |
|
114 |
def parseResults(self,results):
|
115 |
'''
|
116 |
take the results dictionary and
|
117 |
print the results
|
118 |
'''
|
119 |
for file, dict in results :
|
120 |
if file:
|
121 |
txt = 'success'
|
122 |
if dict['erCode'] != 'failed':
|
123 |
msg = 'Copy %s for file: %s \n'%(txt,file)
|
124 |
if txt == 'failed': msg += 'Copy failed because : %s'%dict['reason']
|
125 |
common.logger.message( msg )
|
126 |
return
|