ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.20
Committed: Tue Jun 10 09:08:28 2008 UTC (16 years, 10 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_2_pre5
Changes since 1.19: +1 -1 lines
Log Message:
Fix on thresholdLevel name

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 from crab_logger import Logger
14 import common
15
16 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18 from xml.dom import minidom
19 import os
20 import commands
21
22 class ServerCommunicator:
23 """
24 Common interface for the interaction between the Crab client and the server Web Service
25 """
26 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 """
28 Open the communication with an Analysis Server by passing the server URL and the port
29 """
30
31 self.dontMoveProxy = False
32 if string.lower(cfg_params.get("CRAB.scheduler")) in ['lsf','caf']:
33 self.dontMoveProxy = True
34
35 self.asSession = C_AS_Session(serverName, serverPort)
36 self.cfg_params = cfg_params
37 self.userSubj = ''
38 self.serverName = serverName
39
40 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
41
42 x509 = proxyPath
43 if self.dontMoveProxy == False:
44 if 'X509_USER_PROXY' in os.environ:
45 x509 = os.environ['X509_USER_PROXY']
46 else:
47 exitCode, x509 = commands.getstatusoutput('ls /tmp/x509up_u`id -u`').strip()
48 if exitCode != 0:
49 raise CrabException("Error while locating the user proxy file")
50 return
51
52 exitCode, self.userSubj = commands.getstatusoutput('openssl x509 -in %s -subject -noout'%x509)
53 if exitCode != 0:
54 raise CrabException("Error while getting the subject from the user proxy")
55 return
56 self.userSubj = str(self.userSubj).strip()
57 else:
58 x509 = ''
59 self.userSubj = str(os.environ['USER'])
60 pass
61
62 ###################################################
63 # Interactions with the server
64 ###################################################
65
66 def submitNewTask(self, blTaskName, blXml, rng):
67 """
68 _submitNewTask_
69 Send a new task to the server to be submitted.
70
71 Accepts in input:
72 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
73 - the range of the submission as specified by the user at the command line
74 """
75
76 if not blXml:
77 raise CrabException('Error while serializing task object to XML')
78 return -2
79
80 if not blTaskName:
81 raise CrabException('Error while extracting the Task Unique Name string')
82 return -3
83
84 cmdXML = None
85 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True)
86 if not cmdXML:
87 raise CrabException('Error while creating the Command XML')
88 return -4
89
90 ret = -1
91 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
92 logMsg = ''
93 if ret == 0:
94 # success
95 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
96 common.logger.message(logMsg+'\n')
97 else:
98 if ret == 10:
99 # overlaod
100 logMsg = 'The server %s refused to accept the task %s because it is overloaded'%(self.serverName, self.crab_task_name)
101 elif ret == 101:
102 # overlaod
103 logMsg = 'The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
104 elif ret == 11:
105 # failed to push message in DB
106 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
107 elif ret == 12:
108 # failed SOAP communication
109 logMsg = 'Error during SOAP communication with server %s.\n'%self.serverName
110 logMsg +='\t ----- The server could be under maintainance. ----- '
111 else:
112 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
113
114 common.logger.write(logMsg+'\n')
115 raise CrabException(logMsg)
116 return ret
117
118 def subsequentJobSubmit(self, blTaskName, rng):
119 """
120 _subsequentJobSubmit_
121 Let the submission of other jobs of a task that has been already sent to a server.
122 This method is used for subsequent submission of ranged sets of jobs.
123
124 Accepts in input:
125 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
126 - the range of the submission as specified by the user at the command line
127 """
128 return self._genericCommand('submit', blTaskName, rng)
129
130 def killJobs(self, blTaskName, rng):
131 """
132 _killJobs_
133 Send a kill command to one or more jobs running on the server.
134
135 Accepts in input:
136 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
137 - the range of the submission as specified by the user at the command line
138 """
139 return self._genericCommand('kill', blTaskName, rng)
140
141 def cleanJobs(self, blTaskName, rng):
142 """
143 _cleanJobs_
144 Force the server to clean the jobs on the server.
145
146 Accepts in input:
147 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
148 - the range of the submission as specified by the user at the command line
149 """
150 return self._genericCommand('clean', blTaskName, rng)
151
152 def getStatus(self, blTaskName, statusFile=None):
153 """
154 _getStatus_
155 Retrieve the task status from the server.
156
157 Accepts in input:
158 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
159 - the range of the submission as specified by the user at the command line
160 """
161 # fill the filename
162 filename = str(statusFile)
163
164 if not blTaskName:
165 raise CrabException('Exception while getting the task unique name')
166 return ''
167
168 # get the data and fill the file content
169 statusMsg = self.asSession.getTaskStatus(blTaskName)
170 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
171 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
172 return
173
174 if statusFile is not None:
175 f = open(statusFile, 'w')
176 f.write(statusMsg)
177 f.close()
178 return statusFile
179 return statusMsg
180
181 def outputRetrieved(self, blTaskName, rng):
182 """
183 _getJobsOutput_
184 Get from the server the output file locations to be transfered back.
185
186 Accepts in input:
187 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
188 - the range of the submission as specified by the user at the command line
189 """
190 return self._genericCommand('outputRetrieved', blTaskName, rng)
191
192 def postMortemInfos(self, blTaskName, rng):
193 """
194 _postMortemInfos_
195 Retrieve the job postmortem information from the server.
196
197 Accepts in input:
198 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
199 - the range of the submission as specified by the user at the command line
200 """
201 # get the status in
202 raise NotImplementedError
203 return None
204
205 ###################################################
206 # Auxiliary methods
207 ###################################################
208
209 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False):
210 xmlString = ''
211 cfile = minidom.Document()
212
213 node = cfile.createElement("TaskCommand")
214 node.setAttribute("Task", str(taskUName) )
215 node.setAttribute("Subject", str(self.userSubj) )
216 node.setAttribute("Command", str(cmdSpec) )
217 node.setAttribute("Range", str(rng) )
218 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
219
220 # first submission specific attributes: not available or not considered for the other kind of messages
221 if (newTaskAddIns == True):
222 # add here one time fields if needed
223 pass
224 # node.setAttribute("Service", self.cfg_params[''])
225
226 # create a mini-cfg to be transfered to the server
227 miniCfg = {}
228
229 ## migrate CE/SE infos
230 miniCfg['EDG.ce_white_list'] = ""
231 if 'EDG.ce_white_list' in self.cfg_params:
232 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['EDG.ce_white_list'] )
233
234 miniCfg['EDG.ce_black_list'] = ""
235 if 'EDG.ce_black_list' in self.cfg_params:
236 miniCfg['EDG.ce_black_list'] = str( self.cfg_params['EDG.ce_black_list'] )
237
238 miniCfg['EDG.se_white_list'] = ""
239 if 'EDG.se_white_list' in self.cfg_params:
240 miniCfg['EDG.se_white_list'] = str( self.cfg_params['EDG.se_white_list'] )
241
242 miniCfg['EDG.se_black_list'] = ""
243 if 'EDG.se_black_list' in self.cfg_params:
244 miniCfg['EDG.se_black_list'] = str( self.cfg_params['EDG.se_black_list'] )
245
246 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
247 if 'cfgFileNameCkSum' in self.cfg_params:
248 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
249
250 miniCfg['CRAB.se_remote_dir'] = ''
251 if 'CRAB.se_remote_dir' in self.cfg_params:
252 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
253
254 ## JDL requirements specific data. Scheduler dependant
255 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('EDG.max_wall_clock_time', None)
256 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('EDG.max_cpu_time', '130')
257 miniCfg['proxyServer'] = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
258 miniCfg['VO'] = self.cfg_params.get('EDG.virtual_organization', 'cms')
259 miniCfg['EDG_retry_count'] = self.cfg_params.get('EDG.retry_count',0)
260 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('EDG.shallow_retry_count',-1)
261
262 ## Additional field for DashBoard
263 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
264
265 ## Additional fields for Notification by the server
266 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
267 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
268
269 ## put here other fields if needed
270 node.setAttribute("CfgParamDict", str(miniCfg) )
271 cfile.appendChild(node)
272 xmlString += str(cfile.toprettyxml())
273 return xmlString
274
275 def _genericCommand(self, cmd, blTaskName, rng):
276 if not blTaskName:
277 raise CrabException('Error while extracting the Task Unique Name string')
278 return -2
279
280 cmdXML = None
281 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
282 if not cmdXML:
283 raise CrabException('Error while creating the Command XML')
284 return -3
285
286 ret = -1
287 ret = self.asSession.sendCommand(cmdXML, blTaskName)
288 logMsg = ''
289 debugMsg = ''
290 if ret == 0:
291 # success
292 debugMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
293 elif ret == 101:
294 # overlaod
295 logMsg = 'The server %s refused the submission %s because you asked to handle a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
296 elif ret == 20:
297 # failed to push message in PA
298 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
299 elif ret == 22:
300 # failed SOAP communication
301 logMsg = 'Error during SOAP communication with server %s'%self.serverName
302 else:
303 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
304
305 # print loggings
306 if logMsg != '':
307 common.logger.message(logMsg+'\n')
308 else:
309 common.logger.debug(3,debugMsg+'\n')
310 return ret
311