ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.90
Committed: Thu Oct 29 18:01:48 2009 UTC (15 years, 6 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
Changes since 1.89: +23 -1 lines
Log Message:
Fix for bug #55671. Now client check if the server is not in drain mode before sending ISB

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands, traceback
8 import zlib, base64
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 from ProdCommon.Storage.SEAPI.Exceptions import *
16 from ProdCommon.Credential.CredentialAPI import CredentialAPI
17
18 class SubmitterServer( Submitter ):
19 def __init__(self, cfg_params, parsed_range, val):
20 self.srvCfg = {}
21 self.cfg_params = cfg_params
22 self.submitRange = []
23 self.credentialType = 'Proxy'
24 self.copyTout= setLcgTimeout()
25 #wmbs
26 self.type = int(cfg_params.get('WMBS.automation',0))
27 self.taskType = 'fullySpecified'
28 if self.type==1: self.taskType='partiallySpecified'
29 print self.type
30 if common.scheduler.name().upper() in ['LSF', 'CAF']:
31 self.credentialType = 'Token'
32 self.copyTout= ' '
33
34 Submitter.__init__(self, cfg_params, parsed_range, val)
35
36 # init client server params...
37 CliServerParams(self)
38
39 # path fix
40 if self.storage_path[0]!='/':
41 self.storage_path = '/'+self.storage_path
42
43 self.taskuuid = str(common._db.queryTask('name'))
44 self.limitJobs = False
45
46 return
47
48 def run(self):
49 """
50 The main method of the class: submit jobs in range self.nj_list
51 """
52 common.logger.debug("SubmitterServer::run() called")
53
54 start = time.time()
55 #wmbs
56 self.BuildJobList(self.type)
57
58 self.submitRange = self.nj_list
59
60 ## wmbs
61 check = self.checkIfCreate(self.type)
62
63 if check == 0 :
64
65 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
66 self.manageCredential()
67
68 # check if it is the first submission
69 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
70
71 # standard submission to the server
72 self.performSubmission(isFirstSubmission)
73
74 stop = time.time()
75 common.logger.debug("Submission Time: "+str(stop - start))
76 #wmbs
77 if self.type ==0 :msg = 'Total of %d jobs submitted'%len(self.submitRange)
78 else: msg='Request submitted to the server.'
79 common.logger.info(msg)
80
81 if int(self.type)==1:
82 common._db.updateTask_({'jobType':'Submitted'})
83 return
84
85 def moveISB_SEAPI(self):
86 ## get task info from BL ##
87 common.logger.debug("Task name: " + self.taskuuid)
88 isblist = common._db.queryTask('globalSandbox').split(',')
89 common.logger.debug("List of ISB files: " +str(isblist) )
90
91 # init SE interface
92 common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...")
93 try:
94 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
95 except Exception, ex:
96 common.logger.debug(str(ex))
97 msg = "ERROR : Unable to create SE destination interface \n"
98 msg +="Project "+ self.taskuuid +" not Submitted \n"
99 raise CrabException(msg)
100
101 try:
102 loc = SElement("localhost", "local")
103 except Exception, ex:
104 common.logger.debug(str(ex))
105 msg = "ERROR : Unable to create SE source interface \n"
106 msg +="Project "+ self.taskuuid +" not Submitted \n"
107 raise CrabException(msg)
108
109
110 ### it should not be there... To move into SE API. DS
111
112 # create remote dir for gsiftp
113 if self.storage_proto in ['gridftp','rfio','uberftp']:
114 try:
115 action = SBinterface( seEl )
116 action.createDir( self.remotedir )
117 except AlreadyExistsException, ex:
118 msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
119 msg +='\t%s'%str(ex)
120 common.logger.debug(msg)
121 except OperationException, ex:
122 common.logger.debug(str(ex.detail))
123 msg = "ERROR: Unable to create project destination on the Storage Element %s\n"%str(ex)
124 msg +="Project "+ self.taskuuid +" not Submitted \n"
125 raise CrabException(msg)
126 except AuthorizationException, ex:
127 common.logger.debug(str(ex.detail))
128 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
129 msg +="Project "+ self.taskuuid +" not Submitted \n"
130 raise CrabException(msg)
131 except TransferException, ex:
132 common.logger.debug(str(ex.detail))
133 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
134 msg +="Project "+ self.taskuuid +" not Submitted \n"
135 raise CrabException(msg)
136
137 ## copy ISB ##
138 sbi = SBinterface( loc, seEl )
139
140 # can copy a list of files
141 if self.storage_proto in ['globus']:
142 #print "[moveISB_SEAPI] doing globus-url-copy"
143 # construct a list of absolute paths of input files
144 # and the destinations to copy them to
145 sourcesList = []
146 destsList = []
147 for filetocopy in isblist:
148 sourcesList.append(os.path.abspath(filetocopy))
149 destsList.append(os.path.join(self.remotedir, os.path.basename(filetocopy)))
150
151 # construct logging information
152 toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
153 common.logger.debug("Sending:\n " + toCopy)
154
155 # try to do the copy
156 copy_res = None
157 try:
158 copy_res = sbi.copy( sourcesList, destsList, opt=self.copyTout)
159 except AuthorizationException, ex:
160 common.logger.debug(str(ex.detail))
161 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
162 msg +="Project "+ self.taskuuid +" not Submitted \n"
163 raise CrabException(msg)
164 except Exception, ex:
165 common.logger.debug(str(ex))
166 import traceback
167 common.logger.debug(str(traceback.format_exc()))
168 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
169 msg +="Project "+ self.taskuuid +" not Submitted \n"
170 raise CrabException(msg)
171 if copy_res is None:
172 raise CrabException("Unkown Error: Unable to ship the project to the server!")
173 else:
174 ## evaluating copy results
175 copy_err_list = []
176 for ll in map(None, copy_res, sourcesList):
177 exitcode = int(ll[0][0])
178 if exitcode == 0:
179 pass
180 else:
181 copy_err_list.append( [ ll[1], ll[0][1] ] )
182 ## now raise an exception, but the submission could be retried
183 if len(copy_err_list) > 0:
184 msg = "ERROR : Unable to ship the project to the server\n"
185 for problem in copy_err_list:
186 msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
187 msg += "Project "+ self.taskuuid +" not Submitted \n"
188 raise CrabException(msg)
189
190 # cannot copy a list of files, need to do
191 # each file in turn
192 else:
193 for filetocopy in isblist:
194 source = os.path.abspath(filetocopy)
195 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
196 common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
197 try:
198 sbi.copy( source, dest, opt=self.copyTout)
199 except AuthorizationException, ex:
200 common.logger.debug(str(ex.detail))
201 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
202 msg +="Project "+ self.taskuuid +" not Submitted \n"
203 raise CrabException(msg)
204 except Exception, ex:
205 common.logger.debug(str(ex))
206 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
207 msg +="Project "+ self.taskuuid +" not Submitted \n"
208 raise CrabException(msg)
209 """
210 for filetocopy in isblist:
211 source = os.path.abspath(filetocopy)
212 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
213 common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
214 try:
215 sbi.copy( source, dest, opt=self.copyTout)
216 except AuthorizationException, ex:
217 common.logger.debug(str(ex.detail))
218 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
219 msg +="Project "+ self.taskuuid +" not Submitted \n"
220 raise CrabException(msg)
221 except Exception, ex:
222 common.logger.debug(str(ex))
223 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
224 msg +="Project "+ self.taskuuid +" not Submitted \n"
225 raise CrabException(msg)
226 """
227 ## if here then project submitted ##
228 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
229 common.logger.debug(msg)
230 return
231
232
233 def checkIfDrained(self, csCommunicator):
234 isDrained = False
235
236 try:
237 drainStatus = csCommunicator.checkDrainMode(self.taskuuid)
238 drainStatus += "="*( len(drainStatus)%8 )
239 drainStatus = zlib.decompress( base64.urlsafe_b64decode(drainStatus) )
240 isDrained = ( drainStatus == "true")
241 except Exception, e:
242 common.logger.debug("Problem while checking server drain mode. The server will be assumed as available.")
243 common.logger.debug( traceback.format_exc() )
244 isDrained = False
245
246 if isDrained == True:
247 msg = "Server is in Drain mode. Unable to submit new tasks."
248 raise CrabException(msg)
249
250 return isDrained
251
252 def manageCredential(self):
253 """
254 Prepare configuration and Call credential API
255 """
256 common.logger.info("Registering credential to the server : %s"%self.server_name)
257
258 myproxyserver = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
259 configAPI = {'credential' : self.credentialType, \
260 'myProxySvr' : myproxyserver,\
261 'serverDN' : self.server_dn,\
262 'shareDir' : common.work_space.shareDir() ,\
263 'userName' : getUserName(),\
264 'serverName' : self.server_name, \
265 'logger' : common.logger() \
266 }
267
268 try:
269 CredAPI = CredentialAPI( configAPI )
270 except Exception, err :
271 common.logger.debug("Configuring Credential API: " +str(traceback.format_exc()))
272 raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
273
274
275 if self.credentialType == 'Proxy':
276 # Proxy delegation through MyProxy, 4 days lifetime minimum
277 if not CredAPI.checkMyProxy(Time=4, checkRetrieverRenewer=True) :
278 common.logger.info("Please renew MyProxy delegated proxy:\n")
279 try:
280 CredAPI.credObj.serverDN = self.server_dn
281 CredAPI.ManualRenewMyProxy()
282 except Exception, ex:
283 common.logger.debug("Delegating Credentials to MyProxy : " +str(traceback.format_exc()))
284 raise CrabException(str(ex))
285 else:
286 # Kerberos token movement
287 if not CredAPI.checkCredential(Time=12) :
288 common.logger.info("Please renew the token:\n")
289 try:
290 CredAPI.ManualRenewCredential()
291 except Exception, ex:
292 raise CrabException(str(ex))
293
294 try:
295 dict = CredAPI.registerCredential()
296 except Exception, err:
297 common.logger.debug("Registering Credentials : " +str(traceback.format_exc()))
298 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
299
300 common.logger.info("Credential successfully delegated to the server.\n")
301 return
302
303 def performSubmission(self, firstSubmission=True):
304 # create the communication session with the server frontend
305 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
306 taskXML = ''
307 subOutcome = 0
308
309 # transfer remote dir to server
310 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
311
312 if firstSubmission==True:
313 # check if the server is in drain mode
314 if self.checkIfDrained(csCommunicator)==True:
315 return
316
317 TotJob = common._db.nJobs()
318 # move the sandbox
319 self.moveISB_SEAPI()
320
321 # first time submit
322 try:
323 self.stateChange( self.submitRange, "SubRequested" )
324 taskXML += common._db.serializeTask( common._db.getTask() )
325 common.logger.debug(taskXML)
326 except Exception, e:
327 self.stateChange( self.submitRange, "Created" )
328 msg = "BossLite ERROR: Unable to serialize task object\n"
329 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
330 msg += str(e)
331 raise CrabException(msg)
332
333 # TODO fix not needed first field
334 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob,taskType=self.taskType)
335 else:
336 # subsequent submissions and resubmit
337 self.stateChange( self.submitRange, "SubRequested" )
338 try:
339 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
340 except Exception, ex: ##change to specific exception
341 ## clean sub. requested status
342 self.stateChange( self.submitRange, "Created" )
343
344
345 if subOutcome != 0:
346 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
347 self.stateChange( self.submitRange, "Created" )
348 common.logger.debug(msg)
349 raise CrabException('ERROR Jobs NOT submitted.')
350
351 del csCommunicator
352
353 return
354
355
356 def markSubmitting(self):
357 """
358 _markSubmitting_
359 sign local db for jobs sent -submitted- to the server
360 (just for the first submission)
361 """
362 common.logger.debug("Updating submitting jobs %s"%str(self.submitRange))
363 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
364 common._db.updateRunJob_(self.submitRange, updlist)
365
366