ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.83
Committed: Mon Aug 17 18:42:22 2009 UTC (15 years, 8 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: test_1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1
Branch point for: CRAB_2_6_X_br
Changes since 1.82: +42 -42 lines
Log Message:
Move from limiting # of created jobs to # of submitted jobs

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands, traceback
8 import zlib
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 from ProdCommon.Storage.SEAPI.Exceptions import *
16
17
18 class SubmitterServer( Submitter ):
19 def __init__(self, cfg_params, parsed_range, val):
20 self.srvCfg = {}
21 self.cfg_params = cfg_params
22 self.submitRange = []
23 self.credentialType = 'Proxy'
24 self.copyTout= ' -t 600 '
25 if common.scheduler.name().upper() in ['LSF', 'CAF']:
26 self.credentialType = 'Token'
27 self.copyTout= ' '
28
29 Submitter.__init__(self, cfg_params, parsed_range, val)
30
31 # init client server params...
32 CliServerParams(self)
33
34 # path fix
35 if self.storage_path[0]!='/':
36 self.storage_path = '/'+self.storage_path
37
38 self.taskuuid = str(common._db.queryTask('name'))
39 self.limitJobs = False
40
41 return
42
43 def run(self):
44 """
45 The main method of the class: submit jobs in range self.nj_list
46 """
47 common.logger.debug("SubmitterServer::run() called")
48
49 start = time.time()
50
51 self.BuildJobList()
52
53 self.submitRange = self.nj_list
54
55 check = self.checkIfCreate()
56
57 if check == 0 :
58
59 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
60 self.manageCredential()
61
62 # check if it is the first submission
63 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
64
65 # standard submission to the server
66 self.performSubmission(isFirstSubmission)
67
68 stop = time.time()
69 common.logger.debug("Submission Time: "+str(stop - start))
70
71 msg = 'Total of %d jobs submitted'%len(self.submitRange)
72 common.logger.info(msg)
73
74 return
75
76 def moveISB_SEAPI(self):
77 ## get task info from BL ##
78 common.logger.debug("Task name: " + self.taskuuid)
79 isblist = common._db.queryTask('globalSandbox').split(',')
80 common.logger.debug("List of ISB files: " +str(isblist) )
81
82 # init SE interface
83 common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...")
84 try:
85 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
86 except Exception, ex:
87 common.logger.debug(str(ex))
88 msg = "ERROR : Unable to create SE destination interface \n"
89 msg +="Project "+ self.taskuuid +" not Submitted \n"
90 raise CrabException(msg)
91
92 try:
93 loc = SElement("localhost", "local")
94 except Exception, ex:
95 common.logger.debug(str(ex))
96 msg = "ERROR : Unable to create SE source interface \n"
97 msg +="Project "+ self.taskuuid +" not Submitted \n"
98 raise CrabException(msg)
99
100
101 ### it should not be there... To move into SE API. DS
102
103 # create remote dir for gsiftp
104 if self.storage_proto in ['gridftp','rfio']:
105 try:
106 action = SBinterface( seEl )
107 action.createDir( self.remotedir )
108 except AlreadyExistsException, ex:
109 msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
110 msg +='\t%s'%str(ex)
111 common.logger.debug(msg)
112 except OperationException, ex:
113 common.logger.debug(str(ex.detail))
114 msg = "ERROR: Unable to create project destination on the Storage Element %s\n"%str(ex)
115 msg +="Project "+ self.taskuuid +" not Submitted \n"
116 raise CrabException(msg)
117 except AuthorizationException, ex:
118 common.logger.debug(str(ex.detail))
119 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
120 msg +="Project "+ self.taskuuid +" not Submitted \n"
121 raise CrabException(msg)
122
123 ## copy ISB ##
124 sbi = SBinterface( loc, seEl )
125
126 for filetocopy in isblist:
127 source = os.path.abspath(filetocopy)
128 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
129 common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
130 try:
131 sbi.copy( source, dest, opt=self.copyTout)
132 except AuthorizationException, ex:
133 common.logger.debug(str(ex.detail))
134 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
135 msg +="Project "+ self.taskuuid +" not Submitted \n"
136 raise CrabException(msg)
137 except Exception, ex:
138 common.logger.debug(str(ex))
139 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
140 msg +="Project "+ self.taskuuid +" not Submitted \n"
141 raise CrabException(msg)
142
143 ## if here then project submitted ##
144 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
145 common.logger.debug(msg)
146 return
147
148
149 def manageCredential(self):
150 """
151 Prepare configuration and Call credential API
152 """
153 common.logger.info("Registering credential to the server : %s"%self.server_name)
154 # only for temporary back-comp.
155 if self.credentialType == 'Proxy':
156 # for proxy all works as before....
157 self.moveProxy()
158 # myProxyMoveProxy() # check within the API ( Proxy.py )
159 else:
160 from ProdCommon.Credential.CredentialAPI import CredentialAPI
161 myproxyserver = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
162 configAPI = {'credential' : self.credentialType, \
163 'myProxySvr' : myproxyserver,\
164 'serverDN' : self.server_dn,\
165 'shareDir' : common.work_space.shareDir() ,\
166 'userName' : getUserName(),\
167 'serverName' : self.server_name, \
168 'logger' : common.logger() \
169 }
170 try:
171 CredAPI = CredentialAPI( configAPI )
172 except Exception, err :
173 common.logger.debug("Configuring Credential API: " +str(traceback.format_exc()))
174 raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
175 if not CredAPI.checkCredential(Time=12) :
176 common.logger.info("Please renew the token:\n")
177 try:
178 CredAPI.ManualRenewCredential()
179 except Exception, ex:
180 raise CrabException(str(ex))
181
182 try:
183 dict = CredAPI.registerCredential()
184 except Exception, err:
185 common.logger.debug("Registering Credentials : " +str(traceback.format_exc()))
186 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
187 self.cfg_params['GRID.proxyInfos'] = dict
188
189 common.logger.info("Credential successfully delegated to the server.\n")
190 return
191 # TO REMOVE
192 def moveProxy( self ):
193 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
194 ## Temporary... to remove soon
195 common.scheduler.checkProxy(minTime=100)
196 try:
197 common.logger.debug("Registering a valid proxy to the server:")
198 flag = " --myproxy"
199 cmd = 'asap-user-register --server '+str(self.server_name) + flag
200 attempt = 3
201 while attempt:
202 common.logger.debug(" executing:\n " + cmd)
203 status, outp = commands.getstatusoutput(cmd)
204 common.logger.debug(outp)
205 if status == 0:
206 break
207 else:
208 attempt = attempt - 1
209 if (attempt == 0):
210 raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
211 except:
212 msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
213 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
214 raise CrabException(msg)
215 return
216
217 def performSubmission(self, firstSubmission=True):
218 # create the communication session with the server frontend
219 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
220 taskXML = ''
221 subOutcome = 0
222
223 # transfer remote dir to server
224 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
225
226 if firstSubmission==True:
227
228 TotJob = common._db.nJobs()
229 # move the sandbox
230 self.moveISB_SEAPI()
231
232 # first time submit
233 try:
234 self.stateChange( self.submitRange, "SubRequested" )
235 taskXML += common._db.serializeTask( common._db.getTask() )
236 common.logger.debug(taskXML)
237 except Exception, e:
238 self.stateChange( self.submitRange, "Created" )
239 msg = "BossLite ERROR: Unable to serialize task object\n"
240 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
241 msg += str(e)
242 raise CrabException(msg)
243
244 # TODO fix not needed first field
245 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
246 else:
247 # subsequent submissions and resubmit
248 self.stateChange( self.submitRange, "SubRequested" )
249 try:
250 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
251 except Exception, ex: ##change to specific exception
252 ## clean sub. requested status
253 self.stateChange( self.submitRange, "Created" )
254
255
256 if subOutcome != 0:
257 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
258 self.stateChange( self.submitRange, "Created" )
259 common.logger.debug(msg)
260 raise CrabException('ERROR Jobs NOT submitted.')
261
262 del csCommunicator
263
264 return
265
266
267 def markSubmitting(self):
268 """
269 _markSubmitting_
270 sign local db for jobs sent -submitted- to the server
271 (just for the first submission)
272 """
273 common.logger.debug("Updating submitting jobs %s"%str(self.submitRange))
274 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
275 common._db.updateRunJob_(self.submitRange, updlist)
276
277