ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/PhEDExDatasvcInfo.py
Revision: 1.3
Committed: Sun Sep 21 10:53:10 2008 UTC (16 years, 7 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.2: +194 -100 lines
Log Message:
Implemented getEndpoint() used to computed the stage out end point using Phedex API. Added also support to use "private" storage (i.e. not phedex node)

File Contents

# Content
1 from Actor import *
2 import urllib
3 from xml.dom.minidom import parse
4 from crab_exceptions import *
5 from crab_logger import Logger
6 from WorkSpace import *
7 from urlparse import urlparse
8 from LFNBaseName import *
9
10 class PhEDExDatasvcInfo:
11 def __init__( self , cfg_params ):
12
13 ## PhEDEx Data Service URL
14 url="https://cmsweb.cern.ch/phedex/datasvc/xml/prod"
15 self.datasvc_url = cfg_params.get("USER.datasvc_url",url)
16
17 self.FacOps_savannah = 'https://savannah.cern.ch/projects/cmscompinfrasup/'
18
19 self.srm_version = cfg_params.get("USER.srm_version",'srmv2')
20 self.node = cfg_params.get('USER.storage_element',None)
21
22 self.publish_data = cfg_params.get("USER.publish_data",0)
23 self.usenamespace = cfg_params.get("USER.usenamespace",0)
24 self.user_remote_dir = cfg_params.get("USER.remote_dir",'')
25 self.datasetpath = cfg_params.get("CMSSW.datasetpath")
26 self.publish_data_name = cfg_params.get('USER.publish_data_name','')
27
28 self.user_lfn = cfg_params.get("USER.lfn",'')
29 self.user_se_path = cfg_params.get("USER.storage_path",'')
30
31 #check if using "private" Storage
32 self.usePhedex = True
33 if (self.node.find('T1_') + self.node.find('T2_')) == -2: self.usePhedex = False
34 if not self.usePhedex and ( self.user_lfn == '' or self.user_se_path == '' ):
35 msg = 'You are asking to stage out without using CMS Storage Name convention. In this case you \n'
36 msg += ' must specify both lfn and storage_path in the crab.cfg section [USER].\n '
37 msg += ' For further information please visit: ADD_TWIKI_LINK'
38 raise CrabException(msg)
39 self.sched = common.scheduler.name().upper()
40
41 self.protocol = self.srm_version
42 if self.sched in ['CAF','LSF']:self.protocol = 'direct'
43
44 return
45
46 def getEndpoint(self):
47 '''
48 Return full SE endpoint and related infos
49 '''
50 self.lfn = self.getLFN()
51
52 #extract the PFN for the given node,LFN,protocol
53 endpoint = self.getStageoutPFN()
54
55 #extract SE name an SE_PATH (needed for publication)
56 SE, SE_PATH, User = self.splitEndpoint(endpoint)
57
58 return endpoint, self.lfn , SE, SE_PATH, User
59
60 def splitEndpoint(self, endpoint):
61 '''
62 Return relevant infos from endpoint
63 '''
64 SE = ''
65 SE_PATH = ''
66 USER = ''
67 if self.usePhedex:
68 if self.protocol == 'direct':
69 query=endpoint
70 SE_PATH = endpoint
71 else:
72 url = 'http://'+endpoint.split('://')[1]
73 # python > 2.4
74 # SE = urlparse(url).hostname
75 scheme, host, path, params, query, fragment = urlparse(url)
76 SE = host.split(':')[0]
77 SE_PATH = endpoint.split(host)[1]
78 USER = (query.split('user')[1]).split('/')[1]
79 else:
80 SE = self.node
81 SE_PATH = self.user_se_path + self.user_lfn
82 try:
83 USER = (self.lfn.split('user')[1]).split('/')[1]
84 except:
85 pass
86
87 return SE, SE_PATH, USER
88
89
90 def getLFN(self):
91 """
92 define the LFN composing the needed pieces
93 """
94 lfn = ''
95 l_User = False
96 if not self.usePhedex and (int(self.publish_data) == 0 and int(self.usenamespace) == 0) :
97 ### add here check if user is trying to force a wrong LFN using a T2 TODO
98 ## check if storage_name is a T2 (siteDB query)
99 ## if yes :match self.user_lfn with LFNBaseName...
100 ## if NOT : raise (you are using a T2. It's not allowed stage out into self.user_path+self.user_lfn)
101 lfn = self.user_lfn
102 return lfn
103 if self.publish_data_name == '' and int(self.publish_data) == 1:
104 msg = "Eeror. The [USER] section does not have 'publish_data_name'"
105 raise CrabException(msg)
106 if self.publish_data_name == '' and int(self.usenamespace) == 1:
107 self.publish_data_name = "DefaultDataset"
108 if int(self.publish_data) == 1 or int(self.usenamespace) == 1:
109 if self.sched in ['CAF']: l_User=True
110 primaryDataset = self.computePrimaryDataset()
111 lfn = LFNBase(primaryDataset,self.publish_data_name,LocalUser=l_User) + '/${PSETHASH}/'
112 else:
113 if self.sched in ['LSF']: l_User=True
114 lfn = LFNBase(self.user_remote_dir,LocalUser=l_User)
115 return lfn
116
117 def computePrimaryDataset(self):
118 """
119 compute the last part for the LFN in case of publication
120 """
121 if (self.datasetpath.upper() != 'NONE'):
122 primarydataset = self.datasetpath.split("/")[1]
123 else:
124 primarydataset = self.publish_data_name
125 return primarydataset
126
127 def lfn2pfn(self):
128 """
129 PhEDEx Data Service lfn2pfn call
130
131 input: LFN,node name,protocol
132 returns: DOM object with the content of the PhEDEx Data Service call
133 """
134 params = {'node' : self.node , 'lfn': self.lfn , 'protocol': self.protocol}
135 params = urllib.urlencode(params)
136 datasvc_lfn2pfn="%s/lfn2pfn"%self.datasvc_url
137 urlresults = urllib.urlopen(datasvc_lfn2pfn, params)
138 try:
139 urlresults = parse(urlresults)
140 except:
141 urlresults = None
142
143 return urlresults
144
145 def parse_error(self,urlresults):
146 """
147 look for errors in the DOM object returned by PhEDEx Data Service call
148 """
149 errormsg = None
150 errors=urlresults.getElementsByTagName('error')
151 for error in errors:
152 errormsg=error.childNodes[0].data
153 if len(error.childNodes)>1:
154 errormsg+=error.childNodes[1].data
155 return errormsg
156
157 def parse_lfn2pfn(self,urlresults):
158 """
159 Parse the content of the result of lfn2pfn PhEDEx Data Service call
160
161 input: DOM object with the content of the lfn2pfn call
162 returns: PFN
163 """
164 result = urlresults.getElementsByTagName('phedex')
165
166 if not result:
167 return []
168 result = result[0]
169 pfn = None
170 mapping = result.getElementsByTagName('mapping')
171 for m in mapping:
172 pfn=m.getAttribute("pfn")
173 if pfn:
174 return pfn
175
176 def getStageoutPFN( self ):
177 """
178 input: LFN,node name,protocol
179 returns: PFN
180 """
181 if self.usePhedex:
182 fullurl="%s/lfn2pfn?node=%s&lfn=%s&protocol=%s"%(self.datasvc_url,self.node,self.lfn,self.protocol)
183 domlfn2pfn = self.lfn2pfn()
184 if not domlfn2pfn :
185 msg="Unable to get info from %s"%fullurl
186 raise CrabException(msg)
187
188 errormsg = self.parse_error(domlfn2pfn)
189 if errormsg:
190 msg="Error extracting info from %s due to: %s"%(fullurl,errormsg)
191 raise CrabException(msg)
192
193 stageoutpfn = self.parse_lfn2pfn(domlfn2pfn)
194 if not stageoutpfn:
195 msg ='Unable to get stageout path for Site %s. Maybe it does not correctly export its TFC. \n'%self.node
196 msg+=' Please alert the FacOps group through their savannah %s'%self.FacOps_savannah
197 raise CrabException(msg)
198 else:
199 stageoutpfn = 'srm://'+self.node+':8443'+self.user_se_path+self.lfn
200
201 return stageoutpfn