1 |
< |
#!/usr/bin/env python |
2 |
< |
|
1 |
> |
from Actor import * |
2 |
|
import urllib |
3 |
|
from xml.dom.minidom import parse |
4 |
|
from crab_exceptions import * |
5 |
+ |
from crab_logger import Logger |
6 |
+ |
from WorkSpace import * |
7 |
+ |
from urlparse import urlparse |
8 |
+ |
from LFNBaseName import * |
9 |
|
|
10 |
|
class PhEDExDatasvcInfo: |
11 |
< |
""" |
12 |
< |
provides information from PhEDEx Data Service |
13 |
< |
""" |
14 |
< |
## PhEDEx Data Service URL |
15 |
< |
#datasvc_url="https://cmsweb.cern.ch/phedex/test/datasvc/xml/prod" |
16 |
< |
datasvc_url="https://cmsweb.cern.ch/phedex/datasvc/xml/prod" |
11 |
> |
def __init__( self , cfg_params ): |
12 |
> |
|
13 |
> |
## PhEDEx Data Service URL |
14 |
> |
url="https://cmsweb.cern.ch/phedex/datasvc/xml/prod" |
15 |
> |
self.datasvc_url = cfg_params.get("USER.datasvc_url",url) |
16 |
> |
|
17 |
> |
self.FacOps_savannah = 'https://savannah.cern.ch/support/?func=additem&group=cmscompinfrasup' |
18 |
> |
stage_out_faq='https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabFaq#How_to_store_output_with_CRAB_2' |
19 |
> |
self.dataPub_faq = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabForPublication' |
20 |
> |
|
21 |
> |
self.srm_version = cfg_params.get("USER.srm_version",'srmv2') |
22 |
> |
self.node = cfg_params.get('USER.storage_element',None) |
23 |
> |
|
24 |
> |
|
25 |
> |
|
26 |
> |
self.user_lfn = cfg_params.get("USER.lfn",'') |
27 |
> |
self.publish_data = cfg_params.get("USER.publish_data",0) |
28 |
> |
self.usenamespace = cfg_params.get("USER.usenamespace",0) |
29 |
> |
self.user_remote_dir = cfg_params.get("USER.user_remote_dir",'') |
30 |
> |
if self.user_remote_dir: |
31 |
> |
if ( self.user_remote_dir[-1] != '/' ) : self.user_remote_dir = self.user_remote_dir + '/' |
32 |
> |
if self.user_lfn: |
33 |
> |
msg = 'Warning: lfn has been deprecated, CRAB will ignore it.\n' |
34 |
> |
msg += '\t Please use only user_remote_dir removing lfn from your crab.cfg\n' |
35 |
> |
msg += '\t For further information please visit : \n\t%s'%stage_out_faq |
36 |
> |
common.logger.message(msg) |
37 |
> |
|
38 |
> |
self.datasetpath = cfg_params.get("CMSSW.datasetpath") |
39 |
> |
self.publish_data_name = cfg_params.get('USER.publish_data_name','') |
40 |
> |
|
41 |
> |
self.user_port = cfg_params.get("USER.storage_port",'8443') |
42 |
> |
self.user_se_path = cfg_params.get("USER.storage_path",'') |
43 |
> |
if self.user_se_path: |
44 |
> |
if ( self.user_se_path[-1] != '/' ) : self.user_se_path = self.user_se_path + '/' |
45 |
> |
|
46 |
> |
#check if using "private" Storage |
47 |
> |
self.usePhedex = True |
48 |
> |
if not self.node : |
49 |
> |
msg = 'Please specify the storage_element name in your crab.cfg section [USER].\n' |
50 |
> |
msg +='\tFor further information please visit : %s'%stage_out_faq |
51 |
> |
raise CrabException(msg) |
52 |
> |
if (self.node.find('T1_') + self.node.find('T2_')+self.node.find('T3_')) == -3: self.usePhedex = False |
53 |
> |
|
54 |
> |
if not self.usePhedex and ( self.user_remote_dir == '' or self.user_se_path == '' ): |
55 |
> |
msg = 'You are asking to stage out without using CMS Storage Name convention. In this case you \n' |
56 |
> |
msg += '\t must specify both user_remote_dir and storage_path in the crab.cfg section [USER].\n ' |
57 |
> |
msg += '\t For further information please visit : \n\t%s'%stage_out_faq |
58 |
> |
raise CrabException(msg) |
59 |
> |
self.sched = common.scheduler.name().upper() |
60 |
> |
self.protocol = self.srm_version |
61 |
> |
if self.sched in ['CAF','LSF']: |
62 |
> |
self.protocol = 'direct' |
63 |
> |
self.SE = {'CAF':'caf.cern.ch', 'LSF':''} |
64 |
> |
|
65 |
> |
self.forced_path = '/store/user/' |
66 |
> |
if not self.usePhedex: |
67 |
> |
self.forced_path = self.user_remote_dir |
68 |
> |
return |
69 |
> |
|
70 |
> |
def getEndpoint(self): |
71 |
> |
''' |
72 |
> |
Return full SE endpoint and related infos |
73 |
> |
''' |
74 |
> |
self.lfn = self.getLFN() |
75 |
> |
|
76 |
> |
#extract the PFN for the given node,LFN,protocol |
77 |
> |
endpoint = self.getStageoutPFN() |
78 |
|
|
79 |
< |
def lfn2pfn(self,node,lfn,protocol): |
80 |
< |
""" |
81 |
< |
PhEDEx Data Service lfn2pfn call |
82 |
< |
|
83 |
< |
input: LFN,node name,protocol |
84 |
< |
returns: DOM object with the content of the PhEDEx Data Service call |
85 |
< |
""" |
86 |
< |
params = {'node' : node , 'lfn': lfn , 'protocol': protocol} |
87 |
< |
params = urllib.urlencode(params) |
88 |
< |
#print params |
89 |
< |
datasvc_lfn2pfn="%s/lfn2pfn"%self.datasvc_url |
90 |
< |
urlresults = urllib.urlopen(datasvc_lfn2pfn, params) |
91 |
< |
try: |
92 |
< |
urlresults = parse(urlresults) |
93 |
< |
except: |
94 |
< |
urlresults = None |
95 |
< |
return urlresults |
96 |
< |
|
97 |
< |
def parse_error(self,urlresults): |
98 |
< |
""" |
99 |
< |
look for errors in the DOM object returned by PhEDEx Data Service call |
100 |
< |
""" |
101 |
< |
errormsg = None |
102 |
< |
errors=urlresults.getElementsByTagName('error') |
103 |
< |
for error in errors: |
104 |
< |
errormsg=error.childNodes[0].data |
105 |
< |
if len(error.childNodes)>1: |
106 |
< |
errormsg+=error.childNodes[1].data |
107 |
< |
return errormsg |
108 |
< |
|
109 |
< |
def parse_lfn2pfn(self,urlresults): |
110 |
< |
""" |
111 |
< |
Parse the content of the result of lfn2pfn PhEDEx Data Service call |
112 |
< |
|
113 |
< |
input: DOM object with the content of the lfn2pfn call |
114 |
< |
returns: PFN |
115 |
< |
""" |
116 |
< |
result = urlresults.getElementsByTagName('phedex') |
117 |
< |
if not result: |
118 |
< |
return [] |
119 |
< |
result = result[0] |
120 |
< |
pfn = None |
121 |
< |
mapping = result.getElementsByTagName('mapping') |
122 |
< |
for m in mapping: |
123 |
< |
pfn=m.getAttribute("pfn") |
124 |
< |
if pfn: |
125 |
< |
return pfn |
126 |
< |
|
127 |
< |
def getStageoutPFN(self,node,lfn,protocol): |
128 |
< |
""" |
129 |
< |
input: LFN,node name,protocol |
130 |
< |
returns: PFN |
131 |
< |
""" |
132 |
< |
fullurl="%s/lfn2pfn?node=%s&lfn=%s&protocol=%s"%(self.datasvc_url,node,lfn,protocol) |
133 |
< |
domlfn2pfn = self.lfn2pfn(node,lfn,protocol) |
134 |
< |
if not domlfn2pfn : |
135 |
< |
msg="Unable to get info from %s"%fullurl |
136 |
< |
raise CrabException(msg) |
137 |
< |
|
138 |
< |
errormsg = self.parse_error(domlfn2pfn) |
139 |
< |
if errormsg: |
140 |
< |
msg="Error extracting info from %s due to: %s"%(fullurl,errormsg) |
141 |
< |
raise CrabException(msg) |
142 |
< |
|
143 |
< |
stageoutpfn = self.parse_lfn2pfn(domlfn2pfn) |
144 |
< |
if not stageoutpfn: |
145 |
< |
msg="Unable to get stageout path (PFN) from %s"%fullurl |
146 |
< |
raise CrabException(msg) |
147 |
< |
return stageoutpfn |
148 |
< |
|
149 |
< |
|
150 |
< |
if __name__ == '__main__' : |
151 |
< |
""" |
152 |
< |
""" |
153 |
< |
from crab_logger import Logger |
154 |
< |
from WorkSpace import * |
155 |
< |
continue_dir="/home/fanfani/CRAB" |
156 |
< |
cfg_params={'USER.logdir' : continue_dir } |
157 |
< |
common.work_space = WorkSpace(continue_dir, cfg_params) |
158 |
< |
log = Logger() |
159 |
< |
common.logger = log |
160 |
< |
|
161 |
< |
from LFNBaseName import * |
162 |
< |
# test values |
163 |
< |
lfn = LFNBase("datasetstring") |
164 |
< |
node='T2_IT_Bari' |
165 |
< |
protocol="srmv2" |
166 |
< |
|
167 |
< |
#create an instance of the PhEDExDatasvcInfo object |
168 |
< |
dsvc = PhEDExDatasvcInfo() |
169 |
< |
#extract the PFN for the given node,LFN,protocol |
170 |
< |
print "Stageout to %s"%dsvc.getStageoutPFN(node,lfn,protocol) |
79 |
> |
#extract SE name an SE_PATH (needed for publication) |
80 |
> |
SE, SE_PATH, User = self.splitEndpoint(endpoint) |
81 |
> |
|
82 |
> |
return endpoint, self.lfn , SE, SE_PATH, User |
83 |
> |
|
84 |
> |
def splitEndpoint(self, endpoint): |
85 |
> |
''' |
86 |
> |
Return relevant infos from endpoint |
87 |
> |
''' |
88 |
> |
SE = '' |
89 |
> |
SE_PATH = '' |
90 |
> |
USER = '' |
91 |
> |
if self.usePhedex: |
92 |
> |
if self.protocol == 'direct': |
93 |
> |
query=endpoint |
94 |
> |
SE_PATH = endpoint |
95 |
> |
SE = self.SE[self.sched] |
96 |
> |
else: |
97 |
> |
url = 'http://'+endpoint.split('://')[1] |
98 |
> |
# python > 2.4 |
99 |
> |
# SE = urlparse(url).hostname |
100 |
> |
scheme, host, path, params, query, fragment = urlparse(url) |
101 |
> |
SE = host.split(':')[0] |
102 |
> |
SE_PATH = endpoint.split(host)[1] |
103 |
> |
USER = (query.split('user')[1]).split('/')[1] |
104 |
> |
else: |
105 |
> |
#### to test ##### |
106 |
> |
url = 'http://'+endpoint.split('://')[1] |
107 |
> |
scheme, host, path, params, query, fragment = urlparse(url) |
108 |
> |
SE = host.split(':')[0] |
109 |
> |
SE_PATH = endpoint.split(host)[1] |
110 |
> |
#SE = self.node |
111 |
> |
#SE_PATH = self.user_se_path + self.user_remote_dir |
112 |
> |
try: |
113 |
> |
USER = (self.lfn.split('user')[1]).split('/')[1] |
114 |
> |
except: |
115 |
> |
pass |
116 |
> |
|
117 |
> |
return SE, SE_PATH, USER |
118 |
> |
|
119 |
> |
|
120 |
> |
def getLFN(self): |
121 |
> |
""" |
122 |
> |
define the LFN composing the needed pieces |
123 |
> |
""" |
124 |
> |
lfn = '' |
125 |
> |
l_User = False |
126 |
> |
if not self.usePhedex and (int(self.publish_data) == 0 and int(self.usenamespace) == 0) : |
127 |
> |
### add here check if user is trying to force a wrong LFN using a T2 TODO |
128 |
> |
## check if storage_name is a T2 (siteDB query) |
129 |
> |
## if yes :match self.user_lfn with LFNBaseName... |
130 |
> |
## if NOT : raise (you are using a T2. It's not allowed stage out into self.user_path+self.user_lfn) |
131 |
> |
lfn = self.user_remote_dir |
132 |
> |
return lfn |
133 |
> |
if self.publish_data_name == '' and int(self.publish_data) == 1: |
134 |
> |
msg = "Error. The [USER] section does not have 'publish_data_name'\n" |
135 |
> |
msg += '\tFor further information please visit : \n\t%s'%self.dataPub_faq |
136 |
> |
raise CrabException(msg) |
137 |
> |
if self.publish_data_name == '' and int(self.usenamespace) == 1: |
138 |
> |
self.publish_data_name = "DefaultDataset" |
139 |
> |
if int(self.publish_data) == 1 or int(self.usenamespace) == 1: |
140 |
> |
if self.sched in ['CAF']: l_User=True |
141 |
> |
primaryDataset = self.computePrimaryDataset() |
142 |
> |
lfn = LFNBase(self.forced_path, primaryDataset, self.publish_data_name, LocalUser=l_User) + '/${PSETHASH}/' |
143 |
> |
else: |
144 |
> |
if self.sched in ['CAF','LSF']: l_User=True |
145 |
> |
lfn = LFNBase(self.forced_path,self.user_remote_dir,LocalUser=l_User) |
146 |
> |
return lfn |
147 |
> |
|
148 |
> |
def computePrimaryDataset(self): |
149 |
> |
""" |
150 |
> |
compute the last part for the LFN in case of publication |
151 |
> |
""" |
152 |
> |
if (self.datasetpath.upper() != 'NONE'): |
153 |
> |
primarydataset = self.datasetpath.split("/")[1] |
154 |
> |
else: |
155 |
> |
primarydataset = self.publish_data_name |
156 |
> |
return primarydataset |
157 |
> |
|
158 |
> |
def lfn2pfn(self): |
159 |
> |
""" |
160 |
> |
PhEDEx Data Service lfn2pfn call |
161 |
> |
|
162 |
> |
input: LFN,node name,protocol |
163 |
> |
returns: DOM object with the content of the PhEDEx Data Service call |
164 |
> |
""" |
165 |
> |
params = {'node' : self.node , 'lfn': self.lfn , 'protocol': self.protocol} |
166 |
> |
params = urllib.urlencode(params) |
167 |
> |
datasvc_lfn2pfn="%s/lfn2pfn"%self.datasvc_url |
168 |
> |
urlresults = urllib.urlopen(datasvc_lfn2pfn, params) |
169 |
> |
try: |
170 |
> |
urlresults = parse(urlresults) |
171 |
> |
except: |
172 |
> |
urlresults = None |
173 |
> |
|
174 |
> |
return urlresults |
175 |
> |
|
176 |
> |
def parse_error(self,urlresults): |
177 |
> |
""" |
178 |
> |
look for errors in the DOM object returned by PhEDEx Data Service call |
179 |
> |
""" |
180 |
> |
errormsg = None |
181 |
> |
errors=urlresults.getElementsByTagName('error') |
182 |
> |
for error in errors: |
183 |
> |
errormsg=error.childNodes[0].data |
184 |
> |
if len(error.childNodes)>1: |
185 |
> |
errormsg+=error.childNodes[1].data |
186 |
> |
return errormsg |
187 |
> |
|
188 |
> |
def parse_lfn2pfn(self,urlresults): |
189 |
> |
""" |
190 |
> |
Parse the content of the result of lfn2pfn PhEDEx Data Service call |
191 |
|
|
192 |
+ |
input: DOM object with the content of the lfn2pfn call |
193 |
+ |
returns: PFN |
194 |
+ |
""" |
195 |
+ |
result = urlresults.getElementsByTagName('phedex') |
196 |
+ |
|
197 |
+ |
if not result: |
198 |
+ |
return [] |
199 |
+ |
result = result[0] |
200 |
+ |
pfn = None |
201 |
+ |
mapping = result.getElementsByTagName('mapping') |
202 |
+ |
for m in mapping: |
203 |
+ |
pfn=m.getAttribute("pfn") |
204 |
+ |
if pfn: |
205 |
+ |
return pfn |
206 |
+ |
|
207 |
+ |
def getStageoutPFN( self ): |
208 |
+ |
""" |
209 |
+ |
input: LFN,node name,protocol |
210 |
+ |
returns: PFN |
211 |
+ |
""" |
212 |
+ |
if self.usePhedex: |
213 |
+ |
fullurl="%s/lfn2pfn?node=%s&lfn=%s&protocol=%s"%(self.datasvc_url,self.node,self.lfn,self.protocol) |
214 |
+ |
domlfn2pfn = self.lfn2pfn() |
215 |
+ |
if not domlfn2pfn : |
216 |
+ |
msg="Unable to get info from %s"%fullurl |
217 |
+ |
raise CrabException(msg) |
218 |
+ |
|
219 |
+ |
errormsg = self.parse_error(domlfn2pfn) |
220 |
+ |
if errormsg: |
221 |
+ |
msg="Error extracting info from %s due to: %s"%(fullurl,errormsg) |
222 |
+ |
raise CrabException(msg) |
223 |
+ |
|
224 |
+ |
stageoutpfn = self.parse_lfn2pfn(domlfn2pfn) |
225 |
+ |
if not stageoutpfn: |
226 |
+ |
msg ='Unable to get stageout path from TFC at Site %s \n'%self.node |
227 |
+ |
msg+=' Please alert the CompInfraSup group through their savannah %s \n'%self.FacOps_savannah |
228 |
+ |
msg+=' reporting: \n' |
229 |
+ |
msg+=' Summary: Unable to get user stageout from TFC at Site %s \n'%self.node |
230 |
+ |
msg+=' OriginalSubmission: stageout path is not retrieved from %s \n'%fullurl |
231 |
+ |
raise CrabException(msg) |
232 |
+ |
else: |
233 |
+ |
if self.sched in ['CAF','LSF'] : |
234 |
+ |
stageoutpfn = self.user_se_path+self.lfn |
235 |
+ |
else: |
236 |
+ |
stageoutpfn = 'srm://'+self.node+':'+self.user_port+self.user_se_path+self.lfn |
237 |
+ |
|
238 |
+ |
return stageoutpfn |
239 |
+ |
|
240 |
+ |
|
241 |
+ |
|
242 |
+ |
if __name__ == '__main__': |
243 |
+ |
""" |
244 |
+ |
Sort of unit testing to check Phedex API for whatever site and/or lfn. |
245 |
+ |
Usage: |
246 |
+ |
python PhEDExDatasvcInfo.py --node T2_IT_Bari --lfn /store/maremma |
247 |
+ |
|
248 |
+ |
""" |
249 |
+ |
import getopt,sys |
250 |
+ |
from crab_util import * |
251 |
+ |
import common |
252 |
+ |
klass_name = 'SchedulerGlite' |
253 |
+ |
klass = importName(klass_name, klass_name) |
254 |
+ |
common.scheduler = klass() |
255 |
+ |
|
256 |
+ |
lfn="/store/user/" |
257 |
+ |
node='T2_IT_Bari' |
258 |
+ |
valid = ['node=','lfn='] |
259 |
+ |
try: |
260 |
+ |
opts, args = getopt.getopt(sys.argv[1:], "", valid) |
261 |
+ |
except getopt.GetoptError, ex: |
262 |
+ |
print str(ex) |
263 |
+ |
sys.exit(1) |
264 |
+ |
for o, a in opts: |
265 |
+ |
if o == "--node": |
266 |
+ |
node = a |
267 |
+ |
if o == "--lfn": |
268 |
+ |
lfn = a |
269 |
+ |
|
270 |
+ |
mycfg_params = { 'USER.storage_element': node } |
271 |
+ |
dsvc = PhEDExDatasvcInfo(mycfg_params) |
272 |
+ |
dsvc.lfn = lfn |
273 |
+ |
print dsvc.getStageoutPFN() |
274 |
+ |
|