ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.58
Committed: Tue Jan 26 12:54:21 2010 UTC (15 years, 3 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
Changes since 1.57: +5 -2 lines
Log Message:
fix for bug #61682: better error handling during 'isServerDrained' communications

File Contents

# User Rev Content
1 farinafa 1.1 #!/usr/bin/env python
2     """
3     _ServerCommunicator_
4    
5     """
6    
7     __version__ = "$Id"
8     __revision__ = "$Revision"
9     __author__ = "farinafa@cern.ch"
10    
11     from crab_exceptions import *
12 farinafa 1.13 from crab_util import *
13 farinafa 1.1 import common
14 spiga 1.34 import Scram
15     from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 farinafa 1.1 from xml.dom import minidom
17 farinafa 1.54 import os, sys
18 farinafa 1.2 import commands
19 ewv 1.45 import traceback
20 spiga 1.55 from Downloader import Downloader
21 farinafa 1.1
22 farinafa 1.54 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23     from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24     else:
25     from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
26    
27 farinafa 1.1 class ServerCommunicator:
28     """
29 ewv 1.45 Common interface for the interaction between the Crab client and the server Web Service
30 farinafa 1.1 """
31 farinafa 1.2 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
32 farinafa 1.1 """
33     Open the communication with an Analysis Server by passing the server URL and the port
34     """
35 spiga 1.27
36 mcinquil 1.44 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServerForUsers#Server_available_for_users'
37 ewv 1.45
38 farinafa 1.1 self.asSession = C_AS_Session(serverName, serverPort)
39     self.cfg_params = cfg_params
40     self.userSubj = ''
41     self.serverName = serverName
42 spiga 1.31 credentialType = 'Proxy'
43 ewv 1.45 if common.scheduler.name().upper() in ['CAF','LSF']:
44 spiga 1.31 credentialType = 'Token'
45 spiga 1.27 CliServerParams(self)
46 farinafa 1.11 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
47    
48 spiga 1.40 configAPI = {'credential' : credentialType, \
49     'logger' : common.logger() }
50 ewv 1.45
51     CredAPI = CredentialAPI( configAPI )
52 spiga 1.31 try:
53 ewv 1.45 self.userSubj = CredAPI.getSubject()
54 spiga 1.31 except Exception, err:
55 spiga 1.35 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
56 spiga 1.31 raise CrabException("Error Getting Credential Subject")
57 spiga 1.34
58     self.scram=Scram.Scram(cfg_params)
59 farinafa 1.1 ###################################################
60     # Interactions with the server
61     ###################################################
62    
63 spiga 1.47 # wmbs
64     def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
65 farinafa 1.1 """
66     _submitNewTask_
67 ewv 1.45 Send a new task to the server to be submitted.
68    
69     Accepts in input:
70     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
71     - the range of the submission as specified by the user at the command line
72 farinafa 1.1 """
73    
74 spiga 1.47
75 farinafa 1.1 if not blXml:
76     raise CrabException('Error while serializing task object to XML')
77     return -2
78    
79 farinafa 1.2 if not blTaskName:
80 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
81     return -3
82    
83     cmdXML = None
84 spiga 1.47 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
85    
86 farinafa 1.1 if not cmdXML:
87     raise CrabException('Error while creating the Command XML')
88     return -4
89    
90 ewv 1.45 ret = -1
91 farinafa 1.2 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
92 ewv 1.45
93 farinafa 1.1 if ret == 0:
94     # success
95 farinafa 1.11 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
96 spiga 1.35 common.logger.info(logMsg+'\n')
97 farinafa 1.1 else:
98 ewv 1.45 self.checkServerResponse(ret)
99 spiga 1.27
100     return ret
101    
102 ewv 1.45 def checkServerResponse(self, ret):
103 spiga 1.27 """
104     analyze the server return codes
105     """
106 ewv 1.45
107 spiga 1.27 logMsg = ''
108     if ret == 10:
109 slacapra 1.38 # overlaod
110     logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
111     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
112 spiga 1.27 elif ret == 14:
113 ewv 1.45 # Draining
114 slacapra 1.38 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
115     logMsg += '\t remaining jobs due to scheduled maintainence\n'
116     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
117 spiga 1.27 elif ret == 101:
118 slacapra 1.38 # overlaod
119     logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
120 spiga 1.27 elif ret == 11:
121 slacapra 1.38 # failed to push message in DB
122 farinafa 1.50 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
123 spiga 1.27 elif ret == 12:
124 slacapra 1.38 # failed SOAP communication
125     logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
126     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
127 spiga 1.27 elif ret == 20:
128 slacapra 1.38 # failed to push message in PA
129 farinafa 1.50 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
130 spiga 1.27 elif ret == 22:
131 slacapra 1.38 # failed SOAP communication
132     logMsg = 'Error during SOAP communication with server %s'%self.serverName
133 spiga 1.27 elif ret == 33:
134 slacapra 1.38 # uncompatible client version
135     logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
136     logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
137 spiga 1.27 else:
138 ewv 1.45 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
139 farinafa 1.1
140 spiga 1.27 # print loggings
141     if logMsg != '':
142 slacapra 1.38 # reset server choice
143     opsToBeSaved={'serverName' : '' }
144     common._db.updateTask_(opsToBeSaved)
145 ewv 1.45 common.logger.info(logMsg)
146 farinafa 1.1 return ret
147 ewv 1.45
148 farinafa 1.2 def subsequentJobSubmit(self, blTaskName, rng):
149 farinafa 1.1 """
150     _subsequentJobSubmit_
151     Let the submission of other jobs of a task that has been already sent to a server.
152     This method is used for subsequent submission of ranged sets of jobs.
153    
154     Accepts in input:
155     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
156     - the range of the submission as specified by the user at the command line
157     """
158 farinafa 1.2 return self._genericCommand('submit', blTaskName, rng)
159 farinafa 1.1
160 farinafa 1.2 def killJobs(self, blTaskName, rng):
161 farinafa 1.1 """
162     _killJobs_
163     Send a kill command to one or more jobs running on the server.
164    
165     Accepts in input:
166     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
167     - the range of the submission as specified by the user at the command line
168     """
169 farinafa 1.2 return self._genericCommand('kill', blTaskName, rng)
170 farinafa 1.1
171 riahi 1.52 def StopWorkflow(self, blTaskName):
172     """
173     _StopWorkflow_
174     """
175     return self._genericCommand('StopWorkflow', blTaskName, 'all')
176    
177 farinafa 1.42 def cleanTask(self, blTaskName):
178 farinafa 1.1 """
179 farinafa 1.42 _cleanTask_
180 farinafa 1.1 Force the server to clean the jobs on the server.
181    
182     Accepts in input:
183     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
184     - the range of the submission as specified by the user at the command line
185     """
186 farinafa 1.42 return self._genericCommand('clean', blTaskName, 'all')
187 farinafa 1.1
188 farinafa 1.23 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
189 farinafa 1.1 """
190     _getStatus_
191 farinafa 1.23 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
192     """
193 farinafa 1.1
194     # fill the filename
195     filename = str(statusFile)
196    
197 farinafa 1.2 if not blTaskName:
198 farinafa 1.1 raise CrabException('Exception while getting the task unique name')
199     return ''
200    
201     # get the data and fill the file content
202 farinafa 1.23 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
203 farinafa 1.2 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
204 farinafa 1.58 if statusFamilyType != 'isServerDrained':
205     raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
206     else:
207     statusFile = None
208     statusMMsg = ""
209 farinafa 1.2
210     if statusFile is not None:
211     f = open(statusFile, 'w')
212     f.write(statusMsg)
213     f.close()
214 ewv 1.45 return statusFile
215 farinafa 1.2 return statusMsg
216 farinafa 1.1
217 farinafa 1.8 def outputRetrieved(self, blTaskName, rng):
218 farinafa 1.1 """
219     _getJobsOutput_
220     Get from the server the output file locations to be transfered back.
221    
222     Accepts in input:
223     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
224     - the range of the submission as specified by the user at the command line
225     """
226 farinafa 1.8 return self._genericCommand('outputRetrieved', blTaskName, rng)
227 farinafa 1.1
228 farinafa 1.49 def checkDrainMode(self, blTaskName='null'):
229     return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
230    
231 farinafa 1.2 def postMortemInfos(self, blTaskName, rng):
232 farinafa 1.1 """
233     _postMortemInfos_
234     Retrieve the job postmortem information from the server.
235    
236     Accepts in input:
237     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
238     - the range of the submission as specified by the user at the command line
239     """
240 ewv 1.45 # get the status in
241 farinafa 1.1 raise NotImplementedError
242     return None
243    
244     ###################################################
245     # Auxiliary methods
246     ###################################################
247 ewv 1.45
248 spiga 1.29 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
249 farinafa 1.1 xmlString = ''
250     cfile = minidom.Document()
251 ewv 1.45
252 spiga 1.27 ver = common.prog_version_str
253 farinafa 1.1 node = cfile.createElement("TaskCommand")
254     node.setAttribute("Task", str(taskUName) )
255     node.setAttribute("Subject", str(self.userSubj) )
256     node.setAttribute("Command", str(cmdSpec) )
257     node.setAttribute("Range", str(rng) )
258 spiga 1.29 node.setAttribute("TotJob", str(jobs) )
259 ewv 1.45 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
260 farinafa 1.24 node.setAttribute("Flavour", str(flavour) )
261 ewv 1.45 node.setAttribute("Type", str(type) )
262     node.setAttribute("ClientVersion", str(ver) )
263 farinafa 1.10
264 spiga 1.30 ## Only Temporary. it should be at Server level
265 spiga 1.55 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
266 spiga 1.57 blackAnaOps = None
267 spiga 1.55 if int(removeBList) == 0:
268 spiga 1.56 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
269 spiga 1.55 result = blacklist.config("site_black_list.conf")
270     if result != None:
271     blackAnaOps = result
272     common.logger.debug("Enforced black list: %s "%str(blacklist))
273     else:
274     common.logger.info("WARNING: Skipping default black list!")
275 spiga 1.30
276 farinafa 1.1 # create a mini-cfg to be transfered to the server
277     miniCfg = {}
278 farinafa 1.3
279     ## migrate CE/SE infos
280 spiga 1.37 miniCfg['EDG.ce_white_list'] = ""
281 spiga 1.36 if 'GRID.ce_white_list' in self.cfg_params:
282 spiga 1.37 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
283 farinafa 1.1
284 spiga 1.55 miniCfg['EDG.ce_black_list'] = blackAnaOps
285 spiga 1.36 if 'GRID.ce_black_list' in self.cfg_params:
286 spiga 1.57 if blackAnaOps:
287 spiga 1.37 miniCfg['EDG.ce_black_list'] += ", "
288     miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
289 farinafa 1.1
290 spiga 1.37 miniCfg['EDG.se_white_list'] = ""
291 spiga 1.36 if 'GRID.se_white_list' in self.cfg_params:
292 spiga 1.37 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
293 spiga 1.36
294 spiga 1.37 miniCfg['EDG.se_black_list'] = ""
295 spiga 1.36 if 'GRID.se_black_list' in self.cfg_params:
296 spiga 1.37 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
297 spiga 1.36
298 spiga 1.37 miniCfg['EDG.group'] = ""
299 spiga 1.36 if 'GRID.group' in self.cfg_params:
300 spiga 1.37 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
301 spiga 1.36
302 spiga 1.37 miniCfg['EDG.role'] = ""
303 spiga 1.36 if 'GRID.role' in self.cfg_params:
304 spiga 1.37 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
305 spiga 1.33
306 ewv 1.45 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
307 farinafa 1.1 if 'cfgFileNameCkSum' in self.cfg_params:
308     miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
309    
310     miniCfg['CRAB.se_remote_dir'] = ''
311     if 'CRAB.se_remote_dir' in self.cfg_params:
312 ewv 1.45 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
313 farinafa 1.1
314 spiga 1.41 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
315     miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
316 farinafa 1.3 ## JDL requirements specific data. Scheduler dependant
317 spiga 1.37 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
318     miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
319 spiga 1.36 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
320     miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
321 spiga 1.37 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
322     miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
323 farinafa 1.13
324 farinafa 1.14 ## Additional field for DashBoard
325     miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
326    
327     ## Additional fields for Notification by the server
328 farinafa 1.15 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
329 farinafa 1.20 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
330 farinafa 1.3
331 spiga 1.34 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
332 spiga 1.48 #WMBS
333     miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
334 riahi 1.53
335     miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
336     miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', '0')
337    
338 spiga 1.48 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
339 riahi 1.51 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
340     miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1)
341 spiga 1.48
342     miniCfg['CMSSW_version'] = self.scram.getSWVersion()
343 ewv 1.45
344 farinafa 1.1 ## put here other fields if needed
345     node.setAttribute("CfgParamDict", str(miniCfg) )
346     cfile.appendChild(node)
347     xmlString += str(cfile.toprettyxml())
348     return xmlString
349    
350 farinafa 1.2 def _genericCommand(self, cmd, blTaskName, rng):
351     if not blTaskName:
352 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
353     return -2
354    
355     cmdXML = None
356 farinafa 1.2 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
357 farinafa 1.1 if not cmdXML:
358     raise CrabException('Error while creating the Command XML')
359     return -3
360 ewv 1.45
361 farinafa 1.1 ret = -1
362 farinafa 1.2 ret = self.asSession.sendCommand(cmdXML, blTaskName)
363 farinafa 1.1 logMsg = ''
364 ewv 1.45 debugMsg = ''
365 farinafa 1.1 if ret == 0:
366     # success
367 farinafa 1.22 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
368 farinafa 1.1 else:
369 spiga 1.27 self.checkServerResponse(ret)
370 farinafa 1.1 return ret
371 ewv 1.45