1 |
#!/usr/bin/env python
|
2 |
"""
|
3 |
_ServerCommunicator_
|
4 |
|
5 |
"""
|
6 |
|
7 |
__version__ = "$Id"
|
8 |
__revision__ = "$Revision"
|
9 |
__author__ = "farinafa@cern.ch"
|
10 |
|
11 |
from crab_exceptions import *
|
12 |
from crab_util import *
|
13 |
import common
|
14 |
import Scram
|
15 |
from ProdCommon.Credential.CredentialAPI import CredentialAPI
|
16 |
from xml.dom import minidom
|
17 |
import os, sys
|
18 |
import commands
|
19 |
import traceback
|
20 |
from Downloader import Downloader
|
21 |
|
22 |
if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
|
23 |
from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
|
24 |
elif os.environ["SCRAM_ARCH"].split("_")[1].find('32')>1:
|
25 |
try:
|
26 |
from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
|
27 |
except :
|
28 |
try:
|
29 |
from CRAB_Server_API_36X import CRAB_Server_Session as C_AS_Session
|
30 |
except :
|
31 |
from CRAB_Server_API_311X import CRAB_Server_Session as C_AS_Session
|
32 |
else:
|
33 |
try:
|
34 |
from lib64.CRAB_Server_API_311X_amd64 import CRAB_Server_Session as C_AS_Session
|
35 |
except:
|
36 |
try:
|
37 |
from lib64.CRAB_Server_API_5X_amd64 import CRAB_Server_Session as C_AS_Session
|
38 |
except:
|
39 |
from lib64.CRAB_Server_API_6X_amd64 import CRAB_Server_Session as C_AS_Session
|
40 |
|
41 |
class ServerCommunicator:
|
42 |
"""
|
43 |
Common interface for the interaction between the Crab client and the server Web Service
|
44 |
"""
|
45 |
def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
|
46 |
"""
|
47 |
Open the communication with an Analysis Server by passing the server URL and the port
|
48 |
"""
|
49 |
|
50 |
self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
|
51 |
|
52 |
self.asSession = C_AS_Session(serverName, serverPort)
|
53 |
self.cfg_params = cfg_params
|
54 |
self.userSubj = ''
|
55 |
self.serverName = serverName
|
56 |
credentialType = 'Proxy'
|
57 |
if common.scheduler.name().upper() in ['CAF','LSF']:
|
58 |
credentialType = 'Token'
|
59 |
CliServerParams(self)
|
60 |
self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
|
61 |
|
62 |
configAPI = {'credential' : credentialType, \
|
63 |
'logger' : common.logger() }
|
64 |
|
65 |
CredAPI = CredentialAPI( configAPI )
|
66 |
try:
|
67 |
self.userSubj = CredAPI.getSubject()
|
68 |
except Exception, err:
|
69 |
common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
|
70 |
raise CrabException("Error Getting Credential Subject")
|
71 |
|
72 |
self.scram=Scram.Scram(cfg_params)
|
73 |
###################################################
|
74 |
# Interactions with the server
|
75 |
###################################################
|
76 |
|
77 |
# wmbs
|
78 |
def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
|
79 |
"""
|
80 |
_submitNewTask_
|
81 |
Send a new task to the server to be submitted.
|
82 |
|
83 |
Accepts in input:
|
84 |
- the bossLite object representing the task (jobs are assumed to be RunningJobs)
|
85 |
- the range of the submission as specified by the user at the command line
|
86 |
"""
|
87 |
|
88 |
|
89 |
if not blXml:
|
90 |
raise CrabException('Error while serializing task object to XML')
|
91 |
return -2
|
92 |
|
93 |
if not blTaskName:
|
94 |
raise CrabException('Error while extracting the Task Unique Name string')
|
95 |
return -3
|
96 |
|
97 |
cmdXML = None
|
98 |
cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
|
99 |
|
100 |
if not cmdXML:
|
101 |
raise CrabException('Error while creating the Command XML')
|
102 |
return -4
|
103 |
|
104 |
ret = -1
|
105 |
ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
|
106 |
|
107 |
if ret == 0:
|
108 |
# success
|
109 |
logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
|
110 |
common.logger.info(logMsg+'\n')
|
111 |
else:
|
112 |
self.checkServerResponse(ret)
|
113 |
# reset server choice
|
114 |
opsToBeSaved={'serverName' : '' }
|
115 |
common._db.updateTask_(opsToBeSaved)
|
116 |
|
117 |
return ret
|
118 |
|
119 |
def checkServerResponse(self, ret):
|
120 |
"""
|
121 |
analyze the server return codes
|
122 |
"""
|
123 |
|
124 |
logMsg = ''
|
125 |
if ret == 10:
|
126 |
# overlaod
|
127 |
logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
|
128 |
logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
|
129 |
elif ret == 14:
|
130 |
# Draining
|
131 |
logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
|
132 |
logMsg += '\t remaining jobs due to scheduled maintainence\n'
|
133 |
logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
|
134 |
elif ret == 101:
|
135 |
# overlaod
|
136 |
logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
|
137 |
elif ret == 11:
|
138 |
# failed to push message in DB
|
139 |
logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
|
140 |
elif ret == 12:
|
141 |
# failed SOAP communication
|
142 |
logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
|
143 |
logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
|
144 |
elif ret == 20:
|
145 |
# failed to push message in PA
|
146 |
logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
|
147 |
elif ret == 22:
|
148 |
# failed SOAP communication
|
149 |
logMsg = 'Error during SOAP communication with server %s'%self.serverName
|
150 |
elif ret == 33:
|
151 |
# uncompatible client version
|
152 |
logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
|
153 |
logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
|
154 |
else:
|
155 |
logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
|
156 |
|
157 |
# print loggings
|
158 |
if logMsg != '':
|
159 |
common.logger.info(logMsg)
|
160 |
return ret
|
161 |
|
162 |
def subsequentJobSubmit(self, blTaskName, rng):
|
163 |
"""
|
164 |
_subsequentJobSubmit_
|
165 |
Let the submission of other jobs of a task that has been already sent to a server.
|
166 |
This method is used for subsequent submission of ranged sets of jobs.
|
167 |
|
168 |
Accepts in input:
|
169 |
- the bossLite object representing the task (jobs are assumed to be RunningJobs)
|
170 |
- the range of the submission as specified by the user at the command line
|
171 |
"""
|
172 |
|
173 |
return self._genericCommand('submit', blTaskName, rng)
|
174 |
|
175 |
def killJobs(self, blTaskName, rng):
|
176 |
"""
|
177 |
_killJobs_
|
178 |
Send a kill command to one or more jobs running on the server.
|
179 |
|
180 |
Accepts in input:
|
181 |
- the bossLite object representing the task (jobs are assumed to be RunningJobs)
|
182 |
- the range of the submission as specified by the user at the command line
|
183 |
"""
|
184 |
return self._genericCommand('kill', blTaskName, rng)
|
185 |
|
186 |
def StopWorkflow(self, blTaskName):
|
187 |
"""
|
188 |
_StopWorkflow_
|
189 |
"""
|
190 |
return self._genericCommand('StopWorkflow', blTaskName, 'all')
|
191 |
|
192 |
def cleanTask(self, blTaskName):
|
193 |
"""
|
194 |
_cleanTask_
|
195 |
Force the server to clean the jobs on the server.
|
196 |
|
197 |
Accepts in input:
|
198 |
- the bossLite object representing the task (jobs are assumed to be RunningJobs)
|
199 |
- the range of the submission as specified by the user at the command line
|
200 |
"""
|
201 |
return self._genericCommand('clean', blTaskName, 'all')
|
202 |
|
203 |
def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
|
204 |
"""
|
205 |
_getStatus_
|
206 |
Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
|
207 |
"""
|
208 |
|
209 |
# fill the filename
|
210 |
filename = str(statusFile)
|
211 |
|
212 |
if not blTaskName:
|
213 |
raise CrabException('Exception while getting the task unique name')
|
214 |
return ''
|
215 |
|
216 |
# get the data and fill the file content
|
217 |
statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
|
218 |
if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
|
219 |
if statusFamilyType != 'isServerDrained':
|
220 |
raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
|
221 |
else:
|
222 |
statusFile = None
|
223 |
statusMMsg = ""
|
224 |
|
225 |
if statusFile is not None:
|
226 |
f = open(statusFile, 'w')
|
227 |
f.write(statusMsg)
|
228 |
f.close()
|
229 |
return statusFile
|
230 |
return statusMsg
|
231 |
|
232 |
def outputRetrieved(self, blTaskName, rng):
|
233 |
"""
|
234 |
_getJobsOutput_
|
235 |
Get from the server the output file locations to be transfered back.
|
236 |
|
237 |
Accepts in input:
|
238 |
- the bossLite object representing the task (jobs are assumed to be RunningJobs)
|
239 |
- the range of the submission as specified by the user at the command line
|
240 |
"""
|
241 |
return self._genericCommand('outputRetrieved', blTaskName, rng)
|
242 |
|
243 |
def checkDrainMode(self, blTaskName='null'):
|
244 |
return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
|
245 |
|
246 |
def postMortemInfos(self, blTaskName, rng):
|
247 |
"""
|
248 |
_postMortemInfos_
|
249 |
Retrieve the job postmortem information from the server.
|
250 |
|
251 |
Accepts in input:
|
252 |
- the bossLite object representing the task (jobs are assumed to be RunningJobs)
|
253 |
- the range of the submission as specified by the user at the command line
|
254 |
"""
|
255 |
# get the status in
|
256 |
raise NotImplementedError
|
257 |
return None
|
258 |
|
259 |
###################################################
|
260 |
# Auxiliary methods
|
261 |
###################################################
|
262 |
|
263 |
def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
|
264 |
|
265 |
xmlString = ''
|
266 |
cfile = minidom.Document()
|
267 |
|
268 |
ver = common.prog_version_str
|
269 |
node = cfile.createElement("TaskCommand")
|
270 |
node.setAttribute("Task", str(taskUName) )
|
271 |
node.setAttribute("Subject", str(self.userSubj) )
|
272 |
node.setAttribute("Command", str(cmdSpec) )
|
273 |
node.setAttribute("Range", str(rng) )
|
274 |
node.setAttribute("TotJob", str(jobs) )
|
275 |
node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
|
276 |
node.setAttribute("Flavour", str(flavour) )
|
277 |
node.setAttribute("Type", str(type) )
|
278 |
node.setAttribute("ClientVersion", str(ver) )
|
279 |
|
280 |
## Only Temporary. it should be at Server level
|
281 |
removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
|
282 |
blackAnaOps = "None.Nowhere"
|
283 |
if int(removeBList) == 0:
|
284 |
blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
|
285 |
result = blacklist.config("site_black_list.conf")
|
286 |
if result != None:
|
287 |
blackAnaOps = result
|
288 |
common.logger.debug("Enforced black list: %s "%str(blacklist))
|
289 |
else:
|
290 |
common.logger.info("WARNING: Skipping default black list!")
|
291 |
|
292 |
try:
|
293 |
myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
|
294 |
myproxyserver = myproxyserver.strip()
|
295 |
if myproxyserver is None:
|
296 |
raise CrabException("myproxy_server.conf retrieved but empty")
|
297 |
except Exception, e:
|
298 |
common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
|
299 |
common.logger.debug(e)
|
300 |
myproxyserver = 'myproxy.cern.ch'
|
301 |
|
302 |
# create a mini-cfg to be transfered to the server
|
303 |
miniCfg = {}
|
304 |
|
305 |
## migrate CE/SE infos
|
306 |
miniCfg['EDG.ce_white_list'] = ""
|
307 |
if 'GRID.ce_white_list' in self.cfg_params:
|
308 |
miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
|
309 |
|
310 |
miniCfg['EDG.ce_black_list'] = blackAnaOps
|
311 |
if 'GRID.ce_black_list' in self.cfg_params:
|
312 |
if blackAnaOps:
|
313 |
miniCfg['EDG.ce_black_list'] += ", "
|
314 |
miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
|
315 |
|
316 |
miniCfg['EDG.se_white_list'] = ""
|
317 |
if 'GRID.se_white_list' in self.cfg_params:
|
318 |
miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
|
319 |
|
320 |
miniCfg['EDG.se_black_list'] = ""
|
321 |
if 'GRID.se_black_list' in self.cfg_params:
|
322 |
miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
|
323 |
|
324 |
miniCfg['EDG.group'] = ""
|
325 |
if 'GRID.group' in self.cfg_params:
|
326 |
miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
|
327 |
|
328 |
miniCfg['EDG.role'] = ""
|
329 |
if 'GRID.role' in self.cfg_params:
|
330 |
miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
|
331 |
|
332 |
miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
|
333 |
if 'cfgFileNameCkSum' in self.cfg_params:
|
334 |
miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
|
335 |
|
336 |
miniCfg['CRAB.se_remote_dir'] = ''
|
337 |
if 'CRAB.se_remote_dir' in self.cfg_params:
|
338 |
miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
|
339 |
|
340 |
miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
|
341 |
miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
|
342 |
miniCfg['CAF.group'] = self.cfg_params.get('CAF.group', None)
|
343 |
## JDL requirements specific data. Scheduler dependant
|
344 |
miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
|
345 |
miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
|
346 |
miniCfg['proxyServer'] = myproxyserver
|
347 |
miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
|
348 |
miniCfg['EDG_retry_count'] = 0
|
349 |
miniCfg['EDG_shallow_retry_count'] = -1
|
350 |
|
351 |
## Additional field for DashBoard
|
352 |
miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
|
353 |
|
354 |
###################################################################################
|
355 |
### FEDE for tasktype, savannah 76950
|
356 |
miniCfg['USER.tasktype'] = self.cfg_params.get("USER.tasktype", 'analysis')
|
357 |
###################################################################################
|
358 |
## Additional fields for Notification by the server
|
359 |
miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
|
360 |
miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
|
361 |
|
362 |
miniCfg['CMSSW_version'] = self.scram.getSWVersion()
|
363 |
#WMBS
|
364 |
miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
|
365 |
miniCfg['parent_task'] = self.cfg_params.get('WMBS.parent_task', '')
|
366 |
|
367 |
miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
|
368 |
miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', 'None')
|
369 |
|
370 |
miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
|
371 |
miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
|
372 |
miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1000000)
|
373 |
|
374 |
miniCfg['CMSSW_version'] = self.scram.getSWVersion()
|
375 |
|
376 |
## put here other fields if needed
|
377 |
node.setAttribute("CfgParamDict", str(miniCfg) )
|
378 |
cfile.appendChild(node)
|
379 |
xmlString += str(cfile.toprettyxml())
|
380 |
return xmlString
|
381 |
|
382 |
def _genericCommand(self, cmd, blTaskName, rng):
|
383 |
if not blTaskName:
|
384 |
raise CrabException('Error while extracting the Task Unique Name string')
|
385 |
return -2
|
386 |
|
387 |
cmdXML = None
|
388 |
cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
|
389 |
if not cmdXML:
|
390 |
raise CrabException('Error while creating the Command XML')
|
391 |
return -3
|
392 |
|
393 |
ret = -1
|
394 |
ret = self.asSession.sendCommand(cmdXML, blTaskName)
|
395 |
logMsg = ''
|
396 |
debugMsg = ''
|
397 |
if ret == 0:
|
398 |
# success
|
399 |
debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
|
400 |
else:
|
401 |
self.checkServerResponse(ret)
|
402 |
return ret
|
403 |
|