ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.80
Committed: Wed Jul 11 15:13:22 2012 UTC (12 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, HEAD
Changes since 1.79: +5 -2 lines
Log Message:
CMSSW6x compatibility

File Contents

# User Rev Content
1 farinafa 1.1 #!/usr/bin/env python
2     """
3     _ServerCommunicator_
4    
5     """
6    
7     __version__ = "$Id"
8     __revision__ = "$Revision"
9     __author__ = "farinafa@cern.ch"
10    
11     from crab_exceptions import *
12 farinafa 1.13 from crab_util import *
13 farinafa 1.1 import common
14 spiga 1.34 import Scram
15     from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 farinafa 1.1 from xml.dom import minidom
17 farinafa 1.54 import os, sys
18 farinafa 1.2 import commands
19 ewv 1.45 import traceback
20 spiga 1.55 from Downloader import Downloader
21 farinafa 1.1
22 farinafa 1.54 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23     from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24 spiga 1.65 elif os.environ["SCRAM_ARCH"].split("_")[1].find('32')>1:
25 slacapra 1.60 try:
26     from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
27 spiga 1.65 except :
28 spiga 1.71 try:
29     from CRAB_Server_API_36X import CRAB_Server_Session as C_AS_Session
30 spiga 1.72 except :
31 spiga 1.71 from CRAB_Server_API_311X import CRAB_Server_Session as C_AS_Session
32 spiga 1.65 else:
33 spiga 1.66 try:
34 spiga 1.77 from lib64.CRAB_Server_API_311X_amd64 import CRAB_Server_Session as C_AS_Session
35 spiga 1.66 except:
36 spiga 1.80 try:
37     from lib64.CRAB_Server_API_5X_amd64 import CRAB_Server_Session as C_AS_Session
38     except:
39     from lib64.CRAB_Server_API_6X_amd64 import CRAB_Server_Session as C_AS_Session
40    
41 farinafa 1.1 class ServerCommunicator:
42     """
43 ewv 1.45 Common interface for the interaction between the Crab client and the server Web Service
44 farinafa 1.1 """
45 farinafa 1.2 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
46 farinafa 1.1 """
47     Open the communication with an Analysis Server by passing the server URL and the port
48     """
49 spiga 1.27
50 spiga 1.61 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
51 ewv 1.45
52 farinafa 1.1 self.asSession = C_AS_Session(serverName, serverPort)
53     self.cfg_params = cfg_params
54     self.userSubj = ''
55     self.serverName = serverName
56 spiga 1.31 credentialType = 'Proxy'
57 ewv 1.45 if common.scheduler.name().upper() in ['CAF','LSF']:
58 spiga 1.31 credentialType = 'Token'
59 spiga 1.27 CliServerParams(self)
60 farinafa 1.11 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
61    
62 spiga 1.40 configAPI = {'credential' : credentialType, \
63     'logger' : common.logger() }
64 ewv 1.45
65     CredAPI = CredentialAPI( configAPI )
66 spiga 1.31 try:
67 ewv 1.45 self.userSubj = CredAPI.getSubject()
68 spiga 1.31 except Exception, err:
69 spiga 1.35 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
70 spiga 1.31 raise CrabException("Error Getting Credential Subject")
71 spiga 1.34
72     self.scram=Scram.Scram(cfg_params)
73 farinafa 1.1 ###################################################
74     # Interactions with the server
75     ###################################################
76    
77 spiga 1.47 # wmbs
78     def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
79 farinafa 1.1 """
80     _submitNewTask_
81 ewv 1.45 Send a new task to the server to be submitted.
82    
83     Accepts in input:
84     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
85     - the range of the submission as specified by the user at the command line
86 farinafa 1.1 """
87    
88 spiga 1.47
89 farinafa 1.1 if not blXml:
90     raise CrabException('Error while serializing task object to XML')
91     return -2
92    
93 farinafa 1.2 if not blTaskName:
94 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
95     return -3
96    
97     cmdXML = None
98 spiga 1.47 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
99    
100 farinafa 1.1 if not cmdXML:
101     raise CrabException('Error while creating the Command XML')
102     return -4
103    
104 ewv 1.45 ret = -1
105 farinafa 1.2 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
106 ewv 1.45
107 farinafa 1.1 if ret == 0:
108     # success
109 farinafa 1.11 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
110 spiga 1.35 common.logger.info(logMsg+'\n')
111 farinafa 1.1 else:
112 ewv 1.45 self.checkServerResponse(ret)
113 belforte 1.79 # reset server choice
114     opsToBeSaved={'serverName' : '' }
115     common._db.updateTask_(opsToBeSaved)
116 spiga 1.27
117     return ret
118    
119 ewv 1.45 def checkServerResponse(self, ret):
120 spiga 1.27 """
121     analyze the server return codes
122     """
123 ewv 1.45
124 spiga 1.27 logMsg = ''
125     if ret == 10:
126 slacapra 1.38 # overlaod
127     logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
128     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
129 spiga 1.27 elif ret == 14:
130 ewv 1.45 # Draining
131 slacapra 1.38 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
132     logMsg += '\t remaining jobs due to scheduled maintainence\n'
133     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
134 spiga 1.27 elif ret == 101:
135 slacapra 1.38 # overlaod
136     logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
137 spiga 1.27 elif ret == 11:
138 slacapra 1.38 # failed to push message in DB
139 farinafa 1.50 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
140 spiga 1.27 elif ret == 12:
141 slacapra 1.38 # failed SOAP communication
142     logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
143     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
144 spiga 1.27 elif ret == 20:
145 slacapra 1.38 # failed to push message in PA
146 farinafa 1.50 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
147 spiga 1.27 elif ret == 22:
148 slacapra 1.38 # failed SOAP communication
149     logMsg = 'Error during SOAP communication with server %s'%self.serverName
150 spiga 1.27 elif ret == 33:
151 slacapra 1.38 # uncompatible client version
152     logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
153     logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
154 spiga 1.27 else:
155 ewv 1.45 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
156 farinafa 1.1
157 spiga 1.27 # print loggings
158     if logMsg != '':
159 ewv 1.45 common.logger.info(logMsg)
160 farinafa 1.1 return ret
161 ewv 1.45
162 farinafa 1.2 def subsequentJobSubmit(self, blTaskName, rng):
163 farinafa 1.1 """
164     _subsequentJobSubmit_
165     Let the submission of other jobs of a task that has been already sent to a server.
166     This method is used for subsequent submission of ranged sets of jobs.
167    
168     Accepts in input:
169     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
170     - the range of the submission as specified by the user at the command line
171     """
172 belforte 1.78
173 farinafa 1.2 return self._genericCommand('submit', blTaskName, rng)
174 farinafa 1.1
175 farinafa 1.2 def killJobs(self, blTaskName, rng):
176 farinafa 1.1 """
177     _killJobs_
178     Send a kill command to one or more jobs running on the server.
179    
180     Accepts in input:
181     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
182     - the range of the submission as specified by the user at the command line
183     """
184 farinafa 1.2 return self._genericCommand('kill', blTaskName, rng)
185 farinafa 1.1
186 riahi 1.52 def StopWorkflow(self, blTaskName):
187     """
188     _StopWorkflow_
189     """
190     return self._genericCommand('StopWorkflow', blTaskName, 'all')
191    
192 farinafa 1.42 def cleanTask(self, blTaskName):
193 farinafa 1.1 """
194 farinafa 1.42 _cleanTask_
195 farinafa 1.1 Force the server to clean the jobs on the server.
196    
197     Accepts in input:
198     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
199     - the range of the submission as specified by the user at the command line
200     """
201 farinafa 1.42 return self._genericCommand('clean', blTaskName, 'all')
202 farinafa 1.1
203 farinafa 1.23 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
204 farinafa 1.1 """
205     _getStatus_
206 farinafa 1.23 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
207     """
208 farinafa 1.1
209     # fill the filename
210     filename = str(statusFile)
211    
212 farinafa 1.2 if not blTaskName:
213 farinafa 1.1 raise CrabException('Exception while getting the task unique name')
214     return ''
215    
216     # get the data and fill the file content
217 farinafa 1.23 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
218 farinafa 1.2 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
219 farinafa 1.58 if statusFamilyType != 'isServerDrained':
220     raise CrabException('Error in retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
221     else:
222     statusFile = None
223     statusMMsg = ""
224 farinafa 1.2
225     if statusFile is not None:
226     f = open(statusFile, 'w')
227     f.write(statusMsg)
228     f.close()
229 ewv 1.45 return statusFile
230 farinafa 1.2 return statusMsg
231 farinafa 1.1
232 farinafa 1.8 def outputRetrieved(self, blTaskName, rng):
233 farinafa 1.1 """
234     _getJobsOutput_
235     Get from the server the output file locations to be transfered back.
236    
237     Accepts in input:
238     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
239     - the range of the submission as specified by the user at the command line
240     """
241 farinafa 1.8 return self._genericCommand('outputRetrieved', blTaskName, rng)
242 farinafa 1.1
243 farinafa 1.49 def checkDrainMode(self, blTaskName='null'):
244     return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
245    
246 farinafa 1.2 def postMortemInfos(self, blTaskName, rng):
247 farinafa 1.1 """
248     _postMortemInfos_
249     Retrieve the job postmortem information from the server.
250    
251     Accepts in input:
252     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
253     - the range of the submission as specified by the user at the command line
254     """
255 ewv 1.45 # get the status in
256 farinafa 1.1 raise NotImplementedError
257     return None
258    
259     ###################################################
260     # Auxiliary methods
261     ###################################################
262 ewv 1.45
263 spiga 1.74 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
264 fanzago 1.73
265 farinafa 1.1 xmlString = ''
266     cfile = minidom.Document()
267 ewv 1.45
268 spiga 1.27 ver = common.prog_version_str
269 farinafa 1.1 node = cfile.createElement("TaskCommand")
270     node.setAttribute("Task", str(taskUName) )
271     node.setAttribute("Subject", str(self.userSubj) )
272     node.setAttribute("Command", str(cmdSpec) )
273     node.setAttribute("Range", str(rng) )
274 spiga 1.29 node.setAttribute("TotJob", str(jobs) )
275 ewv 1.45 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
276 farinafa 1.24 node.setAttribute("Flavour", str(flavour) )
277 ewv 1.45 node.setAttribute("Type", str(type) )
278     node.setAttribute("ClientVersion", str(ver) )
279 farinafa 1.10
280 spiga 1.30 ## Only Temporary. it should be at Server level
281 spiga 1.55 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
282 belforte 1.76 blackAnaOps = "None.Nowhere"
283 spiga 1.55 if int(removeBList) == 0:
284 spiga 1.56 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
285 spiga 1.55 result = blacklist.config("site_black_list.conf")
286     if result != None:
287     blackAnaOps = result
288     common.logger.debug("Enforced black list: %s "%str(blacklist))
289     else:
290     common.logger.info("WARNING: Skipping default black list!")
291 spiga 1.30
292 farinafa 1.62 try:
293     myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
294 farinafa 1.63 myproxyserver = myproxyserver.strip()
295 farinafa 1.62 if myproxyserver is None:
296     raise CrabException("myproxy_server.conf retrieved but empty")
297     except Exception, e:
298     common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
299     common.logger.debug(e)
300     myproxyserver = 'myproxy.cern.ch'
301    
302 farinafa 1.1 # create a mini-cfg to be transfered to the server
303     miniCfg = {}
304 farinafa 1.3
305     ## migrate CE/SE infos
306 spiga 1.37 miniCfg['EDG.ce_white_list'] = ""
307 spiga 1.36 if 'GRID.ce_white_list' in self.cfg_params:
308 spiga 1.37 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
309 farinafa 1.1
310 spiga 1.55 miniCfg['EDG.ce_black_list'] = blackAnaOps
311 spiga 1.36 if 'GRID.ce_black_list' in self.cfg_params:
312 spiga 1.57 if blackAnaOps:
313 spiga 1.37 miniCfg['EDG.ce_black_list'] += ", "
314     miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
315 farinafa 1.1
316 spiga 1.37 miniCfg['EDG.se_white_list'] = ""
317 spiga 1.36 if 'GRID.se_white_list' in self.cfg_params:
318 spiga 1.37 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
319 spiga 1.36
320 spiga 1.37 miniCfg['EDG.se_black_list'] = ""
321 spiga 1.36 if 'GRID.se_black_list' in self.cfg_params:
322 spiga 1.37 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
323 spiga 1.36
324 spiga 1.37 miniCfg['EDG.group'] = ""
325 spiga 1.36 if 'GRID.group' in self.cfg_params:
326 spiga 1.37 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
327 spiga 1.36
328 spiga 1.37 miniCfg['EDG.role'] = ""
329 spiga 1.36 if 'GRID.role' in self.cfg_params:
330 spiga 1.37 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
331 spiga 1.33
332 ewv 1.45 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
333 farinafa 1.1 if 'cfgFileNameCkSum' in self.cfg_params:
334     miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
335    
336     miniCfg['CRAB.se_remote_dir'] = ''
337     if 'CRAB.se_remote_dir' in self.cfg_params:
338 ewv 1.45 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
339 farinafa 1.1
340 spiga 1.41 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
341     miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
342 spiga 1.61 miniCfg['CAF.group'] = self.cfg_params.get('CAF.group', None)
343 farinafa 1.3 ## JDL requirements specific data. Scheduler dependant
344 spiga 1.37 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
345     miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
346 farinafa 1.62 miniCfg['proxyServer'] = myproxyserver
347 spiga 1.36 miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
348 spiga 1.64 miniCfg['EDG_retry_count'] = 0
349 farinafa 1.70 miniCfg['EDG_shallow_retry_count'] = -1
350 farinafa 1.13
351 farinafa 1.14 ## Additional field for DashBoard
352     miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
353    
354 fanzago 1.75 ###################################################################################
355     ### FEDE for tasktype, savannah 76950
356     miniCfg['USER.tasktype'] = self.cfg_params.get("USER.tasktype", 'analysis')
357     ###################################################################################
358 farinafa 1.14 ## Additional fields for Notification by the server
359 farinafa 1.15 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
360 farinafa 1.20 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
361 farinafa 1.3
362 spiga 1.34 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
363 spiga 1.48 #WMBS
364     miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
365 riahi 1.68 miniCfg['parent_task'] = self.cfg_params.get('WMBS.parent_task', '')
366 riahi 1.53
367     miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
368 riahi 1.59 miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', 'None')
369 riahi 1.53
370 spiga 1.48 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
371 riahi 1.51 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
372 riahi 1.68 miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1000000)
373 spiga 1.48
374     miniCfg['CMSSW_version'] = self.scram.getSWVersion()
375 ewv 1.45
376 farinafa 1.1 ## put here other fields if needed
377     node.setAttribute("CfgParamDict", str(miniCfg) )
378     cfile.appendChild(node)
379     xmlString += str(cfile.toprettyxml())
380     return xmlString
381    
382 farinafa 1.2 def _genericCommand(self, cmd, blTaskName, rng):
383     if not blTaskName:
384 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
385     return -2
386    
387     cmdXML = None
388 farinafa 1.2 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
389 farinafa 1.1 if not cmdXML:
390     raise CrabException('Error while creating the Command XML')
391     return -3
392 ewv 1.45
393 farinafa 1.1 ret = -1
394 farinafa 1.2 ret = self.asSession.sendCommand(cmdXML, blTaskName)
395 farinafa 1.1 logMsg = ''
396 ewv 1.45 debugMsg = ''
397 farinafa 1.1 if ret == 0:
398     # success
399 farinafa 1.22 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
400 farinafa 1.1 else:
401 spiga 1.27 self.checkServerResponse(ret)
402 farinafa 1.1 return ret
403 ewv 1.45