1 |
#!/usr/bin/python
|
2 |
|
3 |
"""
|
4 |
This is the Dashboard API Module for the Worker Node
|
5 |
"""
|
6 |
|
7 |
import apmon
|
8 |
import time, sys, os
|
9 |
from types import DictType, StringType, ListType
|
10 |
|
11 |
#
|
12 |
# Methods for manipulating the apmon instance
|
13 |
#
|
14 |
|
15 |
# Config attributes
|
16 |
apmonUseUrl = False
|
17 |
|
18 |
# Internal attributes
|
19 |
apmonInstance = None
|
20 |
apmonInit = False
|
21 |
|
22 |
# Monalisa configuration
|
23 |
#apmonUrlList = ["http://lxgate35.cern.ch:40808/ApMonConf?app=dashboard", \
|
24 |
# "http://monalisa.cacr.caltech.edu:40808/ApMonConf?app=dashboard"]
|
25 |
#apmonConf = {'dashboard08.cern.ch:8884': {'sys_monitoring' : 0, \
|
26 |
# 'general_info' : 0, \
|
27 |
# 'job_monitoring' : 0} }
|
28 |
apmonConf = {'cms-jobmon.cern.ch:8884': {'sys_monitoring' : 0, \
|
29 |
'general_info' : 0, \
|
30 |
'job_monitoring' : 0} }
|
31 |
|
32 |
|
33 |
apmonLoggingLevel = apmon.Logger.ERROR
|
34 |
|
35 |
#
|
36 |
# Method to create a single apmon instance at a time
|
37 |
#
|
38 |
def getApmonInstance():
|
39 |
global apmonInstance
|
40 |
global apmonInit
|
41 |
if apmonInstance is None and not apmonInit :
|
42 |
apmonInit = True
|
43 |
if apmonUseUrl :
|
44 |
apm = None
|
45 |
#print "Creating ApMon with dynamic configuration/url"
|
46 |
try :
|
47 |
apm = apmon.ApMon(apmonUrlList, apmonLoggingLevel);
|
48 |
except Exception, e :
|
49 |
pass
|
50 |
if apm is not None and not apm.initializedOK():
|
51 |
#print "Setting ApMon to static configuration"
|
52 |
try :
|
53 |
apm.setDestinations(apmonConf)
|
54 |
except Exception, e :
|
55 |
apm = None
|
56 |
apmonInstance = apm
|
57 |
if apmonInstance is None :
|
58 |
#print "Creating ApMon with static configuration"
|
59 |
try :
|
60 |
apmonInstance = apmon.ApMon(apmonConf, apmonLoggingLevel)
|
61 |
except Exception, e :
|
62 |
pass
|
63 |
return apmonInstance
|
64 |
|
65 |
#
|
66 |
# Method to free the apmon instance
|
67 |
#
|
68 |
def apmonFree() :
|
69 |
global apmonInstance
|
70 |
global apmonInit
|
71 |
if apmonInstance is not None :
|
72 |
time.sleep(1)
|
73 |
try :
|
74 |
apmonInstance.free()
|
75 |
except Exception, e :
|
76 |
pass
|
77 |
apmonInstance = None
|
78 |
apmonInit = False
|
79 |
|
80 |
#
|
81 |
# Method to send params to Monalisa service
|
82 |
#
|
83 |
def apmonSend(taskid, jobid, params) :
|
84 |
apm = getApmonInstance()
|
85 |
if apm is not None :
|
86 |
if not isinstance(params, DictType) and not isinstance(params, ListType) :
|
87 |
params = {'unknown' : '0'}
|
88 |
if not isinstance(taskid, StringType) :
|
89 |
taskid = 'unknown'
|
90 |
if not isinstance(jobid, StringType) :
|
91 |
jobid = 'unknown'
|
92 |
try :
|
93 |
apm.sendParameters(taskid, jobid, params)
|
94 |
except Exception, e:
|
95 |
pass
|
96 |
|
97 |
#
|
98 |
# Common method for writing debug information in a file
|
99 |
#
|
100 |
def logger(msg) :
|
101 |
msg = str(msg)
|
102 |
if not msg.endswith('\n') :
|
103 |
msg += '\n'
|
104 |
try :
|
105 |
fh = open('report.log','a')
|
106 |
fh.write(msg)
|
107 |
fh.close
|
108 |
except Exception, e :
|
109 |
pass
|
110 |
|
111 |
#
|
112 |
# Context handling for CLI
|
113 |
#
|
114 |
|
115 |
# Format envvar, context var name, context var default value
|
116 |
contextConf = {'MonitorID' : ('MonitorID', 'unknown'),
|
117 |
'MonitorJobID' : ('MonitorJobID', 'unknown') }
|
118 |
|
119 |
#
|
120 |
# Method to return the context
|
121 |
#
|
122 |
def getContext(overload={}) :
|
123 |
if not isinstance(overload, DictType) :
|
124 |
overload = {}
|
125 |
context = {}
|
126 |
for paramName in contextConf.keys() :
|
127 |
paramValue = None
|
128 |
if overload.has_key(paramName) :
|
129 |
paramValue = overload[paramName]
|
130 |
if paramValue is None :
|
131 |
envVar = contextConf[paramName][0]
|
132 |
paramValue = os.getenv(envVar)
|
133 |
if paramValue is None :
|
134 |
defaultValue = contextConf[paramName][1]
|
135 |
paramValue = defaultValue
|
136 |
context[paramName] = paramValue
|
137 |
return context
|
138 |
|
139 |
#
|
140 |
# Methods to read in the CLI arguments
|
141 |
#
|
142 |
def readArgs(lines) :
|
143 |
argValues = {}
|
144 |
for line in lines :
|
145 |
paramName = 'unknown'
|
146 |
paramValue = 'unknown'
|
147 |
line = line.strip()
|
148 |
if line.find('=') != -1 :
|
149 |
split = line.split('=')
|
150 |
paramName = split[0]
|
151 |
paramValue = '='.join(split[1:])
|
152 |
else :
|
153 |
paramName = line
|
154 |
if paramName != '' :
|
155 |
argValues[paramName] = paramValue
|
156 |
return argValues
|
157 |
|
158 |
def filterArgs(argValues) :
|
159 |
|
160 |
contextValues = {}
|
161 |
paramValues = {}
|
162 |
|
163 |
for paramName in argValues.keys() :
|
164 |
paramValue = argValues[paramName]
|
165 |
if paramValue is not None :
|
166 |
if paramName in contextConf.keys() :
|
167 |
contextValues[paramName] = paramValue
|
168 |
else :
|
169 |
paramValues[paramName] = paramValue
|
170 |
else :
|
171 |
logger('Bad value for parameter :' + paramName)
|
172 |
|
173 |
return contextValues, paramValues
|
174 |
|
175 |
#
|
176 |
# SHELL SCRIPT BASED JOB WRAPPER
|
177 |
# Main method for the usage of the report.py script
|
178 |
#
|
179 |
def report(args) :
|
180 |
argValues = readArgs(args)
|
181 |
contextArgs, paramArgs = filterArgs(argValues)
|
182 |
context = getContext(contextArgs)
|
183 |
taskId = context['MonitorID']
|
184 |
jobId = context['MonitorJobID']
|
185 |
logger('SENDING with Task:%s Job:%s' % (taskId, jobId))
|
186 |
logger('params : ' + `paramArgs`)
|
187 |
apmonSend(taskId, jobId, paramArgs)
|
188 |
apmonFree()
|
189 |
print "Parameters sent to Dashboard."
|
190 |
|
191 |
#
|
192 |
# PYTHON BASED JOB WRAPPER
|
193 |
# Main class for Dashboard reporting
|
194 |
#
|
195 |
class DashboardAPI :
|
196 |
def __init__(self, monitorId = None, jobMonitorId = None, lookupUrl = None) :
|
197 |
self.defaultContext = {}
|
198 |
self.defaultContext['MonitorID'] = monitorId
|
199 |
self.defaultContext['MonitorJobID'] = jobMonitorId
|
200 |
# cannot be set from outside
|
201 |
self.defaultContext['MonitorLookupURL'] = lookupUrl
|
202 |
|
203 |
def publish(self,**message) :
|
204 |
publishValues(self, None, None, message)
|
205 |
|
206 |
def publishValues(self, taskId, jobId, message) :
|
207 |
contextArgs, paramArgs = filterArgs(message)
|
208 |
if taskId is not None :
|
209 |
contextArgs['MonitorID'] = taskId
|
210 |
if jobId is not None :
|
211 |
contextArgs['MonitorJobID'] = jobId
|
212 |
for key in contextConf.keys() :
|
213 |
if not contextArgs.has_key(key) and self.defaultContext[key] is not None :
|
214 |
contextArgs[key] = self.defaultContext[key]
|
215 |
context = getContext(contextArgs)
|
216 |
taskId = context['MonitorID']
|
217 |
jobId = context['MonitorJobID']
|
218 |
apmonSend(taskId, jobId, paramArgs)
|
219 |
|
220 |
def sendValues(self, message, jobId=None, taskId=None) :
|
221 |
self.publishValues(taskId, jobId, message)
|
222 |
|
223 |
def free(self) :
|
224 |
apmonFree()
|
225 |
|
226 |
##
|
227 |
## MAIN PROGRAM (FOR TEST)
|
228 |
##
|
229 |
if __name__ == '__main__' :
|
230 |
dashboard = DashboardAPI('julia_test_1111','1_https://sbgrb1.in2p3.fr:9000/xxxx')
|
231 |
dashboard.sendValues({'SubmissionType':'Direct','application':'CMSSW_1_3_6','taskType':'analysis'})
|
232 |
dashboard.free()
|
233 |
sys.exit(0)
|
234 |
# args = sys.argv[1:]
|
235 |
# if len(args) > 0 and args[0] == 'TEST' :
|
236 |
# dashboard = DashboardAPI('Test')
|
237 |
# for i in range(100) :
|
238 |
# #print 'Test', 'testjob_' + `i`, {'testparam':i}
|
239 |
# dashboard.sendValues({'testparam':i}, 'testjob_' + `i`)
|
240 |
# dashboard.free()
|
241 |
# sys.exit(0)
|
242 |
# report(args)
|
243 |
# sys.exit(0)
|