ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.31
Committed: Fri Dec 5 16:56:16 2008 UTC (16 years, 4 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3_pre7
Changes since 1.30: +14 -26 lines
Log Message:
use CredentialApi to get subject

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 from crab_logger import Logger
14 import common
15
16 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18 from xml.dom import minidom
19 import os
20 import commands
21
22 class ServerCommunicator:
23 """
24 Common interface for the interaction between the Crab client and the server Web Service
25 """
26 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 """
28 Open the communication with an Analysis Server by passing the server URL and the port
29 """
30
31 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#Server_available_for_users'
32
33 self.asSession = C_AS_Session(serverName, serverPort)
34 self.cfg_params = cfg_params
35 self.userSubj = ''
36 self.serverName = serverName
37 credentialType = 'Proxy'
38 if common.scheduler.name().upper() in ['CAF','LSF']:
39 credentialType = 'Token'
40 CliServerParams(self)
41 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
42
43 from ProdCommon.Credential.CredentialAPI import CredentialAPI
44 configAPI = {'credential' : credentialType }
45
46 CredAPI = CredentialAPI( configAPI )
47 try:
48 self.userSubj = CredAPI.getSubject()
49 except Exception, err:
50 common.logger.debug(3, "Getting Credential Subject: " +str(traceback.format_exc()))
51 raise CrabException("Error Getting Credential Subject")
52
53 ###################################################
54 # Interactions with the server
55 ###################################################
56
57 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
58 """
59 _submitNewTask_
60 Send a new task to the server to be submitted.
61
62 Accepts in input:
63 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
64 - the range of the submission as specified by the user at the command line
65 """
66
67 if not blXml:
68 raise CrabException('Error while serializing task object to XML')
69 return -2
70
71 if not blTaskName:
72 raise CrabException('Error while extracting the Task Unique Name string')
73 return -3
74
75 cmdXML = None
76 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
77 if not cmdXML:
78 raise CrabException('Error while creating the Command XML')
79 return -4
80
81 ret = -1
82 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
83
84 if ret == 0:
85 # success
86 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
87 common.logger.message(logMsg+'\n')
88 else:
89 self.checkServerResponse(ret)
90
91 return ret
92
93 def checkServerResponse(self, ret):
94 """
95 analyze the server return codes
96 """
97
98 logMsg = ''
99 if ret == 10:
100 # overlaod
101 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
102 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
103 elif ret == 14:
104 # Draining
105 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
106 logMsg += '\t remaining jobs due to scheduled maintainence\n'
107 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
108 elif ret == 101:
109 # overlaod
110 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
111 elif ret == 11:
112 # failed to push message in DB
113 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
114 elif ret == 12:
115 # failed SOAP communication
116 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
117 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
118 elif ret == 20:
119 # failed to push message in PA
120 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
121 elif ret == 22:
122 # failed SOAP communication
123 logMsg = 'Error during SOAP communication with server %s'%self.serverName
124 elif ret == 33:
125 # uncompatible client version
126 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
127 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
128 else:
129 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
130
131 # print loggings
132 if logMsg != '':
133 common.logger.write(logMsg+'\n')
134 raise CrabException(logMsg)
135 return ret
136
137 def subsequentJobSubmit(self, blTaskName, rng):
138 """
139 _subsequentJobSubmit_
140 Let the submission of other jobs of a task that has been already sent to a server.
141 This method is used for subsequent submission of ranged sets of jobs.
142
143 Accepts in input:
144 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
145 - the range of the submission as specified by the user at the command line
146 """
147 return self._genericCommand('submit', blTaskName, rng)
148
149 def killJobs(self, blTaskName, rng):
150 """
151 _killJobs_
152 Send a kill command to one or more jobs running on the server.
153
154 Accepts in input:
155 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
156 - the range of the submission as specified by the user at the command line
157 """
158 return self._genericCommand('kill', blTaskName, rng)
159
160 def cleanJobs(self, blTaskName, rng):
161 """
162 _cleanJobs_
163 Force the server to clean the jobs on the server.
164
165 Accepts in input:
166 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
167 - the range of the submission as specified by the user at the command line
168 """
169 return self._genericCommand('clean', blTaskName, rng)
170
171 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
172 """
173 _getStatus_
174 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
175 """
176
177 # fill the filename
178 filename = str(statusFile)
179
180 if not blTaskName:
181 raise CrabException('Exception while getting the task unique name')
182 return ''
183
184 # get the data and fill the file content
185 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
186 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
187 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
188 return
189
190 if statusFile is not None:
191 f = open(statusFile, 'w')
192 f.write(statusMsg)
193 f.close()
194 return statusFile
195 return statusMsg
196
197 def outputRetrieved(self, blTaskName, rng):
198 """
199 _getJobsOutput_
200 Get from the server the output file locations to be transfered back.
201
202 Accepts in input:
203 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
204 - the range of the submission as specified by the user at the command line
205 """
206 return self._genericCommand('outputRetrieved', blTaskName, rng)
207
208 def postMortemInfos(self, blTaskName, rng):
209 """
210 _postMortemInfos_
211 Retrieve the job postmortem information from the server.
212
213 Accepts in input:
214 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
215 - the range of the submission as specified by the user at the command line
216 """
217 # get the status in
218 raise NotImplementedError
219 return None
220
221 ###################################################
222 # Auxiliary methods
223 ###################################################
224
225 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
226 xmlString = ''
227 cfile = minidom.Document()
228
229 ver = common.prog_version_str
230 node = cfile.createElement("TaskCommand")
231 node.setAttribute("Task", str(taskUName) )
232 node.setAttribute("Subject", str(self.userSubj) )
233 node.setAttribute("Command", str(cmdSpec) )
234 node.setAttribute("Range", str(rng) )
235 node.setAttribute("TotJob", str(jobs) )
236 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
237 node.setAttribute("Flavour", str(flavour) )
238 node.setAttribute("Type", str(type) )
239 node.setAttribute("ClientVersion", str(ver) )
240
241 ## Only Temporary. it should be at Server level
242 removeT1bL = self.cfg_params.get("EDG.remove_default_blacklist", 0 )
243 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
244 lcg00125.grid.sinica.edu.tw, \
245 gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
246 if removeT1bL == '1': T1_BL = ''
247
248 # create a mini-cfg to be transfered to the server
249 miniCfg = {}
250
251 ## migrate CE/SE infos
252 miniCfg['EDG.ce_white_list'] = ""
253 if 'EDG.ce_white_list' in self.cfg_params:
254 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['EDG.ce_white_list'] )
255
256 miniCfg['EDG.ce_black_list'] = T1_BL
257 if 'EDG.ce_black_list' in self.cfg_params:
258 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['EDG.ce_black_list'] )
259
260 miniCfg['EDG.se_white_list'] = ""
261 if 'EDG.se_white_list' in self.cfg_params:
262 miniCfg['EDG.se_white_list'] = str( self.cfg_params['EDG.se_white_list'] )
263
264 miniCfg['EDG.se_black_list'] = ""
265 if 'EDG.se_black_list' in self.cfg_params:
266 miniCfg['EDG.se_black_list'] = str( self.cfg_params['EDG.se_black_list'] )
267
268 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
269 if 'cfgFileNameCkSum' in self.cfg_params:
270 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
271
272 miniCfg['CRAB.se_remote_dir'] = ''
273 if 'CRAB.se_remote_dir' in self.cfg_params:
274 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
275
276 ## JDL requirements specific data. Scheduler dependant
277 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('EDG.max_wall_clock_time', None)
278 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('EDG.max_cpu_time', '130')
279 miniCfg['proxyServer'] = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
280 miniCfg['VO'] = self.cfg_params.get('EDG.virtual_organization', 'cms')
281 miniCfg['EDG_retry_count'] = self.cfg_params.get('EDG.retry_count',0)
282 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('EDG.shallow_retry_count',-1)
283 miniCfg['EDG.proxyInfos'] = self.cfg_params.get('EDG.proxyInfos',{}) #TODO activate this when using MyProxy-based delegation
284
285 ## Additional field for DashBoard
286 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
287
288 ## Additional fields for Notification by the server
289 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
290 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
291
292 ## put here other fields if needed
293 node.setAttribute("CfgParamDict", str(miniCfg) )
294 cfile.appendChild(node)
295 xmlString += str(cfile.toprettyxml())
296 return xmlString
297
298 def _genericCommand(self, cmd, blTaskName, rng):
299 if not blTaskName:
300 raise CrabException('Error while extracting the Task Unique Name string')
301 return -2
302
303 cmdXML = None
304 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
305 if not cmdXML:
306 raise CrabException('Error while creating the Command XML')
307 return -3
308
309 ret = -1
310 ret = self.asSession.sendCommand(cmdXML, blTaskName)
311 logMsg = ''
312 debugMsg = ''
313 if ret == 0:
314 # success
315 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
316 else:
317 self.checkServerResponse(ret)
318 return ret
319