ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.37
Committed: Tue May 26 22:17:51 2009 UTC (15 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.36: +18 -18 lines
Log Message:
fixed problem with server communicaion

File Contents

# User Rev Content
1 farinafa 1.1 #!/usr/bin/env python
2     """
3     _ServerCommunicator_
4    
5     """
6    
7     __version__ = "$Id"
8     __revision__ = "$Revision"
9     __author__ = "farinafa@cern.ch"
10    
11     from crab_exceptions import *
12 farinafa 1.13 from crab_util import *
13 farinafa 1.1 import common
14 spiga 1.34 import Scram
15     from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 farinafa 1.1 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17     # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18     from xml.dom import minidom
19     import os
20 farinafa 1.2 import commands
21 farinafa 1.1
22     class ServerCommunicator:
23     """
24     Common interface for the interaction between the Crab client and the server Web Service
25     """
26 farinafa 1.2 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 farinafa 1.1 """
28     Open the communication with an Analysis Server by passing the server URL and the port
29     """
30 spiga 1.27
31     self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#Server_available_for_users'
32 spiga 1.16
33 farinafa 1.1 self.asSession = C_AS_Session(serverName, serverPort)
34     self.cfg_params = cfg_params
35     self.userSubj = ''
36     self.serverName = serverName
37 spiga 1.31 credentialType = 'Proxy'
38     if common.scheduler.name().upper() in ['CAF','LSF']:
39     credentialType = 'Token'
40 spiga 1.27 CliServerParams(self)
41 farinafa 1.11 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
42    
43 spiga 1.31 configAPI = {'credential' : credentialType }
44    
45     CredAPI = CredentialAPI( configAPI )
46     try:
47     self.userSubj = CredAPI.getSubject()
48     except Exception, err:
49 spiga 1.35 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
50 spiga 1.31 raise CrabException("Error Getting Credential Subject")
51 spiga 1.34
52     self.scram=Scram.Scram(cfg_params)
53 farinafa 1.1 ###################################################
54     # Interactions with the server
55     ###################################################
56    
57 spiga 1.29 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
58 farinafa 1.1 """
59     _submitNewTask_
60     Send a new task to the server to be submitted.
61    
62     Accepts in input:
63     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
64     - the range of the submission as specified by the user at the command line
65     """
66    
67     if not blXml:
68     raise CrabException('Error while serializing task object to XML')
69     return -2
70    
71 farinafa 1.2 if not blTaskName:
72 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
73     return -3
74    
75     cmdXML = None
76 spiga 1.29 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
77 farinafa 1.1 if not cmdXML:
78     raise CrabException('Error while creating the Command XML')
79     return -4
80    
81     ret = -1
82 farinafa 1.2 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
83 spiga 1.27
84 farinafa 1.1 if ret == 0:
85     # success
86 farinafa 1.11 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
87 spiga 1.35 common.logger.info(logMsg+'\n')
88 farinafa 1.1 else:
89 spiga 1.27 self.checkServerResponse(ret)
90    
91     return ret
92    
93     def checkServerResponse(self, ret):
94     """
95     analyze the server return codes
96     """
97    
98     logMsg = ''
99     if ret == 10:
100     # overlaod
101     logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
102     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
103     elif ret == 14:
104     # Draining
105 spiga 1.28 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
106 spiga 1.31 logMsg += '\t remaining jobs due to scheduled maintainence\n'
107 spiga 1.27 logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
108     elif ret == 101:
109     # overlaod
110     logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
111     elif ret == 11:
112     # failed to push message in DB
113     logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
114     elif ret == 12:
115     # failed SOAP communication
116     logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
117     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
118     elif ret == 20:
119     # failed to push message in PA
120     logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
121     elif ret == 22:
122     # failed SOAP communication
123     logMsg = 'Error during SOAP communication with server %s'%self.serverName
124     elif ret == 33:
125     # uncompatible client version
126     logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
127     logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
128     else:
129     logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
130 farinafa 1.1
131 spiga 1.27 # print loggings
132     if logMsg != '':
133 spiga 1.19 raise CrabException(logMsg)
134 farinafa 1.1 return ret
135    
136 farinafa 1.2 def subsequentJobSubmit(self, blTaskName, rng):
137 farinafa 1.1 """
138     _subsequentJobSubmit_
139     Let the submission of other jobs of a task that has been already sent to a server.
140     This method is used for subsequent submission of ranged sets of jobs.
141    
142     Accepts in input:
143     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
144     - the range of the submission as specified by the user at the command line
145     """
146 farinafa 1.2 return self._genericCommand('submit', blTaskName, rng)
147 farinafa 1.1
148 farinafa 1.2 def killJobs(self, blTaskName, rng):
149 farinafa 1.1 """
150     _killJobs_
151     Send a kill command to one or more jobs running on the server.
152    
153     Accepts in input:
154     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
155     - the range of the submission as specified by the user at the command line
156     """
157 farinafa 1.2 return self._genericCommand('kill', blTaskName, rng)
158 farinafa 1.1
159 farinafa 1.2 def cleanJobs(self, blTaskName, rng):
160 farinafa 1.1 """
161     _cleanJobs_
162     Force the server to clean the jobs on the server.
163    
164     Accepts in input:
165     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
166     - the range of the submission as specified by the user at the command line
167     """
168 farinafa 1.2 return self._genericCommand('clean', blTaskName, rng)
169 farinafa 1.1
170 farinafa 1.23 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
171 farinafa 1.1 """
172     _getStatus_
173 farinafa 1.23 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
174     """
175 farinafa 1.1
176     # fill the filename
177     filename = str(statusFile)
178    
179 farinafa 1.2 if not blTaskName:
180 farinafa 1.1 raise CrabException('Exception while getting the task unique name')
181     return ''
182    
183     # get the data and fill the file content
184 farinafa 1.23 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
185 farinafa 1.2 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
186 farinafa 1.11 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
187 farinafa 1.2 return
188    
189     if statusFile is not None:
190     f = open(statusFile, 'w')
191     f.write(statusMsg)
192     f.close()
193     return statusFile
194     return statusMsg
195 farinafa 1.1
196 farinafa 1.8 def outputRetrieved(self, blTaskName, rng):
197 farinafa 1.1 """
198     _getJobsOutput_
199     Get from the server the output file locations to be transfered back.
200    
201     Accepts in input:
202     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
203     - the range of the submission as specified by the user at the command line
204     """
205 farinafa 1.8 return self._genericCommand('outputRetrieved', blTaskName, rng)
206 farinafa 1.1
207 farinafa 1.2 def postMortemInfos(self, blTaskName, rng):
208 farinafa 1.1 """
209     _postMortemInfos_
210     Retrieve the job postmortem information from the server.
211    
212     Accepts in input:
213     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
214     - the range of the submission as specified by the user at the command line
215     """
216     # get the status in
217     raise NotImplementedError
218     return None
219    
220     ###################################################
221     # Auxiliary methods
222     ###################################################
223    
224 spiga 1.29 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
225 farinafa 1.1 xmlString = ''
226     cfile = minidom.Document()
227    
228 spiga 1.27 ver = common.prog_version_str
229 farinafa 1.1 node = cfile.createElement("TaskCommand")
230     node.setAttribute("Task", str(taskUName) )
231     node.setAttribute("Subject", str(self.userSubj) )
232     node.setAttribute("Command", str(cmdSpec) )
233     node.setAttribute("Range", str(rng) )
234 spiga 1.29 node.setAttribute("TotJob", str(jobs) )
235 farinafa 1.10 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
236 farinafa 1.24 node.setAttribute("Flavour", str(flavour) )
237     node.setAttribute("Type", str(type) )
238 spiga 1.27 node.setAttribute("ClientVersion", str(ver) )
239 farinafa 1.10
240 spiga 1.30 ## Only Temporary. it should be at Server level
241 spiga 1.36 removeT1bL = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
242 spiga 1.30 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
243     lcg00125.grid.sinica.edu.tw, \
244     gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
245     if removeT1bL == '1': T1_BL = ''
246    
247 farinafa 1.1 # create a mini-cfg to be transfered to the server
248     miniCfg = {}
249 farinafa 1.3
250     ## migrate CE/SE infos
251 spiga 1.37 miniCfg['EDG.ce_white_list'] = ""
252 spiga 1.36 if 'GRID.ce_white_list' in self.cfg_params:
253 spiga 1.37 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
254 farinafa 1.1
255 spiga 1.37 miniCfg['EDG.ce_black_list'] = T1_BL
256 spiga 1.36 if 'GRID.ce_black_list' in self.cfg_params:
257 mcinquil 1.32 if len(T1_BL) > 0:
258 spiga 1.37 miniCfg['EDG.ce_black_list'] += ", "
259     miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
260 farinafa 1.1
261 spiga 1.37 miniCfg['EDG.se_white_list'] = ""
262 spiga 1.36 if 'GRID.se_white_list' in self.cfg_params:
263 spiga 1.37 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
264 spiga 1.36
265 spiga 1.37 miniCfg['EDG.se_black_list'] = ""
266 spiga 1.36 if 'GRID.se_black_list' in self.cfg_params:
267 spiga 1.37 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
268 spiga 1.36
269 spiga 1.37 miniCfg['EDG.group'] = ""
270 spiga 1.36 if 'GRID.group' in self.cfg_params:
271 spiga 1.37 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
272 spiga 1.36
273 spiga 1.37 miniCfg['EDG.role'] = ""
274 spiga 1.36 if 'GRID.role' in self.cfg_params:
275 spiga 1.37 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
276 spiga 1.33
277 farinafa 1.13 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
278 farinafa 1.1 if 'cfgFileNameCkSum' in self.cfg_params:
279     miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
280    
281     miniCfg['CRAB.se_remote_dir'] = ''
282     if 'CRAB.se_remote_dir' in self.cfg_params:
283     miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
284    
285 farinafa 1.3 ## JDL requirements specific data. Scheduler dependant
286 spiga 1.37 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
287     miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
288 spiga 1.36 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
289     miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
290 spiga 1.37 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
291     miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
292     miniCfg['EDG.proxyInfos'] = self.cfg_params.get('GRID.proxyInfos',{}) #TODO activate this when using MyProxy-based delegation
293 farinafa 1.13
294 farinafa 1.14 ## Additional field for DashBoard
295     miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
296    
297     ## Additional fields for Notification by the server
298 farinafa 1.15 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
299 farinafa 1.20 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
300 farinafa 1.3
301 spiga 1.34 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
302    
303 farinafa 1.1 ## put here other fields if needed
304     node.setAttribute("CfgParamDict", str(miniCfg) )
305     cfile.appendChild(node)
306     xmlString += str(cfile.toprettyxml())
307     return xmlString
308    
309 farinafa 1.2 def _genericCommand(self, cmd, blTaskName, rng):
310     if not blTaskName:
311 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
312     return -2
313    
314     cmdXML = None
315 farinafa 1.2 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
316 farinafa 1.1 if not cmdXML:
317     raise CrabException('Error while creating the Command XML')
318     return -3
319 spiga 1.27
320 farinafa 1.1 ret = -1
321 farinafa 1.2 ret = self.asSession.sendCommand(cmdXML, blTaskName)
322 farinafa 1.1 logMsg = ''
323 spiga 1.12 debugMsg = ''
324 farinafa 1.1 if ret == 0:
325     # success
326 farinafa 1.22 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
327 farinafa 1.1 else:
328 spiga 1.27 self.checkServerResponse(ret)
329 farinafa 1.1 return ret
330