ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.30
Committed: Mon Nov 10 11:15:43 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3_pre6, CRAB_2_4_3_pre5, CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1, CRAB_2_4_2
Changes since 1.29: +9 -2 lines
Log Message:
allow to manage default BList also using the server

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 from crab_logger import Logger
14 import common
15
16 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18 from xml.dom import minidom
19 import os
20 import commands
21
22 class ServerCommunicator:
23 """
24 Common interface for the interaction between the Crab client and the server Web Service
25 """
26 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 """
28 Open the communication with an Analysis Server by passing the server URL and the port
29 """
30
31 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#Server_available_for_users'
32
33 self.dontMoveProxy = False
34 if string.lower(cfg_params.get("CRAB.scheduler")) in ['lsf','caf']:
35 self.dontMoveProxy = True
36
37 self.asSession = C_AS_Session(serverName, serverPort)
38 self.cfg_params = cfg_params
39 self.userSubj = ''
40 self.serverName = serverName
41
42 CliServerParams(self)
43 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
44
45 x509 = proxyPath
46 if self.dontMoveProxy == False:
47 if 'X509_USER_PROXY' in os.environ:
48 x509 = os.environ['X509_USER_PROXY']
49 else:
50 exitCode, x509 = commands.getstatusoutput('ls /tmp/x509up_u`id -u`').strip()
51 if exitCode != 0:
52 raise CrabException("Error while locating the user proxy file")
53 return
54
55 exitCode, self.userSubj = commands.getstatusoutput('openssl x509 -in %s -subject -noout'%x509)
56 if exitCode != 0:
57 raise CrabException("Error while getting the subject from the user proxy")
58 return
59 self.userSubj = str(self.userSubj).strip()
60 else:
61 x509 = ''
62 self.userSubj = str(os.environ['USER'])
63 pass
64
65 ###################################################
66 # Interactions with the server
67 ###################################################
68
69 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
70 """
71 _submitNewTask_
72 Send a new task to the server to be submitted.
73
74 Accepts in input:
75 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
76 - the range of the submission as specified by the user at the command line
77 """
78
79 if not blXml:
80 raise CrabException('Error while serializing task object to XML')
81 return -2
82
83 if not blTaskName:
84 raise CrabException('Error while extracting the Task Unique Name string')
85 return -3
86
87 cmdXML = None
88 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
89 if not cmdXML:
90 raise CrabException('Error while creating the Command XML')
91 return -4
92
93 ret = -1
94 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
95
96 if ret == 0:
97 # success
98 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
99 common.logger.message(logMsg+'\n')
100 else:
101 self.checkServerResponse(ret)
102
103 return ret
104
105 def checkServerResponse(self, ret):
106 """
107 analyze the server return codes
108 """
109
110 logMsg = ''
111 if ret == 10:
112 # overlaod
113 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
114 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
115 elif ret == 14:
116 # Draining
117 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
118 logMsg = '\t remaining jobs due to scheduled maintainence\n'
119 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
120 elif ret == 101:
121 # overlaod
122 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
123 elif ret == 11:
124 # failed to push message in DB
125 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
126 elif ret == 12:
127 # failed SOAP communication
128 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
129 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
130 elif ret == 20:
131 # failed to push message in PA
132 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
133 elif ret == 22:
134 # failed SOAP communication
135 logMsg = 'Error during SOAP communication with server %s'%self.serverName
136 elif ret == 33:
137 # uncompatible client version
138 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
139 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
140 else:
141 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
142
143 # print loggings
144 if logMsg != '':
145 common.logger.write(logMsg+'\n')
146 raise CrabException(logMsg)
147 return ret
148
149 def subsequentJobSubmit(self, blTaskName, rng):
150 """
151 _subsequentJobSubmit_
152 Let the submission of other jobs of a task that has been already sent to a server.
153 This method is used for subsequent submission of ranged sets of jobs.
154
155 Accepts in input:
156 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
157 - the range of the submission as specified by the user at the command line
158 """
159 return self._genericCommand('submit', blTaskName, rng)
160
161 def killJobs(self, blTaskName, rng):
162 """
163 _killJobs_
164 Send a kill command to one or more jobs running on the server.
165
166 Accepts in input:
167 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
168 - the range of the submission as specified by the user at the command line
169 """
170 return self._genericCommand('kill', blTaskName, rng)
171
172 def cleanJobs(self, blTaskName, rng):
173 """
174 _cleanJobs_
175 Force the server to clean the jobs on the server.
176
177 Accepts in input:
178 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
179 - the range of the submission as specified by the user at the command line
180 """
181 return self._genericCommand('clean', blTaskName, rng)
182
183 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
184 """
185 _getStatus_
186 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
187 """
188
189 # fill the filename
190 filename = str(statusFile)
191
192 if not blTaskName:
193 raise CrabException('Exception while getting the task unique name')
194 return ''
195
196 # get the data and fill the file content
197 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
198 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
199 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
200 return
201
202 if statusFile is not None:
203 f = open(statusFile, 'w')
204 f.write(statusMsg)
205 f.close()
206 return statusFile
207 return statusMsg
208
209 def outputRetrieved(self, blTaskName, rng):
210 """
211 _getJobsOutput_
212 Get from the server the output file locations to be transfered back.
213
214 Accepts in input:
215 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
216 - the range of the submission as specified by the user at the command line
217 """
218 return self._genericCommand('outputRetrieved', blTaskName, rng)
219
220 def postMortemInfos(self, blTaskName, rng):
221 """
222 _postMortemInfos_
223 Retrieve the job postmortem information from the server.
224
225 Accepts in input:
226 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
227 - the range of the submission as specified by the user at the command line
228 """
229 # get the status in
230 raise NotImplementedError
231 return None
232
233 ###################################################
234 # Auxiliary methods
235 ###################################################
236
237 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
238 xmlString = ''
239 cfile = minidom.Document()
240
241 ver = common.prog_version_str
242 node = cfile.createElement("TaskCommand")
243 node.setAttribute("Task", str(taskUName) )
244 node.setAttribute("Subject", str(self.userSubj) )
245 node.setAttribute("Command", str(cmdSpec) )
246 node.setAttribute("Range", str(rng) )
247 node.setAttribute("TotJob", str(jobs) )
248 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
249 node.setAttribute("Flavour", str(flavour) )
250 node.setAttribute("Type", str(type) )
251 node.setAttribute("ClientVersion", str(ver) )
252
253 ## Only Temporary. it should be at Server level
254 removeT1bL = self.cfg_params.get("EDG.remove_default_blacklist", 0 )
255 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
256 lcg00125.grid.sinica.edu.tw, \
257 gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
258 if removeT1bL == '1': T1_BL = ''
259
260 # create a mini-cfg to be transfered to the server
261 miniCfg = {}
262
263 ## migrate CE/SE infos
264 miniCfg['EDG.ce_white_list'] = ""
265 if 'EDG.ce_white_list' in self.cfg_params:
266 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['EDG.ce_white_list'] )
267
268 miniCfg['EDG.ce_black_list'] = T1_BL
269 if 'EDG.ce_black_list' in self.cfg_params:
270 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['EDG.ce_black_list'] )
271
272 miniCfg['EDG.se_white_list'] = ""
273 if 'EDG.se_white_list' in self.cfg_params:
274 miniCfg['EDG.se_white_list'] = str( self.cfg_params['EDG.se_white_list'] )
275
276 miniCfg['EDG.se_black_list'] = ""
277 if 'EDG.se_black_list' in self.cfg_params:
278 miniCfg['EDG.se_black_list'] = str( self.cfg_params['EDG.se_black_list'] )
279
280 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
281 if 'cfgFileNameCkSum' in self.cfg_params:
282 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
283
284 miniCfg['CRAB.se_remote_dir'] = ''
285 if 'CRAB.se_remote_dir' in self.cfg_params:
286 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
287
288 ## JDL requirements specific data. Scheduler dependant
289 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('EDG.max_wall_clock_time', None)
290 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('EDG.max_cpu_time', '130')
291 miniCfg['proxyServer'] = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
292 miniCfg['VO'] = self.cfg_params.get('EDG.virtual_organization', 'cms')
293 miniCfg['EDG_retry_count'] = self.cfg_params.get('EDG.retry_count',0)
294 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('EDG.shallow_retry_count',-1)
295 miniCfg['EDG.proxyInfos'] = self.cfg_params.get('EDG.proxyInfos',{}) #TODO activate this when using MyProxy-based delegation
296
297 ## Additional field for DashBoard
298 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
299
300 ## Additional fields for Notification by the server
301 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
302 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
303
304 ## put here other fields if needed
305 node.setAttribute("CfgParamDict", str(miniCfg) )
306 cfile.appendChild(node)
307 xmlString += str(cfile.toprettyxml())
308 return xmlString
309
310 def _genericCommand(self, cmd, blTaskName, rng):
311 if not blTaskName:
312 raise CrabException('Error while extracting the Task Unique Name string')
313 return -2
314
315 cmdXML = None
316 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
317 if not cmdXML:
318 raise CrabException('Error while creating the Command XML')
319 return -3
320
321 ret = -1
322 ret = self.asSession.sendCommand(cmdXML, blTaskName)
323 logMsg = ''
324 debugMsg = ''
325 if ret == 0:
326 # success
327 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
328 else:
329 self.checkServerResponse(ret)
330 return ret
331