ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.105
Committed: Mon Nov 12 22:01:14 2012 UTC (12 years, 5 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, HEAD
Changes since 1.104: +5 -0 lines
Log Message:
remoteGlidein scheduler requires use_server=0 https://savannah.cern.ch/bugs/?98651

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     from crab_util import *
3 spiga 1.3 import common
4 spiga 1.1 from ApmonIf import ApmonIf
5 corvo 1.2
6 ewv 1.83 import os, errno, time, sys, re
7 farinafa 1.48 import commands, traceback
8 farinafa 1.90 import zlib, base64
9 farinafa 1.10
10 spiga 1.34 from Submitter import Submitter
11 ewv 1.83 from ServerCommunicator import ServerCommunicator
12 farinafa 1.32
13     from ProdCommon.Storage.SEAPI.SElement import SElement
14     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 spiga 1.61 from ProdCommon.Storage.SEAPI.Exceptions import *
16 farinafa 1.84 from ProdCommon.Credential.CredentialAPI import CredentialAPI
17 ewv 1.83
18 farinafa 1.95 from Downloader import Downloader
19    
20 spiga 1.34 class SubmitterServer( Submitter ):
21 farinafa 1.29 def __init__(self, cfg_params, parsed_range, val):
22 slacapra 1.76 self.srvCfg = {}
23 farinafa 1.24 self.cfg_params = cfg_params
24 farinafa 1.29 self.submitRange = []
25 ewv 1.83 self.credentialType = 'Proxy'
26 spiga 1.85 self.copyTout= setLcgTimeout()
27 ewv 1.104 self.extended=int(cfg_params.get('CMSSW.extend',0))
28 spiga 1.87 #wmbs
29     self.type = int(cfg_params.get('WMBS.automation',0))
30     self.taskType = 'fullySpecified'
31     if self.type==1: self.taskType='partiallySpecified'
32 spiga 1.51 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33 ewv 1.83 self.credentialType = 'Token'
34     self.copyTout= ' '
35    
36 belforte 1.105 if common.scheduler.name().upper() == 'REMOTEGLIDEIN':
37     msg = "FATAL ERROR: remoteGlidein scheduler requires use_server=0"
38     raise CrabException(msg)
39    
40    
41 ewv 1.83 Submitter.__init__(self, cfg_params, parsed_range, val)
42 mcinquil 1.56
43 spiga 1.36 # init client server params...
44 ewv 1.83 CliServerParams(self)
45 farinafa 1.6
46 farinafa 1.24 # path fix
47     if self.storage_path[0]!='/':
48     self.storage_path = '/'+self.storage_path
49 mcinquil 1.43
50     self.taskuuid = str(common._db.queryTask('name'))
51 ewv 1.83 self.limitJobs = False
52    
53 spiga 1.102
54 farinafa 1.20 return
55 spiga 1.1
56     def run(self):
57 farinafa 1.20 """
58     The main method of the class: submit jobs in range self.nj_list
59     """
60 slacapra 1.76 common.logger.debug("SubmitterServer::run() called")
61 spiga 1.34
62 spiga 1.79 start = time.time()
63 spiga 1.87 #wmbs
64     self.BuildJobList(self.type)
65 spiga 1.79
66 spiga 1.34 self.submitRange = self.nj_list
67 ewv 1.83
68 ewv 1.104 ## wmbs
69     check = self.checkIfCreate(self.type)
70 farinafa 1.31
71 spiga 1.34 if check == 0 :
72 farinafa 1.20
73 spiga 1.34 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
74 spiga 1.51 self.manageCredential()
75 ewv 1.83
76     # check if it is the first submission
77     isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
78 farinafa 1.20
79 spiga 1.34 # standard submission to the server
80     self.performSubmission(isFirstSubmission)
81 ewv 1.83
82 spiga 1.79 stop = time.time()
83     common.logger.debug("Submission Time: "+str(stop - start))
84 spiga 1.87 #wmbs
85 ewv 1.104 if self.type == 0 : msg = 'Total of %d jobs submitted'%len(self.submitRange)
86 spiga 1.87 else: msg='Request submitted to the server.'
87 spiga 1.73 common.logger.info(msg)
88 ewv 1.104
89     if int(self.type)==1:
90 spiga 1.87 common._db.updateTask_({'jobType':'Submitted'})
91 farinafa 1.20 return
92 farinafa 1.32
93 spiga 1.102 def moveISB_SEAPI(self, listOffiles=[]):
94 farinafa 1.32 ## get task info from BL ##
95 spiga 1.73 common.logger.debug("Task name: " + self.taskuuid)
96 spiga 1.102 if len(listOffiles)>0:
97     isblist=listOffiles
98     else:
99     isblist = str(common._db.queryTask('globalSandbox')).split(',')
100 spiga 1.73 common.logger.debug("List of ISB files: " +str(isblist) )
101 ewv 1.83
102 farinafa 1.32 # init SE interface
103 spiga 1.73 common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...")
104 ewv 1.83 try:
105 spiga 1.33 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
106     except Exception, ex:
107 spiga 1.73 common.logger.debug(str(ex))
108 spiga 1.33 msg = "ERROR : Unable to create SE destination interface \n"
109     msg +="Project "+ self.taskuuid +" not Submitted \n"
110     raise CrabException(msg)
111 ewv 1.83
112     try:
113 spiga 1.33 loc = SElement("localhost", "local")
114     except Exception, ex:
115 spiga 1.73 common.logger.debug(str(ex))
116 spiga 1.33 msg = "ERROR : Unable to create SE source interface \n"
117     msg +="Project "+ self.taskuuid +" not Submitted \n"
118     raise CrabException(msg)
119    
120    
121     ### it should not be there... To move into SE API. DS
122 farinafa 1.32
123 mcinquil 1.89 # create remote dir for gsiftp
124 mcinquil 1.88 if self.storage_proto in ['gridftp','rfio','uberftp']:
125 farinafa 1.32 try:
126 ewv 1.104 action = SBinterface(seEl, logger = common.logger.logger)
127 spiga 1.61 action.createDir( self.remotedir )
128     except AlreadyExistsException, ex:
129 ewv 1.83 msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
130 spiga 1.61 msg +='\t%s'%str(ex)
131 spiga 1.73 common.logger.debug(msg)
132 spiga 1.61 except OperationException, ex:
133 spiga 1.73 common.logger.debug(str(ex.detail))
134 mcinquil 1.66 msg = "ERROR: Unable to create project destination on the Storage Element %s\n"%str(ex)
135     msg +="Project "+ self.taskuuid +" not Submitted \n"
136     raise CrabException(msg)
137     except AuthorizationException, ex:
138 spiga 1.73 common.logger.debug(str(ex.detail))
139 mcinquil 1.66 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
140 farinafa 1.32 msg +="Project "+ self.taskuuid +" not Submitted \n"
141     raise CrabException(msg)
142 slacapra 1.86 except TransferException, ex:
143     common.logger.debug(str(ex.detail))
144     msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
145     msg +="Project "+ self.taskuuid +" not Submitted \n"
146     raise CrabException(msg)
147 farinafa 1.32
148     ## copy ISB ##
149 ewv 1.104 sbi = SBinterface(loc, seEl, logger = common.logger.logger)
150 farinafa 1.32
151 mcinquil 1.89 # can copy a list of files
152     if self.storage_proto in ['globus']:
153     #print "[moveISB_SEAPI] doing globus-url-copy"
154 ewv 1.104 # construct a list of absolute paths of input files
155 mcinquil 1.89 # and the destinations to copy them to
156     sourcesList = []
157     destsList = []
158     for filetocopy in isblist:
159 ewv 1.104 sourcesList.append(os.path.abspath(filetocopy))
160 mcinquil 1.89 destsList.append(os.path.join(self.remotedir, os.path.basename(filetocopy)))
161    
162     # construct logging information
163     toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
164 ewv 1.104 common.logger.debug("Sending:\n " + toCopy)
165    
166     # try to do the copy
167 mcinquil 1.89 copy_res = None
168     try:
169 ewv 1.104 copy_res = sbi.copy( sourcesList, destsList, opt=self.copyTout)
170 mcinquil 1.89 except AuthorizationException, ex:
171 ewv 1.104 common.logger.debug(str(ex.detail))
172 mcinquil 1.89 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
173 ewv 1.104 msg +="Project "+ self.taskuuid +" not Submitted \n"
174 mcinquil 1.89 raise CrabException(msg)
175 ewv 1.104 except Exception, ex:
176     common.logger.debug(str(ex))
177 mcinquil 1.89 import traceback
178     common.logger.debug(str(traceback.format_exc()))
179     msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
180     msg +="Project "+ self.taskuuid +" not Submitted \n"
181 ewv 1.104 raise CrabException(msg)
182 mcinquil 1.89 if copy_res is None:
183     raise CrabException("Unkown Error: Unable to ship the project to the server!")
184     else:
185     ## evaluating copy results
186     copy_err_list = []
187     for ll in map(None, copy_res, sourcesList):
188     exitcode = int(ll[0][0])
189     if exitcode == 0:
190     pass
191     else:
192     copy_err_list.append( [ ll[1], ll[0][1] ] )
193     ## now raise an exception, but the submission could be retried
194     if len(copy_err_list) > 0:
195     msg = "ERROR : Unable to ship the project to the server\n"
196     for problem in copy_err_list:
197     msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
198     msg += "Project "+ self.taskuuid +" not Submitted \n"
199     raise CrabException(msg)
200 ewv 1.104
201     # cannot copy a list of files, need to do
202 mcinquil 1.89 # each file in turn
203 ewv 1.104 else:
204     for filetocopy in isblist:
205 mcinquil 1.89 source = os.path.abspath(filetocopy)
206 ewv 1.104 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
207 mcinquil 1.89 common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
208     try:
209     sbi.copy( source, dest, opt=self.copyTout)
210     except AuthorizationException, ex:
211 ewv 1.104 common.logger.debug(str(ex.detail))
212 mcinquil 1.89 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
213     msg +="Project "+ self.taskuuid +" not Submitted \n"
214     raise CrabException(msg)
215     except Exception, ex:
216     common.logger.debug(str(ex))
217     msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
218 ewv 1.104 msg +="Project "+ self.taskuuid +" not Submitted \n"
219 spiga 1.94 raise CrabException(msg)
220    
221 farinafa 1.32 ## if here then project submitted ##
222     msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
223 spiga 1.73 common.logger.debug(msg)
224 farinafa 1.32 return
225    
226 spiga 1.51
227 farinafa 1.90 def checkIfDrained(self, csCommunicator):
228     isDrained = False
229    
230     try:
231     drainStatus = csCommunicator.checkDrainMode(self.taskuuid)
232     drainStatus += "="*( len(drainStatus)%8 )
233     drainStatus = zlib.decompress( base64.urlsafe_b64decode(drainStatus) )
234     isDrained = ( drainStatus == "true")
235     except Exception, e:
236     common.logger.debug("Problem while checking server drain mode. The server will be assumed as available.")
237     common.logger.debug( traceback.format_exc() )
238     isDrained = False
239    
240     if isDrained == True:
241     msg = "Server is in Drain mode. Unable to submit new tasks."
242     raise CrabException(msg)
243    
244     return isDrained
245    
246 ewv 1.83 def manageCredential(self):
247 spiga 1.51 """
248 ewv 1.83 Prepare configuration and Call credential API
249 spiga 1.51 """
250 spiga 1.75 common.logger.info("Registering credential to the server : %s"%self.server_name)
251 farinafa 1.84
252 farinafa 1.95 try:
253     myproxyserver = Downloader("http://cmsdoc.cern.ch/cms/LCG/crab/config/").config("myproxy_server.conf")
254 farinafa 1.96 myproxyserver = myproxyserver.strip()
255 farinafa 1.95 if myproxyserver is None:
256     raise CrabException("myproxy_server.conf retrieved but empty")
257     except Exception, e:
258     common.logger.info("Problem setting myproxy server endpoint: using myproxy.cern.ch")
259     common.logger.debug(e)
260 ewv 1.104 myproxyserver = 'myproxy.cern.ch'
261    
262 farinafa 1.84 configAPI = {'credential' : self.credentialType, \
263     'myProxySvr' : myproxyserver,\
264     'serverDN' : self.server_dn,\
265     'shareDir' : common.work_space.shareDir() ,\
266     'userName' : getUserName(),\
267     'serverName' : self.server_name, \
268 riahi 1.103 'proxyPath' : self.proxy_path, \
269 farinafa 1.84 'logger' : common.logger() \
270     }
271    
272     try:
273 spiga 1.93 CredAPI = CredentialAPI( configAPI )
274 farinafa 1.84 except Exception, err :
275 spiga 1.93 common.logger.debug("Configuring Credential API: " +str(traceback.format_exc()))
276     raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
277 farinafa 1.84
278    
279 ewv 1.83 if self.credentialType == 'Proxy':
280 ewv 1.104 # Proxy delegation through MyProxy, 4 days lifetime minimum
281 spiga 1.93 if not CredAPI.checkMyProxy(Time=4, checkRetrieverRenewer=True) :
282 farinafa 1.84 common.logger.info("Please renew MyProxy delegated proxy:\n")
283     try:
284     CredAPI.credObj.serverDN = self.server_dn
285     CredAPI.ManualRenewMyProxy()
286     except Exception, ex:
287     common.logger.debug("Delegating Credentials to MyProxy : " +str(traceback.format_exc()))
288     raise CrabException(str(ex))
289 spiga 1.57 else:
290 ewv 1.104 if not CredAPI.checkMyProxy(Time=100):
291 spiga 1.73 common.logger.info("Please renew the token:\n")
292 spiga 1.65 try:
293     CredAPI.ManualRenewCredential()
294     except Exception, ex:
295     raise CrabException(str(ex))
296 spiga 1.93 try:
297     dict = CredAPI.registerCredential()
298     except Exception, err:
299     common.logger.debug("Registering Credentials : " +str(traceback.format_exc()))
300     raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
301 farinafa 1.49
302 spiga 1.73 common.logger.info("Credential successfully delegated to the server.\n")
303 farinafa 1.49 return
304 farinafa 1.20
305     def performSubmission(self, firstSubmission=True):
306     # create the communication session with the server frontend
307 spiga 1.36 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
308 farinafa 1.20 subOutcome = 0
309    
310 spiga 1.101 TotJob = common._db.nJobs()
311 farinafa 1.20 # transfer remote dir to server
312     self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
313    
314     if firstSubmission==True:
315 farinafa 1.90 # check if the server is in drain mode
316     if self.checkIfDrained(csCommunicator)==True:
317     return
318 ewv 1.83
319 spiga 1.35 # move the sandbox
320 ewv 1.83 self.moveISB_SEAPI()
321 spiga 1.35
322 farinafa 1.20 # first time submit
323 spiga 1.101 taskXML= self.serialize()
324 farinafa 1.23
325 ewv 1.83 # TODO fix not needed first field
326 spiga 1.87 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob,taskType=self.taskType)
327 farinafa 1.8 else:
328 farinafa 1.20 # subsequent submissions and resubmit
329 mcinquil 1.72 self.stateChange( self.submitRange, "SubRequested" )
330 mcinquil 1.71
331 spiga 1.101 if self.extended==1:
332 spiga 1.102 # update the Arguments XML file
333     argsXML = common.work_space.shareDir()+'arguments.xml'
334     self.moveISB_SEAPI([argsXML])
335 spiga 1.101 taskXML= self.serialize()
336     subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob,taskType='extended')
337 ewv 1.104 else:
338 spiga 1.101 try:
339     subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
340     except Exception, ex: ##change to specific exception
341     ## clean sub. requested status
342     self.stateChange( self.submitRange, "Created" )
343 spiga 1.3
344 farinafa 1.20 if subOutcome != 0:
345     msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
346 mcinquil 1.72 self.stateChange( self.submitRange, "Created" )
347 ewv 1.83 common.logger.debug(msg)
348 spiga 1.77 raise CrabException('ERROR Jobs NOT submitted.')
349 farinafa 1.6
350 farinafa 1.30 del csCommunicator
351    
352 farinafa 1.31 return
353 farinafa 1.6
354 spiga 1.101 def serialize(self):
355     taskXML=''
356     try:
357     self.stateChange( self.submitRange, "SubRequested" )
358     taskXML += common._db.serializeTask( common._db.getTask() )
359     common.logger.debug(taskXML)
360     except Exception, e:
361     self.stateChange( self.submitRange, "Created" )
362     msg = "BossLite ERROR: Unable to serialize task object\n"
363     msg +="Project "+str(self.taskuuid)+" not Submitted \n"
364     msg += str(e)
365     raise CrabException(msg)
366 ewv 1.104 return taskXML
367 mcinquil 1.56
368 mcinquil 1.71 def markSubmitting(self):
369 mcinquil 1.56 """
370     _markSubmitting_
371     sign local db for jobs sent -submitted- to the server
372     (just for the first submission)
373     """
374 spiga 1.73 common.logger.debug("Updating submitting jobs %s"%str(self.submitRange))
375 mcinquil 1.56 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
376     common._db.updateRunJob_(self.submitRange, updlist)
377    
378 mcinquil 1.71