ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.38
Committed: Wed May 27 12:41:10 2009 UTC (15 years, 11 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre4
Changes since 1.37: +25 -22 lines
Log Message:
handle the case when the selected server is not compatible

File Contents

# User Rev Content
1 farinafa 1.1 #!/usr/bin/env python
2     """
3     _ServerCommunicator_
4    
5     """
6    
7     __version__ = "$Id"
8     __revision__ = "$Revision"
9     __author__ = "farinafa@cern.ch"
10    
11     from crab_exceptions import *
12 farinafa 1.13 from crab_util import *
13 farinafa 1.1 import common
14 spiga 1.34 import Scram
15     from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 farinafa 1.1 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17     # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18     from xml.dom import minidom
19     import os
20 farinafa 1.2 import commands
21 farinafa 1.1
22     class ServerCommunicator:
23     """
24     Common interface for the interaction between the Crab client and the server Web Service
25     """
26 farinafa 1.2 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 farinafa 1.1 """
28     Open the communication with an Analysis Server by passing the server URL and the port
29     """
30 spiga 1.27
31     self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#Server_available_for_users'
32 spiga 1.16
33 farinafa 1.1 self.asSession = C_AS_Session(serverName, serverPort)
34     self.cfg_params = cfg_params
35     self.userSubj = ''
36     self.serverName = serverName
37 spiga 1.31 credentialType = 'Proxy'
38     if common.scheduler.name().upper() in ['CAF','LSF']:
39     credentialType = 'Token'
40 spiga 1.27 CliServerParams(self)
41 farinafa 1.11 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
42    
43 spiga 1.31 configAPI = {'credential' : credentialType }
44    
45     CredAPI = CredentialAPI( configAPI )
46     try:
47     self.userSubj = CredAPI.getSubject()
48     except Exception, err:
49 spiga 1.35 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
50 spiga 1.31 raise CrabException("Error Getting Credential Subject")
51 spiga 1.34
52     self.scram=Scram.Scram(cfg_params)
53 farinafa 1.1 ###################################################
54     # Interactions with the server
55     ###################################################
56    
57 spiga 1.29 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
58 farinafa 1.1 """
59     _submitNewTask_
60     Send a new task to the server to be submitted.
61    
62     Accepts in input:
63     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
64     - the range of the submission as specified by the user at the command line
65     """
66    
67     if not blXml:
68     raise CrabException('Error while serializing task object to XML')
69     return -2
70    
71 farinafa 1.2 if not blTaskName:
72 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
73     return -3
74    
75     cmdXML = None
76 spiga 1.29 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
77 farinafa 1.1 if not cmdXML:
78     raise CrabException('Error while creating the Command XML')
79     return -4
80    
81     ret = -1
82 farinafa 1.2 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
83 spiga 1.27
84 farinafa 1.1 if ret == 0:
85     # success
86 farinafa 1.11 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
87 spiga 1.35 common.logger.info(logMsg+'\n')
88 farinafa 1.1 else:
89 spiga 1.27 self.checkServerResponse(ret)
90    
91     return ret
92    
93     def checkServerResponse(self, ret):
94     """
95     analyze the server return codes
96     """
97    
98     logMsg = ''
99     if ret == 10:
100 slacapra 1.38 # overlaod
101     logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
102     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
103 spiga 1.27 elif ret == 14:
104 slacapra 1.38 # Draining
105     logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
106     logMsg += '\t remaining jobs due to scheduled maintainence\n'
107     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
108 spiga 1.27 elif ret == 101:
109 slacapra 1.38 # overlaod
110     logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
111 spiga 1.27 elif ret == 11:
112 slacapra 1.38 # failed to push message in DB
113     logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
114 spiga 1.27 elif ret == 12:
115 slacapra 1.38 # failed SOAP communication
116     logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
117     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
118 spiga 1.27 elif ret == 20:
119 slacapra 1.38 # failed to push message in PA
120     logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
121 spiga 1.27 elif ret == 22:
122 slacapra 1.38 # failed SOAP communication
123     logMsg = 'Error during SOAP communication with server %s'%self.serverName
124 spiga 1.27 elif ret == 33:
125 slacapra 1.38 # uncompatible client version
126     logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
127     logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
128 spiga 1.27 else:
129 slacapra 1.38 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
130 farinafa 1.1
131 spiga 1.27 # print loggings
132     if logMsg != '':
133 slacapra 1.38 # reset server choice
134     opsToBeSaved={'serverName' : '' }
135     common._db.updateTask_(opsToBeSaved)
136 spiga 1.19 raise CrabException(logMsg)
137 farinafa 1.1 return ret
138    
139 farinafa 1.2 def subsequentJobSubmit(self, blTaskName, rng):
140 farinafa 1.1 """
141     _subsequentJobSubmit_
142     Let the submission of other jobs of a task that has been already sent to a server.
143     This method is used for subsequent submission of ranged sets of jobs.
144    
145     Accepts in input:
146     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
147     - the range of the submission as specified by the user at the command line
148     """
149 farinafa 1.2 return self._genericCommand('submit', blTaskName, rng)
150 farinafa 1.1
151 farinafa 1.2 def killJobs(self, blTaskName, rng):
152 farinafa 1.1 """
153     _killJobs_
154     Send a kill command to one or more jobs running on the server.
155    
156     Accepts in input:
157     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
158     - the range of the submission as specified by the user at the command line
159     """
160 farinafa 1.2 return self._genericCommand('kill', blTaskName, rng)
161 farinafa 1.1
162 farinafa 1.2 def cleanJobs(self, blTaskName, rng):
163 farinafa 1.1 """
164     _cleanJobs_
165     Force the server to clean the jobs on the server.
166    
167     Accepts in input:
168     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
169     - the range of the submission as specified by the user at the command line
170     """
171 farinafa 1.2 return self._genericCommand('clean', blTaskName, rng)
172 farinafa 1.1
173 farinafa 1.23 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
174 farinafa 1.1 """
175     _getStatus_
176 farinafa 1.23 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
177     """
178 farinafa 1.1
179     # fill the filename
180     filename = str(statusFile)
181    
182 farinafa 1.2 if not blTaskName:
183 farinafa 1.1 raise CrabException('Exception while getting the task unique name')
184     return ''
185    
186     # get the data and fill the file content
187 farinafa 1.23 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
188 farinafa 1.2 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
189 farinafa 1.11 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
190 farinafa 1.2 return
191    
192     if statusFile is not None:
193     f = open(statusFile, 'w')
194     f.write(statusMsg)
195     f.close()
196     return statusFile
197     return statusMsg
198 farinafa 1.1
199 farinafa 1.8 def outputRetrieved(self, blTaskName, rng):
200 farinafa 1.1 """
201     _getJobsOutput_
202     Get from the server the output file locations to be transfered back.
203    
204     Accepts in input:
205     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
206     - the range of the submission as specified by the user at the command line
207     """
208 farinafa 1.8 return self._genericCommand('outputRetrieved', blTaskName, rng)
209 farinafa 1.1
210 farinafa 1.2 def postMortemInfos(self, blTaskName, rng):
211 farinafa 1.1 """
212     _postMortemInfos_
213     Retrieve the job postmortem information from the server.
214    
215     Accepts in input:
216     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
217     - the range of the submission as specified by the user at the command line
218     """
219     # get the status in
220     raise NotImplementedError
221     return None
222    
223     ###################################################
224     # Auxiliary methods
225     ###################################################
226    
227 spiga 1.29 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
228 farinafa 1.1 xmlString = ''
229     cfile = minidom.Document()
230    
231 spiga 1.27 ver = common.prog_version_str
232 farinafa 1.1 node = cfile.createElement("TaskCommand")
233     node.setAttribute("Task", str(taskUName) )
234     node.setAttribute("Subject", str(self.userSubj) )
235     node.setAttribute("Command", str(cmdSpec) )
236     node.setAttribute("Range", str(rng) )
237 spiga 1.29 node.setAttribute("TotJob", str(jobs) )
238 farinafa 1.10 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
239 farinafa 1.24 node.setAttribute("Flavour", str(flavour) )
240     node.setAttribute("Type", str(type) )
241 spiga 1.27 node.setAttribute("ClientVersion", str(ver) )
242 farinafa 1.10
243 spiga 1.30 ## Only Temporary. it should be at Server level
244 spiga 1.36 removeT1bL = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
245 spiga 1.30 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
246     lcg00125.grid.sinica.edu.tw, \
247     gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
248     if removeT1bL == '1': T1_BL = ''
249    
250 farinafa 1.1 # create a mini-cfg to be transfered to the server
251     miniCfg = {}
252 farinafa 1.3
253     ## migrate CE/SE infos
254 spiga 1.37 miniCfg['EDG.ce_white_list'] = ""
255 spiga 1.36 if 'GRID.ce_white_list' in self.cfg_params:
256 spiga 1.37 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
257 farinafa 1.1
258 spiga 1.37 miniCfg['EDG.ce_black_list'] = T1_BL
259 spiga 1.36 if 'GRID.ce_black_list' in self.cfg_params:
260 mcinquil 1.32 if len(T1_BL) > 0:
261 spiga 1.37 miniCfg['EDG.ce_black_list'] += ", "
262     miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
263 farinafa 1.1
264 spiga 1.37 miniCfg['EDG.se_white_list'] = ""
265 spiga 1.36 if 'GRID.se_white_list' in self.cfg_params:
266 spiga 1.37 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
267 spiga 1.36
268 spiga 1.37 miniCfg['EDG.se_black_list'] = ""
269 spiga 1.36 if 'GRID.se_black_list' in self.cfg_params:
270 spiga 1.37 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
271 spiga 1.36
272 spiga 1.37 miniCfg['EDG.group'] = ""
273 spiga 1.36 if 'GRID.group' in self.cfg_params:
274 spiga 1.37 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
275 spiga 1.36
276 spiga 1.37 miniCfg['EDG.role'] = ""
277 spiga 1.36 if 'GRID.role' in self.cfg_params:
278 spiga 1.37 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
279 spiga 1.33
280 farinafa 1.13 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
281 farinafa 1.1 if 'cfgFileNameCkSum' in self.cfg_params:
282     miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
283    
284     miniCfg['CRAB.se_remote_dir'] = ''
285     if 'CRAB.se_remote_dir' in self.cfg_params:
286     miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
287    
288 farinafa 1.3 ## JDL requirements specific data. Scheduler dependant
289 spiga 1.37 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
290     miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
291 spiga 1.36 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
292     miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
293 spiga 1.37 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
294     miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
295     miniCfg['EDG.proxyInfos'] = self.cfg_params.get('GRID.proxyInfos',{}) #TODO activate this when using MyProxy-based delegation
296 farinafa 1.13
297 farinafa 1.14 ## Additional field for DashBoard
298     miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
299    
300     ## Additional fields for Notification by the server
301 farinafa 1.15 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
302 farinafa 1.20 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
303 farinafa 1.3
304 spiga 1.34 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
305    
306 farinafa 1.1 ## put here other fields if needed
307     node.setAttribute("CfgParamDict", str(miniCfg) )
308     cfile.appendChild(node)
309     xmlString += str(cfile.toprettyxml())
310     return xmlString
311    
312 farinafa 1.2 def _genericCommand(self, cmd, blTaskName, rng):
313     if not blTaskName:
314 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
315     return -2
316    
317     cmdXML = None
318 farinafa 1.2 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
319 farinafa 1.1 if not cmdXML:
320     raise CrabException('Error while creating the Command XML')
321     return -3
322 spiga 1.27
323 farinafa 1.1 ret = -1
324 farinafa 1.2 ret = self.asSession.sendCommand(cmdXML, blTaskName)
325 farinafa 1.1 logMsg = ''
326 spiga 1.12 debugMsg = ''
327 farinafa 1.1 if ret == 0:
328     # success
329 farinafa 1.22 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
330 farinafa 1.1 else:
331 spiga 1.27 self.checkServerResponse(ret)
332 farinafa 1.1 return ret
333