ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.73
Committed: Thu Mar 3 12:23:18 2011 UTC (14 years, 2 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_8_dash2
Changes since 1.72: +7 -1 lines
Log Message:
added configurable [USER].tasktype in the crab.cfg, savannah bug 76950

File Contents

# User Rev Content
1 farinafa 1.1 #!/usr/bin/env python
2     """
3     _ServerCommunicator_
4    
5     """
6    
7     __version__ = "$Id"
8     __revision__ = "$Revision"
9     __author__ = "farinafa@cern.ch"
10    
11     from crab_exceptions import *
12 farinafa 1.13 from crab_util import *
13 farinafa 1.1 import common
14 spiga 1.34 import Scram
15     from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 farinafa 1.1 from xml.dom import minidom
17 farinafa 1.54 import os, sys
18 farinafa 1.2 import commands
19 ewv 1.45 import traceback
20 spiga 1.55 from Downloader import Downloader
21 farinafa 1.1
22 farinafa 1.54 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23     from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 spiga 1.65 elif os.environ["SCRAM_ARCH"].split("_")[1].find('32')>1:
25 slacapra 1.60 try:
26     from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
27 spiga 1.65 except :
28 spiga 1.71 try:
29     from CRAB_Server_API_36X import CRAB_Server_Session as C_AS_Session
30 spiga 1.72 except :
31 spiga 1.71 from CRAB_Server_API_311X import CRAB_Server_Session as C_AS_Session
32 spiga 1.65 else:
33 spiga 1.66 try:
34     from lib64.CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
35     except:
36 spiga 1.71 try:
37     from lib64.CRAB_Server_API_36X_amd64 import CRAB_Server_Session as C_AS_Session
38     except:
39     from lib64.CRAB_Server_API_311X_amd64 import CRAB_Server_Session as C_AS_Session
40 farinafa 1.54
41 farinafa 1.1 class ServerCommunicator:
42     """
43 ewv 1.45 Common interface for the interaction between the Crab client and the server Web Service
44 farinafa 1.1 """
45 farinafa 1.2 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
46 farinafa 1.1 """
47     Open the communication with an Analysis Server by passing the server URL and the port
48     """
49 spiga 1.27
50 spiga 1.61 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
51 ewv 1.45
52 farinafa 1.1 self.asSession = C_AS_Session(serverName, serverPort)
53     self.cfg_params = cfg_params
54     self.userSubj = ''
55     self.serverName = serverName
56 spiga 1.31 credentialType = 'Proxy'
57 ewv 1.45 if common.scheduler.name().upper() in ['CAF','LSF']:
58 spiga 1.31 credentialType = 'Token'
59 spiga 1.27 CliServerParams(self)
60 farinafa 1.11 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
61    
62 spiga 1.40 configAPI = {'credential' : credentialType, \
63     'logger' : common.logger() }
64 ewv 1.45
65     CredAPI = CredentialAPI( configAPI )
66 spiga 1.31 try:
67 ewv 1.45 self.userSubj = CredAPI.getSubject()
68 spiga 1.31 except Exception, err:
69 spiga 1.35 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
70 spiga 1.31 raise CrabException("Error Getting Credential Subject")
71 spiga 1.34
72     self.scram=Scram.Scram(cfg_params)
73 farinafa 1.1 ###################################################
74     # Interactions with the server
75     ###################################################
76    
77 spiga 1.47 # wmbs
78     def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
79 farinafa 1.1 """
80     _submitNewTask_
81 ewv 1.45 Send a new task to the server to be submitted.
82    
83     Accepts in input:
84     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
85     - the range of the submission as specified by the user at the command line
86 farinafa 1.1 """
87    
88 spiga 1.47
89 farinafa 1.1 if not blXml:
90     raise CrabException('Error while serializing task object to XML')
91     return -2
92    
93 farinafa 1.2 if not blTaskName:
94 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
95     return -3
96    
97     cmdXML = None
98 spiga 1.47 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
99    
100 farinafa 1.1 if not cmdXML:
101     raise CrabException('Error while creating the Command XML')
102     return -4
103    
104 ewv 1.45 ret = -1
105 farinafa 1.2 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
106 ewv 1.45
107 farinafa 1.1 if ret == 0:
108     # success
109 farinafa 1.11 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
110 spiga 1.35 common.logger.info(logMsg+'\n')
111 farinafa 1.1 else:
112 ewv 1.45 self.checkServerResponse(ret)
113 spiga 1.27
114     return ret
115    
116 ewv 1.45 def checkServerResponse(self, ret):
117 spiga 1.27 """
118     analyze the server return codes
119     """
120 ewv 1.45
121 spiga 1.27 logMsg = ''
122     if ret == 10:
123 slacapra 1.38 # overlaod
124     logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
125     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
126 spiga 1.27 elif ret == 14:
127 ewv 1.45 # Draining
128 slacapra 1.38 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
129     logMsg += '\t remaining jobs due to scheduled maintainence\n'
130     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
131 spiga 1.27 elif ret == 101:
132 slacapra 1.38 # overlaod
133     logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
134 spiga 1.27 elif ret == 11:
135 slacapra 1.38 # failed to push message in DB
136 farinafa 1.50 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
137 spiga 1.27 elif ret == 12:
138 slacapra 1.38 # failed SOAP communication
139     logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
140     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
141 spiga 1.27 elif ret == 20:
142 slacapra 1.38 # failed to push message in PA
143 farinafa 1.50 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
144 spiga 1.27 elif ret == 22:
145 slacapra 1.38 # failed SOAP communication
146     logMsg = 'Error during SOAP communication with server %s'%self.serverName
147 spiga 1.27 elif ret == 33:
148 slacapra 1.38 # uncompatible client version
149     logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
150     logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
151 spiga 1.27 else:
152 ewv 1.45 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
153 farinafa 1.1
154 spiga 1.27 # print loggings
155     if logMsg != '':
156 slacapra 1.38 # reset server choice
157     opsToBeSaved={'serverName' : '' }
158     common._db.updateTask_(opsToBeSaved)
159 ewv 1.45 common.logger.info(logMsg)
160 farinafa 1.1 return ret
161 ewv 1.45
162 farinafa 1.2 def subsequentJobSubmit(self, blTaskName, rng):
163 farinafa 1.1 """
164     _subsequentJobSubmit_
165     Let the submission of other jobs of a task that has been already sent to a server.
166     This method is used for subsequent submission of ranged sets of jobs.
167    
168     Accepts in input:
169     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
170     - the range of the submission as specified by the user at the command line
171     """
172 farinafa 1.2 return self._genericCommand('submit', blTaskName, rng)
173 farinafa 1.1
174 farinafa 1.2 def killJobs(self, blTaskName, rng):
175 farinafa 1.1 """
176     _killJobs_
177     Send a kill command to one or more jobs running on the server.
178    
179     Accepts in input:
180     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
181     - the range of the submission as specified by the user at the command line
182     """
183 farinafa 1.2 return self._genericCommand('kill', blTaskName, rng)
184 farinafa 1.1
185 riahi 1.52 def StopWorkflow(self, blTaskName):
186     """
187     _StopWorkflow_
188     """
189     return self._genericCommand('StopWorkflow', blTaskName, 'all')
190    
191 farinafa 1.42 def cleanTask(self, blTaskName):
192 farinafa 1.1 """
193 farinafa 1.42 _cleanTask_
194 farinafa 1.1 Force the server to clean the jobs on the server.
195    
196     Accepts in input:
197     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
198     - the range of the submission as specified by the user at the command line
199     """
200 farinafa 1.42 return self._genericCommand('clean', blTaskName, 'all')
201 farinafa 1.1
202 farinafa 1.23 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
203 farinafa 1.1 """
204     _getStatus_
205 farinafa 1.23 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
206     """
207 farinafa 1.1
208     # fill the filename
209     filename = str(statusFile)
210    
211 farinafa 1.2 if not blTaskName:
212 farinafa 1.1 raise CrabException('Exception while getting the task unique name')
213     return ''
214    
215     # get the data and fill the file content
216 farinafa 1.23 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
217 farinafa 1.2 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
218 farinafa 1.58 if statusFamilyType != 'isServerDrained':
219     raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
220     else:
221     statusFile = None
222     statusMMsg = ""
223 farinafa 1.2
224     if statusFile is not None:
225     f = open(statusFile, 'w')
226     f.write(statusMsg)
227     f.close()
228 ewv 1.45 return statusFile
229 farinafa 1.2 return statusMsg
230 farinafa 1.1
231 farinafa 1.8 def outputRetrieved(self, blTaskName, rng):
232 farinafa 1.1 """
233     _getJobsOutput_
234     Get from the server the output file locations to be transfered back.
235    
236     Accepts in input:
237     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
238     - the range of the submission as specified by the user at the command line
239     """
240 farinafa 1.8 return self._genericCommand('outputRetrieved', blTaskName, rng)
241 farinafa 1.1
242 farinafa 1.49 def checkDrainMode(self, blTaskName='null'):
243     return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
244    
245 farinafa 1.2 def postMortemInfos(self, blTaskName, rng):
246 farinafa 1.1 """
247     _postMortemInfos_
248     Retrieve the job postmortem information from the server.
249    
250     Accepts in input:
251     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
252     - the range of the submission as specified by the user at the command line
253     """
254 ewv 1.45 # get the status in
255 farinafa 1.1 raise NotImplementedError
256     return None
257    
258     ###################################################
259     # Auxiliary methods
260     ###################################################
261 ewv 1.45
262 fanzago 1.73 ### FEDE for savannah 76950 ###
263     #def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
264     def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, type='fullySpecified',jobs='-1'):
265    
266     print "create XML command"
267 farinafa 1.1 xmlString = ''
268     cfile = minidom.Document()
269 ewv 1.45
270 spiga 1.27 ver = common.prog_version_str
271 farinafa 1.1 node = cfile.createElement("TaskCommand")
272     node.setAttribute("Task", str(taskUName) )
273     node.setAttribute("Subject", str(self.userSubj) )
274     node.setAttribute("Command", str(cmdSpec) )
275     node.setAttribute("Range", str(rng) )
276 spiga 1.29 node.setAttribute("TotJob", str(jobs) )
277 ewv 1.45 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
278 fanzago 1.73 #node.setAttribute("Flavour", str(flavour) )
279     node.setAttribute("Flavour", (self.cfg_params.get("USER.tasktype", 'analysis')))
280 farinafa 1.24 node.setAttribute("Flavour", str(flavour) )
281 ewv 1.45 node.setAttribute("Type", str(type) )
282     node.setAttribute("ClientVersion", str(ver) )
283 farinafa 1.10
284 spiga 1.30 ## Only Temporary. it should be at Server level
285 spiga 1.55 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
286 spiga 1.57 blackAnaOps = None
287 spiga 1.55 if int(removeBList) == 0:
288 spiga 1.56 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
289 spiga 1.55 result = blacklist.config("site_black_list.conf")
290     if result != None:
291     blackAnaOps = result
292     common.logger.debug("Enforced black list: %s "%str(blacklist))
293     else:
294     common.logger.info("WARNING: Skipping default black list!")
295 spiga 1.30
296 farinafa 1.62 try:
297     myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
298 farinafa 1.63 myproxyserver = myproxyserver.strip()
299 farinafa 1.62 if myproxyserver is None:
300     raise CrabException("myproxy_server.conf retrieved but empty")
301     except Exception, e:
302     common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
303     common.logger.debug(e)
304     myproxyserver = 'myproxy.cern.ch'
305    
306 farinafa 1.1 # create a mini-cfg to be transfered to the server
307     miniCfg = {}
308 farinafa 1.3
309     ## migrate CE/SE infos
310 spiga 1.37 miniCfg['EDG.ce_white_list'] = ""
311 spiga 1.36 if 'GRID.ce_white_list' in self.cfg_params:
312 spiga 1.37 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
313 farinafa 1.1
314 spiga 1.55 miniCfg['EDG.ce_black_list'] = blackAnaOps
315 spiga 1.36 if 'GRID.ce_black_list' in self.cfg_params:
316 spiga 1.57 if blackAnaOps:
317 spiga 1.37 miniCfg['EDG.ce_black_list'] += ", "
318     miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
319 farinafa 1.1
320 spiga 1.37 miniCfg['EDG.se_white_list'] = ""
321 spiga 1.36 if 'GRID.se_white_list' in self.cfg_params:
322 spiga 1.37 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
323 spiga 1.36
324 spiga 1.37 miniCfg['EDG.se_black_list'] = ""
325 spiga 1.36 if 'GRID.se_black_list' in self.cfg_params:
326 spiga 1.37 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
327 spiga 1.36
328 spiga 1.37 miniCfg['EDG.group'] = ""
329 spiga 1.36 if 'GRID.group' in self.cfg_params:
330 spiga 1.37 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
331 spiga 1.36
332 spiga 1.37 miniCfg['EDG.role'] = ""
333 spiga 1.36 if 'GRID.role' in self.cfg_params:
334 spiga 1.37 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
335 spiga 1.33
336 ewv 1.45 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
337 farinafa 1.1 if 'cfgFileNameCkSum' in self.cfg_params:
338     miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
339    
340     miniCfg['CRAB.se_remote_dir'] = ''
341     if 'CRAB.se_remote_dir' in self.cfg_params:
342 ewv 1.45 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
343 farinafa 1.1
344 spiga 1.41 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
345     miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
346 spiga 1.61 miniCfg['CAF.group'] = self.cfg_params.get('CAF.group', None)
347 farinafa 1.3 ## JDL requirements specific data. Scheduler dependant
348 spiga 1.37 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
349     miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
350 farinafa 1.62 miniCfg['proxyServer'] = myproxyserver
351 spiga 1.36 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
352 spiga 1.64 miniCfg['EDG_retry_count'] = 0
353 farinafa 1.70 miniCfg['EDG_shallow_retry_count'] = -1
354 farinafa 1.13
355 farinafa 1.14 ## Additional field for DashBoard
356     miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
357    
358     ## Additional fields for Notification by the server
359 farinafa 1.15 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
360 farinafa 1.20 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
361 farinafa 1.3
362 spiga 1.34 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
363 spiga 1.48 #WMBS
364     miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
365 riahi 1.68 miniCfg['parent_task'] = self.cfg_params.get('WMBS.parent_task', '')
366 riahi 1.53
367     miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
368 riahi 1.59 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', 'None')
369 riahi 1.53
370 spiga 1.48 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
371 riahi 1.51 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
372 riahi 1.68 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1000000)
373 spiga 1.48
374     miniCfg['CMSSW_version'] = self.scram.getSWVersion()
375 ewv 1.45
376 farinafa 1.1 ## put here other fields if needed
377     node.setAttribute("CfgParamDict", str(miniCfg) )
378     cfile.appendChild(node)
379     xmlString += str(cfile.toprettyxml())
380     return xmlString
381    
382 farinafa 1.2 def _genericCommand(self, cmd, blTaskName, rng):
383     if not blTaskName:
384 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
385     return -2
386    
387     cmdXML = None
388 farinafa 1.2 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
389 farinafa 1.1 if not cmdXML:
390     raise CrabException('Error while creating the Command XML')
391     return -3
392 ewv 1.45
393 farinafa 1.1 ret = -1
394 farinafa 1.2 ret = self.asSession.sendCommand(cmdXML, blTaskName)
395 farinafa 1.1 logMsg = ''
396 ewv 1.45 debugMsg = ''
397 farinafa 1.1 if ret == 0:
398     # success
399 farinafa 1.22 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
400 farinafa 1.1 else:
401 spiga 1.27 self.checkServerResponse(ret)
402 farinafa 1.1 return ret
403 ewv 1.45