ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.38
Committed: Wed May 27 12:41:10 2009 UTC (15 years, 11 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre4
Changes since 1.37: +25 -22 lines
Log Message:
handle the case when the selected server is not compatible

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18 from xml.dom import minidom
19 import os
20 import commands
21
22 class ServerCommunicator:
23 """
24 Common interface for the interaction between the Crab client and the server Web Service
25 """
26 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 """
28 Open the communication with an Analysis Server by passing the server URL and the port
29 """
30
31 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#Server_available_for_users'
32
33 self.asSession = C_AS_Session(serverName, serverPort)
34 self.cfg_params = cfg_params
35 self.userSubj = ''
36 self.serverName = serverName
37 credentialType = 'Proxy'
38 if common.scheduler.name().upper() in ['CAF','LSF']:
39 credentialType = 'Token'
40 CliServerParams(self)
41 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
42
43 configAPI = {'credential' : credentialType }
44
45 CredAPI = CredentialAPI( configAPI )
46 try:
47 self.userSubj = CredAPI.getSubject()
48 except Exception, err:
49 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
50 raise CrabException("Error Getting Credential Subject")
51
52 self.scram=Scram.Scram(cfg_params)
53 ###################################################
54 # Interactions with the server
55 ###################################################
56
57 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
58 """
59 _submitNewTask_
60 Send a new task to the server to be submitted.
61
62 Accepts in input:
63 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
64 - the range of the submission as specified by the user at the command line
65 """
66
67 if not blXml:
68 raise CrabException('Error while serializing task object to XML')
69 return -2
70
71 if not blTaskName:
72 raise CrabException('Error while extracting the Task Unique Name string')
73 return -3
74
75 cmdXML = None
76 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
77 if not cmdXML:
78 raise CrabException('Error while creating the Command XML')
79 return -4
80
81 ret = -1
82 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
83
84 if ret == 0:
85 # success
86 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
87 common.logger.info(logMsg+'\n')
88 else:
89 self.checkServerResponse(ret)
90
91 return ret
92
93 def checkServerResponse(self, ret):
94 """
95 analyze the server return codes
96 """
97
98 logMsg = ''
99 if ret == 10:
100 # overlaod
101 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
102 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
103 elif ret == 14:
104 # Draining
105 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
106 logMsg += '\t remaining jobs due to scheduled maintainence\n'
107 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
108 elif ret == 101:
109 # overlaod
110 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
111 elif ret == 11:
112 # failed to push message in DB
113 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
114 elif ret == 12:
115 # failed SOAP communication
116 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
117 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
118 elif ret == 20:
119 # failed to push message in PA
120 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
121 elif ret == 22:
122 # failed SOAP communication
123 logMsg = 'Error during SOAP communication with server %s'%self.serverName
124 elif ret == 33:
125 # uncompatible client version
126 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
127 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
128 else:
129 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
130
131 # print loggings
132 if logMsg != '':
133 # reset server choice
134 opsToBeSaved={'serverName' : '' }
135 common._db.updateTask_(opsToBeSaved)
136 raise CrabException(logMsg)
137 return ret
138
139 def subsequentJobSubmit(self, blTaskName, rng):
140 """
141 _subsequentJobSubmit_
142 Let the submission of other jobs of a task that has been already sent to a server.
143 This method is used for subsequent submission of ranged sets of jobs.
144
145 Accepts in input:
146 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
147 - the range of the submission as specified by the user at the command line
148 """
149 return self._genericCommand('submit', blTaskName, rng)
150
151 def killJobs(self, blTaskName, rng):
152 """
153 _killJobs_
154 Send a kill command to one or more jobs running on the server.
155
156 Accepts in input:
157 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
158 - the range of the submission as specified by the user at the command line
159 """
160 return self._genericCommand('kill', blTaskName, rng)
161
162 def cleanJobs(self, blTaskName, rng):
163 """
164 _cleanJobs_
165 Force the server to clean the jobs on the server.
166
167 Accepts in input:
168 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
169 - the range of the submission as specified by the user at the command line
170 """
171 return self._genericCommand('clean', blTaskName, rng)
172
173 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
174 """
175 _getStatus_
176 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
177 """
178
179 # fill the filename
180 filename = str(statusFile)
181
182 if not blTaskName:
183 raise CrabException('Exception while getting the task unique name')
184 return ''
185
186 # get the data and fill the file content
187 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
188 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
189 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
190 return
191
192 if statusFile is not None:
193 f = open(statusFile, 'w')
194 f.write(statusMsg)
195 f.close()
196 return statusFile
197 return statusMsg
198
199 def outputRetrieved(self, blTaskName, rng):
200 """
201 _getJobsOutput_
202 Get from the server the output file locations to be transfered back.
203
204 Accepts in input:
205 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
206 - the range of the submission as specified by the user at the command line
207 """
208 return self._genericCommand('outputRetrieved', blTaskName, rng)
209
210 def postMortemInfos(self, blTaskName, rng):
211 """
212 _postMortemInfos_
213 Retrieve the job postmortem information from the server.
214
215 Accepts in input:
216 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
217 - the range of the submission as specified by the user at the command line
218 """
219 # get the status in
220 raise NotImplementedError
221 return None
222
223 ###################################################
224 # Auxiliary methods
225 ###################################################
226
227 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
228 xmlString = ''
229 cfile = minidom.Document()
230
231 ver = common.prog_version_str
232 node = cfile.createElement("TaskCommand")
233 node.setAttribute("Task", str(taskUName) )
234 node.setAttribute("Subject", str(self.userSubj) )
235 node.setAttribute("Command", str(cmdSpec) )
236 node.setAttribute("Range", str(rng) )
237 node.setAttribute("TotJob", str(jobs) )
238 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
239 node.setAttribute("Flavour", str(flavour) )
240 node.setAttribute("Type", str(type) )
241 node.setAttribute("ClientVersion", str(ver) )
242
243 ## Only Temporary. it should be at Server level
244 removeT1bL = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
245 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
246 lcg00125.grid.sinica.edu.tw, \
247 gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
248 if removeT1bL == '1': T1_BL = ''
249
250 # create a mini-cfg to be transfered to the server
251 miniCfg = {}
252
253 ## migrate CE/SE infos
254 miniCfg['EDG.ce_white_list'] = ""
255 if 'GRID.ce_white_list' in self.cfg_params:
256 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
257
258 miniCfg['EDG.ce_black_list'] = T1_BL
259 if 'GRID.ce_black_list' in self.cfg_params:
260 if len(T1_BL) > 0:
261 miniCfg['EDG.ce_black_list'] += ", "
262 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
263
264 miniCfg['EDG.se_white_list'] = ""
265 if 'GRID.se_white_list' in self.cfg_params:
266 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
267
268 miniCfg['EDG.se_black_list'] = ""
269 if 'GRID.se_black_list' in self.cfg_params:
270 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
271
272 miniCfg['EDG.group'] = ""
273 if 'GRID.group' in self.cfg_params:
274 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
275
276 miniCfg['EDG.role'] = ""
277 if 'GRID.role' in self.cfg_params:
278 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
279
280 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
281 if 'cfgFileNameCkSum' in self.cfg_params:
282 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
283
284 miniCfg['CRAB.se_remote_dir'] = ''
285 if 'CRAB.se_remote_dir' in self.cfg_params:
286 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
287
288 ## JDL requirements specific data. Scheduler dependant
289 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
290 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
291 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
292 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
293 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
294 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
295 miniCfg['EDG.proxyInfos'] = self.cfg_params.get('GRID.proxyInfos',{}) #TODO activate this when using MyProxy-based delegation
296
297 ## Additional field for DashBoard
298 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
299
300 ## Additional fields for Notification by the server
301 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
302 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
303
304 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
305
306 ## put here other fields if needed
307 node.setAttribute("CfgParamDict", str(miniCfg) )
308 cfile.appendChild(node)
309 xmlString += str(cfile.toprettyxml())
310 return xmlString
311
312 def _genericCommand(self, cmd, blTaskName, rng):
313 if not blTaskName:
314 raise CrabException('Error while extracting the Task Unique Name string')
315 return -2
316
317 cmdXML = None
318 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
319 if not cmdXML:
320 raise CrabException('Error while creating the Command XML')
321 return -3
322
323 ret = -1
324 ret = self.asSession.sendCommand(cmdXML, blTaskName)
325 logMsg = ''
326 debugMsg = ''
327 if ret == 0:
328 # success
329 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
330 else:
331 self.checkServerResponse(ret)
332 return ret
333