ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.74
Committed: Mon Mar 28 10:02:26 2011 UTC (14 years, 1 month ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_8_pre2, CRAB_2_7_8_dash3
Changes since 1.73: +1 -6 lines
Log Message:
fix for #76950 does not apply. Rolling  it back

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from xml.dom import minidom
17 import os, sys
18 import commands
19 import traceback
20 from Downloader import Downloader
21
22 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 elif os.environ["SCRAM_ARCH"].split("_")[1].find('32')>1:
25 try:
26 from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
27 except :
28 try:
29 from CRAB_Server_API_36X import CRAB_Server_Session as C_AS_Session
30 except :
31 from CRAB_Server_API_311X import CRAB_Server_Session as C_AS_Session
32 else:
33 try:
34 from lib64.CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
35 except:
36 try:
37 from lib64.CRAB_Server_API_36X_amd64 import CRAB_Server_Session as C_AS_Session
38 except:
39 from lib64.CRAB_Server_API_311X_amd64 import CRAB_Server_Session as C_AS_Session
40
41 class ServerCommunicator:
42 """
43 Common interface for the interaction between the Crab client and the server Web Service
44 """
45 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
46 """
47 Open the communication with an Analysis Server by passing the server URL and the port
48 """
49
50 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
51
52 self.asSession = C_AS_Session(serverName, serverPort)
53 self.cfg_params = cfg_params
54 self.userSubj = ''
55 self.serverName = serverName
56 credentialType = 'Proxy'
57 if common.scheduler.name().upper() in ['CAF','LSF']:
58 credentialType = 'Token'
59 CliServerParams(self)
60 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
61
62 configAPI = {'credential' : credentialType, \
63 'logger' : common.logger() }
64
65 CredAPI = CredentialAPI( configAPI )
66 try:
67 self.userSubj = CredAPI.getSubject()
68 except Exception, err:
69 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
70 raise CrabException("Error Getting Credential Subject")
71
72 self.scram=Scram.Scram(cfg_params)
73 ###################################################
74 # Interactions with the server
75 ###################################################
76
77 # wmbs
78 def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
79 """
80 _submitNewTask_
81 Send a new task to the server to be submitted.
82
83 Accepts in input:
84 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
85 - the range of the submission as specified by the user at the command line
86 """
87
88
89 if not blXml:
90 raise CrabException('Error while serializing task object to XML')
91 return -2
92
93 if not blTaskName:
94 raise CrabException('Error while extracting the Task Unique Name string')
95 return -3
96
97 cmdXML = None
98 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
99
100 if not cmdXML:
101 raise CrabException('Error while creating the Command XML')
102 return -4
103
104 ret = -1
105 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
106
107 if ret == 0:
108 # success
109 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
110 common.logger.info(logMsg+'\n')
111 else:
112 self.checkServerResponse(ret)
113
114 return ret
115
116 def checkServerResponse(self, ret):
117 """
118 analyze the server return codes
119 """
120
121 logMsg = ''
122 if ret == 10:
123 # overlaod
124 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
125 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
126 elif ret == 14:
127 # Draining
128 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
129 logMsg += '\t remaining jobs due to scheduled maintainence\n'
130 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
131 elif ret == 101:
132 # overlaod
133 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
134 elif ret == 11:
135 # failed to push message in DB
136 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
137 elif ret == 12:
138 # failed SOAP communication
139 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
140 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
141 elif ret == 20:
142 # failed to push message in PA
143 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
144 elif ret == 22:
145 # failed SOAP communication
146 logMsg = 'Error during SOAP communication with server %s'%self.serverName
147 elif ret == 33:
148 # uncompatible client version
149 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
150 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
151 else:
152 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
153
154 # print loggings
155 if logMsg != '':
156 # reset server choice
157 opsToBeSaved={'serverName' : '' }
158 common._db.updateTask_(opsToBeSaved)
159 common.logger.info(logMsg)
160 return ret
161
162 def subsequentJobSubmit(self, blTaskName, rng):
163 """
164 _subsequentJobSubmit_
165 Let the submission of other jobs of a task that has been already sent to a server.
166 This method is used for subsequent submission of ranged sets of jobs.
167
168 Accepts in input:
169 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
170 - the range of the submission as specified by the user at the command line
171 """
172 return self._genericCommand('submit', blTaskName, rng)
173
174 def killJobs(self, blTaskName, rng):
175 """
176 _killJobs_
177 Send a kill command to one or more jobs running on the server.
178
179 Accepts in input:
180 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
181 - the range of the submission as specified by the user at the command line
182 """
183 return self._genericCommand('kill', blTaskName, rng)
184
185 def StopWorkflow(self, blTaskName):
186 """
187 _StopWorkflow_
188 """
189 return self._genericCommand('StopWorkflow', blTaskName, 'all')
190
191 def cleanTask(self, blTaskName):
192 """
193 _cleanTask_
194 Force the server to clean the jobs on the server.
195
196 Accepts in input:
197 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
198 - the range of the submission as specified by the user at the command line
199 """
200 return self._genericCommand('clean', blTaskName, 'all')
201
202 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
203 """
204 _getStatus_
205 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
206 """
207
208 # fill the filename
209 filename = str(statusFile)
210
211 if not blTaskName:
212 raise CrabException('Exception while getting the task unique name')
213 return ''
214
215 # get the data and fill the file content
216 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
217 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
218 if statusFamilyType != 'isServerDrained':
219 raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
220 else:
221 statusFile = None
222 statusMMsg = ""
223
224 if statusFile is not None:
225 f = open(statusFile, 'w')
226 f.write(statusMsg)
227 f.close()
228 return statusFile
229 return statusMsg
230
231 def outputRetrieved(self, blTaskName, rng):
232 """
233 _getJobsOutput_
234 Get from the server the output file locations to be transfered back.
235
236 Accepts in input:
237 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
238 - the range of the submission as specified by the user at the command line
239 """
240 return self._genericCommand('outputRetrieved', blTaskName, rng)
241
242 def checkDrainMode(self, blTaskName='null'):
243 return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
244
245 def postMortemInfos(self, blTaskName, rng):
246 """
247 _postMortemInfos_
248 Retrieve the job postmortem information from the server.
249
250 Accepts in input:
251 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
252 - the range of the submission as specified by the user at the command line
253 """
254 # get the status in
255 raise NotImplementedError
256 return None
257
258 ###################################################
259 # Auxiliary methods
260 ###################################################
261
262 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
263
264 xmlString = ''
265 cfile = minidom.Document()
266
267 ver = common.prog_version_str
268 node = cfile.createElement("TaskCommand")
269 node.setAttribute("Task", str(taskUName) )
270 node.setAttribute("Subject", str(self.userSubj) )
271 node.setAttribute("Command", str(cmdSpec) )
272 node.setAttribute("Range", str(rng) )
273 node.setAttribute("TotJob", str(jobs) )
274 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
275 node.setAttribute("Flavour", str(flavour) )
276 node.setAttribute("Type", str(type) )
277 node.setAttribute("ClientVersion", str(ver) )
278
279 ## Only Temporary. it should be at Server level
280 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
281 blackAnaOps = None
282 if int(removeBList) == 0:
283 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
284 result = blacklist.config("site_black_list.conf")
285 if result != None:
286 blackAnaOps = result
287 common.logger.debug("Enforced black list: %s "%str(blacklist))
288 else:
289 common.logger.info("WARNING: Skipping default black list!")
290
291 try:
292 myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
293 myproxyserver = myproxyserver.strip()
294 if myproxyserver is None:
295 raise CrabException("myproxy_server.conf retrieved but empty")
296 except Exception, e:
297 common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
298 common.logger.debug(e)
299 myproxyserver = 'myproxy.cern.ch'
300
301 # create a mini-cfg to be transfered to the server
302 miniCfg = {}
303
304 ## migrate CE/SE infos
305 miniCfg['EDG.ce_white_list'] = ""
306 if 'GRID.ce_white_list' in self.cfg_params:
307 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
308
309 miniCfg['EDG.ce_black_list'] = blackAnaOps
310 if 'GRID.ce_black_list' in self.cfg_params:
311 if blackAnaOps:
312 miniCfg['EDG.ce_black_list'] += ", "
313 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
314
315 miniCfg['EDG.se_white_list'] = ""
316 if 'GRID.se_white_list' in self.cfg_params:
317 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
318
319 miniCfg['EDG.se_black_list'] = ""
320 if 'GRID.se_black_list' in self.cfg_params:
321 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
322
323 miniCfg['EDG.group'] = ""
324 if 'GRID.group' in self.cfg_params:
325 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
326
327 miniCfg['EDG.role'] = ""
328 if 'GRID.role' in self.cfg_params:
329 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
330
331 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
332 if 'cfgFileNameCkSum' in self.cfg_params:
333 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
334
335 miniCfg['CRAB.se_remote_dir'] = ''
336 if 'CRAB.se_remote_dir' in self.cfg_params:
337 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
338
339 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
340 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
341 miniCfg['CAF.group'] = self.cfg_params.get('CAF.group', None)
342 ## JDL requirements specific data. Scheduler dependant
343 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
344 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
345 miniCfg['proxyServer'] = myproxyserver
346 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
347 miniCfg['EDG_retry_count'] = 0
348 miniCfg['EDG_shallow_retry_count'] = -1
349
350 ## Additional field for DashBoard
351 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
352
353 ## Additional fields for Notification by the server
354 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
355 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
356
357 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
358 #WMBS
359 miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
360 miniCfg['parent_task'] = self.cfg_params.get('WMBS.parent_task', '')
361
362 miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
363 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', 'None')
364
365 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
366 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
367 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1000000)
368
369 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
370
371 ## put here other fields if needed
372 node.setAttribute("CfgParamDict", str(miniCfg) )
373 cfile.appendChild(node)
374 xmlString += str(cfile.toprettyxml())
375 return xmlString
376
377 def _genericCommand(self, cmd, blTaskName, rng):
378 if not blTaskName:
379 raise CrabException('Error while extracting the Task Unique Name string')
380 return -2
381
382 cmdXML = None
383 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
384 if not cmdXML:
385 raise CrabException('Error while creating the Command XML')
386 return -3
387
388 ret = -1
389 ret = self.asSession.sendCommand(cmdXML, blTaskName)
390 logMsg = ''
391 debugMsg = ''
392 if ret == 0:
393 # success
394 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
395 else:
396 self.checkServerResponse(ret)
397 return ret
398