ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.44
Committed: Tue Sep 8 22:07:59 2009 UTC (15 years, 7 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
Changes since 1.43: +1 -1 lines
Log Message:
Changed link for server available to users

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18 from xml.dom import minidom
19 import os
20 import commands
21
22 class ServerCommunicator:
23 """
24 Common interface for the interaction between the Crab client and the server Web Service
25 """
26 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 """
28 Open the communication with an Analysis Server by passing the server URL and the port
29 """
30
31 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServerForUsers#Server_available_for_users'
32
33 self.asSession = C_AS_Session(serverName, serverPort)
34 self.cfg_params = cfg_params
35 self.userSubj = ''
36 self.serverName = serverName
37 credentialType = 'Proxy'
38 if common.scheduler.name().upper() in ['CAF','LSF']:
39 credentialType = 'Token'
40 CliServerParams(self)
41 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
42
43 configAPI = {'credential' : credentialType, \
44 'logger' : common.logger() }
45
46 CredAPI = CredentialAPI( configAPI )
47 try:
48 self.userSubj = CredAPI.getSubject()
49 except Exception, err:
50 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
51 raise CrabException("Error Getting Credential Subject")
52
53 self.scram=Scram.Scram(cfg_params)
54 ###################################################
55 # Interactions with the server
56 ###################################################
57
58 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
59 """
60 _submitNewTask_
61 Send a new task to the server to be submitted.
62
63 Accepts in input:
64 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
65 - the range of the submission as specified by the user at the command line
66 """
67
68 if not blXml:
69 raise CrabException('Error while serializing task object to XML')
70 return -2
71
72 if not blTaskName:
73 raise CrabException('Error while extracting the Task Unique Name string')
74 return -3
75
76 cmdXML = None
77 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
78 if not cmdXML:
79 raise CrabException('Error while creating the Command XML')
80 return -4
81
82 ret = -1
83 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
84
85 if ret == 0:
86 # success
87 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
88 common.logger.info(logMsg+'\n')
89 else:
90 self.checkServerResponse(ret)
91
92 return ret
93
94 def checkServerResponse(self, ret):
95 """
96 analyze the server return codes
97 """
98
99 logMsg = ''
100 if ret == 10:
101 # overlaod
102 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
103 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
104 elif ret == 14:
105 # Draining
106 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
107 logMsg += '\t remaining jobs due to scheduled maintainence\n'
108 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
109 elif ret == 101:
110 # overlaod
111 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
112 elif ret == 11:
113 # failed to push message in DB
114 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
115 elif ret == 12:
116 # failed SOAP communication
117 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
118 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
119 elif ret == 20:
120 # failed to push message in PA
121 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
122 elif ret == 22:
123 # failed SOAP communication
124 logMsg = 'Error during SOAP communication with server %s'%self.serverName
125 elif ret == 33:
126 # uncompatible client version
127 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
128 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
129 else:
130 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
131
132 # print loggings
133 if logMsg != '':
134 # reset server choice
135 opsToBeSaved={'serverName' : '' }
136 common._db.updateTask_(opsToBeSaved)
137 common.logger.info(logMsg)
138 return ret
139
140 def subsequentJobSubmit(self, blTaskName, rng):
141 """
142 _subsequentJobSubmit_
143 Let the submission of other jobs of a task that has been already sent to a server.
144 This method is used for subsequent submission of ranged sets of jobs.
145
146 Accepts in input:
147 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
148 - the range of the submission as specified by the user at the command line
149 """
150 return self._genericCommand('submit', blTaskName, rng)
151
152 def killJobs(self, blTaskName, rng):
153 """
154 _killJobs_
155 Send a kill command to one or more jobs running on the server.
156
157 Accepts in input:
158 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
159 - the range of the submission as specified by the user at the command line
160 """
161 return self._genericCommand('kill', blTaskName, rng)
162
163 def cleanTask(self, blTaskName):
164 """
165 _cleanTask_
166 Force the server to clean the jobs on the server.
167
168 Accepts in input:
169 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
170 - the range of the submission as specified by the user at the command line
171 """
172 return self._genericCommand('clean', blTaskName, 'all')
173
174 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
175 """
176 _getStatus_
177 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
178 """
179
180 # fill the filename
181 filename = str(statusFile)
182
183 if not blTaskName:
184 raise CrabException('Exception while getting the task unique name')
185 return ''
186
187 # get the data and fill the file content
188 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
189 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
190 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
191 return
192
193 if statusFile is not None:
194 f = open(statusFile, 'w')
195 f.write(statusMsg)
196 f.close()
197 return statusFile
198 return statusMsg
199
200 def outputRetrieved(self, blTaskName, rng):
201 """
202 _getJobsOutput_
203 Get from the server the output file locations to be transfered back.
204
205 Accepts in input:
206 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
207 - the range of the submission as specified by the user at the command line
208 """
209 return self._genericCommand('outputRetrieved', blTaskName, rng)
210
211 def postMortemInfos(self, blTaskName, rng):
212 """
213 _postMortemInfos_
214 Retrieve the job postmortem information from the server.
215
216 Accepts in input:
217 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
218 - the range of the submission as specified by the user at the command line
219 """
220 # get the status in
221 raise NotImplementedError
222 return None
223
224 ###################################################
225 # Auxiliary methods
226 ###################################################
227
228 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
229 xmlString = ''
230 cfile = minidom.Document()
231
232 ver = common.prog_version_str
233 node = cfile.createElement("TaskCommand")
234 node.setAttribute("Task", str(taskUName) )
235 node.setAttribute("Subject", str(self.userSubj) )
236 node.setAttribute("Command", str(cmdSpec) )
237 node.setAttribute("Range", str(rng) )
238 node.setAttribute("TotJob", str(jobs) )
239 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
240 node.setAttribute("Flavour", str(flavour) )
241 node.setAttribute("Type", str(type) )
242 node.setAttribute("ClientVersion", str(ver) )
243
244 ## Only Temporary. it should be at Server level
245 removeT1bL = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
246 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
247 lcg00125.grid.sinica.edu.tw, \
248 gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
249 if removeT1bL == '1': T1_BL = ''
250
251 # create a mini-cfg to be transfered to the server
252 miniCfg = {}
253
254 ## migrate CE/SE infos
255 miniCfg['EDG.ce_white_list'] = ""
256 if 'GRID.ce_white_list' in self.cfg_params:
257 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
258
259 miniCfg['EDG.ce_black_list'] = T1_BL
260 if 'GRID.ce_black_list' in self.cfg_params:
261 if len(T1_BL) > 0:
262 miniCfg['EDG.ce_black_list'] += ", "
263 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
264
265 miniCfg['EDG.se_white_list'] = ""
266 if 'GRID.se_white_list' in self.cfg_params:
267 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
268
269 miniCfg['EDG.se_black_list'] = ""
270 if 'GRID.se_black_list' in self.cfg_params:
271 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
272
273 miniCfg['EDG.group'] = ""
274 if 'GRID.group' in self.cfg_params:
275 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
276
277 miniCfg['EDG.role'] = ""
278 if 'GRID.role' in self.cfg_params:
279 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
280
281 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
282 if 'cfgFileNameCkSum' in self.cfg_params:
283 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
284
285 miniCfg['CRAB.se_remote_dir'] = ''
286 if 'CRAB.se_remote_dir' in self.cfg_params:
287 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
288
289 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
290 miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
291 ## JDL requirements specific data. Scheduler dependant
292 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
293 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
294 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
295 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
296 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
297 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
298
299 ## Additional field for DashBoard
300 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
301
302 ## Additional fields for Notification by the server
303 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
304 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
305
306 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
307
308 ## put here other fields if needed
309 node.setAttribute("CfgParamDict", str(miniCfg) )
310 cfile.appendChild(node)
311 xmlString += str(cfile.toprettyxml())
312 return xmlString
313
314 def _genericCommand(self, cmd, blTaskName, rng):
315 if not blTaskName:
316 raise CrabException('Error while extracting the Task Unique Name string')
317 return -2
318
319 cmdXML = None
320 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
321 if not cmdXML:
322 raise CrabException('Error while creating the Command XML')
323 return -3
324
325 ret = -1
326 ret = self.asSession.sendCommand(cmdXML, blTaskName)
327 logMsg = ''
328 debugMsg = ''
329 if ret == 0:
330 # success
331 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
332 else:
333 self.checkServerResponse(ret)
334 return ret
335