ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.61
Committed: Tue May 4 16:48:12 2010 UTC (14 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.60: +4 -1 lines
Log Message:
merge 2.7.1_branch

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from xml.dom import minidom
17 import os, sys
18 import commands
19 import traceback
20 from Downloader import Downloader
21
22 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 else:
25 try:
26 from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
27 except:
28 from CRAB_Server_API_36X import CRAB_Server_Session as C_AS_Session
29 except:
30 from lib26x.CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
31
32 class ServerCommunicator:
33 """
34 Common interface for the interaction between the Crab client and the server Web Service
35 """
36 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
37 """
38 Open the communication with an Analysis Server by passing the server URL and the port
39 """
40
41 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
42
43 self.asSession = C_AS_Session(serverName, serverPort)
44 self.cfg_params = cfg_params
45 self.userSubj = ''
46 self.serverName = serverName
47 credentialType = 'Proxy'
48 if common.scheduler.name().upper() in ['CAF','LSF']:
49 credentialType = 'Token'
50 CliServerParams(self)
51 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
52
53 configAPI = {'credential' : credentialType, \
54 'logger' : common.logger() }
55
56 CredAPI = CredentialAPI( configAPI )
57 try:
58 self.userSubj = CredAPI.getSubject()
59 except Exception, err:
60 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
61 raise CrabException("Error Getting Credential Subject")
62
63 self.scram=Scram.Scram(cfg_params)
64 ###################################################
65 # Interactions with the server
66 ###################################################
67
68 # wmbs
69 def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
70 """
71 _submitNewTask_
72 Send a new task to the server to be submitted.
73
74 Accepts in input:
75 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
76 - the range of the submission as specified by the user at the command line
77 """
78
79
80 if not blXml:
81 raise CrabException('Error while serializing task object to XML')
82 return -2
83
84 if not blTaskName:
85 raise CrabException('Error while extracting the Task Unique Name string')
86 return -3
87
88 cmdXML = None
89 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
90
91 if not cmdXML:
92 raise CrabException('Error while creating the Command XML')
93 return -4
94
95 ret = -1
96 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
97
98 if ret == 0:
99 # success
100 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
101 common.logger.info(logMsg+'\n')
102 else:
103 self.checkServerResponse(ret)
104
105 return ret
106
107 def checkServerResponse(self, ret):
108 """
109 analyze the server return codes
110 """
111
112 logMsg = ''
113 if ret == 10:
114 # overlaod
115 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
116 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
117 elif ret == 14:
118 # Draining
119 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
120 logMsg += '\t remaining jobs due to scheduled maintainence\n'
121 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
122 elif ret == 101:
123 # overlaod
124 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
125 elif ret == 11:
126 # failed to push message in DB
127 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
128 elif ret == 12:
129 # failed SOAP communication
130 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
131 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
132 elif ret == 20:
133 # failed to push message in PA
134 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
135 elif ret == 22:
136 # failed SOAP communication
137 logMsg = 'Error during SOAP communication with server %s'%self.serverName
138 elif ret == 33:
139 # uncompatible client version
140 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
141 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
142 else:
143 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
144
145 # print loggings
146 if logMsg != '':
147 # reset server choice
148 opsToBeSaved={'serverName' : '' }
149 common._db.updateTask_(opsToBeSaved)
150 common.logger.info(logMsg)
151 return ret
152
153 def subsequentJobSubmit(self, blTaskName, rng):
154 """
155 _subsequentJobSubmit_
156 Let the submission of other jobs of a task that has been already sent to a server.
157 This method is used for subsequent submission of ranged sets of jobs.
158
159 Accepts in input:
160 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
161 - the range of the submission as specified by the user at the command line
162 """
163 return self._genericCommand('submit', blTaskName, rng)
164
165 def killJobs(self, blTaskName, rng):
166 """
167 _killJobs_
168 Send a kill command to one or more jobs running on the server.
169
170 Accepts in input:
171 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
172 - the range of the submission as specified by the user at the command line
173 """
174 return self._genericCommand('kill', blTaskName, rng)
175
176 def StopWorkflow(self, blTaskName):
177 """
178 _StopWorkflow_
179 """
180 return self._genericCommand('StopWorkflow', blTaskName, 'all')
181
182 def cleanTask(self, blTaskName):
183 """
184 _cleanTask_
185 Force the server to clean the jobs on the server.
186
187 Accepts in input:
188 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
189 - the range of the submission as specified by the user at the command line
190 """
191 return self._genericCommand('clean', blTaskName, 'all')
192
193 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
194 """
195 _getStatus_
196 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
197 """
198
199 # fill the filename
200 filename = str(statusFile)
201
202 if not blTaskName:
203 raise CrabException('Exception while getting the task unique name')
204 return ''
205
206 # get the data and fill the file content
207 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
208 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
209 if statusFamilyType != 'isServerDrained':
210 raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
211 else:
212 statusFile = None
213 statusMMsg = ""
214
215 if statusFile is not None:
216 f = open(statusFile, 'w')
217 f.write(statusMsg)
218 f.close()
219 return statusFile
220 return statusMsg
221
222 def outputRetrieved(self, blTaskName, rng):
223 """
224 _getJobsOutput_
225 Get from the server the output file locations to be transfered back.
226
227 Accepts in input:
228 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
229 - the range of the submission as specified by the user at the command line
230 """
231 return self._genericCommand('outputRetrieved', blTaskName, rng)
232
233 def checkDrainMode(self, blTaskName='null'):
234 return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
235
236 def postMortemInfos(self, blTaskName, rng):
237 """
238 _postMortemInfos_
239 Retrieve the job postmortem information from the server.
240
241 Accepts in input:
242 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
243 - the range of the submission as specified by the user at the command line
244 """
245 # get the status in
246 raise NotImplementedError
247 return None
248
249 ###################################################
250 # Auxiliary methods
251 ###################################################
252
253 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
254 xmlString = ''
255 cfile = minidom.Document()
256
257 ver = common.prog_version_str
258 node = cfile.createElement("TaskCommand")
259 node.setAttribute("Task", str(taskUName) )
260 node.setAttribute("Subject", str(self.userSubj) )
261 node.setAttribute("Command", str(cmdSpec) )
262 node.setAttribute("Range", str(rng) )
263 node.setAttribute("TotJob", str(jobs) )
264 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
265 node.setAttribute("Flavour", str(flavour) )
266 node.setAttribute("Type", str(type) )
267 node.setAttribute("ClientVersion", str(ver) )
268
269 ## Only Temporary. it should be at Server level
270 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
271 blackAnaOps = None
272 if int(removeBList) == 0:
273 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
274 result = blacklist.config("site_black_list.conf")
275 if result != None:
276 blackAnaOps = result
277 common.logger.debug("Enforced black list: %s "%str(blacklist))
278 else:
279 common.logger.info("WARNING: Skipping default black list!")
280
281 # create a mini-cfg to be transfered to the server
282 miniCfg = {}
283
284 ## migrate CE/SE infos
285 miniCfg['EDG.ce_white_list'] = ""
286 if 'GRID.ce_white_list' in self.cfg_params:
287 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
288
289 miniCfg['EDG.ce_black_list'] = blackAnaOps
290 if 'GRID.ce_black_list' in self.cfg_params:
291 if blackAnaOps:
292 miniCfg['EDG.ce_black_list'] += ", "
293 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
294
295 miniCfg['EDG.se_white_list'] = ""
296 if 'GRID.se_white_list' in self.cfg_params:
297 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
298
299 miniCfg['EDG.se_black_list'] = ""
300 if 'GRID.se_black_list' in self.cfg_params:
301 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
302
303 miniCfg['EDG.group'] = ""
304 if 'GRID.group' in self.cfg_params:
305 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
306
307 miniCfg['EDG.role'] = ""
308 if 'GRID.role' in self.cfg_params:
309 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
310
311 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
312 if 'cfgFileNameCkSum' in self.cfg_params:
313 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
314
315 miniCfg['CRAB.se_remote_dir'] = ''
316 if 'CRAB.se_remote_dir' in self.cfg_params:
317 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
318
319 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
320 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
321 miniCfg['CAF.group'] = self.cfg_params.get('CAF.group', None)
322 ## JDL requirements specific data. Scheduler dependant
323 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
324 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
325 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
326 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
327 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
328 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
329
330 ## Additional field for DashBoard
331 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
332
333 ## Additional fields for Notification by the server
334 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
335 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
336
337 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
338 #WMBS
339 miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
340
341 miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
342 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', 'None')
343
344 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
345 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
346 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1)
347
348 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
349
350 ## put here other fields if needed
351 node.setAttribute("CfgParamDict", str(miniCfg) )
352 cfile.appendChild(node)
353 xmlString += str(cfile.toprettyxml())
354 return xmlString
355
356 def _genericCommand(self, cmd, blTaskName, rng):
357 if not blTaskName:
358 raise CrabException('Error while extracting the Task Unique Name string')
359 return -2
360
361 cmdXML = None
362 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
363 if not cmdXML:
364 raise CrabException('Error while creating the Command XML')
365 return -3
366
367 ret = -1
368 ret = self.asSession.sendCommand(cmdXML, blTaskName)
369 logMsg = ''
370 debugMsg = ''
371 if ret == 0:
372 # success
373 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
374 else:
375 self.checkServerResponse(ret)
376 return ret
377