ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.44
Committed: Tue Sep 8 22:07:59 2009 UTC (15 years, 7 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
Changes since 1.43: +1 -1 lines
Log Message:
Changed link for server available to users

File Contents

# User Rev Content
1 farinafa 1.1 #!/usr/bin/env python
2     """
3     _ServerCommunicator_
4    
5     """
6    
7     __version__ = "$Id"
8     __revision__ = "$Revision"
9     __author__ = "farinafa@cern.ch"
10    
11     from crab_exceptions import *
12 farinafa 1.13 from crab_util import *
13 farinafa 1.1 import common
14 spiga 1.34 import Scram
15     from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 farinafa 1.1 from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
17     # from CRAB_Server_fastAPI import CRAB_Server_https as C_AS_Session
18     from xml.dom import minidom
19     import os
20 farinafa 1.2 import commands
21 farinafa 1.1
22     class ServerCommunicator:
23     """
24     Common interface for the interaction between the Crab client and the server Web Service
25     """
26 farinafa 1.2 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
27 farinafa 1.1 """
28     Open the communication with an Analysis Server by passing the server URL and the port
29     """
30 spiga 1.27
31 mcinquil 1.44 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServerForUsers#Server_available_for_users'
32 spiga 1.16
33 farinafa 1.1 self.asSession = C_AS_Session(serverName, serverPort)
34     self.cfg_params = cfg_params
35     self.userSubj = ''
36     self.serverName = serverName
37 spiga 1.31 credentialType = 'Proxy'
38     if common.scheduler.name().upper() in ['CAF','LSF']:
39     credentialType = 'Token'
40 spiga 1.27 CliServerParams(self)
41 farinafa 1.11 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
42    
43 spiga 1.40 configAPI = {'credential' : credentialType, \
44     'logger' : common.logger() }
45 spiga 1.31
46     CredAPI = CredentialAPI( configAPI )
47     try:
48     self.userSubj = CredAPI.getSubject()
49     except Exception, err:
50 spiga 1.35 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
51 spiga 1.31 raise CrabException("Error Getting Credential Subject")
52 spiga 1.34
53     self.scram=Scram.Scram(cfg_params)
54 farinafa 1.1 ###################################################
55     # Interactions with the server
56     ###################################################
57    
58 spiga 1.29 def submitNewTask(self, blTaskName, blXml, rng, TotJob):
59 farinafa 1.1 """
60     _submitNewTask_
61     Send a new task to the server to be submitted.
62    
63     Accepts in input:
64     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
65     - the range of the submission as specified by the user at the command line
66     """
67    
68     if not blXml:
69     raise CrabException('Error while serializing task object to XML')
70     return -2
71    
72 farinafa 1.2 if not blTaskName:
73 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
74     return -3
75    
76     cmdXML = None
77 spiga 1.29 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob)
78 farinafa 1.1 if not cmdXML:
79     raise CrabException('Error while creating the Command XML')
80     return -4
81    
82     ret = -1
83 farinafa 1.2 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
84 spiga 1.27
85 farinafa 1.1 if ret == 0:
86     # success
87 farinafa 1.11 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
88 spiga 1.35 common.logger.info(logMsg+'\n')
89 farinafa 1.1 else:
90 spiga 1.27 self.checkServerResponse(ret)
91    
92     return ret
93    
94     def checkServerResponse(self, ret):
95     """
96     analyze the server return codes
97     """
98    
99     logMsg = ''
100     if ret == 10:
101 slacapra 1.38 # overlaod
102     logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
103     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
104 spiga 1.27 elif ret == 14:
105 slacapra 1.38 # Draining
106     logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
107     logMsg += '\t remaining jobs due to scheduled maintainence\n'
108     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
109 spiga 1.27 elif ret == 101:
110 slacapra 1.38 # overlaod
111     logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
112 spiga 1.27 elif ret == 11:
113 slacapra 1.38 # failed to push message in DB
114     logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
115 spiga 1.27 elif ret == 12:
116 slacapra 1.38 # failed SOAP communication
117     logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
118     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
119 spiga 1.27 elif ret == 20:
120 slacapra 1.38 # failed to push message in PA
121     logMsg = 'Backend unable to release messages to trigger the computation of task %s'%self.crab_task_name
122 spiga 1.27 elif ret == 22:
123 slacapra 1.38 # failed SOAP communication
124     logMsg = 'Error during SOAP communication with server %s'%self.serverName
125 spiga 1.27 elif ret == 33:
126 slacapra 1.38 # uncompatible client version
127     logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
128     logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
129 spiga 1.27 else:
130 slacapra 1.38 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
131 farinafa 1.1
132 spiga 1.27 # print loggings
133     if logMsg != '':
134 slacapra 1.38 # reset server choice
135     opsToBeSaved={'serverName' : '' }
136     common._db.updateTask_(opsToBeSaved)
137 spiga 1.39 common.logger.info(logMsg)
138 farinafa 1.1 return ret
139    
140 farinafa 1.2 def subsequentJobSubmit(self, blTaskName, rng):
141 farinafa 1.1 """
142     _subsequentJobSubmit_
143     Let the submission of other jobs of a task that has been already sent to a server.
144     This method is used for subsequent submission of ranged sets of jobs.
145    
146     Accepts in input:
147     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
148     - the range of the submission as specified by the user at the command line
149     """
150 farinafa 1.2 return self._genericCommand('submit', blTaskName, rng)
151 farinafa 1.1
152 farinafa 1.2 def killJobs(self, blTaskName, rng):
153 farinafa 1.1 """
154     _killJobs_
155     Send a kill command to one or more jobs running on the server.
156    
157     Accepts in input:
158     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
159     - the range of the submission as specified by the user at the command line
160     """
161 farinafa 1.2 return self._genericCommand('kill', blTaskName, rng)
162 farinafa 1.1
163 farinafa 1.42 def cleanTask(self, blTaskName):
164 farinafa 1.1 """
165 farinafa 1.42 _cleanTask_
166 farinafa 1.1 Force the server to clean the jobs on the server.
167    
168     Accepts in input:
169     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
170     - the range of the submission as specified by the user at the command line
171     """
172 farinafa 1.42 return self._genericCommand('clean', blTaskName, 'all')
173 farinafa 1.1
174 farinafa 1.23 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
175 farinafa 1.1 """
176     _getStatus_
177 farinafa 1.23 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
178     """
179 farinafa 1.1
180     # fill the filename
181     filename = str(statusFile)
182    
183 farinafa 1.2 if not blTaskName:
184 farinafa 1.1 raise CrabException('Exception while getting the task unique name')
185     return ''
186    
187     # get the data and fill the file content
188 farinafa 1.23 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
189 farinafa 1.2 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
190 farinafa 1.11 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
191 farinafa 1.2 return
192    
193     if statusFile is not None:
194     f = open(statusFile, 'w')
195     f.write(statusMsg)
196     f.close()
197     return statusFile
198     return statusMsg
199 farinafa 1.1
200 farinafa 1.8 def outputRetrieved(self, blTaskName, rng):
201 farinafa 1.1 """
202     _getJobsOutput_
203     Get from the server the output file locations to be transfered back.
204    
205     Accepts in input:
206     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
207     - the range of the submission as specified by the user at the command line
208     """
209 farinafa 1.8 return self._genericCommand('outputRetrieved', blTaskName, rng)
210 farinafa 1.1
211 farinafa 1.2 def postMortemInfos(self, blTaskName, rng):
212 farinafa 1.1 """
213     _postMortemInfos_
214     Retrieve the job postmortem information from the server.
215    
216     Accepts in input:
217     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
218     - the range of the submission as specified by the user at the command line
219     """
220     # get the status in
221     raise NotImplementedError
222     return None
223    
224     ###################################################
225     # Auxiliary methods
226     ###################################################
227    
228 spiga 1.29 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
229 farinafa 1.1 xmlString = ''
230     cfile = minidom.Document()
231    
232 spiga 1.27 ver = common.prog_version_str
233 farinafa 1.1 node = cfile.createElement("TaskCommand")
234     node.setAttribute("Task", str(taskUName) )
235     node.setAttribute("Subject", str(self.userSubj) )
236     node.setAttribute("Command", str(cmdSpec) )
237     node.setAttribute("Range", str(rng) )
238 spiga 1.29 node.setAttribute("TotJob", str(jobs) )
239 farinafa 1.10 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
240 farinafa 1.24 node.setAttribute("Flavour", str(flavour) )
241     node.setAttribute("Type", str(type) )
242 spiga 1.27 node.setAttribute("ClientVersion", str(ver) )
243 farinafa 1.10
244 spiga 1.30 ## Only Temporary. it should be at Server level
245 spiga 1.36 removeT1bL = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
246 spiga 1.30 T1_BL = "fnal.gov, gridka.de ,w-ce01.grid.sinica.edu.tw, w-ce02.grid.sinica.edu.tw, \
247     lcg00125.grid.sinica.edu.tw, \
248     gridpp.rl.ac.uk, cclcgceli03.in2p3.fr, cclcgceli04.in2p3.fr, pic.es, cnaf"
249     if removeT1bL == '1': T1_BL = ''
250    
251 farinafa 1.1 # create a mini-cfg to be transfered to the server
252     miniCfg = {}
253 farinafa 1.3
254     ## migrate CE/SE infos
255 spiga 1.37 miniCfg['EDG.ce_white_list'] = ""
256 spiga 1.36 if 'GRID.ce_white_list' in self.cfg_params:
257 spiga 1.37 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
258 farinafa 1.1
259 spiga 1.37 miniCfg['EDG.ce_black_list'] = T1_BL
260 spiga 1.36 if 'GRID.ce_black_list' in self.cfg_params:
261 mcinquil 1.32 if len(T1_BL) > 0:
262 spiga 1.37 miniCfg['EDG.ce_black_list'] += ", "
263     miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
264 farinafa 1.1
265 spiga 1.37 miniCfg['EDG.se_white_list'] = ""
266 spiga 1.36 if 'GRID.se_white_list' in self.cfg_params:
267 spiga 1.37 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
268 spiga 1.36
269 spiga 1.37 miniCfg['EDG.se_black_list'] = ""
270 spiga 1.36 if 'GRID.se_black_list' in self.cfg_params:
271 spiga 1.37 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
272 spiga 1.36
273 spiga 1.37 miniCfg['EDG.group'] = ""
274 spiga 1.36 if 'GRID.group' in self.cfg_params:
275 spiga 1.37 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
276 spiga 1.36
277 spiga 1.37 miniCfg['EDG.role'] = ""
278 spiga 1.36 if 'GRID.role' in self.cfg_params:
279 spiga 1.37 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
280 spiga 1.33
281 farinafa 1.13 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
282 farinafa 1.1 if 'cfgFileNameCkSum' in self.cfg_params:
283     miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
284    
285     miniCfg['CRAB.se_remote_dir'] = ''
286     if 'CRAB.se_remote_dir' in self.cfg_params:
287     miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
288    
289 spiga 1.41 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
290     miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
291 farinafa 1.3 ## JDL requirements specific data. Scheduler dependant
292 spiga 1.37 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
293     miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
294 spiga 1.36 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
295     miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
296 spiga 1.37 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
297     miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
298 farinafa 1.13
299 farinafa 1.14 ## Additional field for DashBoard
300     miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
301    
302     ## Additional fields for Notification by the server
303 farinafa 1.15 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
304 farinafa 1.20 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
305 farinafa 1.3
306 spiga 1.34 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
307    
308 farinafa 1.1 ## put here other fields if needed
309     node.setAttribute("CfgParamDict", str(miniCfg) )
310     cfile.appendChild(node)
311     xmlString += str(cfile.toprettyxml())
312     return xmlString
313    
314 farinafa 1.2 def _genericCommand(self, cmd, blTaskName, rng):
315     if not blTaskName:
316 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
317     return -2
318    
319     cmdXML = None
320 farinafa 1.2 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
321 farinafa 1.1 if not cmdXML:
322     raise CrabException('Error while creating the Command XML')
323     return -3
324 spiga 1.27
325 farinafa 1.1 ret = -1
326 farinafa 1.2 ret = self.asSession.sendCommand(cmdXML, blTaskName)
327 farinafa 1.1 logMsg = ''
328 spiga 1.12 debugMsg = ''
329 farinafa 1.1 if ret == 0:
330     # success
331 farinafa 1.22 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
332 farinafa 1.1 else:
333 spiga 1.27 self.checkServerResponse(ret)
334 farinafa 1.1 return ret
335