ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.49
Committed: Thu Oct 29 18:01:48 2009 UTC (15 years, 6 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
Changes since 1.48: +3 -0 lines
Log Message:
Fix for bug #55671. Now client check if the server is not in drain mode before sending ISB

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18 from xml.dom import minidom
19 import os
20 import commands
21 import traceback
22
23 class ServerCommunicator:
24 """
25 Common interface for the interaction between the Crab client and the server Web Service
26 """
27 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
28 """
29 Open the communication with an Analysis Server by passing the server URL and the port
30 """
31
32 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServerForUsers#Server_available_for_users'
33
34 self.asSession = C_AS_Session(serverName, serverPort)
35 self.cfg_params = cfg_params
36 self.userSubj = ''
37 self.serverName = serverName
38 credentialType = 'Proxy'
39 if common.scheduler.name().upper() in ['CAF','LSF']:
40 credentialType = 'Token'
41 CliServerParams(self)
42 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
43
44 configAPI = {'credential' : credentialType, \
45 'logger' : common.logger() }
46
47 CredAPI = CredentialAPI( configAPI )
48 try:
49 self.userSubj = CredAPI.getSubject()
50 except Exception, err:
51 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
52 raise CrabException("Error Getting Credential Subject")
53
54 self.scram=Scram.Scram(cfg_params)
55 ###################################################
56 # Interactions with the server
57 ###################################################
58
59 # wmbs
60 def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
61 """
62 _submitNewTask_
63 Send a new task to the server to be submitted.
64
65 Accepts in input:
66 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
67 - the range of the submission as specified by the user at the command line
68 """
69
70
71 if not blXml:
72 raise CrabException('Error while serializing task object to XML')
73 return -2
74
75 if not blTaskName:
76 raise CrabException('Error while extracting the Task Unique Name string')
77 return -3
78
79 cmdXML = None
80 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
81
82 if not cmdXML:
83 raise CrabException('Error while creating the Command XML')
84 return -4
85
86 ret = -1
87 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
88
89 if ret == 0:
90 # success
91 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
92 common.logger.info(logMsg+'\n')
93 else:
94 self.checkServerResponse(ret)
95
96 return ret
97
98 def checkServerResponse(self, ret):
99 """
100 analyze the server return codes
101 """
102
103 logMsg = ''
104 if ret == 10:
105 # overlaod
106 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
107 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
108 elif ret == 14:
109 # Draining
110 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
111 logMsg += '\t remaining jobs due to scheduled maintainence\n'
112 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
113 elif ret == 101:
114 # overlaod
115 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
116 elif ret == 11:
117 # failed to push message in DB
118 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
119 elif ret == 12:
120 # failed SOAP communication
121 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
122 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
123 elif ret == 20:
124 # failed to push message in PA
125 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
126 elif ret == 22:
127 # failed SOAP communication
128 logMsg = 'Error during SOAP communication with server %s'%self.serverName
129 elif ret == 33:
130 # uncompatible client version
131 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
132 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
133 else:
134 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
135
136 # print loggings
137 if logMsg != '':
138 # reset server choice
139 opsToBeSaved={'serverName' : '' }
140 common._db.updateTask_(opsToBeSaved)
141 common.logger.info(logMsg)
142 return ret
143
144 def subsequentJobSubmit(self, blTaskName, rng):
145 """
146 _subsequentJobSubmit_
147 Let the submission of other jobs of a task that has been already sent to a server.
148 This method is used for subsequent submission of ranged sets of jobs.
149
150 Accepts in input:
151 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
152 - the range of the submission as specified by the user at the command line
153 """
154 return self._genericCommand('submit', blTaskName, rng)
155
156 def killJobs(self, blTaskName, rng):
157 """
158 _killJobs_
159 Send a kill command to one or more jobs running on the server.
160
161 Accepts in input:
162 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
163 - the range of the submission as specified by the user at the command line
164 """
165 return self._genericCommand('kill', blTaskName, rng)
166
167 def cleanTask(self, blTaskName):
168 """
169 _cleanTask_
170 Force the server to clean the jobs on the server.
171
172 Accepts in input:
173 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
174 - the range of the submission as specified by the user at the command line
175 """
176 return self._genericCommand('clean', blTaskName, 'all')
177
178 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
179 """
180 _getStatus_
181 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
182 """
183
184 # fill the filename
185 filename = str(statusFile)
186
187 if not blTaskName:
188 raise CrabException('Exception while getting the task unique name')
189 return ''
190
191 # get the data and fill the file content
192 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
193 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
194 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
195 return
196
197 if statusFile is not None:
198 f = open(statusFile, 'w')
199 f.write(statusMsg)
200 f.close()
201 return statusFile
202 return statusMsg
203
204 def outputRetrieved(self, blTaskName, rng):
205 """
206 _getJobsOutput_
207 Get from the server the output file locations to be transfered back.
208
209 Accepts in input:
210 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
211 - the range of the submission as specified by the user at the command line
212 """
213 return self._genericCommand('outputRetrieved', blTaskName, rng)
214
215 def checkDrainMode(self, blTaskName='null'):
216 return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
217
218 def postMortemInfos(self, blTaskName, rng):
219 """
220 _postMortemInfos_
221 Retrieve the job postmortem information from the server.
222
223 Accepts in input:
224 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
225 - the range of the submission as specified by the user at the command line
226 """
227 # get the status in
228 raise NotImplementedError
229 return None
230
231 ###################################################
232 # Auxiliary methods
233 ###################################################
234
235 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
236 xmlString = ''
237 cfile = minidom.Document()
238
239 ver = common.prog_version_str
240 node = cfile.createElement("TaskCommand")
241 node.setAttribute("Task", str(taskUName) )
242 node.setAttribute("Subject", str(self.userSubj) )
243 node.setAttribute("Command", str(cmdSpec) )
244 node.setAttribute("Range", str(rng) )
245 node.setAttribute("TotJob", str(jobs) )
246 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
247 node.setAttribute("Flavour", str(flavour) )
248 node.setAttribute("Type", str(type) )
249 node.setAttribute("ClientVersion", str(ver) )
250
251 ## Only Temporary. it should be at Server level
252 removeT1bL = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
253 T1_BL = "T0, T1"
254 # T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
255 # lcg00125.grid.sinica.edu.tw, \
256 # gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
257 if removeT1bL == '1': T1_BL = ''
258
259 # create a mini-cfg to be transfered to the server
260 miniCfg = {}
261
262 ## migrate CE/SE infos
263 miniCfg['EDG.ce_white_list'] = ""
264 if 'GRID.ce_white_list' in self.cfg_params:
265 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
266
267 miniCfg['EDG.ce_black_list'] = T1_BL
268 if 'GRID.ce_black_list' in self.cfg_params:
269 if len(T1_BL) > 0:
270 miniCfg['EDG.ce_black_list'] += ", "
271 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
272
273 miniCfg['EDG.se_white_list'] = ""
274 if 'GRID.se_white_list' in self.cfg_params:
275 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
276
277 miniCfg['EDG.se_black_list'] = ""
278 if 'GRID.se_black_list' in self.cfg_params:
279 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
280
281 miniCfg['EDG.group'] = ""
282 if 'GRID.group' in self.cfg_params:
283 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
284
285 miniCfg['EDG.role'] = ""
286 if 'GRID.role' in self.cfg_params:
287 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
288
289 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
290 if 'cfgFileNameCkSum' in self.cfg_params:
291 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
292
293 miniCfg['CRAB.se_remote_dir'] = ''
294 if 'CRAB.se_remote_dir' in self.cfg_params:
295 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
296
297 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
298 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
299 ## JDL requirements specific data. Scheduler dependant
300 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
301 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
302 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
303 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
304 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
305 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
306
307 ## Additional field for DashBoard
308 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
309
310 ## Additional fields for Notification by the server
311 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
312 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
313
314 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
315 #WMBS
316 miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
317 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
318
319 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
320
321 ## put here other fields if needed
322 node.setAttribute("CfgParamDict", str(miniCfg) )
323 cfile.appendChild(node)
324 xmlString += str(cfile.toprettyxml())
325 return xmlString
326
327 def _genericCommand(self, cmd, blTaskName, rng):
328 if not blTaskName:
329 raise CrabException('Error while extracting the Task Unique Name string')
330 return -2
331
332 cmdXML = None
333 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
334 if not cmdXML:
335 raise CrabException('Error while creating the Command XML')
336 return -3
337
338 ret = -1
339 ret = self.asSession.sendCommand(cmdXML, blTaskName)
340 logMsg = ''
341 debugMsg = ''
342 if ret == 0:
343 # success
344 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
345 else:
346 self.checkServerResponse(ret)
347 return ret
348