ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PhEDExDatasvcInfo.py
Revision: 1.46
Committed: Fri Mar 9 01:12:48 2012 UTC (13 years, 1 month ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_2_pre1, CRAB_2_8_1
Changes since 1.45: +6 -5 lines
Log Message:
fix for https://savannah.cern.ch/bugs/?91893

File Contents

# User Rev Content
1 spiga 1.3 from Actor import *
2 afanfani 1.1 import urllib
3     from xml.dom.minidom import parse
4     from crab_exceptions import *
5 spiga 1.3 from WorkSpace import *
6     from urlparse import urlparse
7     from LFNBaseName import *
8 fanzago 1.36 from crab_util import getUserName
9 afanfani 1.1
10     class PhEDExDatasvcInfo:
11 fanzago 1.28 def __init__( self , cfg_params=None, config=None ):
12 spiga 1.3
13     ## PhEDEx Data Service URL
14 belforte 1.39 self.datasvc_url="https://cmsweb.cern.ch/phedex/datasvc/xml/prod"
15 spiga 1.3
16 afanfani 1.6 self.FacOps_savannah = 'https://savannah.cern.ch/support/?func=additem&group=cmscompinfrasup'
17 spiga 1.34 self.stage_out_faq='https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabHowTo#Stageout_and_publication'
18 spiga 1.17 self.dataPub_faq = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabForPublication'
19 spiga 1.3
20 fanzago 1.28 self.usePhedex = True
21 spiga 1.29 self.sched = common.scheduler.name().upper()
22 fanzago 1.28
23     if config!=None:
24     self.checkConfig(config)
25     else:
26     self.checkCfgConfig(cfg_params)
27    
28     self.protocol = self.srm_version
29    
30    
31     def checkConfig(self,config):
32     """
33     """
34     self.srm_version = config.get("srm_version",'srmv2')
35     self.node = config.get('storage_element',None)
36     self.lfn='/store/'
37    
38     def checkCfgConfig(self,cfg_params):
39     """
40     """
41     self.datasvc_url = cfg_params.get("USER.datasvc_url",self.datasvc_url)
42 spiga 1.3 self.srm_version = cfg_params.get("USER.srm_version",'srmv2')
43     self.node = cfg_params.get('USER.storage_element',None)
44 spiga 1.17
45 spiga 1.3 self.publish_data = cfg_params.get("USER.publish_data",0)
46     self.usenamespace = cfg_params.get("USER.usenamespace",0)
47 fanzago 1.10 self.user_remote_dir = cfg_params.get("USER.user_remote_dir",'')
48 fanzago 1.7 if self.user_remote_dir:
49     if ( self.user_remote_dir[-1] != '/' ) : self.user_remote_dir = self.user_remote_dir + '/'
50 spiga 1.17
51 spiga 1.3 self.datasetpath = cfg_params.get("CMSSW.datasetpath")
52     self.publish_data_name = cfg_params.get('USER.publish_data_name','')
53    
54 spiga 1.45 self.pset = cfg_params.get('CMSSW.pset',None)
55    
56 spiga 1.8 self.user_port = cfg_params.get("USER.storage_port",'8443')
57 spiga 1.3 self.user_se_path = cfg_params.get("USER.storage_path",'')
58 fanzago 1.7 if self.user_se_path:
59     if ( self.user_se_path[-1] != '/' ) : self.user_se_path = self.user_se_path + '/'
60    
61 spiga 1.3 #check if using "private" Storage
62 spiga 1.11 if not self.node :
63     msg = 'Please specify the storage_element name in your crab.cfg section [USER].\n'
64 spiga 1.34 msg +='\tFor further information please visit : %s'%self.stage_out_faq
65 spiga 1.11 raise CrabException(msg)
66 spiga 1.4 if (self.node.find('T1_') + self.node.find('T2_')+self.node.find('T3_')) == -3: self.usePhedex = False
67 spiga 1.15
68     if not self.usePhedex and ( self.user_remote_dir == '' or self.user_se_path == '' ):
69 fanzago 1.40 ####### FEDE FOR BUG 73010 ############
70 fanzago 1.43 msg = 'Error: task ' + common.work_space._top_dir + ' not correctly created. Please remove it. \n'
71     msg += ' You are asking to stage out without using CMS Storage Name convention. In this case you \n'
72 fanzago 1.41 msg += ' must specify both user_remote_dir and storage_path in the crab.cfg section [USER].\n'
73     msg += ' For further information please visit : \n\t%s'%self.stage_out_faq
74 fanzago 1.40 task = common._db.getTask()
75 fanzago 1.43 #add = '\n\n'
76     #import shutil
77     #try:
78     # add += ' Task not correctly created: removing the working_dir ' + common.work_space._top_dir + ' \n'
79     # shutil.rmtree(common.work_space._top_dir)
80     #except OSError:
81     # add += ' Warning: problems removing the working_dir ' + common.work_space._top_dir + ' \n'
82     # add += ' Please remove it by hand'
83     #msg += add
84 spiga 1.3 raise CrabException(msg)
85 spiga 1.22
86     self.forced_path = '/store/user/'
87 fanzago 1.42 if self.sched in ['LSF','PBS']:
88 spiga 1.31 self.srm_version = 'direct'
89 fanzago 1.42 self.SE = {'LSF':'', 'PBS':''}
90 spiga 1.22
91 fanzago 1.42 if self.sched == 'CAF':
92     #### FEDE TEST FOR XROOTD
93     ######### first solution ################
94     #eos = cfg_params.get("USER.caf_eos_area", 0)
95     #if eos == 0:
96     # self.forced_path = '/store/caf/user/'
97     #else:
98     # self.forced_path = '/store/eos/user'
99     #########################################
100     ######### second solution ###############
101     self.forced_path = cfg_params.get("USER.caf_lfn", '/store/caf/user')
102     #########################################
103 fanzago 1.44 #print "--->>> FORCING THE FIRST PART OF LFN WITH ", self.forced_path
104 fanzago 1.42 self.SE = {'CAF':'caf.cern.ch'}
105     self.srm_version = 'stageout'
106 fanzago 1.44 #print "--->>> query with 'stageout' "
107 fanzago 1.42 #########################################
108    
109 spiga 1.14 if not self.usePhedex:
110 spiga 1.15 self.forced_path = self.user_remote_dir
111 spiga 1.3 return
112    
113     def getEndpoint(self):
114     '''
115     Return full SE endpoint and related infos
116     '''
117     self.lfn = self.getLFN()
118    
119     #extract the PFN for the given node,LFN,protocol
120     endpoint = self.getStageoutPFN()
121 fanzago 1.27 if ( endpoint[-1] != '/' ) : endpoint = endpoint + '/'
122 belforte 1.46
123     if int(self.publish_data) == 1 or int(self.usenamespace) == 1:
124     self.lfn = self.lfn + '/${PSETHASH}/'
125     endpoint = endpoint + '/${PSETHASH}/'
126 spiga 1.3
127     #extract SE name an SE_PATH (needed for publication)
128     SE, SE_PATH, User = self.splitEndpoint(endpoint)
129    
130 fanzago 1.42 #### FEDE FOR XROOTD #####
131     #print "in getEndpoint di PhEDExDatasvcInfo.py: "
132     #print " SE = ", SE
133     #print " SE_PATH = ", SE_PATH
134     #print " User = ", User
135     #print " endpoint = ", endpoint
136     ##############################
137    
138 spiga 1.3 return endpoint, self.lfn , SE, SE_PATH, User
139    
140     def splitEndpoint(self, endpoint):
141     '''
142     Return relevant infos from endpoint
143     '''
144     SE = ''
145     SE_PATH = ''
146 spiga 1.37 USER = getUserName()
147 spiga 1.3 if self.usePhedex:
148 fanzago 1.42 ### FEDE PER TEST WITH XROOTD
149     if (self.protocol == 'direct' or self.protocol == 'stageout'):
150 fanzago 1.18 SE = self.SE[self.sched]
151 fanzago 1.42 SE_PATH = endpoint
152     #############################
153     #print " SE_PATH = ", SE_PATH
154 spiga 1.3 else:
155     url = 'http://'+endpoint.split('://')[1]
156     scheme, host, path, params, query, fragment = urlparse(url)
157 spiga 1.33 SE = self.getAuthoritativeSE()
158 spiga 1.3 SE_PATH = endpoint.split(host)[1]
159     else:
160 spiga 1.20 SE = self.node
161     SE_PATH = self.user_se_path + self.user_remote_dir
162 spiga 1.37 if self.lfn.find('group') != -1:
163     try:
164     USER = (self.lfn.split('group')[1]).split('/')[1]
165     except:
166     pass
167 spiga 1.3 return SE, SE_PATH, USER
168    
169     def getLFN(self):
170     """
171     define the LFN composing the needed pieces
172     """
173     lfn = ''
174     l_User = False
175     if not self.usePhedex and (int(self.publish_data) == 0 and int(self.usenamespace) == 0) :
176     ### add here check if user is trying to force a wrong LFN using a T2 TODO
177     ## check if storage_name is a T2 (siteDB query)
178     ## if yes :match self.user_lfn with LFNBaseName...
179     ## if NOT : raise (you are using a T2. It's not allowed stage out into self.user_path+self.user_lfn)
180 spiga 1.15 lfn = self.user_remote_dir
181 spiga 1.3 return lfn
182     if self.publish_data_name == '' and int(self.publish_data) == 1:
183 spiga 1.17 msg = "Error. The [USER] section does not have 'publish_data_name'\n"
184     msg += '\tFor further information please visit : \n\t%s'%self.dataPub_faq
185 spiga 1.3 raise CrabException(msg)
186     if self.publish_data_name == '' and int(self.usenamespace) == 1:
187     self.publish_data_name = "DefaultDataset"
188 fanzago 1.26 if int(self.publish_data) == 1:
189     if self.sched in ['CAF']: l_User=True
190     primaryDataset = self.computePrimaryDataset()
191     ### added the case lfn = LFNBase(self.forced_path, primaryDataset, self.publish_data_name, publish=True)
192     ### for the publication in order to be able to check the lfn length
193 belforte 1.46 lfn = LFNBase(self.forced_path, primaryDataset, self.publish_data_name, publish=True)
194 fanzago 1.26 elif int(self.usenamespace) == 1:
195 spiga 1.3 if self.sched in ['CAF']: l_User=True
196     primaryDataset = self.computePrimaryDataset()
197 belforte 1.46 lfn = LFNBase(self.forced_path, primaryDataset, self.publish_data_name)
198 spiga 1.3 else:
199 spiga 1.5 if self.sched in ['CAF','LSF']: l_User=True
200 spiga 1.21 lfn = LFNBase(self.forced_path,self.user_remote_dir)
201 spiga 1.45
202 fanzago 1.35 if ( lfn[-1] != '/' ) : lfn = lfn + '/'
203 spiga 1.45
204 spiga 1.3 return lfn
205    
206     def computePrimaryDataset(self):
207     """
208     compute the last part for the LFN in case of publication
209     """
210     if (self.datasetpath.upper() != 'NONE'):
211     primarydataset = self.datasetpath.split("/")[1]
212     else:
213     primarydataset = self.publish_data_name
214     return primarydataset
215    
216 spiga 1.33 def domPhedex(self,params,datasvc_baseUrl):
217 spiga 1.3 """
218     PhEDEx Data Service lfn2pfn call
219    
220 spiga 1.33 input: params,datasvc_baseUrl
221 spiga 1.3 returns: DOM object with the content of the PhEDEx Data Service call
222     """
223     params = urllib.urlencode(params)
224     try:
225 spiga 1.33 urlresults = urllib.urlopen(datasvc_baseUrl, params)
226 spiga 1.3 urlresults = parse(urlresults)
227 slacapra 1.24 except IOError:
228 spiga 1.33 msg="Unable to access PhEDEx Data Service at %s"%datasvc_baseUrl
229 slacapra 1.24 raise CrabException(msg)
230 spiga 1.3 except:
231     urlresults = None
232    
233     return urlresults
234 afanfani 1.1
235 spiga 1.3 def parse_error(self,urlresults):
236     """
237     look for errors in the DOM object returned by PhEDEx Data Service call
238     """
239     errormsg = None
240     errors=urlresults.getElementsByTagName('error')
241     for error in errors:
242     errormsg=error.childNodes[0].data
243     if len(error.childNodes)>1:
244     errormsg+=error.childNodes[1].data
245     return errormsg
246    
247     def parse_lfn2pfn(self,urlresults):
248     """
249     Parse the content of the result of lfn2pfn PhEDEx Data Service call
250    
251     input: DOM object with the content of the lfn2pfn call
252     returns: PFN
253     """
254     result = urlresults.getElementsByTagName('phedex')
255    
256     if not result:
257     return []
258     result = result[0]
259     pfn = None
260     mapping = result.getElementsByTagName('mapping')
261     for m in mapping:
262     pfn=m.getAttribute("pfn")
263     if pfn:
264     return pfn
265    
266     def getStageoutPFN( self ):
267     """
268     input: LFN,node name,protocol
269     returns: PFN
270     """
271     if self.usePhedex:
272 spiga 1.33 params = {'node' : self.node , 'lfn': self.lfn , 'protocol': self.protocol}
273     datasvc_lfn2pfn="%s/lfn2pfn"%self.datasvc_url
274 spiga 1.3 fullurl="%s/lfn2pfn?node=%s&lfn=%s&protocol=%s"%(self.datasvc_url,self.node,self.lfn,self.protocol)
275 fanzago 1.44 #print "--->>> fullurl = ", fullurl
276 spiga 1.33 domlfn2pfn = self.domPhedex(params,datasvc_lfn2pfn)
277 spiga 1.3 if not domlfn2pfn :
278     msg="Unable to get info from %s"%fullurl
279     raise CrabException(msg)
280    
281     errormsg = self.parse_error(domlfn2pfn)
282     if errormsg:
283     msg="Error extracting info from %s due to: %s"%(fullurl,errormsg)
284     raise CrabException(msg)
285    
286     stageoutpfn = self.parse_lfn2pfn(domlfn2pfn)
287     if not stageoutpfn:
288 afanfani 1.6 msg ='Unable to get stageout path from TFC at Site %s \n'%self.node
289     msg+=' Please alert the CompInfraSup group through their savannah %s \n'%self.FacOps_savannah
290     msg+=' reporting: \n'
291     msg+=' Summary: Unable to get user stageout from TFC at Site %s \n'%self.node
292     msg+=' OriginalSubmission: stageout path is not retrieved from %s \n'%fullurl
293 spiga 1.3 raise CrabException(msg)
294     else:
295 mcinquil 1.32 if self.sched in ['CAF','LSF','PBS'] :
296 spiga 1.12 stageoutpfn = self.user_se_path+self.lfn
297 spiga 1.11 else:
298     stageoutpfn = 'srm://'+self.node+':'+self.user_port+self.user_se_path+self.lfn
299 spiga 1.3
300 fanzago 1.35 if ( stageoutpfn[-1] != '/' ) : stageoutpfn = stageoutpfn + '/'
301 spiga 1.3 return stageoutpfn
302 afanfani 1.6
303 spiga 1.33 def getAuthoritativeSE(self):
304     """
305     input: node name
306     returns: AuthoritativeSE
307     """
308     params = {'node' : self.node }
309     datasvc_nodes="%s/nodes"%self.datasvc_url
310     fullurl="%s/nodes/?node=%s"%(self.datasvc_url,self.node)
311     domnodes = self.domPhedex(params,datasvc_nodes)
312    
313     if not domnodes :
314     msg="Unable to get info from %s"%fullurl
315     raise CrabException(msg)
316    
317     errormsg = self.parse_error(domnodes)
318     if errormsg:
319     msg="Error extracting info from %s due to: %s"%(fullurl,errormsg)
320     raise CrabException(msg)
321     result = domnodes.getElementsByTagName('phedex')
322     if not result:
323     return []
324     result = result[0]
325     se = None
326     node = result.getElementsByTagName('node')
327     for m in node:
328     se=m.getAttribute("se")
329     if se:
330     return se
331 afanfani 1.6
332    
333     if __name__ == '__main__':
334     """
335     Sort of unit testing to check Phedex API for whatever site and/or lfn.
336     Usage:
337     python PhEDExDatasvcInfo.py --node T2_IT_Bari --lfn /store/maremma
338    
339     """
340     import getopt,sys
341     from crab_util import *
342     import common
343     klass_name = 'SchedulerGlite'
344     klass = importName(klass_name, klass_name)
345     common.scheduler = klass()
346    
347     lfn="/store/user/"
348     node='T2_IT_Bari'
349     valid = ['node=','lfn=']
350     try:
351     opts, args = getopt.getopt(sys.argv[1:], "", valid)
352     except getopt.GetoptError, ex:
353     print str(ex)
354     sys.exit(1)
355     for o, a in opts:
356     if o == "--node":
357     node = a
358     if o == "--lfn":
359     lfn = a
360    
361     mycfg_params = { 'USER.storage_element': node }
362     dsvc = PhEDExDatasvcInfo(mycfg_params)
363     dsvc.lfn = lfn
364     print dsvc.getStageoutPFN()
365