ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.68
Committed: Mon May 31 20:25:45 2010 UTC (14 years, 11 months ago) by riahi
Content type: text/x-python
Branch: MAIN
Changes since 1.67: +2 -1 lines
Log Message:
Add parent_task parameter in WMBS section. Set the default split_value to the maximum number of files

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from xml.dom import minidom
17 import os, sys
18 import commands
19 import traceback
20 from Downloader import Downloader
21
22 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 elif os.environ["SCRAM_ARCH"].split("_")[1].find('32')>1:
25 try:
26 from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
27 except :
28 from CRAB_Server_API_36X import CRAB_Server_Session as C_AS_Session
29 else:
30 try:
31 from lib64.CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
32 except:
33 from lib64.CRAB_Server_API_36X_amd64 import CRAB_Server_Session as C_AS_Session
34
35 class ServerCommunicator:
36 """
37 Common interface for the interaction between the Crab client and the server Web Service
38 """
39 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
40 """
41 Open the communication with an Analysis Server by passing the server URL and the port
42 """
43
44 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
45
46 self.asSession = C_AS_Session(serverName, serverPort)
47 self.cfg_params = cfg_params
48 self.userSubj = ''
49 self.serverName = serverName
50 credentialType = 'Proxy'
51 if common.scheduler.name().upper() in ['CAF','LSF']:
52 credentialType = 'Token'
53 CliServerParams(self)
54 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
55
56 configAPI = {'credential' : credentialType, \
57 'logger' : common.logger() }
58
59 CredAPI = CredentialAPI( configAPI )
60 try:
61 self.userSubj = CredAPI.getSubject()
62 except Exception, err:
63 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
64 raise CrabException("Error Getting Credential Subject")
65
66 self.scram=Scram.Scram(cfg_params)
67 ###################################################
68 # Interactions with the server
69 ###################################################
70
71 # wmbs
72 def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
73 """
74 _submitNewTask_
75 Send a new task to the server to be submitted.
76
77 Accepts in input:
78 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
79 - the range of the submission as specified by the user at the command line
80 """
81
82
83 if not blXml:
84 raise CrabException('Error while serializing task object to XML')
85 return -2
86
87 if not blTaskName:
88 raise CrabException('Error while extracting the Task Unique Name string')
89 return -3
90
91 cmdXML = None
92 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
93
94 if not cmdXML:
95 raise CrabException('Error while creating the Command XML')
96 return -4
97
98 ret = -1
99 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
100
101 if ret == 0:
102 # success
103 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
104 common.logger.info(logMsg+'\n')
105 else:
106 self.checkServerResponse(ret)
107
108 return ret
109
110 def checkServerResponse(self, ret):
111 """
112 analyze the server return codes
113 """
114
115 logMsg = ''
116 if ret == 10:
117 # overlaod
118 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
119 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
120 elif ret == 14:
121 # Draining
122 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
123 logMsg += '\t remaining jobs due to scheduled maintainence\n'
124 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
125 elif ret == 101:
126 # overlaod
127 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
128 elif ret == 11:
129 # failed to push message in DB
130 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
131 elif ret == 12:
132 # failed SOAP communication
133 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
134 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
135 elif ret == 20:
136 # failed to push message in PA
137 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
138 elif ret == 22:
139 # failed SOAP communication
140 logMsg = 'Error during SOAP communication with server %s'%self.serverName
141 elif ret == 33:
142 # uncompatible client version
143 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
144 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
145 else:
146 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
147
148 # print loggings
149 if logMsg != '':
150 # reset server choice
151 opsToBeSaved={'serverName' : '' }
152 common._db.updateTask_(opsToBeSaved)
153 common.logger.info(logMsg)
154 return ret
155
156 def subsequentJobSubmit(self, blTaskName, rng):
157 """
158 _subsequentJobSubmit_
159 Let the submission of other jobs of a task that has been already sent to a server.
160 This method is used for subsequent submission of ranged sets of jobs.
161
162 Accepts in input:
163 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
164 - the range of the submission as specified by the user at the command line
165 """
166 return self._genericCommand('submit', blTaskName, rng)
167
168 def killJobs(self, blTaskName, rng):
169 """
170 _killJobs_
171 Send a kill command to one or more jobs running on the server.
172
173 Accepts in input:
174 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
175 - the range of the submission as specified by the user at the command line
176 """
177 return self._genericCommand('kill', blTaskName, rng)
178
179 def StopWorkflow(self, blTaskName):
180 """
181 _StopWorkflow_
182 """
183 return self._genericCommand('StopWorkflow', blTaskName, 'all')
184
185 def cleanTask(self, blTaskName):
186 """
187 _cleanTask_
188 Force the server to clean the jobs on the server.
189
190 Accepts in input:
191 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
192 - the range of the submission as specified by the user at the command line
193 """
194 return self._genericCommand('clean', blTaskName, 'all')
195
196 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
197 """
198 _getStatus_
199 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
200 """
201
202 # fill the filename
203 filename = str(statusFile)
204
205 if not blTaskName:
206 raise CrabException('Exception while getting the task unique name')
207 return ''
208
209 # get the data and fill the file content
210 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
211 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
212 if statusFamilyType != 'isServerDrained':
213 raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
214 else:
215 statusFile = None
216 statusMMsg = ""
217
218 if statusFile is not None:
219 f = open(statusFile, 'w')
220 f.write(statusMsg)
221 f.close()
222 return statusFile
223 return statusMsg
224
225 def outputRetrieved(self, blTaskName, rng):
226 """
227 _getJobsOutput_
228 Get from the server the output file locations to be transfered back.
229
230 Accepts in input:
231 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
232 - the range of the submission as specified by the user at the command line
233 """
234 return self._genericCommand('outputRetrieved', blTaskName, rng)
235
236 def checkDrainMode(self, blTaskName='null'):
237 return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
238
239 def postMortemInfos(self, blTaskName, rng):
240 """
241 _postMortemInfos_
242 Retrieve the job postmortem information from the server.
243
244 Accepts in input:
245 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
246 - the range of the submission as specified by the user at the command line
247 """
248 # get the status in
249 raise NotImplementedError
250 return None
251
252 ###################################################
253 # Auxiliary methods
254 ###################################################
255
256 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
257 xmlString = ''
258 cfile = minidom.Document()
259
260 ver = common.prog_version_str
261 node = cfile.createElement("TaskCommand")
262 node.setAttribute("Task", str(taskUName) )
263 node.setAttribute("Subject", str(self.userSubj) )
264 node.setAttribute("Command", str(cmdSpec) )
265 node.setAttribute("Range", str(rng) )
266 node.setAttribute("TotJob", str(jobs) )
267 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
268 node.setAttribute("Flavour", str(flavour) )
269 node.setAttribute("Type", str(type) )
270 node.setAttribute("ClientVersion", str(ver) )
271
272 ## Only Temporary. it should be at Server level
273 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
274 blackAnaOps = None
275 if int(removeBList) == 0:
276 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
277 result = blacklist.config("site_black_list.conf")
278 if result != None:
279 blackAnaOps = result
280 common.logger.debug("Enforced black list: %s "%str(blacklist))
281 else:
282 common.logger.info("WARNING: Skipping default black list!")
283
284 try:
285 myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
286 myproxyserver = myproxyserver.strip()
287 if myproxyserver is None:
288 raise CrabException("myproxy_server.conf retrieved but empty")
289 except Exception, e:
290 common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
291 common.logger.debug(e)
292 myproxyserver = 'myproxy.cern.ch'
293
294 # create a mini-cfg to be transfered to the server
295 miniCfg = {}
296
297 ## migrate CE/SE infos
298 miniCfg['EDG.ce_white_list'] = ""
299 if 'GRID.ce_white_list' in self.cfg_params:
300 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
301
302 miniCfg['EDG.ce_black_list'] = blackAnaOps
303 if 'GRID.ce_black_list' in self.cfg_params:
304 if blackAnaOps:
305 miniCfg['EDG.ce_black_list'] += ", "
306 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
307
308 miniCfg['EDG.se_white_list'] = ""
309 if 'GRID.se_white_list' in self.cfg_params:
310 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
311
312 miniCfg['EDG.se_black_list'] = ""
313 if 'GRID.se_black_list' in self.cfg_params:
314 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
315
316 miniCfg['EDG.group'] = ""
317 if 'GRID.group' in self.cfg_params:
318 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
319
320 miniCfg['EDG.role'] = ""
321 if 'GRID.role' in self.cfg_params:
322 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
323
324 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
325 if 'cfgFileNameCkSum' in self.cfg_params:
326 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
327
328 miniCfg['CRAB.se_remote_dir'] = ''
329 if 'CRAB.se_remote_dir' in self.cfg_params:
330 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
331
332 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
333 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
334 miniCfg['CAF.group'] = self.cfg_params.get('CAF.group', None)
335 ## JDL requirements specific data. Scheduler dependant
336 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
337 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
338 miniCfg['proxyServer'] = myproxyserver
339 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
340 miniCfg['EDG_retry_count'] = 0
341 miniCfg['EDG_shallow_retry_count'] = -1
342
343 ## Additional field for DashBoard
344 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
345
346 ## Additional fields for Notification by the server
347 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
348 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
349
350 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
351 #WMBS
352 miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
353 miniCfg['parent_task'] = self.cfg_params.get('WMBS.parent_task', '')
354
355 miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
356 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', 'None')
357
358 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
359 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
360 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1000000)
361
362 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
363
364 ## put here other fields if needed
365 node.setAttribute("CfgParamDict", str(miniCfg) )
366 cfile.appendChild(node)
367 xmlString += str(cfile.toprettyxml())
368 return xmlString
369
370 def _genericCommand(self, cmd, blTaskName, rng):
371 if not blTaskName:
372 raise CrabException('Error while extracting the Task Unique Name string')
373 return -2
374
375 cmdXML = None
376 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
377 if not cmdXML:
378 raise CrabException('Error while creating the Command XML')
379 return -3
380
381 ret = -1
382 ret = self.asSession.sendCommand(cmdXML, blTaskName)
383 logMsg = ''
384 debugMsg = ''
385 if ret == 0:
386 # success
387 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
388 else:
389 self.checkServerResponse(ret)
390 return ret
391