ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.105
Committed: Mon Nov 12 22:01:14 2012 UTC (12 years, 5 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, HEAD
Changes since 1.104: +5 -0 lines
Log Message:
remoteGlidein scheduler requires use_server=0 https://savannah.cern.ch/bugs/?98651

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands, traceback
8 import zlib, base64
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 from ProdCommon.Storage.SEAPI.Exceptions import *
16 from ProdCommon.Credential.CredentialAPI import CredentialAPI
17
18 from Downloader import Downloader
19
20 class SubmitterServer( Submitter ):
21 def __init__(self, cfg_params, parsed_range, val):
22 self.srvCfg = {}
23 self.cfg_params = cfg_params
24 self.submitRange = []
25 self.credentialType = 'Proxy'
26 self.copyTout= setLcgTimeout()
27 self.extended=int(cfg_params.get('CMSSW.extend',0))
28 #wmbs
29 self.type = int(cfg_params.get('WMBS.automation',0))
30 self.taskType = 'fullySpecified'
31 if self.type==1: self.taskType='partiallySpecified'
32 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33 self.credentialType = 'Token'
34 self.copyTout= ' '
35
36 if common.scheduler.name().upper() == 'REMOTEGLIDEIN':
37 msg = "FATAL ERROR: remoteGlidein scheduler requires use_server=0"
38 raise CrabException(msg)
39
40
41 Submitter.__init__(self, cfg_params, parsed_range, val)
42
43 # init client server params...
44 CliServerParams(self)
45
46 # path fix
47 if self.storage_path[0]!='/':
48 self.storage_path = '/'+self.storage_path
49
50 self.taskuuid = str(common._db.queryTask('name'))
51 self.limitJobs = False
52
53
54 return
55
56 def run(self):
57 """
58 The main method of the class: submit jobs in range self.nj_list
59 """
60 common.logger.debug("SubmitterServer::run() called")
61
62 start = time.time()
63 #wmbs
64 self.BuildJobList(self.type)
65
66 self.submitRange = self.nj_list
67
68 ## wmbs
69 check = self.checkIfCreate(self.type)
70
71 if check == 0 :
72
73 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
74 self.manageCredential()
75
76 # check if it is the first submission
77 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
78
79 # standard submission to the server
80 self.performSubmission(isFirstSubmission)
81
82 stop = time.time()
83 common.logger.debug("Submission Time: "+str(stop - start))
84 #wmbs
85 if self.type == 0 : msg = 'Total of %d jobs submitted'%len(self.submitRange)
86 else: msg='Request submitted to the server.'
87 common.logger.info(msg)
88
89 if int(self.type)==1:
90 common._db.updateTask_({'jobType':'Submitted'})
91 return
92
93 def moveISB_SEAPI(self, listOffiles=[]):
94 ## get task info from BL ##
95 common.logger.debug("Task name: " + self.taskuuid)
96 if len(listOffiles)>0:
97 isblist=listOffiles
98 else:
99 isblist = str(common._db.queryTask('globalSandbox')).split(',')
100 common.logger.debug("List of ISB files: " +str(isblist) )
101
102 # init SE interface
103 common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...")
104 try:
105 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
106 except Exception, ex:
107 common.logger.debug(str(ex))
108 msg = "ERROR : Unable to create SE destination interface \n"
109 msg +="Project "+ self.taskuuid +" not Submitted \n"
110 raise CrabException(msg)
111
112 try:
113 loc = SElement("localhost", "local")
114 except Exception, ex:
115 common.logger.debug(str(ex))
116 msg = "ERROR : Unable to create SE source interface \n"
117 msg +="Project "+ self.taskuuid +" not Submitted \n"
118 raise CrabException(msg)
119
120
121 ### it should not be there... To move into SE API. DS
122
123 # create remote dir for gsiftp
124 if self.storage_proto in ['gridftp','rfio','uberftp']:
125 try:
126 action = SBinterface(seEl, logger = common.logger.logger)
127 action.createDir( self.remotedir )
128 except AlreadyExistsException, ex:
129 msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
130 msg +='\t%s'%str(ex)
131 common.logger.debug(msg)
132 except OperationException, ex:
133 common.logger.debug(str(ex.detail))
134 msg = "ERROR: Unable to create project destination on the Storage Element %s\n"%str(ex)
135 msg +="Project "+ self.taskuuid +" not Submitted \n"
136 raise CrabException(msg)
137 except AuthorizationException, ex:
138 common.logger.debug(str(ex.detail))
139 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
140 msg +="Project "+ self.taskuuid +" not Submitted \n"
141 raise CrabException(msg)
142 except TransferException, ex:
143 common.logger.debug(str(ex.detail))
144 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
145 msg +="Project "+ self.taskuuid +" not Submitted \n"
146 raise CrabException(msg)
147
148 ## copy ISB ##
149 sbi = SBinterface(loc, seEl, logger = common.logger.logger)
150
151 # can copy a list of files
152 if self.storage_proto in ['globus']:
153 #print "[moveISB_SEAPI] doing globus-url-copy"
154 # construct a list of absolute paths of input files
155 # and the destinations to copy them to
156 sourcesList = []
157 destsList = []
158 for filetocopy in isblist:
159 sourcesList.append(os.path.abspath(filetocopy))
160 destsList.append(os.path.join(self.remotedir, os.path.basename(filetocopy)))
161
162 # construct logging information
163 toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
164 common.logger.debug("Sending:\n " + toCopy)
165
166 # try to do the copy
167 copy_res = None
168 try:
169 copy_res = sbi.copy( sourcesList, destsList, opt=self.copyTout)
170 except AuthorizationException, ex:
171 common.logger.debug(str(ex.detail))
172 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
173 msg +="Project "+ self.taskuuid +" not Submitted \n"
174 raise CrabException(msg)
175 except Exception, ex:
176 common.logger.debug(str(ex))
177 import traceback
178 common.logger.debug(str(traceback.format_exc()))
179 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
180 msg +="Project "+ self.taskuuid +" not Submitted \n"
181 raise CrabException(msg)
182 if copy_res is None:
183 raise CrabException("Unkown Error: Unable to ship the project to the server!")
184 else:
185 ## evaluating copy results
186 copy_err_list = []
187 for ll in map(None, copy_res, sourcesList):
188 exitcode = int(ll[0][0])
189 if exitcode == 0:
190 pass
191 else:
192 copy_err_list.append( [ ll[1], ll[0][1] ] )
193 ## now raise an exception, but the submission could be retried
194 if len(copy_err_list) > 0:
195 msg = "ERROR : Unable to ship the project to the server\n"
196 for problem in copy_err_list:
197 msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
198 msg += "Project "+ self.taskuuid +" not Submitted \n"
199 raise CrabException(msg)
200
201 # cannot copy a list of files, need to do
202 # each file in turn
203 else:
204 for filetocopy in isblist:
205 source = os.path.abspath(filetocopy)
206 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
207 common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
208 try:
209 sbi.copy( source, dest, opt=self.copyTout)
210 except AuthorizationException, ex:
211 common.logger.debug(str(ex.detail))
212 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
213 msg +="Project "+ self.taskuuid +" not Submitted \n"
214 raise CrabException(msg)
215 except Exception, ex:
216 common.logger.debug(str(ex))
217 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
218 msg +="Project "+ self.taskuuid +" not Submitted \n"
219 raise CrabException(msg)
220
221 ## if here then project submitted ##
222 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
223 common.logger.debug(msg)
224 return
225
226
227 def checkIfDrained(self, csCommunicator):
228 isDrained = False
229
230 try:
231 drainStatus = csCommunicator.checkDrainMode(self.taskuuid)
232 drainStatus += "="*( len(drainStatus)%8 )
233 drainStatus = zlib.decompress( base64.urlsafe_b64decode(drainStatus) )
234 isDrained = ( drainStatus == "true")
235 except Exception, e:
236 common.logger.debug("Problem while checking server drain mode. The server will be assumed as available.")
237 common.logger.debug( traceback.format_exc() )
238 isDrained = False
239
240 if isDrained == True:
241 msg = "Server is in Drain mode. Unable to submit new tasks."
242 raise CrabException(msg)
243
244 return isDrained
245
246 def manageCredential(self):
247 """
248 Prepare configuration and Call credential API
249 """
250 common.logger.info("Registering credential to the server : %s"%self.server_name)
251
252 try:
253 myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
254 myproxyserver = myproxyserver.strip()
255 if myproxyserver is None:
256 raise CrabException("myproxy_server.conf retrieved but empty")
257 except Exception, e:
258 common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
259 common.logger.debug(e)
260 myproxyserver = 'myproxy.cern.ch'
261
262 configAPI = {'credential' : self.credentialType, \
263 'myProxySvr' : myproxyserver,\
264 'serverDN' : self.server_dn,\
265 'shareDir' : common.work_space.shareDir() ,\
266 'userName' : getUserName(),\
267 'serverName' : self.server_name, \
268 'proxyPath' : self.proxy_path, \
269 'logger' : common.logger() \
270 }
271
272 try:
273 CredAPI = CredentialAPI( configAPI )
274 except Exception, err :
275 common.logger.debug("Configuring Credential API: " +str(traceback.format_exc()))
276 raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
277
278
279 if self.credentialType == 'Proxy':
280 # Proxy delegation through MyProxy, 4 days lifetime minimum
281 if not CredAPI.checkMyProxy(Time=4, checkRetrieverRenewer=True) :
282 common.logger.info("Please renew MyProxy delegated proxy:\n")
283 try:
284 CredAPI.credObj.serverDN = self.server_dn
285 CredAPI.ManualRenewMyProxy()
286 except Exception, ex:
287 common.logger.debug("Delegating Credentials to MyProxy : " +str(traceback.format_exc()))
288 raise CrabException(str(ex))
289 else:
290 if not CredAPI.checkMyProxy(Time=100):
291 common.logger.info("Please renew the token:\n")
292 try:
293 CredAPI.ManualRenewCredential()
294 except Exception, ex:
295 raise CrabException(str(ex))
296 try:
297 dict = CredAPI.registerCredential()
298 except Exception, err:
299 common.logger.debug("Registering Credentials : " +str(traceback.format_exc()))
300 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
301
302 common.logger.info("Credential successfully delegated to the server.\n")
303 return
304
305 def performSubmission(self, firstSubmission=True):
306 # create the communication session with the server frontend
307 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
308 subOutcome = 0
309
310 TotJob = common._db.nJobs()
311 # transfer remote dir to server
312 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
313
314 if firstSubmission==True:
315 # check if the server is in drain mode
316 if self.checkIfDrained(csCommunicator)==True:
317 return
318
319 # move the sandbox
320 self.moveISB_SEAPI()
321
322 # first time submit
323 taskXML= self.serialize()
324
325 # TODO fix not needed first field
326 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob,taskType=self.taskType)
327 else:
328 # subsequent submissions and resubmit
329 self.stateChange( self.submitRange, "SubRequested" )
330
331 if self.extended==1:
332 # update the Arguments XML file
333 argsXML = common.work_space.shareDir()+'arguments.xml'
334 self.moveISB_SEAPI([argsXML])
335 taskXML= self.serialize()
336 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob,taskType='extended')
337 else:
338 try:
339 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
340 except Exception, ex: ##change to specific exception
341 ## clean sub. requested status
342 self.stateChange( self.submitRange, "Created" )
343
344 if subOutcome != 0:
345 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
346 self.stateChange( self.submitRange, "Created" )
347 common.logger.debug(msg)
348 raise CrabException('ERROR Jobs NOT submitted.')
349
350 del csCommunicator
351
352 return
353
354 def serialize(self):
355 taskXML=''
356 try:
357 self.stateChange( self.submitRange, "SubRequested" )
358 taskXML += common._db.serializeTask( common._db.getTask() )
359 common.logger.debug(taskXML)
360 except Exception, e:
361 self.stateChange( self.submitRange, "Created" )
362 msg = "BossLite ERROR: Unable to serialize task object\n"
363 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
364 msg += str(e)
365 raise CrabException(msg)
366 return taskXML
367
368 def markSubmitting(self):
369 """
370 _markSubmitting_
371 sign local db for jobs sent -submitted- to the server
372 (just for the first submission)
373 """
374 common.logger.debug("Updating submitting jobs %s"%str(self.submitRange))
375 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
376 common._db.updateRunJob_(self.submitRange, updlist)
377
378