ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.58
Committed: Tue Jan 26 12:54:21 2010 UTC (15 years, 3 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
Changes since 1.57: +5 -2 lines
Log Message:
fix for bug #61682: better error handling during 'isServerDrained' communications

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from xml.dom import minidom
17 import os, sys
18 import commands
19 import traceback
20 from Downloader import Downloader
21
22 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 else:
25 from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
26
27 class ServerCommunicator:
28 """
29 Common interface for the interaction between the Crab client and the server Web Service
30 """
31 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
32 """
33 Open the communication with an Analysis Server by passing the server URL and the port
34 """
35
36 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServerForUsers#Server_available_for_users'
37
38 self.asSession = C_AS_Session(serverName, serverPort)
39 self.cfg_params = cfg_params
40 self.userSubj = ''
41 self.serverName = serverName
42 credentialType = 'Proxy'
43 if common.scheduler.name().upper() in ['CAF','LSF']:
44 credentialType = 'Token'
45 CliServerParams(self)
46 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
47
48 configAPI = {'credential' : credentialType, \
49 'logger' : common.logger() }
50
51 CredAPI = CredentialAPI( configAPI )
52 try:
53 self.userSubj = CredAPI.getSubject()
54 except Exception, err:
55 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
56 raise CrabException("Error Getting Credential Subject")
57
58 self.scram=Scram.Scram(cfg_params)
59 ###################################################
60 # Interactions with the server
61 ###################################################
62
63 # wmbs
64 def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
65 """
66 _submitNewTask_
67 Send a new task to the server to be submitted.
68
69 Accepts in input:
70 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
71 - the range of the submission as specified by the user at the command line
72 """
73
74
75 if not blXml:
76 raise CrabException('Error while serializing task object to XML')
77 return -2
78
79 if not blTaskName:
80 raise CrabException('Error while extracting the Task Unique Name string')
81 return -3
82
83 cmdXML = None
84 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
85
86 if not cmdXML:
87 raise CrabException('Error while creating the Command XML')
88 return -4
89
90 ret = -1
91 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
92
93 if ret == 0:
94 # success
95 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
96 common.logger.info(logMsg+'\n')
97 else:
98 self.checkServerResponse(ret)
99
100 return ret
101
102 def checkServerResponse(self, ret):
103 """
104 analyze the server return codes
105 """
106
107 logMsg = ''
108 if ret == 10:
109 # overlaod
110 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
111 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
112 elif ret == 14:
113 # Draining
114 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
115 logMsg += '\t remaining jobs due to scheduled maintainence\n'
116 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
117 elif ret == 101:
118 # overlaod
119 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
120 elif ret == 11:
121 # failed to push message in DB
122 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
123 elif ret == 12:
124 # failed SOAP communication
125 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
126 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
127 elif ret == 20:
128 # failed to push message in PA
129 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
130 elif ret == 22:
131 # failed SOAP communication
132 logMsg = 'Error during SOAP communication with server %s'%self.serverName
133 elif ret == 33:
134 # uncompatible client version
135 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
136 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
137 else:
138 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
139
140 # print loggings
141 if logMsg != '':
142 # reset server choice
143 opsToBeSaved={'serverName' : '' }
144 common._db.updateTask_(opsToBeSaved)
145 common.logger.info(logMsg)
146 return ret
147
148 def subsequentJobSubmit(self, blTaskName, rng):
149 """
150 _subsequentJobSubmit_
151 Let the submission of other jobs of a task that has been already sent to a server.
152 This method is used for subsequent submission of ranged sets of jobs.
153
154 Accepts in input:
155 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
156 - the range of the submission as specified by the user at the command line
157 """
158 return self._genericCommand('submit', blTaskName, rng)
159
160 def killJobs(self, blTaskName, rng):
161 """
162 _killJobs_
163 Send a kill command to one or more jobs running on the server.
164
165 Accepts in input:
166 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
167 - the range of the submission as specified by the user at the command line
168 """
169 return self._genericCommand('kill', blTaskName, rng)
170
171 def StopWorkflow(self, blTaskName):
172 """
173 _StopWorkflow_
174 """
175 return self._genericCommand('StopWorkflow', blTaskName, 'all')
176
177 def cleanTask(self, blTaskName):
178 """
179 _cleanTask_
180 Force the server to clean the jobs on the server.
181
182 Accepts in input:
183 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
184 - the range of the submission as specified by the user at the command line
185 """
186 return self._genericCommand('clean', blTaskName, 'all')
187
188 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
189 """
190 _getStatus_
191 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
192 """
193
194 # fill the filename
195 filename = str(statusFile)
196
197 if not blTaskName:
198 raise CrabException('Exception while getting the task unique name')
199 return ''
200
201 # get the data and fill the file content
202 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
203 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
204 if statusFamilyType != 'isServerDrained':
205 raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
206 else:
207 statusFile = None
208 statusMMsg = ""
209
210 if statusFile is not None:
211 f = open(statusFile, 'w')
212 f.write(statusMsg)
213 f.close()
214 return statusFile
215 return statusMsg
216
217 def outputRetrieved(self, blTaskName, rng):
218 """
219 _getJobsOutput_
220 Get from the server the output file locations to be transfered back.
221
222 Accepts in input:
223 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
224 - the range of the submission as specified by the user at the command line
225 """
226 return self._genericCommand('outputRetrieved', blTaskName, rng)
227
228 def checkDrainMode(self, blTaskName='null'):
229 return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
230
231 def postMortemInfos(self, blTaskName, rng):
232 """
233 _postMortemInfos_
234 Retrieve the job postmortem information from the server.
235
236 Accepts in input:
237 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
238 - the range of the submission as specified by the user at the command line
239 """
240 # get the status in
241 raise NotImplementedError
242 return None
243
244 ###################################################
245 # Auxiliary methods
246 ###################################################
247
248 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
249 xmlString = ''
250 cfile = minidom.Document()
251
252 ver = common.prog_version_str
253 node = cfile.createElement("TaskCommand")
254 node.setAttribute("Task", str(taskUName) )
255 node.setAttribute("Subject", str(self.userSubj) )
256 node.setAttribute("Command", str(cmdSpec) )
257 node.setAttribute("Range", str(rng) )
258 node.setAttribute("TotJob", str(jobs) )
259 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
260 node.setAttribute("Flavour", str(flavour) )
261 node.setAttribute("Type", str(type) )
262 node.setAttribute("ClientVersion", str(ver) )
263
264 ## Only Temporary. it should be at Server level
265 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
266 blackAnaOps = None
267 if int(removeBList) == 0:
268 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
269 result = blacklist.config("site_black_list.conf")
270 if result != None:
271 blackAnaOps = result
272 common.logger.debug("Enforced black list: %s "%str(blacklist))
273 else:
274 common.logger.info("WARNING: Skipping default black list!")
275
276 # create a mini-cfg to be transfered to the server
277 miniCfg = {}
278
279 ## migrate CE/SE infos
280 miniCfg['EDG.ce_white_list'] = ""
281 if 'GRID.ce_white_list' in self.cfg_params:
282 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
283
284 miniCfg['EDG.ce_black_list'] = blackAnaOps
285 if 'GRID.ce_black_list' in self.cfg_params:
286 if blackAnaOps:
287 miniCfg['EDG.ce_black_list'] += ", "
288 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
289
290 miniCfg['EDG.se_white_list'] = ""
291 if 'GRID.se_white_list' in self.cfg_params:
292 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
293
294 miniCfg['EDG.se_black_list'] = ""
295 if 'GRID.se_black_list' in self.cfg_params:
296 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
297
298 miniCfg['EDG.group'] = ""
299 if 'GRID.group' in self.cfg_params:
300 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
301
302 miniCfg['EDG.role'] = ""
303 if 'GRID.role' in self.cfg_params:
304 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
305
306 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
307 if 'cfgFileNameCkSum' in self.cfg_params:
308 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
309
310 miniCfg['CRAB.se_remote_dir'] = ''
311 if 'CRAB.se_remote_dir' in self.cfg_params:
312 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
313
314 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
315 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
316 ## JDL requirements specific data. Scheduler dependant
317 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
318 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
319 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
320 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
321 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
322 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
323
324 ## Additional field for DashBoard
325 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
326
327 ## Additional fields for Notification by the server
328 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
329 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
330
331 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
332 #WMBS
333 miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
334
335 miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
336 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', '0')
337
338 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
339 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
340 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1)
341
342 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
343
344 ## put here other fields if needed
345 node.setAttribute("CfgParamDict", str(miniCfg) )
346 cfile.appendChild(node)
347 xmlString += str(cfile.toprettyxml())
348 return xmlString
349
350 def _genericCommand(self, cmd, blTaskName, rng):
351 if not blTaskName:
352 raise CrabException('Error while extracting the Task Unique Name string')
353 return -2
354
355 cmdXML = None
356 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
357 if not cmdXML:
358 raise CrabException('Error while creating the Command XML')
359 return -3
360
361 ret = -1
362 ret = self.asSession.sendCommand(cmdXML, blTaskName)
363 logMsg = ''
364 debugMsg = ''
365 if ret == 0:
366 # success
367 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
368 else:
369 self.checkServerResponse(ret)
370 return ret
371