ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/ServerCommunicator.py
Revision: 1.57
Committed: Wed Jan 20 13:21:02 2010 UTC (15 years, 3 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_1_pre4
Changes since 1.56: +2 -2 lines
Log Message:
AnaOps BList must not be a python list...

File Contents

# User Rev Content
1 farinafa 1.1 #!/usr/bin/env python
2     """
3     _ServerCommunicator_
4    
5     """
6    
7     __version__ = "$Id"
8     __revision__ = "$Revision"
9     __author__ = "farinafa@cern.ch"
10    
11     from crab_exceptions import *
12 farinafa 1.13 from crab_util import *
13 farinafa 1.1 import common
14 spiga 1.34 import Scram
15     from ProdCommon.Credential.CredentialAPI import CredentialAPI
16 farinafa 1.1 from xml.dom import minidom
17 farinafa 1.54 import os, sys
18 farinafa 1.2 import commands
19 ewv 1.45 import traceback
20 spiga 1.55 from Downloader import Downloader
21 farinafa 1.1
22 farinafa 1.54 if (sys.version_info[0] + .1 * sys.version_info[1]) < 2.6 :
23     from CRAB_Server_API import CRAB_Server_Session as C_AS_Session
24     else:
25     from CRAB_Server_API_1_1 import CRAB_Server_Session as C_AS_Session
26    
27 farinafa 1.1 class ServerCommunicator:
28     """
29 ewv 1.45 Common interface for the interaction between the Crab client and the server Web Service
30 farinafa 1.1 """
31 farinafa 1.2 def __init__(self, serverName, serverPort, cfg_params, proxyPath=None):
32 farinafa 1.1 """
33     Open the communication with an Analysis Server by passing the server URL and the port
34     """
35 spiga 1.27
36 mcinquil 1.44 self.ServerTwiki = 'https://twiki.cern.ch/twiki/bin/view/CMS/CrabServerForUsers#Server_available_for_users'
37 ewv 1.45
38 farinafa 1.1 self.asSession = C_AS_Session(serverName, serverPort)
39     self.cfg_params = cfg_params
40     self.userSubj = ''
41     self.serverName = serverName
42 spiga 1.31 credentialType = 'Proxy'
43 ewv 1.45 if common.scheduler.name().upper() in ['CAF','LSF']:
44 spiga 1.31 credentialType = 'Token'
45 spiga 1.27 CliServerParams(self)
46 farinafa 1.11 self.crab_task_name = common.work_space.topDir().split('/')[-2] # nice task name "crab_0_..."
47    
48 spiga 1.40 configAPI = {'credential' : credentialType, \
49     'logger' : common.logger() }
50 ewv 1.45
51     CredAPI = CredentialAPI( configAPI )
52 spiga 1.31 try:
53 ewv 1.45 self.userSubj = CredAPI.getSubject()
54 spiga 1.31 except Exception, err:
55 spiga 1.35 common.logger.debug("Getting Credential Subject: " +str(traceback.format_exc()))
56 spiga 1.31 raise CrabException("Error Getting Credential Subject")
57 spiga 1.34
58     self.scram=Scram.Scram(cfg_params)
59 farinafa 1.1 ###################################################
60     # Interactions with the server
61     ###################################################
62    
63 spiga 1.47 # wmbs
64     def submitNewTask(self, blTaskName, blXml, rng, TotJob,taskType='fullySpecified'):
65 farinafa 1.1 """
66     _submitNewTask_
67 ewv 1.45 Send a new task to the server to be submitted.
68    
69     Accepts in input:
70     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
71     - the range of the submission as specified by the user at the command line
72 farinafa 1.1 """
73    
74 spiga 1.47
75 farinafa 1.1 if not blXml:
76     raise CrabException('Error while serializing task object to XML')
77     return -2
78    
79 farinafa 1.2 if not blTaskName:
80 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
81     return -3
82    
83     cmdXML = None
84 spiga 1.47 cmdXML = self._createXMLcommand(blTaskName, 'submit', rng, newTaskAddIns=True,jobs=TotJob,type=taskType)
85    
86 farinafa 1.1 if not cmdXML:
87     raise CrabException('Error while creating the Command XML')
88     return -4
89    
90 ewv 1.45 ret = -1
91 farinafa 1.2 ret = self.asSession.transferTaskAndSubmit(blXml, cmdXML, blTaskName)
92 ewv 1.45
93 farinafa 1.1 if ret == 0:
94     # success
95 farinafa 1.11 logMsg = 'Task %s successfully submitted to server %s'%(self.crab_task_name, self.serverName)
96 spiga 1.35 common.logger.info(logMsg+'\n')
97 farinafa 1.1 else:
98 ewv 1.45 self.checkServerResponse(ret)
99 spiga 1.27
100     return ret
101    
102 ewv 1.45 def checkServerResponse(self, ret):
103 spiga 1.27 """
104     analyze the server return codes
105     """
106 ewv 1.45
107 spiga 1.27 logMsg = ''
108     if ret == 10:
109 slacapra 1.38 # overlaod
110     logMsg = 'Error The server %s refused to accept the task %s because it is overloaded\n'%(self.serverName, self.crab_task_name)
111     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
112 spiga 1.27 elif ret == 14:
113 ewv 1.45 # Draining
114 slacapra 1.38 logMsg = 'Error The server %s refused to accept the task %s because it is Draining out\n'%(self.serverName, self.crab_task_name)
115     logMsg += '\t remaining jobs due to scheduled maintainence\n'
116     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
117 spiga 1.27 elif ret == 101:
118 slacapra 1.38 # overlaod
119     logMsg = 'Error The server %s refused the submission %s because you asked a too large task. Please submit by range'%(self.serverName, self.crab_task_name)
120 spiga 1.27 elif ret == 11:
121 slacapra 1.38 # failed to push message in DB
122 farinafa 1.50 logMsg = 'Server unable to release messages into DB. Task %s won\'t be submitted.'%self.crab_task_name
123 spiga 1.27 elif ret == 12:
124 slacapra 1.38 # failed SOAP communication
125     logMsg = 'Error The server %s refused to accept the task %s. It could be under maintainance. \n'%(self.serverName, self.crab_task_name)
126     logMsg += '\t For Further infos please contact the server Admin: %s'%self.server_admin
127 spiga 1.27 elif ret == 20:
128 slacapra 1.38 # failed to push message in PA
129 farinafa 1.50 logMsg = 'Server unable to release messages to other components. Task %s won\'t be submitted.'%self.crab_task_name
130 spiga 1.27 elif ret == 22:
131 slacapra 1.38 # failed SOAP communication
132     logMsg = 'Error during SOAP communication with server %s'%self.serverName
133 spiga 1.27 elif ret == 33:
134 slacapra 1.38 # uncompatible client version
135     logMsg = 'Error You are using a wrong client version for server: %s\n'%self.serverName
136     logMsg += '\t For further informations about "Servers available for users" please check here:\n \t%s '%self.ServerTwiki
137 spiga 1.27 else:
138 ewv 1.45 logMsg = 'Unexpected return code from server %s: %d'%(self.serverName, ret)
139 farinafa 1.1
140 spiga 1.27 # print loggings
141     if logMsg != '':
142 slacapra 1.38 # reset server choice
143     opsToBeSaved={'serverName' : '' }
144     common._db.updateTask_(opsToBeSaved)
145 ewv 1.45 common.logger.info(logMsg)
146 farinafa 1.1 return ret
147 ewv 1.45
148 farinafa 1.2 def subsequentJobSubmit(self, blTaskName, rng):
149 farinafa 1.1 """
150     _subsequentJobSubmit_
151     Let the submission of other jobs of a task that has been already sent to a server.
152     This method is used for subsequent submission of ranged sets of jobs.
153    
154     Accepts in input:
155     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
156     - the range of the submission as specified by the user at the command line
157     """
158 farinafa 1.2 return self._genericCommand('submit', blTaskName, rng)
159 farinafa 1.1
160 farinafa 1.2 def killJobs(self, blTaskName, rng):
161 farinafa 1.1 """
162     _killJobs_
163     Send a kill command to one or more jobs running on the server.
164    
165     Accepts in input:
166     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
167     - the range of the submission as specified by the user at the command line
168     """
169 farinafa 1.2 return self._genericCommand('kill', blTaskName, rng)
170 farinafa 1.1
171 riahi 1.52 def StopWorkflow(self, blTaskName):
172     """
173     _StopWorkflow_
174     """
175     return self._genericCommand('StopWorkflow', blTaskName, 'all')
176    
177 farinafa 1.42 def cleanTask(self, blTaskName):
178 farinafa 1.1 """
179 farinafa 1.42 _cleanTask_
180 farinafa 1.1 Force the server to clean the jobs on the server.
181    
182     Accepts in input:
183     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
184     - the range of the submission as specified by the user at the command line
185     """
186 farinafa 1.42 return self._genericCommand('clean', blTaskName, 'all')
187 farinafa 1.1
188 farinafa 1.23 def getStatus(self, blTaskName, statusFile=None, statusFamilyType='status'):
189 farinafa 1.1 """
190     _getStatus_
191 farinafa 1.23 Retrieve the task status from the server. It can recover any kind of status (version, loggingInfos,...)
192     """
193 farinafa 1.1
194     # fill the filename
195     filename = str(statusFile)
196    
197 farinafa 1.2 if not blTaskName:
198 farinafa 1.1 raise CrabException('Exception while getting the task unique name')
199     return ''
200    
201     # get the data and fill the file content
202 farinafa 1.23 statusMsg = self.asSession.getTaskStatus(statusFamilyType, blTaskName)
203 farinafa 1.2 if 'Error:' in statusMsg[:6] or len(statusMsg)==0:
204 farinafa 1.11 raise CrabException('Error occurred while retrieving task %s status from server %s'%(self.crab_task_name, self.serverName) )
205 farinafa 1.2 return
206    
207     if statusFile is not None:
208     f = open(statusFile, 'w')
209     f.write(statusMsg)
210     f.close()
211 ewv 1.45 return statusFile
212 farinafa 1.2 return statusMsg
213 farinafa 1.1
214 farinafa 1.8 def outputRetrieved(self, blTaskName, rng):
215 farinafa 1.1 """
216     _getJobsOutput_
217     Get from the server the output file locations to be transfered back.
218    
219     Accepts in input:
220     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
221     - the range of the submission as specified by the user at the command line
222     """
223 farinafa 1.8 return self._genericCommand('outputRetrieved', blTaskName, rng)
224 farinafa 1.1
225 farinafa 1.49 def checkDrainMode(self, blTaskName='null'):
226     return self.getStatus( blTaskName, statusFamilyType='isServerDrained')
227    
228 farinafa 1.2 def postMortemInfos(self, blTaskName, rng):
229 farinafa 1.1 """
230     _postMortemInfos_
231     Retrieve the job postmortem information from the server.
232    
233     Accepts in input:
234     - the bossLite object representing the task (jobs are assumed to be RunningJobs)
235     - the range of the submission as specified by the user at the command line
236     """
237 ewv 1.45 # get the status in
238 farinafa 1.1 raise NotImplementedError
239     return None
240    
241     ###################################################
242     # Auxiliary methods
243     ###################################################
244 ewv 1.45
245 spiga 1.29 def _createXMLcommand(self, taskUName, cmdSpec='status', rng='all', newTaskAddIns=False, flavour='analysis', type='fullySpecified',jobs='-1'):
246 farinafa 1.1 xmlString = ''
247     cfile = minidom.Document()
248 ewv 1.45
249 spiga 1.27 ver = common.prog_version_str
250 farinafa 1.1 node = cfile.createElement("TaskCommand")
251     node.setAttribute("Task", str(taskUName) )
252     node.setAttribute("Subject", str(self.userSubj) )
253     node.setAttribute("Command", str(cmdSpec) )
254     node.setAttribute("Range", str(rng) )
255 spiga 1.29 node.setAttribute("TotJob", str(jobs) )
256 ewv 1.45 node.setAttribute("Scheduler", str(self.cfg_params['CRAB.scheduler']) )
257 farinafa 1.24 node.setAttribute("Flavour", str(flavour) )
258 ewv 1.45 node.setAttribute("Type", str(type) )
259     node.setAttribute("ClientVersion", str(ver) )
260 farinafa 1.10
261 spiga 1.30 ## Only Temporary. it should be at Server level
262 spiga 1.55 removeBList = self.cfg_params.get("GRID.remove_default_blacklist", 0 )
263 spiga 1.57 blackAnaOps = None
264 spiga 1.55 if int(removeBList) == 0:
265 spiga 1.56 blacklist = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/")
266 spiga 1.55 result = blacklist.config("site_black_list.conf")
267     if result != None:
268     blackAnaOps = result
269     common.logger.debug("Enforced black list: %s "%str(blacklist))
270     else:
271     common.logger.info("WARNING: Skipping default black list!")
272 spiga 1.30
273 farinafa 1.1 # create a mini-cfg to be transfered to the server
274     miniCfg = {}
275 farinafa 1.3
276     ## migrate CE/SE infos
277 spiga 1.37 miniCfg['EDG.ce_white_list'] = ""
278 spiga 1.36 if 'GRID.ce_white_list' in self.cfg_params:
279 spiga 1.37 miniCfg['EDG.ce_white_list'] = str( self.cfg_params['GRID.ce_white_list'] )
280 farinafa 1.1
281 spiga 1.55 miniCfg['EDG.ce_black_list'] = blackAnaOps
282 spiga 1.36 if 'GRID.ce_black_list' in self.cfg_params:
283 spiga 1.57 if blackAnaOps:
284 spiga 1.37 miniCfg['EDG.ce_black_list'] += ", "
285     miniCfg['EDG.ce_black_list'] += str( self.cfg_params['GRID.ce_black_list'] )
286 farinafa 1.1
287 spiga 1.37 miniCfg['EDG.se_white_list'] = ""
288 spiga 1.36 if 'GRID.se_white_list' in self.cfg_params:
289 spiga 1.37 miniCfg['EDG.se_white_list'] = str( self.cfg_params['GRID.se_white_list'] )
290 spiga 1.36
291 spiga 1.37 miniCfg['EDG.se_black_list'] = ""
292 spiga 1.36 if 'GRID.se_black_list' in self.cfg_params:
293 spiga 1.37 miniCfg['EDG.se_black_list'] = str( self.cfg_params['GRID.se_black_list'] )
294 spiga 1.36
295 spiga 1.37 miniCfg['EDG.group'] = ""
296 spiga 1.36 if 'GRID.group' in self.cfg_params:
297 spiga 1.37 miniCfg['EDG.group'] = str( self.cfg_params['GRID.group'] )
298 spiga 1.36
299 spiga 1.37 miniCfg['EDG.role'] = ""
300 spiga 1.36 if 'GRID.role' in self.cfg_params:
301 spiga 1.37 miniCfg['EDG.role'] = str( self.cfg_params['GRID.role'] )
302 spiga 1.33
303 ewv 1.45 miniCfg['cfgFileNameCkSum'] = makeCksum(common.work_space.cfgFileName())
304 farinafa 1.1 if 'cfgFileNameCkSum' in self.cfg_params:
305     miniCfg['cfgFileNameCkSum'] = str(self.cfg_params['cfgFileNameCkSum'])
306    
307     miniCfg['CRAB.se_remote_dir'] = ''
308     if 'CRAB.se_remote_dir' in self.cfg_params:
309 ewv 1.45 miniCfg['CRAB.se_remote_dir'] = str(self.cfg_params['CRAB.se_remote_dir'])
310 farinafa 1.1
311 spiga 1.41 miniCfg['CAF.queue'] = self.cfg_params.get('CAF.queue','cmscaf1nw')
312     miniCfg['CAF.resources'] = self.cfg_params.get('CAF.resource', 'cmscaf')
313 farinafa 1.3 ## JDL requirements specific data. Scheduler dependant
314 spiga 1.37 miniCfg['EDG.max_wall_time'] = self.cfg_params.get('GRID.max_wall_clock_time', None)
315     miniCfg['EDG.max_cpu_time'] = self.cfg_params.get('GRID.max_cpu_time', '130')
316 spiga 1.36 miniCfg['proxyServer'] = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
317     miniCfg['VO'] = self.cfg_params.get('GRID.virtual_organization', 'cms')
318 spiga 1.37 miniCfg['EDG_retry_count'] = self.cfg_params.get('GRID.retry_count',0)
319     miniCfg['EDG_shallow_retry_count'] = self.cfg_params.get('GRID.shallow_retry_count',-1)
320 farinafa 1.13
321 farinafa 1.14 ## Additional field for DashBoard
322     miniCfg['CMSSW.datasetpath'] = self.cfg_params.get('CMSSW.datasetpath', 'None')
323    
324     ## Additional fields for Notification by the server
325 farinafa 1.15 miniCfg['eMail'] = self.cfg_params.get('USER.email', None)
326 farinafa 1.20 miniCfg['threshold'] = self.cfg_params.get('USER.thresholdlevel', 100)
327 farinafa 1.3
328 spiga 1.34 miniCfg['CMSSW_version'] = self.scram.getSWVersion()
329 spiga 1.48 #WMBS
330     miniCfg['feeder'] = self.cfg_params.get('WMBS.feeder', 'Feeder')
331 riahi 1.53
332     miniCfg['processing'] = self.cfg_params.get('WMBS.processing', 'bulk')
333     miniCfg['startrun'] = self.cfg_params.get('WMBS.startrun', '0')
334    
335 spiga 1.48 miniCfg['splitting_algorithm'] = self.cfg_params.get('WMBS.splitting_algorithm','FileBased')
336 riahi 1.51 miniCfg['split_per_job'] = self.cfg_params.get('WMBS.split_per_job','files_per_job')
337     miniCfg['split_value'] = self.cfg_params.get('WMBS.split_value',1)
338 spiga 1.48
339     miniCfg['CMSSW_version'] = self.scram.getSWVersion()
340 ewv 1.45
341 farinafa 1.1 ## put here other fields if needed
342     node.setAttribute("CfgParamDict", str(miniCfg) )
343     cfile.appendChild(node)
344     xmlString += str(cfile.toprettyxml())
345     return xmlString
346    
347 farinafa 1.2 def _genericCommand(self, cmd, blTaskName, rng):
348     if not blTaskName:
349 farinafa 1.1 raise CrabException('Error while extracting the Task Unique Name string')
350     return -2
351    
352     cmdXML = None
353 farinafa 1.2 cmdXML = self._createXMLcommand(blTaskName, cmd, rng)
354 farinafa 1.1 if not cmdXML:
355     raise CrabException('Error while creating the Command XML')
356     return -3
357 ewv 1.45
358 farinafa 1.1 ret = -1
359 farinafa 1.2 ret = self.asSession.sendCommand(cmdXML, blTaskName)
360 farinafa 1.1 logMsg = ''
361 ewv 1.45 debugMsg = ''
362 farinafa 1.1 if ret == 0:
363     # success
364 farinafa 1.22 debugMsg = 'Command successfully sent to server %s for task %s'%(self.serverName, self.crab_task_name)
365 farinafa 1.1 else:
366 spiga 1.27 self.checkServerResponse(ret)
367 farinafa 1.1 return ret
368 ewv 1.45