ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.57
Committed: Wed Jan 20 13:21:02 2010 UTC (15 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_1_pre4
Changes since 1.56: +2 -2 lines
Log Message:
AnaOps BList must not be a python list...

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from xml.dom import minidom
17 import os, sys
18 import commands
19 import traceback
20 from Downloader import Downloader
21
22 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 else:
25 from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
26
27 class ServerCommunicator:
28 """
29 Common interface for the interaction between the Crab client and the server Web Service
30 """
31 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
32 """
33 Open the communication with an Analysis Server by passing the server URL and the port
34 """
35
36 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServerForUsers#Server_available_for_users'
37
38 self.asSession = C_AS_Session(serverName, serverPort)
39 self.cfg_params = cfg_params
40 self.userSubj = ''
41 self.serverName = serverName
42 credentialType = 'Proxy'
43 if common.scheduler.name().upper() in ['CAF','LSF']:
44 credentialType = 'Token'
45 CliServerParams(self)
46 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
47
48 configAPI = {'credential' : credentialType, \
49 'logger' : common.logger() }
50
51 CredAPI = CredentialAPI( configAPI )
52 try:
53 self.userSubj = CredAPI.getSubject()
54 except Exception, err:
55 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
56 raise CrabException("Error Getting Credential Subject")
57
58 self.scram=Scram.Scram(cfg_params)
59 ###################################################
60 # Interactions with the server
61 ###################################################
62
63 # wmbs
64 def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
65 """
66 _submitNewTask_
67 Send a new task to the server to be submitted.
68
69 Accepts in input:
70 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
71 - the range of the submission as specified by the user at the command line
72 """
73
74
75 if not blXml:
76 raise CrabException('Error while serializing task object to XML')
77 return -2
78
79 if not blTaskName:
80 raise CrabException('Error while extracting the Task Unique Name string')
81 return -3
82
83 cmdXML = None
84 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
85
86 if not cmdXML:
87 raise CrabException('Error while creating the Command XML')
88 return -4
89
90 ret = -1
91 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
92
93 if ret == 0:
94 # success
95 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
96 common.logger.info(logMsg+'\n')
97 else:
98 self.checkServerResponse(ret)
99
100 return ret
101
102 def checkServerResponse(self, ret):
103 """
104 analyze the server return codes
105 """
106
107 logMsg = ''
108 if ret == 10:
109 # overlaod
110 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
111 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
112 elif ret == 14:
113 # Draining
114 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
115 logMsg += '\t remaining jobs due to scheduled maintainence\n'
116 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
117 elif ret == 101:
118 # overlaod
119 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
120 elif ret == 11:
121 # failed to push message in DB
122 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
123 elif ret == 12:
124 # failed SOAP communication
125 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
126 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
127 elif ret == 20:
128 # failed to push message in PA
129 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
130 elif ret == 22:
131 # failed SOAP communication
132 logMsg = 'Error during SOAP communication with server %s'%self.serverName
133 elif ret == 33:
134 # uncompatible client version
135 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
136 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
137 else:
138 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
139
140 # print loggings
141 if logMsg != '':
142 # reset server choice
143 opsToBeSaved={'serverName' : '' }
144 common._db.updateTask_(opsToBeSaved)
145 common.logger.info(logMsg)
146 return ret
147
148 def subsequentJobSubmit(self, blTaskName, rng):
149 """
150 _subsequentJobSubmit_
151 Let the submission of other jobs of a task that has been already sent to a server.
152 This method is used for subsequent submission of ranged sets of jobs.
153
154 Accepts in input:
155 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
156 - the range of the submission as specified by the user at the command line
157 """
158 return self._genericCommand('submit', blTaskName, rng)
159
160 def killJobs(self, blTaskName, rng):
161 """
162 _killJobs_
163 Send a kill command to one or more jobs running on the server.
164
165 Accepts in input:
166 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
167 - the range of the submission as specified by the user at the command line
168 """
169 return self._genericCommand('kill', blTaskName, rng)
170
171 def StopWorkflow(self, blTaskName):
172 """
173 _StopWorkflow_
174 """
175 return self._genericCommand('StopWorkflow', blTaskName, 'all')
176
177 def cleanTask(self, blTaskName):
178 """
179 _cleanTask_
180 Force the server to clean the jobs on the server.
181
182 Accepts in input:
183 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
184 - the range of the submission as specified by the user at the command line
185 """
186 return self._genericCommand('clean', blTaskName, 'all')
187
188 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
189 """
190 _getStatus_
191 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
192 """
193
194 # fill the filename
195 filename = str(statusFile)
196
197 if not blTaskName:
198 raise CrabException('Exception while getting the task unique name')
199 return ''
200
201 # get the data and fill the file content
202 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
203 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
204 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
205 return
206
207 if statusFile is not None:
208 f = open(statusFile, 'w')
209 f.write(statusMsg)
210 f.close()
211 return statusFile
212 return statusMsg
213
214 def outputRetrieved(self, blTaskName, rng):
215 """
216 _getJobsOutput_
217 Get from the server the output file locations to be transfered back.
218
219 Accepts in input:
220 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
221 - the range of the submission as specified by the user at the command line
222 """
223 return self._genericCommand('outputRetrieved', blTaskName, rng)
224
225 def checkDrainMode(self, blTaskName='null'):
226 return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
227
228 def postMortemInfos(self, blTaskName, rng):
229 """
230 _postMortemInfos_
231 Retrieve the job postmortem information from the server.
232
233 Accepts in input:
234 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
235 - the range of the submission as specified by the user at the command line
236 """
237 # get the status in
238 raise NotImplementedError
239 return None
240
241 ###################################################
242 # Auxiliary methods
243 ###################################################
244
245 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
246 xmlString = ''
247 cfile = minidom.Document()
248
249 ver = common.prog_version_str
250 node = cfile.createElement("TaskCommand")
251 node.setAttribute("Task", str(taskUName) )
252 node.setAttribute("Subject", str(self.userSubj) )
253 node.setAttribute("Command", str(cmdSpec) )
254 node.setAttribute("Range", str(rng) )
255 node.setAttribute("TotJob", str(jobs) )
256 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
257 node.setAttribute("Flavour", str(flavour) )
258 node.setAttribute("Type", str(type) )
259 node.setAttribute("ClientVersion", str(ver) )
260
261 ## Only Temporary. it should be at Server level
262 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
263 blackAnaOps = None
264 if int(removeBList) == 0:
265 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
266 result = blacklist.config("site_black_list.conf")
267 if result != None:
268 blackAnaOps = result
269 common.logger.debug("Enforced black list: %s "%str(blacklist))
270 else:
271 common.logger.info("WARNING: Skipping default black list!")
272
273 # create a mini-cfg to be transfered to the server
274 miniCfg = {}
275
276 ## migrate CE/SE infos
277 miniCfg['EDG.ce_white_list'] = ""
278 if 'GRID.ce_white_list' in self.cfg_params:
279 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
280
281 miniCfg['EDG.ce_black_list'] = blackAnaOps
282 if 'GRID.ce_black_list' in self.cfg_params:
283 if blackAnaOps:
284 miniCfg['EDG.ce_black_list'] += ", "
285 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
286
287 miniCfg['EDG.se_white_list'] = ""
288 if 'GRID.se_white_list' in self.cfg_params:
289 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
290
291 miniCfg['EDG.se_black_list'] = ""
292 if 'GRID.se_black_list' in self.cfg_params:
293 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
294
295 miniCfg['EDG.group'] = ""
296 if 'GRID.group' in self.cfg_params:
297 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
298
299 miniCfg['EDG.role'] = ""
300 if 'GRID.role' in self.cfg_params:
301 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
302
303 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
304 if 'cfgFileNameCkSum' in self.cfg_params:
305 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
306
307 miniCfg['CRAB.se_remote_dir'] = ''
308 if 'CRAB.se_remote_dir' in self.cfg_params:
309 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
310
311 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
312 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
313 ## JDL requirements specific data. Scheduler dependant
314 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
315 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
316 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
317 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
318 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
319 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
320
321 ## Additional field for DashBoard
322 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
323
324 ## Additional fields for Notification by the server
325 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
326 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
327
328 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
329 #WMBS
330 miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
331
332 miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
333 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', '0')
334
335 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
336 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
337 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1)
338
339 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
340
341 ## put here other fields if needed
342 node.setAttribute("CfgParamDict", str(miniCfg) )
343 cfile.appendChild(node)
344 xmlString += str(cfile.toprettyxml())
345 return xmlString
346
347 def _genericCommand(self, cmd, blTaskName, rng):
348 if not blTaskName:
349 raise CrabException('Error while extracting the Task Unique Name string')
350 return -2
351
352 cmdXML = None
353 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
354 if not cmdXML:
355 raise CrabException('Error while creating the Command XML')
356 return -3
357
358 ret = -1
359 ret = self.asSession.sendCommand(cmdXML, blTaskName)
360 logMsg = ''
361 debugMsg = ''
362 if ret == 0:
363 # success
364 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
365 else:
366 self.checkServerResponse(ret)
367 return ret
368