ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.36
Committed: Tue May 26 16:53:23 2009 UTC (15 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre3
Changes since 1.35: +30 -30 lines
Log Message:
adapting code to EDG --> GRID migration

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 import common
14 import Scram
15 from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18 from xml.dom import minidom
19 import os
20 import commands
21
22 class ServerCommunicator:
23 """
24 Common interface for the interaction between the Crab client and the server Web Service
25 """
26 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 """
28 Open the communication with an Analysis Server by passing the server URL and the port
29 """
30
31 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#Server_available_for_users'
32
33 self.asSession = C_AS_Session(serverName, serverPort)
34 self.cfg_params = cfg_params
35 self.userSubj = ''
36 self.serverName = serverName
37 credentialType = 'Proxy'
38 if common.scheduler.name().upper() in ['CAF','LSF']:
39 credentialType = 'Token'
40 CliServerParams(self)
41 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
42
43 configAPI = {'credential' : credentialType }
44
45 CredAPI = CredentialAPI( configAPI )
46 try:
47 self.userSubj = CredAPI.getSubject()
48 except Exception, err:
49 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
50 raise CrabException("Error Getting Credential Subject")
51
52 self.scram=Scram.Scram(cfg_params)
53 ###################################################
54 # Interactions with the server
55 ###################################################
56
57 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
58 """
59 _submitNewTask_
60 Send a new task to the server to be submitted.
61
62 Accepts in input:
63 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
64 - the range of the submission as specified by the user at the command line
65 """
66
67 if not blXml:
68 raise CrabException('Error while serializing task object to XML')
69 return -2
70
71 if not blTaskName:
72 raise CrabException('Error while extracting the Task Unique Name string')
73 return -3
74
75 cmdXML = None
76 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
77 if not cmdXML:
78 raise CrabException('Error while creating the Command XML')
79 return -4
80
81 ret = -1
82 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
83
84 if ret == 0:
85 # success
86 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
87 common.logger.info(logMsg+'\n')
88 else:
89 self.checkServerResponse(ret)
90
91 return ret
92
93 def checkServerResponse(self, ret):
94 """
95 analyze the server return codes
96 """
97
98 logMsg = ''
99 if ret == 10:
100 # overlaod
101 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
102 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
103 elif ret == 14:
104 # Draining
105 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
106 logMsg += '\t remaining jobs due to scheduled maintainence\n'
107 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
108 elif ret == 101:
109 # overlaod
110 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
111 elif ret == 11:
112 # failed to push message in DB
113 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
114 elif ret == 12:
115 # failed SOAP communication
116 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
117 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
118 elif ret == 20:
119 # failed to push message in PA
120 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
121 elif ret == 22:
122 # failed SOAP communication
123 logMsg = 'Error during SOAP communication with server %s'%self.serverName
124 elif ret == 33:
125 # uncompatible client version
126 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
127 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
128 else:
129 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
130
131 # print loggings
132 if logMsg != '':
133 raise CrabException(logMsg)
134 return ret
135
136 def subsequentJobSubmit(self, blTaskName, rng):
137 """
138 _subsequentJobSubmit_
139 Let the submission of other jobs of a task that has been already sent to a server.
140 This method is used for subsequent submission of ranged sets of jobs.
141
142 Accepts in input:
143 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
144 - the range of the submission as specified by the user at the command line
145 """
146 return self._genericCommand('submit', blTaskName, rng)
147
148 def killJobs(self, blTaskName, rng):
149 """
150 _killJobs_
151 Send a kill command to one or more jobs running on the server.
152
153 Accepts in input:
154 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
155 - the range of the submission as specified by the user at the command line
156 """
157 return self._genericCommand('kill', blTaskName, rng)
158
159 def cleanJobs(self, blTaskName, rng):
160 """
161 _cleanJobs_
162 Force the server to clean the jobs on the server.
163
164 Accepts in input:
165 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
166 - the range of the submission as specified by the user at the command line
167 """
168 return self._genericCommand('clean', blTaskName, rng)
169
170 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
171 """
172 _getStatus_
173 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
174 """
175
176 # fill the filename
177 filename = str(statusFile)
178
179 if not blTaskName:
180 raise CrabException('Exception while getting the task unique name')
181 return ''
182
183 # get the data and fill the file content
184 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
185 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
186 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
187 return
188
189 if statusFile is not None:
190 f = open(statusFile, 'w')
191 f.write(statusMsg)
192 f.close()
193 return statusFile
194 return statusMsg
195
196 def outputRetrieved(self, blTaskName, rng):
197 """
198 _getJobsOutput_
199 Get from the server the output file locations to be transfered back.
200
201 Accepts in input:
202 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
203 - the range of the submission as specified by the user at the command line
204 """
205 return self._genericCommand('outputRetrieved', blTaskName, rng)
206
207 def postMortemInfos(self, blTaskName, rng):
208 """
209 _postMortemInfos_
210 Retrieve the job postmortem information from the server.
211
212 Accepts in input:
213 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
214 - the range of the submission as specified by the user at the command line
215 """
216 # get the status in
217 raise NotImplementedError
218 return None
219
220 ###################################################
221 # Auxiliary methods
222 ###################################################
223
224 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
225 xmlString = ''
226 cfile = minidom.Document()
227
228 ver = common.prog_version_str
229 node = cfile.createElement("TaskCommand")
230 node.setAttribute("Task", str(taskUName) )
231 node.setAttribute("Subject", str(self.userSubj) )
232 node.setAttribute("Command", str(cmdSpec) )
233 node.setAttribute("Range", str(rng) )
234 node.setAttribute("TotJob", str(jobs) )
235 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
236 node.setAttribute("Flavour", str(flavour) )
237 node.setAttribute("Type", str(type) )
238 node.setAttribute("ClientVersion", str(ver) )
239
240 ## Only Temporary. it should be at Server level
241 removeT1bL = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
242 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
243 lcg00125.grid.sinica.edu.tw, \
244 gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
245 if removeT1bL == '1': T1_BL = ''
246
247 # create a mini-cfg to be transfered to the server
248 miniCfg = {}
249
250 ## migrate CE/SE infos
251 miniCfg['GRID.ce_white_list'] = ""
252 if 'GRID.ce_white_list' in self.cfg_params:
253 miniCfg['GRID.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
254
255 miniCfg['GRID.ce_black_list'] = T1_BL
256 if 'GRID.ce_black_list' in self.cfg_params:
257 if len(T1_BL) > 0:
258 miniCfg['GRID.ce_black_list'] += ", "
259 miniCfg['GRID.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
260
261 miniCfg['GRID.se_white_list'] = ""
262 if 'GRID.se_white_list' in self.cfg_params:
263 miniCfg['GRID.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
264
265 miniCfg['GRID.se_black_list'] = ""
266 if 'GRID.se_black_list' in self.cfg_params:
267 miniCfg['GRID.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
268
269 miniCfg['GRID.group'] = ""
270 if 'GRID.group' in self.cfg_params:
271 miniCfg['GRID.group'] = str( self.cfg_params['GRID.group'] )
272
273 miniCfg['GRID.role'] = ""
274 if 'GRID.role' in self.cfg_params:
275 miniCfg['GRID.role'] = str( self.cfg_params['GRID.role'] )
276
277 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
278 if 'cfgFileNameCkSum' in self.cfg_params:
279 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
280
281 miniCfg['CRAB.se_remote_dir'] = ''
282 if 'CRAB.se_remote_dir' in self.cfg_params:
283 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
284
285 ## JDL requirements specific data. Scheduler dependant
286 miniCfg['GRID.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
287 miniCfg['GRID.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
288 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
289 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
290 miniCfg['GRID_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
291 miniCfg['GRID_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
292 miniCfg['GRID.proxyInfos'] = self.cfg_params.get('GRID.proxyInfos',{}) #TODO activate this when using MyProxy-based delegation
293
294 ## Additional field for DashBoard
295 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
296
297 ## Additional fields for Notification by the server
298 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
299 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
300
301 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
302
303 ## put here other fields if needed
304 node.setAttribute("CfgParamDict", str(miniCfg) )
305 cfile.appendChild(node)
306 xmlString += str(cfile.toprettyxml())
307 return xmlString
308
309 def _genericCommand(self, cmd, blTaskName, rng):
310 if not blTaskName:
311 raise CrabException('Error while extracting the Task Unique Name string')
312 return -2
313
314 cmdXML = None
315 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
316 if not cmdXML:
317 raise CrabException('Error while creating the Command XML')
318 return -3
319
320 ret = -1
321 ret = self.asSession.sendCommand(cmdXML, blTaskName)
322 logMsg = ''
323 debugMsg = ''
324 if ret == 0:
325 # success
326 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
327 else:
328 self.checkServerResponse(ret)
329 return ret
330