ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.78
Committed: Fri May 29 12:53:36 2009 UTC (15 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0_pre8, CRAB_2_6_0_pre7, CRAB_2_6_0_pre6
Changes since 1.77: +1 -1 lines
Log Message:
print format

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands, traceback
8 import zlib
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 from ProdCommon.Storage.SEAPI.Exceptions import *
16
17
18 class SubmitterServer( Submitter ):
19 def __init__(self, cfg_params, parsed_range, val):
20 self.srvCfg = {}
21 self.cfg_params = cfg_params
22 self.submitRange = []
23 self.credentialType = 'Proxy'
24 if common.scheduler.name().upper() in ['LSF', 'CAF']:
25 self.credentialType = 'Token'
26
27 Submitter.__init__(self, cfg_params, parsed_range, val)
28
29 # init client server params...
30 CliServerParams(self)
31
32 # path fix
33 if self.storage_path[0]!='/':
34 self.storage_path = '/'+self.storage_path
35
36 self.taskuuid = str(common._db.queryTask('name'))
37
38 return
39
40 def run(self):
41 """
42 The main method of the class: submit jobs in range self.nj_list
43 """
44 common.logger.debug("SubmitterServer::run() called")
45
46 self.submitRange = self.nj_list
47
48 check = self.checkIfCreate()
49
50 if check == 0 :
51
52 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
53 self.manageCredential()
54
55 # check if it is the first submission
56 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
57
58 # standard submission to the server
59 self.performSubmission(isFirstSubmission)
60
61 msg = 'Total of %d jobs submitted'%len(self.submitRange)
62 common.logger.info(msg)
63
64 return
65
66 def moveISB_SEAPI(self):
67 ## get task info from BL ##
68 common.logger.debug("Task name: " + self.taskuuid)
69 isblist = common._db.queryTask('globalSandbox').split(',')
70 common.logger.debug("List of ISB files: " +str(isblist) )
71
72 # init SE interface
73 common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...")
74 try:
75 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
76 except Exception, ex:
77 common.logger.debug(str(ex))
78 msg = "ERROR : Unable to create SE destination interface \n"
79 msg +="Project "+ self.taskuuid +" not Submitted \n"
80 raise CrabException(msg)
81
82 try:
83 loc = SElement("localhost", "local")
84 except Exception, ex:
85 common.logger.debug(str(ex))
86 msg = "ERROR : Unable to create SE source interface \n"
87 msg +="Project "+ self.taskuuid +" not Submitted \n"
88 raise CrabException(msg)
89
90
91 ### it should not be there... To move into SE API. DS
92
93 # create remote dir for gsiftp
94 if self.storage_proto in ['gridftp','rfio']:
95 try:
96 action = SBinterface( seEl )
97 action.createDir( self.remotedir )
98 except AlreadyExistsException, ex:
99 msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
100 msg +='\t%s'%str(ex)
101 common.logger.debug(msg)
102 except OperationException, ex:
103 common.logger.debug(str(ex.detail))
104 msg = "ERROR: Unable to create project destination on the Storage Element %s\n"%str(ex)
105 msg +="Project "+ self.taskuuid +" not Submitted \n"
106 raise CrabException(msg)
107 except AuthorizationException, ex:
108 common.logger.debug(str(ex.detail))
109 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
110 msg +="Project "+ self.taskuuid +" not Submitted \n"
111 raise CrabException(msg)
112
113 ## copy ISB ##
114 sbi = SBinterface( loc, seEl )
115
116 for filetocopy in isblist:
117 source = os.path.abspath(filetocopy)
118 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
119 common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
120 try:
121 sbi.copy( source, dest)
122 except AuthorizationException, ex:
123 common.logger.debug(str(ex.detail))
124 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
125 msg +="Project "+ self.taskuuid +" not Submitted \n"
126 raise CrabException(msg)
127 except Exception, ex:
128 common.logger.debug(str(ex))
129 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
130 msg +="Project "+ self.taskuuid +" not Submitted \n"
131 raise CrabException(msg)
132
133 ## if here then project submitted ##
134 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
135 common.logger.debug(msg)
136 return
137
138
139 def manageCredential(self):
140 """
141 Prepare configuration and Call credential API
142 """
143 common.logger.info("Registering credential to the server : %s"%self.server_name)
144 # only for temporary back-comp.
145 if self.credentialType == 'Proxy':
146 # for proxy all works as before....
147 self.moveProxy()
148 # myProxyMoveProxy() # check within the API ( Proxy.py )
149 else:
150 from ProdCommon.Credential.CredentialAPI import CredentialAPI
151 myproxyserver = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
152 configAPI = {'credential' : self.credentialType, \
153 'myProxySvr' : myproxyserver,\
154 'serverDN' : self.server_dn,\
155 'shareDir' : common.work_space.shareDir() ,\
156 'userName' : getUserName(),\
157 'serverName' : self.server_name \
158 }
159 try:
160 CredAPI = CredentialAPI( configAPI )
161 except Exception, err :
162 common.logger.debug("Configuring Credential API: " +str(traceback.format_exc()))
163 raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
164 if not CredAPI.checkCredential(Time=12) :
165 common.logger.info("Please renew the token:\n")
166 try:
167 CredAPI.ManualRenewCredential()
168 except Exception, ex:
169 raise CrabException(str(ex))
170
171 try:
172 dict = CredAPI.registerCredential()
173 except Exception, err:
174 common.logger.debug("Registering Credentials : " +str(traceback.format_exc()))
175 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
176 self.cfg_params['GRID.proxyInfos'] = dict
177
178 common.logger.info("Credential successfully delegated to the server.\n")
179 return
180 # TO REMOVE
181 def moveProxy( self ):
182 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
183 ## Temporary... to remove soon
184 common.scheduler.checkProxy(minTime=100)
185 try:
186 common.logger.debug("Registering a valid proxy to the server:")
187 flag = " --myproxy"
188 cmd = 'asap-user-register --server '+str(self.server_name) + flag
189 attempt = 3
190 while attempt:
191 common.logger.debug(" executing:\n " + cmd)
192 status, outp = commands.getstatusoutput(cmd)
193 common.logger.debug(outp)
194 if status == 0:
195 break
196 else:
197 attempt = attempt - 1
198 if (attempt == 0):
199 raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
200 except:
201 msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
202 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
203 raise CrabException(msg)
204 return
205
206 def performSubmission(self, firstSubmission=True):
207 # create the communication session with the server frontend
208 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
209 taskXML = ''
210 subOutcome = 0
211
212 # transfer remote dir to server
213 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
214
215 if firstSubmission==True:
216
217 TotJob = common._db.nJobs()
218 # move the sandbox
219 self.moveISB_SEAPI()
220
221 # first time submit
222 try:
223 self.stateChange( self.submitRange, "SubRequested" )
224 taskXML += common._db.serializeTask( common._db.getTask() )
225 common.logger.debug(taskXML)
226 except Exception, e:
227 self.stateChange( self.submitRange, "Created" )
228 msg = "BossLite ERROR: Unable to serialize task object\n"
229 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
230 msg += str(e)
231 raise CrabException(msg)
232
233 # TODO fix not needed first field
234 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
235 else:
236 # subsequent submissions and resubmit
237 self.stateChange( self.submitRange, "SubRequested" )
238 try:
239 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
240 except Exception, ex: ##change to specific exception
241 ## clean sub. requested status
242 self.stateChange( self.submitRange, "Created" )
243
244
245 if subOutcome != 0:
246 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
247 self.stateChange( self.submitRange, "Created" )
248 common.logger.debug(msg)
249 raise CrabException('ERROR Jobs NOT submitted.')
250
251 del csCommunicator
252
253 return
254
255
256 def markSubmitting(self):
257 """
258 _markSubmitting_
259 sign local db for jobs sent -submitted- to the server
260 (just for the first submission)
261 """
262 common.logger.debug("Updating submitting jobs %s"%str(self.submitRange))
263 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
264 common._db.updateRunJob_(self.submitRange, updlist)
265
266