ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.34
Committed: Mon Feb 16 17:45:05 2009 UTC (16 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre2, CRAB_2_6_0_pre1, CRAB_2_5_1, CRAB_2_5_1_pre4, CRAB_2_5_1_pre3, CRAB_2_5_1_pre2, CRAB_2_5_1_pre1, CRAB_2_5_0, CRAB_2_5_0_pre7, CRAB_2_5_0_pre6, CRAB_2_5_0_pre5, CRAB_2_5_0_pre4
Changes since 1.33: +6 -3 lines
Log Message:
ship to the server the cmssw version used

File Contents

# Content
1 #!/usr/bin/env python
2 """
3 _ServerCommunicator_
4
5 """
6
7 __version__ = "$Id"
8 __revision__ = "$Revision"
9 __author__ = "farinafa@cern.ch"
10
11 from crab_exceptions import *
12 from crab_util import *
13 from crab_logger import Logger
14 import common
15 import Scram
16 from ProdCommon.Credential.CredentialAPI import CredentialAPI
17 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
18 # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
19 from xml.dom import minidom
20 import os
21 import commands
22
23 class ServerCommunicator:
24 """
25 Common interface for the interaction between the Crab client and the server Web Service
26 """
27 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
28 """
29 Open the communication with an Analysis Server by passing the server URL and the port
30 """
31
32 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#Server_available_for_users'
33
34 self.asSession = C_AS_Session(serverName, serverPort)
35 self.cfg_params = cfg_params
36 self.userSubj = ''
37 self.serverName = serverName
38 credentialType = 'Proxy'
39 if common.scheduler.name().upper() in ['CAF','LSF']:
40 credentialType = 'Token'
41 CliServerParams(self)
42 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
43
44 configAPI = {'credential' : credentialType }
45
46 CredAPI = CredentialAPI( configAPI )
47 try:
48 self.userSubj = CredAPI.getSubject()
49 except Exception, err:
50 common.logger.debug(3, "Getting Credential Subject: " +str(traceback.format_exc()))
51 raise CrabException("Error Getting Credential Subject")
52
53 self.scram=Scram.Scram(cfg_params)
54 ###################################################
55 # Interactions with the server
56 ###################################################
57
58 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
59 """
60 _submitNewTask_
61 Send a new task to the server to be submitted.
62
63 Accepts in input:
64 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
65 - the range of the submission as specified by the user at the command line
66 """
67
68 if not blXml:
69 raise CrabException('Error while serializing task object to XML')
70 return -2
71
72 if not blTaskName:
73 raise CrabException('Error while extracting the Task Unique Name string')
74 return -3
75
76 cmdXML = None
77 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
78 if not cmdXML:
79 raise CrabException('Error while creating the Command XML')
80 return -4
81
82 ret = -1
83 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
84
85 if ret == 0:
86 # success
87 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
88 common.logger.message(logMsg+'\n')
89 else:
90 self.checkServerResponse(ret)
91
92 return ret
93
94 def checkServerResponse(self, ret):
95 """
96 analyze the server return codes
97 """
98
99 logMsg = ''
100 if ret == 10:
101 # overlaod
102 logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
103 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
104 elif ret == 14:
105 # Draining
106 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
107 logMsg += '\t remaining jobs due to scheduled maintainence\n'
108 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
109 elif ret == 101:
110 # overlaod
111 logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
112 elif ret == 11:
113 # failed to push message in DB
114 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
115 elif ret == 12:
116 # failed SOAP communication
117 logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
118 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
119 elif ret == 20:
120 # failed to push message in PA
121 logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
122 elif ret == 22:
123 # failed SOAP communication
124 logMsg = 'Error during SOAP communication with server %s'%self.serverName
125 elif ret == 33:
126 # uncompatible client version
127 logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
128 logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
129 else:
130 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
131
132 # print loggings
133 if logMsg != '':
134 common.logger.write(logMsg+'\n')
135 raise CrabException(logMsg)
136 return ret
137
138 def subsequentJobSubmit(self, blTaskName, rng):
139 """
140 _subsequentJobSubmit_
141 Let the submission of other jobs of a task that has been already sent to a server.
142 This method is used for subsequent submission of ranged sets of jobs.
143
144 Accepts in input:
145 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
146 - the range of the submission as specified by the user at the command line
147 """
148 return self._genericCommand('submit', blTaskName, rng)
149
150 def killJobs(self, blTaskName, rng):
151 """
152 _killJobs_
153 Send a kill command to one or more jobs running on the server.
154
155 Accepts in input:
156 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
157 - the range of the submission as specified by the user at the command line
158 """
159 return self._genericCommand('kill', blTaskName, rng)
160
161 def cleanJobs(self, blTaskName, rng):
162 """
163 _cleanJobs_
164 Force the server to clean the jobs on the server.
165
166 Accepts in input:
167 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
168 - the range of the submission as specified by the user at the command line
169 """
170 return self._genericCommand('clean', blTaskName, rng)
171
172 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
173 """
174 _getStatus_
175 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
176 """
177
178 # fill the filename
179 filename = str(statusFile)
180
181 if not blTaskName:
182 raise CrabException('Exception while getting the task unique name')
183 return ''
184
185 # get the data and fill the file content
186 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
187 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
188 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
189 return
190
191 if statusFile is not None:
192 f = open(statusFile, 'w')
193 f.write(statusMsg)
194 f.close()
195 return statusFile
196 return statusMsg
197
198 def outputRetrieved(self, blTaskName, rng):
199 """
200 _getJobsOutput_
201 Get from the server the output file locations to be transfered back.
202
203 Accepts in input:
204 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
205 - the range of the submission as specified by the user at the command line
206 """
207 return self._genericCommand('outputRetrieved', blTaskName, rng)
208
209 def postMortemInfos(self, blTaskName, rng):
210 """
211 _postMortemInfos_
212 Retrieve the job postmortem information from the server.
213
214 Accepts in input:
215 - the bossLite object representing the task (jobs are assumed to be RunningJobs)
216 - the range of the submission as specified by the user at the command line
217 """
218 # get the status in
219 raise NotImplementedError
220 return None
221
222 ###################################################
223 # Auxiliary methods
224 ###################################################
225
226 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
227 xmlString = ''
228 cfile = minidom.Document()
229
230 ver = common.prog_version_str
231 node = cfile.createElement("TaskCommand")
232 node.setAttribute("Task", str(taskUName) )
233 node.setAttribute("Subject", str(self.userSubj) )
234 node.setAttribute("Command", str(cmdSpec) )
235 node.setAttribute("Range", str(rng) )
236 node.setAttribute("TotJob", str(jobs) )
237 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
238 node.setAttribute("Flavour", str(flavour) )
239 node.setAttribute("Type", str(type) )
240 node.setAttribute("ClientVersion", str(ver) )
241
242 ## Only Temporary. it should be at Server level
243 removeT1bL = self.cfg_params.get("EDG.remove_default_blacklist", 0 )
244 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
245 lcg00125.grid.sinica.edu.tw, \
246 gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
247 if removeT1bL == '1': T1_BL = ''
248
249 # create a mini-cfg to be transfered to the server
250 miniCfg = {}
251
252 ## migrate CE/SE infos
253 miniCfg['EDG.ce_white_list'] = ""
254 if 'EDG.ce_white_list' in self.cfg_params:
255 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['EDG.ce_white_list'] )
256
257 miniCfg['EDG.ce_black_list'] = T1_BL
258 if 'EDG.ce_black_list' in self.cfg_params:
259 if len(T1_BL) > 0:
260 miniCfg['EDG.ce_black_list'] += ", "
261 miniCfg['EDG.ce_black_list'] += str( self.cfg_params['EDG.ce_black_list'] )
262
263 miniCfg['EDG.se_white_list'] = ""
264 if 'EDG.se_white_list' in self.cfg_params:
265 miniCfg['EDG.se_white_list'] = str( self.cfg_params['EDG.se_white_list'] )
266
267 miniCfg['EDG.se_black_list'] = ""
268 if 'EDG.se_black_list' in self.cfg_params:
269 miniCfg['EDG.se_black_list'] = str( self.cfg_params['EDG.se_black_list'] )
270
271 miniCfg['EDG.group'] = ""
272 if 'EDG.group' in self.cfg_params:
273 miniCfg['EDG.group'] = str( self.cfg_params['EDG.group'] )
274
275 miniCfg['EDG.role'] = ""
276 if 'EDG.role' in self.cfg_params:
277 miniCfg['EDG.role'] = str( self.cfg_params['EDG.role'] )
278
279 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
280 if 'cfgFileNameCkSum' in self.cfg_params:
281 miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
282
283 miniCfg['CRAB.se_remote_dir'] = ''
284 if 'CRAB.se_remote_dir' in self.cfg_params:
285 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
286
287 ## JDL requirements specific data. Scheduler dependant
288 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('EDG.max_wall_clock_time', None)
289 miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('EDG.max_cpu_time', '130')
290 miniCfg['proxyServer'] = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
291 miniCfg['VO'] = self.cfg_params.get('EDG.virtual_organization', 'cms')
292 miniCfg['EDG_retry_count'] = self.cfg_params.get('EDG.retry_count',0)
293 miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('EDG.shallow_retry_count',-1)
294 miniCfg['EDG.proxyInfos'] = self.cfg_params.get('EDG.proxyInfos',{}) #TODO activate this when using MyProxy-based delegation
295
296 ## Additional field for DashBoard
297 miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
298
299 ## Additional fields for Notification by the server
300 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
301 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
302
303 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
304
305 ## put here other fields if needed
306 node.setAttribute("CfgParamDict", str(miniCfg) )
307 cfile.appendChild(node)
308 xmlString += str(cfile.toprettyxml())
309 return xmlString
310
311 def _genericCommand(self, cmd, blTaskName, rng):
312 if not blTaskName:
313 raise CrabException('Error while extracting the Task Unique Name string')
314 return -2
315
316 cmdXML = None
317 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
318 if not cmdXML:
319 raise CrabException('Error while creating the Command XML')
320 return -3
321
322 ret = -1
323 ret = self.asSession.sendCommand(cmdXML, blTaskName)
324 logMsg = ''
325 debugMsg = ''
326 if ret == 0:
327 # success
328 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
329 else:
330 self.checkServerResponse(ret)
331 return ret
332