ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.80
Committed: Wed Jul 11 15:13:22 2012 UTC (12 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, HEAD
Changes since 1.79: +5 -2 lines
Log Message:
CMSSW6x compatibility

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from xml.dom import minidom
17 import os, sys
18 import commands
19 import traceback
20 from Downloader import Downloader
21
22 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 elif os.environ["SCRAM_ARCH"].split("_")[1].find('32')>1:
25 try:
26 from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
27 except :
28 try:
29 from CRAB_Server_API_36X import CRAB_Server_Session as C_AS_Session
30 except :
31 from CRAB_Server_API_311X import CRAB_Server_Session as C_AS_Session
32 else:
33 try:
34 from lib64.CRAB_Server_API_311X_amd64 import CRAB_Server_Session as C_AS_Session
35 except:
36 try:
37 from lib64.CRAB_Server_API_5X_amd64 import CRAB_Server_Session as C_AS_Session
38 except:
39 from lib64.CRAB_Server_API_6X_amd64 import CRAB_Server_Session as C_AS_Session
40
41 class ServerCommunicator:
42 """
43 Common interface for the interaction between the Crab client and the server Web Service
44 """
45 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
46 """
47 Open the communication with an Analysis Server by passing the server URL and the port
48 """
49
50 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
51
52 self.asSession = C_AS_Session(serverName, serverPort)
53 self.cfg_params = cfg_params
54 self.userSubj = ''
55 self.serverName = serverName
56 credentialType = 'Proxy'
57 if common.scheduler.name().upper() in ['CAF','LSF']:
58 credentialType = 'Token'
59 CliServerParams(self)
60 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
61
62 configAPI = {'credential' : credentialType, \
63 'logger' : common.logger() }
64
65 CredAPI = CredentialAPI( configAPI )
66 try:
67 self.userSubj = CredAPI.getSubject()
68 except Exception, err:
69 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
70 raise CrabException("Error Getting Credential Subject")
71
72 self.scram=Scram.Scram(cfg_params)
73 ###################################################
74 # Interactions with the server
75 ###################################################
76
77 # wmbs
78 def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
79 """
80 _submitNewTask_
81 Send a new task to the server to be submitted.
82
83 Accepts in input:
84 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
85 - the range of the submission as specified by the user at the command line
86 """
87
88
89 if not blXml:
90 raise CrabException('Error while serializing task object to XML')
91 return -2
92
93 if not blTaskName:
94 raise CrabException('Error while extracting the Task Unique Name string')
95 return -3
96
97 cmdXML = None
98 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
99
100 if not cmdXML:
101 raise CrabException('Error while creating the Command XML')
102 return -4
103
104 ret = -1
105 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
106
107 if ret == 0:
108 # success
109 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
110 common.logger.info(logMsg+'\n')
111 else:
112 self.checkServerResponse(ret)
113 # reset server choice
114 opsToBeSaved={'serverName' : '' }
115 common._db.updateTask_(opsToBeSaved)
116
117 return ret
118
119 def checkServerResponse(self, ret):
120 """
121 analyze the server return codes
122 """
123
124 logMsg = ''
125 if ret == 10:
126 # overlaod
127 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
128 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
129 elif ret == 14:
130 # Draining
131 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
132 logMsg += '\t remaining jobs due to scheduled maintainence\n'
133 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
134 elif ret == 101:
135 # overlaod
136 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
137 elif ret == 11:
138 # failed to push message in DB
139 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
140 elif ret == 12:
141 # failed SOAP communication
142 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
143 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
144 elif ret == 20:
145 # failed to push message in PA
146 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
147 elif ret == 22:
148 # failed SOAP communication
149 logMsg = 'Error during SOAP communication with server %s'%self.serverName
150 elif ret == 33:
151 # uncompatible client version
152 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
153 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
154 else:
155 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
156
157 # print loggings
158 if logMsg != '':
159 common.logger.info(logMsg)
160 return ret
161
162 def subsequentJobSubmit(self, blTaskName, rng):
163 """
164 _subsequentJobSubmit_
165 Let the submission of other jobs of a task that has been already sent to a server.
166 This method is used for subsequent submission of ranged sets of jobs.
167
168 Accepts in input:
169 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
170 - the range of the submission as specified by the user at the command line
171 """
172
173 return self._genericCommand('submit', blTaskName, rng)
174
175 def killJobs(self, blTaskName, rng):
176 """
177 _killJobs_
178 Send a kill command to one or more jobs running on the server.
179
180 Accepts in input:
181 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
182 - the range of the submission as specified by the user at the command line
183 """
184 return self._genericCommand('kill', blTaskName, rng)
185
186 def StopWorkflow(self, blTaskName):
187 """
188 _StopWorkflow_
189 """
190 return self._genericCommand('StopWorkflow', blTaskName, 'all')
191
192 def cleanTask(self, blTaskName):
193 """
194 _cleanTask_
195 Force the server to clean the jobs on the server.
196
197 Accepts in input:
198 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
199 - the range of the submission as specified by the user at the command line
200 """
201 return self._genericCommand('clean', blTaskName, 'all')
202
203 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
204 """
205 _getStatus_
206 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
207 """
208
209 # fill the filename
210 filename = str(statusFile)
211
212 if not blTaskName:
213 raise CrabException('Exception while getting the task unique name')
214 return ''
215
216 # get the data and fill the file content
217 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
218 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
219 if statusFamilyType != 'isServerDrained':
220 raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
221 else:
222 statusFile = None
223 statusMMsg = ""
224
225 if statusFile is not None:
226 f = open(statusFile, 'w')
227 f.write(statusMsg)
228 f.close()
229 return statusFile
230 return statusMsg
231
232 def outputRetrieved(self, blTaskName, rng):
233 """
234 _getJobsOutput_
235 Get from the server the output file locations to be transfered back.
236
237 Accepts in input:
238 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
239 - the range of the submission as specified by the user at the command line
240 """
241 return self._genericCommand('outputRetrieved', blTaskName, rng)
242
243 def checkDrainMode(self, blTaskName='null'):
244 return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
245
246 def postMortemInfos(self, blTaskName, rng):
247 """
248 _postMortemInfos_
249 Retrieve the job postmortem information from the server.
250
251 Accepts in input:
252 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
253 - the range of the submission as specified by the user at the command line
254 """
255 # get the status in
256 raise NotImplementedError
257 return None
258
259 ###################################################
260 # Auxiliary methods
261 ###################################################
262
263 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
264
265 xmlString = ''
266 cfile = minidom.Document()
267
268 ver = common.prog_version_str
269 node = cfile.createElement("TaskCommand")
270 node.setAttribute("Task", str(taskUName) )
271 node.setAttribute("Subject", str(self.userSubj) )
272 node.setAttribute("Command", str(cmdSpec) )
273 node.setAttribute("Range", str(rng) )
274 node.setAttribute("TotJob", str(jobs) )
275 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
276 node.setAttribute("Flavour", str(flavour) )
277 node.setAttribute("Type", str(type) )
278 node.setAttribute("ClientVersion", str(ver) )
279
280 ## Only Temporary. it should be at Server level
281 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
282 blackAnaOps = "None.Nowhere"
283 if int(removeBList) == 0:
284 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
285 result = blacklist.config("site_black_list.conf")
286 if result != None:
287 blackAnaOps = result
288 common.logger.debug("Enforced black list: %s "%str(blacklist))
289 else:
290 common.logger.info("WARNING: Skipping default black list!")
291
292 try:
293 myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
294 myproxyserver = myproxyserver.strip()
295 if myproxyserver is None:
296 raise CrabException("myproxy_server.conf retrieved but empty")
297 except Exception, e:
298 common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
299 common.logger.debug(e)
300 myproxyserver = 'myproxy.cern.ch'
301
302 # create a mini-cfg to be transfered to the server
303 miniCfg = {}
304
305 ## migrate CE/SE infos
306 miniCfg['EDG.ce_white_list'] = ""
307 if 'GRID.ce_white_list' in self.cfg_params:
308 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
309
310 miniCfg['EDG.ce_black_list'] = blackAnaOps
311 if 'GRID.ce_black_list' in self.cfg_params:
312 if blackAnaOps:
313 miniCfg['EDG.ce_black_list'] += ", "
314 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
315
316 miniCfg['EDG.se_white_list'] = ""
317 if 'GRID.se_white_list' in self.cfg_params:
318 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
319
320 miniCfg['EDG.se_black_list'] = ""
321 if 'GRID.se_black_list' in self.cfg_params:
322 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
323
324 miniCfg['EDG.group'] = ""
325 if 'GRID.group' in self.cfg_params:
326 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
327
328 miniCfg['EDG.role'] = ""
329 if 'GRID.role' in self.cfg_params:
330 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
331
332 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
333 if 'cfgFileNameCkSum' in self.cfg_params:
334 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
335
336 miniCfg['CRAB.se_remote_dir'] = ''
337 if 'CRAB.se_remote_dir' in self.cfg_params:
338 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
339
340 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
341 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
342 miniCfg['CAF.group'] = self.cfg_params.get('CAF.group', None)
343 ## JDL requirements specific data. Scheduler dependant
344 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
345 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
346 miniCfg['proxyServer'] = myproxyserver
347 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
348 miniCfg['EDG_retry_count'] = 0
349 miniCfg['EDG_shallow_retry_count'] = -1
350
351 ## Additional field for DashBoard
352 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
353
354 ###################################################################################
355 ### FEDE for tasktype, savannah 76950
356 miniCfg['USER.tasktype'] = self.cfg_params.get("USER.tasktype", 'analysis')
357 ###################################################################################
358 ## Additional fields for Notification by the server
359 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
360 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
361
362 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
363 #WMBS
364 miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
365 miniCfg['parent_task'] = self.cfg_params.get('WMBS.parent_task', '')
366
367 miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
368 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', 'None')
369
370 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
371 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
372 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1000000)
373
374 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
375
376 ## put here other fields if needed
377 node.setAttribute("CfgParamDict", str(miniCfg) )
378 cfile.appendChild(node)
379 xmlString += str(cfile.toprettyxml())
380 return xmlString
381
382 def _genericCommand(self, cmd, blTaskName, rng):
383 if not blTaskName:
384 raise CrabException('Error while extracting the Task Unique Name string')
385 return -2
386
387 cmdXML = None
388 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
389 if not cmdXML:
390 raise CrabException('Error while creating the Command XML')
391 return -3
392
393 ret = -1
394 ret = self.asSession.sendCommand(cmdXML, blTaskName)
395 logMsg = ''
396 debugMsg = ''
397 if ret == 0:
398 # success
399 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
400 else:
401 self.checkServerResponse(ret)
402 return ret
403