1 |
belforte |
1.30 |
__revision__ = "$Id: CopyData.py,v 1.29 2012/07/11 15:53:30 belforte Exp $"
|
2 |
|
|
__version__ = "$Revision: 1.29 $"
|
3 |
fanzago |
1.26 |
|
4 |
spiga |
1.1 |
from Actor import *
|
5 |
|
|
from crab_util import *
|
6 |
|
|
from crab_exceptions import *
|
7 |
|
|
import common
|
8 |
|
|
import string
|
9 |
|
|
|
10 |
|
|
class CopyData(Actor):
|
11 |
|
|
def __init__(self, cfg_params, nj_list, StatusObj):
|
12 |
|
|
"""
|
13 |
|
|
constructor
|
14 |
|
|
"""
|
15 |
fanzago |
1.19 |
common.logger.debug("CopyData __init__() : \n")
|
16 |
ewv |
1.21 |
|
17 |
fanzago |
1.19 |
if (cfg_params.get('USER.copy_data',0) == '0') :
|
18 |
|
|
msg = 'Cannot copy output if it has not \n'
|
19 |
|
|
msg += '\tbeen stored to SE via USER.copy_data=1'
|
20 |
|
|
raise CrabException(msg)
|
21 |
ewv |
1.21 |
|
22 |
spiga |
1.1 |
self.cfg_params = cfg_params
|
23 |
|
|
self.nj_list = nj_list
|
24 |
ewv |
1.21 |
|
25 |
fanzago |
1.16 |
### default is the copy local
|
26 |
|
|
self.copy_local = 1
|
27 |
fanzago |
1.19 |
self.dest_se = cfg_params.get("CRAB.dest_se", 'local')
|
28 |
|
|
self.dest_endpoint = cfg_params.get("CRAB.dest_endpoint", 'local')
|
29 |
ewv |
1.21 |
|
30 |
fanzago |
1.19 |
if ((self.dest_se != 'local') and (self.dest_endpoint != 'local')):
|
31 |
|
|
msg = 'You can specify only a parameter with CopyData option \n'
|
32 |
|
|
msg += '1) The dest_se in the case of official CMS SE (i.e -dest_se=T2_IT_Legnaro)\n'
|
33 |
|
|
msg += '2) The complete endpoint for not official SE (i.e -dest_endpoint=srm://<se_name>:<port>/xxx/yyy/)\n'
|
34 |
ewv |
1.21 |
msg += '3) if you do not specify parameters, the output will be copied in your UI under crab_working_dir/res \n'
|
35 |
spiga |
1.5 |
raise CrabException(msg)
|
36 |
ewv |
1.21 |
|
37 |
fanzago |
1.19 |
if ((self.dest_se != 'local') or (self.dest_endpoint != 'local')):
|
38 |
ewv |
1.21 |
self.copy_local = 0
|
39 |
spiga |
1.3 |
|
40 |
spiga |
1.1 |
# update local DB
|
41 |
spiga |
1.3 |
if StatusObj:# this is to avoid a really strange segv
|
42 |
|
|
StatusObj.query(display=False)
|
43 |
ewv |
1.21 |
|
44 |
|
|
protocolDict = { 'CAF' : 'rfio',
|
45 |
|
|
'LSF' : 'rfio',
|
46 |
|
|
'CONDOR_G' : 'srmv2',
|
47 |
|
|
'GLITE' : 'srm-lcg',
|
48 |
|
|
'GLIDEIN' : 'srm-lcg',
|
49 |
|
|
'CONDOR' : 'srmv2',
|
50 |
belforte |
1.30 |
'REMOTEGLIDEIN' : 'srm-lcg',
|
51 |
ewv |
1.21 |
'SGE' : 'srmv2',
|
52 |
|
|
'ARC' : 'srmv2',
|
53 |
|
|
}
|
54 |
|
|
self.protocol = protocolDict[common.scheduler.name().upper()]
|
55 |
|
|
|
56 |
spiga |
1.20 |
common.logger.debug("Protocol = %s; Destination SE = %s; Destination Endpoint = %s."%(self.protocol,self.dest_se,self.dest_endpoint))
|
57 |
spiga |
1.1 |
|
58 |
|
|
def run(self):
|
59 |
fanzago |
1.16 |
"""
|
60 |
|
|
default is the copy of output to the local dir res in crabDir
|
61 |
|
|
"""
|
62 |
|
|
|
63 |
|
|
results = self.copy()
|
64 |
ewv |
1.21 |
self.parseResults( results )
|
65 |
|
|
return
|
66 |
spiga |
1.1 |
|
67 |
fanzago |
1.16 |
def copy(self):
|
68 |
spiga |
1.1 |
"""
|
69 |
fanzago |
1.19 |
1) local copy: it is the default. The output will be copied under crab_working_dir/res dir
|
70 |
ewv |
1.21 |
2) copy to a remote SE specifying -dest_se (official CMS remote SE)
|
71 |
|
|
or -dest_endpoint (not official, needed the complete endpoint)
|
72 |
spiga |
1.1 |
"""
|
73 |
fanzago |
1.16 |
|
74 |
|
|
to_copy = {}
|
75 |
|
|
results = {}
|
76 |
fanzago |
1.26 |
tmp = ''
|
77 |
ewv |
1.21 |
|
78 |
fanzago |
1.19 |
lfn, to_copy = self.checkAvailableList()
|
79 |
spiga |
1.2 |
|
80 |
fanzago |
1.16 |
if (self.copy_local == 1):
|
81 |
|
|
outputDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
|
82 |
spiga |
1.20 |
common.logger.info("Copy file locally.\n\tOutput dir: %s"%outputDir)
|
83 |
fanzago |
1.16 |
dest = {"destinationDir": outputDir}
|
84 |
|
|
else:
|
85 |
fanzago |
1.19 |
if (self.dest_se != 'local'):
|
86 |
|
|
from PhEDExDatasvcInfo import PhEDExDatasvcInfo
|
87 |
|
|
phedexCfg={'storage_element': self.dest_se}
|
88 |
|
|
stageout = PhEDExDatasvcInfo(config=phedexCfg)
|
89 |
|
|
self.endpoint = stageout.getStageoutPFN()
|
90 |
fanzago |
1.26 |
|
91 |
|
|
if (str(lfn).find("/store/temp/") == 0 ):
|
92 |
|
|
tmp = lfn.replace("/store/temp/", "/", 1)
|
93 |
|
|
elif (str(lfn).find("/store/") == 0 ):
|
94 |
|
|
tmp = lfn.replace("/store/", "/", 1)
|
95 |
|
|
else:
|
96 |
|
|
tmp = lfn
|
97 |
|
|
|
98 |
|
|
common.logger.debug("Source LFN = %s"%lfn)
|
99 |
|
|
common.logger.debug("tmp LFN = %s"%tmp)
|
100 |
fanzago |
1.19 |
self.endpoint = self.endpoint + tmp
|
101 |
|
|
else:
|
102 |
|
|
self.endpoint = self.dest_endpoint
|
103 |
ewv |
1.21 |
|
104 |
spiga |
1.20 |
common.logger.info("Copy file to remote SE.\n\tEndpoint: %s"%self.endpoint)
|
105 |
fanzago |
1.26 |
|
106 |
fanzago |
1.19 |
dest = {"destination": self.endpoint}
|
107 |
ewv |
1.21 |
|
108 |
fanzago |
1.16 |
for key in to_copy.keys():
|
109 |
|
|
cmscpConfig = {
|
110 |
|
|
"source": key,
|
111 |
|
|
"inputFileList": to_copy[key],
|
112 |
fanzago |
1.27 |
"protocol": self.protocol,
|
113 |
fanzago |
1.28 |
#"debug":'1'
|
114 |
ewv |
1.21 |
}
|
115 |
fanzago |
1.16 |
cmscpConfig.update(dest)
|
116 |
fanzago |
1.26 |
|
117 |
|
|
common.logger.debug("Source = %s"%key)
|
118 |
|
|
common.logger.debug("Files = %s"%to_copy[key])
|
119 |
|
|
common.logger.debug("CmscpConfig = %s"%str(cmscpConfig))
|
120 |
ewv |
1.21 |
|
121 |
fanzago |
1.16 |
results.update(self.performCopy(cmscpConfig))
|
122 |
spiga |
1.1 |
return results
|
123 |
ewv |
1.21 |
|
124 |
fanzago |
1.17 |
def checkAvailableList(self):
|
125 |
spiga |
1.1 |
'''
|
126 |
ewv |
1.21 |
check if asked list of jobs
|
127 |
|
|
already produce output to move
|
128 |
|
|
returns a dictionary {with endpoint, fileList}
|
129 |
spiga |
1.1 |
'''
|
130 |
ewv |
1.21 |
|
131 |
fanzago |
1.18 |
common.logger.debug("CopyData in checkAvailableList() ")
|
132 |
ewv |
1.21 |
|
133 |
spiga |
1.1 |
task=common._db.getTask(self.nj_list)
|
134 |
|
|
allMatch={}
|
135 |
fanzago |
1.16 |
to_copy={}
|
136 |
spiga |
1.20 |
lfn=''
|
137 |
fanzago |
1.26 |
|
138 |
|
|
# loop over jobs
|
139 |
spiga |
1.1 |
for job in task.jobs:
|
140 |
fanzago |
1.16 |
InfileList = ''
|
141 |
fanzago |
1.23 |
if ( job.runningJob['state'] == 'Cleared' and ( job.runningJob[ 'wrapperReturnCode'] == 0 or job.runningJob[ 'wrapperReturnCode'] == 60308)):
|
142 |
ewv |
1.21 |
id_job = job['jobId']
|
143 |
spiga |
1.20 |
common.logger.log(10-1,"job_id = %s"%str(id_job))
|
144 |
fanzago |
1.26 |
endpoints = job.runningJob['storage']
|
145 |
|
|
output_files = job.runningJob['lfn']
|
146 |
|
|
for i in range(0,len(endpoints)):
|
147 |
|
|
endpoint = endpoints[i]
|
148 |
|
|
output_file_name = os.path.basename(output_files[i])
|
149 |
|
|
if to_copy.has_key(endpoint):
|
150 |
|
|
to_copy[endpoint] = to_copy[endpoint] + ',' + output_file_name
|
151 |
|
|
else:
|
152 |
|
|
to_copy[endpoint] = output_file_name
|
153 |
|
|
### the lfn should be the same for every file, the only difference could be the temp dir
|
154 |
|
|
### to test if this is ok as solution
|
155 |
|
|
lfn = os.path.dirname(output_files[0])
|
156 |
|
|
|
157 |
|
|
"""
|
158 |
fanzago |
1.17 |
output_files = job.runningJob['lfn']
|
159 |
spiga |
1.20 |
common.logger.log(10-1,"Output_files = %s"%str(job.runningJob['lfn']))
|
160 |
fanzago |
1.26 |
|
161 |
fanzago |
1.17 |
for file in output_files:
|
162 |
spiga |
1.20 |
InfileList += '%s,'%os.path.basename(file)
|
163 |
|
|
lfn = os.path.dirname(file)
|
164 |
|
|
common.logger.log(10-1,"InfileList = %s"%str(InfileList))
|
165 |
ewv |
1.21 |
if to_copy.has_key(endpoint):
|
166 |
fanzago |
1.16 |
to_copy[endpoint] = to_copy[endpoint] + InfileList
|
167 |
|
|
else:
|
168 |
|
|
to_copy[endpoint] = InfileList
|
169 |
fanzago |
1.26 |
"""
|
170 |
ewv |
1.21 |
|
171 |
spiga |
1.20 |
elif ( job.runningJob['state'] == 'Cleared' and job.runningJob['wrapperReturnCode'] != 0):
|
172 |
spiga |
1.13 |
common.logger.info("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
|
173 |
spiga |
1.1 |
%(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
|
174 |
ewv |
1.21 |
else:
|
175 |
spiga |
1.13 |
common.logger.info("Not possible copy outputs of Job # %s : Status is %s" \
|
176 |
spiga |
1.1 |
%(str(job['jobId']),str(job.runningJob['statusScheduler'])))
|
177 |
|
|
pass
|
178 |
|
|
|
179 |
fanzago |
1.16 |
if (len(to_copy) == 0) :
|
180 |
spiga |
1.5 |
raise CrabException("No files to copy")
|
181 |
ewv |
1.21 |
|
182 |
|
|
common.logger.debug(" to_copy = %s"%str(to_copy))
|
183 |
|
|
return lfn, to_copy
|
184 |
spiga |
1.1 |
|
185 |
|
|
def performCopy(self, dict):
|
186 |
|
|
"""
|
187 |
|
|
call the cmscp class and do the copy
|
188 |
|
|
"""
|
189 |
|
|
from cmscp import cmscp
|
190 |
fanzago |
1.27 |
self.doCopy = cmscp(dict)
|
191 |
spiga |
1.1 |
|
192 |
spiga |
1.13 |
common.logger.info("Starting copy...")
|
193 |
spiga |
1.1 |
|
194 |
|
|
start = time.time()
|
195 |
fanzago |
1.27 |
results = self.doCopy.run()
|
196 |
spiga |
1.1 |
stop = time.time()
|
197 |
|
|
|
198 |
spiga |
1.13 |
common.logger.debug("CopyLocal Time: "+str(stop - start))
|
199 |
spiga |
1.1 |
|
200 |
|
|
return results
|
201 |
|
|
|
202 |
|
|
def parseResults(self,results):
|
203 |
ewv |
1.21 |
'''
|
204 |
spiga |
1.1 |
take the results dictionary and
|
205 |
ewv |
1.21 |
print the results
|
206 |
spiga |
1.1 |
'''
|
207 |
fanzago |
1.27 |
self.doCopy.writeJsonFile(results)
|
208 |
ewv |
1.21 |
for file, dict in results.items() :
|
209 |
spiga |
1.1 |
if file:
|
210 |
ewv |
1.21 |
txt = 'success'
|
211 |
spiga |
1.1 |
if dict['erCode'] != '0': txt = 'failed'
|
212 |
spiga |
1.20 |
msg = 'Copy %s for file: %s '%(txt,file)
|
213 |
|
|
if txt == 'failed': msg += '\n\tCopy failed because : %s'%dict['reason']
|
214 |
spiga |
1.13 |
common.logger.info( msg )
|
215 |
ewv |
1.21 |
return
|