2 |
|
import urllib |
3 |
|
from xml.dom.minidom import parse |
4 |
|
from crab_exceptions import * |
5 |
– |
from crab_logger import Logger |
5 |
|
from WorkSpace import * |
6 |
|
from urlparse import urlparse |
7 |
|
from LFNBaseName import * |
8 |
+ |
from crab_util import getUserName |
9 |
|
|
10 |
|
class PhEDExDatasvcInfo: |
11 |
< |
def __init__( self , cfg_params ): |
11 |
> |
def __init__( self , cfg_params=None, config=None ): |
12 |
|
|
13 |
|
## PhEDEx Data Service URL |
14 |
< |
url="https://cmsweb.cern.ch/phedex/datasvc/xml/prod" |
15 |
< |
self.datasvc_url = cfg_params.get("USER.datasvc_url",url) |
14 |
> |
self.datasvc_url="https://cmsweb.cern.ch/phedex/datasvc/xml/prod" |
15 |
|
|
16 |
|
self.FacOps_savannah = 'https://savannah.cern.ch/support/?func=additem&group=cmscompinfrasup' |
17 |
+ |
self.stage_out_faq='https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabHowTo#Stageout_and_publication' |
18 |
+ |
self.dataPub_faq = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabForPublication' |
19 |
|
|
20 |
+ |
self.usePhedex = True |
21 |
+ |
self.sched = common.scheduler.name().upper() |
22 |
+ |
|
23 |
+ |
if config!=None: |
24 |
+ |
self.checkConfig(config) |
25 |
+ |
else: |
26 |
+ |
self.checkCfgConfig(cfg_params) |
27 |
|
|
28 |
+ |
self.protocol = self.srm_version |
29 |
+ |
|
30 |
+ |
|
31 |
+ |
def checkConfig(self,config): |
32 |
+ |
""" |
33 |
+ |
""" |
34 |
+ |
self.srm_version = config.get("srm_version",'srmv2') |
35 |
+ |
self.node = config.get('storage_element',None) |
36 |
+ |
self.lfn='/store/' |
37 |
+ |
|
38 |
+ |
def checkCfgConfig(self,cfg_params): |
39 |
+ |
""" |
40 |
+ |
""" |
41 |
+ |
self.datasvc_url = cfg_params.get("USER.datasvc_url",self.datasvc_url) |
42 |
|
self.srm_version = cfg_params.get("USER.srm_version",'srmv2') |
43 |
|
self.node = cfg_params.get('USER.storage_element',None) |
44 |
< |
|
44 |
> |
|
45 |
|
self.publish_data = cfg_params.get("USER.publish_data",0) |
46 |
|
self.usenamespace = cfg_params.get("USER.usenamespace",0) |
47 |
|
self.user_remote_dir = cfg_params.get("USER.user_remote_dir",'') |
48 |
|
if self.user_remote_dir: |
49 |
|
if ( self.user_remote_dir[-1] != '/' ) : self.user_remote_dir = self.user_remote_dir + '/' |
50 |
< |
|
50 |
> |
|
51 |
|
self.datasetpath = cfg_params.get("CMSSW.datasetpath") |
52 |
|
self.publish_data_name = cfg_params.get('USER.publish_data_name','') |
53 |
|
|
54 |
< |
self.user_lfn = cfg_params.get("USER.lfn",'') |
55 |
< |
if self.user_lfn: |
34 |
< |
if ( self.user_lfn[-1] != '/' ) : self.user_lfn = self.user_lfn + '/' |
35 |
< |
|
54 |
> |
self.pset = cfg_params.get('CMSSW.pset',None) |
55 |
> |
|
56 |
|
self.user_port = cfg_params.get("USER.storage_port",'8443') |
57 |
|
self.user_se_path = cfg_params.get("USER.storage_path",'') |
58 |
|
if self.user_se_path: |
59 |
|
if ( self.user_se_path[-1] != '/' ) : self.user_se_path = self.user_se_path + '/' |
60 |
|
|
41 |
– |
|
61 |
|
#check if using "private" Storage |
43 |
– |
self.usePhedex = True |
44 |
– |
stage_out_faq='https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabFaq#How_to_store_output_with_CRAB_2' |
62 |
|
if not self.node : |
63 |
|
msg = 'Please specify the storage_element name in your crab.cfg section [USER].\n' |
64 |
< |
msg +=' For further information please visit : %s'%stage_out_faq |
64 |
> |
msg +='\tFor further information please visit : %s'%self.stage_out_faq |
65 |
|
raise CrabException(msg) |
66 |
|
if (self.node.find('T1_') + self.node.find('T2_')+self.node.find('T3_')) == -3: self.usePhedex = False |
67 |
< |
if not self.usePhedex and ( self.user_lfn == '' or self.user_se_path == '' ): |
68 |
< |
msg = 'You are asking to stage out without using CMS Storage Name convention. In this case you \n' |
69 |
< |
msg += ' must specify both lfn and storage_path in the crab.cfg section [USER].\n ' |
70 |
< |
msg += ' For further information please visit : %s'%stage_out_faq |
67 |
> |
|
68 |
> |
if not self.usePhedex and ( self.user_remote_dir == '' or self.user_se_path == '' ): |
69 |
> |
####### FEDE FOR BUG 73010 ############ |
70 |
> |
msg = 'Error: task ' + common.work_space._top_dir + ' not correctly created. Please remove it. \n' |
71 |
> |
msg += ' You are asking to stage out without using CMS Storage Name convention. In this case you \n' |
72 |
> |
msg += ' must specify both user_remote_dir and storage_path in the crab.cfg section [USER].\n' |
73 |
> |
msg += ' For further information please visit : \n\t%s'%self.stage_out_faq |
74 |
> |
task = common._db.getTask() |
75 |
> |
#add = '\n\n' |
76 |
> |
#import shutil |
77 |
> |
#try: |
78 |
> |
# add += ' Task not correctly created: removing the working_dir ' + common.work_space._top_dir + ' \n' |
79 |
> |
# shutil.rmtree(common.work_space._top_dir) |
80 |
> |
#except OSError: |
81 |
> |
# add += ' Warning: problems removing the working_dir ' + common.work_space._top_dir + ' \n' |
82 |
> |
# add += ' Please remove it by hand' |
83 |
> |
#msg += add |
84 |
|
raise CrabException(msg) |
55 |
– |
self.sched = common.scheduler.name().upper() |
56 |
– |
self.protocol = self.srm_version |
57 |
– |
if self.sched in ['CAF','LSF']:self.protocol = 'direct' |
85 |
|
|
86 |
+ |
self.forced_path = '/store/user/' |
87 |
+ |
if self.sched in ['LSF','PBS']: |
88 |
+ |
self.srm_version = 'direct' |
89 |
+ |
self.SE = {'LSF':'', 'PBS':''} |
90 |
+ |
|
91 |
+ |
if self.sched == 'CAF': |
92 |
+ |
#### FEDE TEST FOR XROOTD |
93 |
+ |
######### first solution ################ |
94 |
+ |
#eos = cfg_params.get("USER.caf_eos_area", 0) |
95 |
+ |
#if eos == 0: |
96 |
+ |
# self.forced_path = '/store/caf/user/' |
97 |
+ |
#else: |
98 |
+ |
# self.forced_path = '/store/eos/user' |
99 |
+ |
######################################### |
100 |
+ |
######### second solution ############### |
101 |
+ |
self.forced_path = cfg_params.get("USER.caf_lfn", '/store/caf/user') |
102 |
+ |
######################################### |
103 |
+ |
#print "--->>> FORCING THE FIRST PART OF LFN WITH ", self.forced_path |
104 |
+ |
self.SE = {'CAF':'caf.cern.ch'} |
105 |
+ |
self.srm_version = 'stageout' |
106 |
+ |
#print "--->>> query with 'stageout' " |
107 |
+ |
######################################### |
108 |
+ |
|
109 |
+ |
if not self.usePhedex: |
110 |
+ |
self.forced_path = self.user_remote_dir |
111 |
|
return |
112 |
|
|
113 |
|
def getEndpoint(self): |
118 |
|
|
119 |
|
#extract the PFN for the given node,LFN,protocol |
120 |
|
endpoint = self.getStageoutPFN() |
121 |
+ |
if ( endpoint[-1] != '/' ) : endpoint = endpoint + '/' |
122 |
|
|
123 |
|
#extract SE name an SE_PATH (needed for publication) |
124 |
|
SE, SE_PATH, User = self.splitEndpoint(endpoint) |
125 |
|
|
126 |
+ |
#### FEDE FOR XROOTD ##### |
127 |
+ |
#print "in getEndpoint di PhEDExDatasvcInfo.py: " |
128 |
+ |
#print " SE = ", SE |
129 |
+ |
#print " SE_PATH = ", SE_PATH |
130 |
+ |
#print " User = ", User |
131 |
+ |
#print " endpoint = ", endpoint |
132 |
+ |
############################## |
133 |
+ |
|
134 |
|
return endpoint, self.lfn , SE, SE_PATH, User |
135 |
|
|
136 |
|
def splitEndpoint(self, endpoint): |
139 |
|
''' |
140 |
|
SE = '' |
141 |
|
SE_PATH = '' |
142 |
< |
USER = '' |
142 |
> |
USER = getUserName() |
143 |
|
if self.usePhedex: |
144 |
< |
if self.protocol == 'direct': |
145 |
< |
query=endpoint |
146 |
< |
SE_PATH = endpoint |
147 |
< |
### FEDE added SE ### |
148 |
< |
SE = self.sched |
144 |
> |
### FEDE PER TEST WITH XROOTD |
145 |
> |
if (self.protocol == 'direct' or self.protocol == 'stageout'): |
146 |
> |
SE = self.SE[self.sched] |
147 |
> |
SE_PATH = endpoint |
148 |
> |
############################# |
149 |
> |
#print " SE_PATH = ", SE_PATH |
150 |
|
else: |
151 |
|
url = 'http://'+endpoint.split('://')[1] |
90 |
– |
# python > 2.4 |
91 |
– |
# SE = urlparse(url).hostname |
152 |
|
scheme, host, path, params, query, fragment = urlparse(url) |
153 |
< |
SE = host.split(':')[0] |
153 |
> |
SE = self.getAuthoritativeSE() |
154 |
|
SE_PATH = endpoint.split(host)[1] |
95 |
– |
USER = (query.split('user')[1]).split('/')[1] |
155 |
|
else: |
156 |
|
SE = self.node |
157 |
< |
SE_PATH = self.user_se_path + self.user_lfn |
158 |
< |
try: |
159 |
< |
USER = (self.lfn.split('user')[1]).split('/')[1] |
160 |
< |
except: |
161 |
< |
pass |
162 |
< |
|
157 |
> |
SE_PATH = self.user_se_path + self.user_remote_dir |
158 |
> |
if self.lfn.find('group') != -1: |
159 |
> |
try: |
160 |
> |
USER = (self.lfn.split('group')[1]).split('/')[1] |
161 |
> |
except: |
162 |
> |
pass |
163 |
|
return SE, SE_PATH, USER |
105 |
– |
|
164 |
|
|
165 |
|
def getLFN(self): |
166 |
|
""" |
173 |
|
## check if storage_name is a T2 (siteDB query) |
174 |
|
## if yes :match self.user_lfn with LFNBaseName... |
175 |
|
## if NOT : raise (you are using a T2. It's not allowed stage out into self.user_path+self.user_lfn) |
176 |
< |
lfn = self.user_lfn |
176 |
> |
lfn = self.user_remote_dir |
177 |
|
return lfn |
178 |
|
if self.publish_data_name == '' and int(self.publish_data) == 1: |
179 |
< |
msg = "Eeror. The [USER] section does not have 'publish_data_name'" |
179 |
> |
msg = "Error. The [USER] section does not have 'publish_data_name'\n" |
180 |
> |
msg += '\tFor further information please visit : \n\t%s'%self.dataPub_faq |
181 |
|
raise CrabException(msg) |
182 |
|
if self.publish_data_name == '' and int(self.usenamespace) == 1: |
183 |
|
self.publish_data_name = "DefaultDataset" |
184 |
< |
if int(self.publish_data) == 1 or int(self.usenamespace) == 1: |
184 |
> |
if int(self.publish_data) == 1: |
185 |
|
if self.sched in ['CAF']: l_User=True |
186 |
|
primaryDataset = self.computePrimaryDataset() |
187 |
< |
#lfn = LFNBase(primaryDataset,self.publish_data_name,LocalUser=l_User) + '/${PSETHASH}/' |
188 |
< |
if self.usePhedex: |
189 |
< |
lfn = LFNBase('/store/user', primaryDataset, self.publish_data_name, LocalUser=l_User) + '/${PSETHASH}/' |
190 |
< |
else: |
191 |
< |
lfn = LFNBase(self.user_lfn, primaryDataset, self.publish_data_name, LocalUser=l_User) + '/${PSETHASH}/' |
187 |
> |
### added the case lfn = LFNBase(self.forced_path, primaryDataset, self.publish_data_name, publish=True) |
188 |
> |
### for the publication in order to be able to check the lfn length |
189 |
> |
lfn = LFNBase(self.forced_path, primaryDataset, self.publish_data_name, publish=True) + '/${PSETHASH}/' |
190 |
> |
elif int(self.usenamespace) == 1: |
191 |
> |
if self.sched in ['CAF']: l_User=True |
192 |
> |
primaryDataset = self.computePrimaryDataset() |
193 |
> |
lfn = LFNBase(self.forced_path, primaryDataset, self.publish_data_name) + '/${PSETHASH}/' |
194 |
|
else: |
195 |
|
if self.sched in ['CAF','LSF']: l_User=True |
196 |
< |
lfn = LFNBase(self.user_remote_dir,LocalUser=l_User) |
196 |
> |
lfn = LFNBase(self.forced_path,self.user_remote_dir) |
197 |
> |
|
198 |
> |
if lfn.find('${PSETHASH}')>1: |
199 |
> |
psethash = runCommand('edmConfigHash < %s| tail -1'%self.pset) |
200 |
> |
lfn = string.replace(lfn,'${PSETHASH}',string.strip(psethash)) |
201 |
> |
if ( lfn[-1] != '/' ) : lfn = lfn + '/' |
202 |
> |
|
203 |
|
return lfn |
204 |
|
|
205 |
|
def computePrimaryDataset(self): |
212 |
|
primarydataset = self.publish_data_name |
213 |
|
return primarydataset |
214 |
|
|
215 |
< |
def lfn2pfn(self): |
215 |
> |
def domPhedex(self,params,datasvc_baseUrl): |
216 |
|
""" |
217 |
|
PhEDEx Data Service lfn2pfn call |
218 |
|
|
219 |
< |
input: LFN,node name,protocol |
219 |
> |
input: params,datasvc_baseUrl |
220 |
|
returns: DOM object with the content of the PhEDEx Data Service call |
221 |
|
""" |
155 |
– |
params = {'node' : self.node , 'lfn': self.lfn , 'protocol': self.protocol} |
222 |
|
params = urllib.urlencode(params) |
157 |
– |
datasvc_lfn2pfn="%s/lfn2pfn"%self.datasvc_url |
158 |
– |
urlresults = urllib.urlopen(datasvc_lfn2pfn, params) |
223 |
|
try: |
224 |
+ |
urlresults = urllib.urlopen(datasvc_baseUrl, params) |
225 |
|
urlresults = parse(urlresults) |
226 |
+ |
except IOError: |
227 |
+ |
msg="Unable to access PhEDEx Data Service at %s"%datasvc_baseUrl |
228 |
+ |
raise CrabException(msg) |
229 |
|
except: |
230 |
|
urlresults = None |
231 |
|
|
268 |
|
returns: PFN |
269 |
|
""" |
270 |
|
if self.usePhedex: |
271 |
+ |
params = {'node' : self.node , 'lfn': self.lfn , 'protocol': self.protocol} |
272 |
+ |
datasvc_lfn2pfn="%s/lfn2pfn"%self.datasvc_url |
273 |
|
fullurl="%s/lfn2pfn?node=%s&lfn=%s&protocol=%s"%(self.datasvc_url,self.node,self.lfn,self.protocol) |
274 |
< |
domlfn2pfn = self.lfn2pfn() |
274 |
> |
#print "--->>> fullurl = ", fullurl |
275 |
> |
domlfn2pfn = self.domPhedex(params,datasvc_lfn2pfn) |
276 |
|
if not domlfn2pfn : |
277 |
|
msg="Unable to get info from %s"%fullurl |
278 |
|
raise CrabException(msg) |
291 |
|
msg+=' OriginalSubmission: stageout path is not retrieved from %s \n'%fullurl |
292 |
|
raise CrabException(msg) |
293 |
|
else: |
294 |
< |
if self.sched in ['CAF','LSF'] : |
294 |
> |
if self.sched in ['CAF','LSF','PBS'] : |
295 |
|
stageoutpfn = self.user_se_path+self.lfn |
296 |
|
else: |
297 |
|
stageoutpfn = 'srm://'+self.node+':'+self.user_port+self.user_se_path+self.lfn |
298 |
|
|
299 |
+ |
if ( stageoutpfn[-1] != '/' ) : stageoutpfn = stageoutpfn + '/' |
300 |
|
return stageoutpfn |
301 |
|
|
302 |
+ |
def getAuthoritativeSE(self): |
303 |
+ |
""" |
304 |
+ |
input: node name |
305 |
+ |
returns: AuthoritativeSE |
306 |
+ |
""" |
307 |
+ |
params = {'node' : self.node } |
308 |
+ |
datasvc_nodes="%s/nodes"%self.datasvc_url |
309 |
+ |
fullurl="%s/nodes/?node=%s"%(self.datasvc_url,self.node) |
310 |
+ |
domnodes = self.domPhedex(params,datasvc_nodes) |
311 |
+ |
|
312 |
+ |
if not domnodes : |
313 |
+ |
msg="Unable to get info from %s"%fullurl |
314 |
+ |
raise CrabException(msg) |
315 |
+ |
|
316 |
+ |
errormsg = self.parse_error(domnodes) |
317 |
+ |
if errormsg: |
318 |
+ |
msg="Error extracting info from %s due to: %s"%(fullurl,errormsg) |
319 |
+ |
raise CrabException(msg) |
320 |
+ |
result = domnodes.getElementsByTagName('phedex') |
321 |
+ |
if not result: |
322 |
+ |
return [] |
323 |
+ |
result = result[0] |
324 |
+ |
se = None |
325 |
+ |
node = result.getElementsByTagName('node') |
326 |
+ |
for m in node: |
327 |
+ |
se=m.getAttribute("se") |
328 |
+ |
if se: |
329 |
+ |
return se |
330 |
|
|
331 |
|
|
332 |
|
if __name__ == '__main__': |