ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.63
Committed: Fri Dec 5 16:54:35 2008 UTC (16 years, 4 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3_pre7
Changes since 1.62: +1 -1 lines
Log Message:
typo

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands, traceback
8 import zlib
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 from ProdCommon.Storage.SEAPI.Exceptions import *
16
17
18 class SubmitterServer( Submitter ):
19 def __init__(self, cfg_params, parsed_range, val):
20 self.srvCfg = {}
21 self.cfg_params = cfg_params
22 self.submitRange = []
23 self.credentialType = 'Proxy'
24 if common.scheduler.name().upper() in ['LSF', 'CAF']:
25 self.credentialType = 'Token'
26
27 Submitter.__init__(self, cfg_params, parsed_range, val)
28
29 # init client server params...
30 CliServerParams(self)
31
32 # path fix
33 if self.storage_path[0]!='/':
34 self.storage_path = '/'+self.storage_path
35
36 self.taskuuid = str(common._db.queryTask('name'))
37
38 return
39
40 def run(self):
41 """
42 The main method of the class: submit jobs in range self.nj_list
43 """
44 common.logger.debug(5, "SubmitterServer::run() called")
45
46 self.submitRange = self.nj_list
47
48 check = self.checkIfCreate()
49
50 if check == 0 :
51
52 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
53 self.manageCredential()
54
55 # check if it is the first submission
56 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
57
58 # standard submission to the server
59 self.performSubmission(isFirstSubmission)
60
61 msg = '\nTotal of %d jobs submitted'%len(self.submitRange)
62 common.logger.message(msg)
63
64 return
65
66 def moveISB_SEAPI(self):
67 ## get task info from BL ##
68 common.logger.debug(3, "Task name: " + self.taskuuid)
69 isblist = common._db.queryTask('globalSandbox').split(',')
70 common.logger.debug(3, "List of ISB files: " +str(isblist) )
71
72 # init SE interface
73 common.logger.message("Starting sending the project to the storage "+str(self.storage_name)+"...")
74 try:
75 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
76 except Exception, ex:
77 common.logger.debug(1, str(ex))
78 msg = "ERROR : Unable to create SE destination interface \n"
79 msg +="Project "+ self.taskuuid +" not Submitted \n"
80 raise CrabException(msg)
81
82 try:
83 loc = SElement("localhost", "local")
84 except Exception, ex:
85 common.logger.debug(1, str(ex))
86 msg = "ERROR : Unable to create SE source interface \n"
87 msg +="Project "+ self.taskuuid +" not Submitted \n"
88 raise CrabException(msg)
89
90
91 ### it should not be there... To move into SE API. DS
92
93 # create remote dir for gsiftp
94 if self.storage_proto in ['gridftp','rfio']:
95 try:
96 action = SBinterface( seEl )
97 action.createDir( self.remotedir )
98 except AlreadyExistsException, ex:
99 msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
100 msg +='\t%s'%str(ex)
101 common.logger.debug(1, msg)
102 except OperationException, ex:
103 common.logger.debug(1, str(ex))
104 msg = "ERROR : Unable to create project destination on the Storage Element \n"
105 msg +="Project "+ self.taskuuid +" not Submitted \n"
106 raise CrabException(msg)
107 if self.storage_proto == 'rfio':
108 opt = '777' # REMOVE me
109 try:
110 action.setGrant( self.remotedir, opt)
111 except Exception, ex:
112 common.logger.debug(1, str(ex))
113 msg = "ERROR : Unable to change permission on the Storage Element \n"
114 msg +="Project "+ self.taskuuid +" not Submitted \n"
115 raise CrabException(msg)
116
117 ## copy ISB ##
118 sbi = SBinterface( loc, seEl )
119
120 for filetocopy in isblist:
121 source = os.path.abspath(filetocopy)
122 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
123 common.logger.debug(1, "Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
124 try:
125 sbi.copy( source, dest)
126 except Exception, ex:
127 common.logger.debug(1, str(ex))
128 msg = "ERROR : Unable to ship the project to the server \n"
129 msg +="Project "+ self.taskuuid +" not Submitted \n"
130 raise CrabException(msg)
131
132 ## if here then project submitted ##
133 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
134 common.logger.debug(3,msg)
135 return
136
137
138 def manageCredential(self):
139 """
140 Prepare configuration and Call credential API
141 """
142 common.logger.message("Registering credential to the server")
143 # only for temporary back-comp.
144 if self.credentialType == 'Proxy':
145 # for proxy all works as before....
146 self.moveProxy()
147 # myProxyMoveProxy() # check within the API ( Proxy.py )
148 else:
149 from ProdCommon.Credential.CredentialAPI import CredentialAPI
150 myproxyserver = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
151 configAPI = {'credential' : self.credentialType, \
152 'myProxySvr' : myproxyserver,\
153 'serverDN' : self.server_dn,\
154 'shareDir' : common.work_space.shareDir() ,\
155 'userName' : UnixUserName(),\
156 'serverName' : self.server_name \
157 }
158 try:
159 CredAPI = CredentialAPI( configAPI )
160 except Exception, err :
161 common.logger.debug(3, "Configuring Credential API: " +str(traceback.format_exc()))
162 raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
163 try:
164 dict = CredAPI.registerCredential('submit')
165 except Exception, err:
166 common.logger.debug(3, "Registering Credentials : " +str(traceback.format_exc()))
167 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
168 self.cfg_params['EDG.proxyInfos'] = dict
169
170 common.logger.message("Credential successfully delegated to the server.\n")
171 return
172 # TO REMOVE
173 def moveProxy( self ):
174 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
175 ## Temporary... to remove soon
176 common.scheduler.checkProxy(minTime=100)
177 try:
178 common.logger.message("Registering a valid proxy to the server:")
179 flag = " --myproxy"
180 cmd = 'asap-user-register --server '+str(self.server_name) + flag
181 attempt = 3
182 while attempt:
183 common.logger.debug(3, " executing:\n " + cmd)
184 status, outp = commands.getstatusoutput(cmd)
185 common.logger.debug(3, outp)
186 if status == 0:
187 break
188 else:
189 attempt = attempt - 1
190 if (attempt == 0):
191 raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
192 except:
193 msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
194 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
195 raise CrabException(msg)
196 return
197
198 def performSubmission(self, firstSubmission=True):
199 # create the communication session with the server frontend
200 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
201 taskXML = ''
202 subOutcome = 0
203
204 # transfer remote dir to server
205 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
206
207 if firstSubmission==True:
208
209 TotJob = common._db.nJobs()
210 # move the sandbox
211 self.moveISB_SEAPI()
212
213 # first time submit
214 try:
215 taskXML += common._db.serializeTask( common._db.getTask() )
216 common.logger.debug(5, taskXML)
217 except Exception, e:
218 msg = "BossLite ERROR: Unable to serialize task object\n"
219 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
220 msg += str(e)
221 raise CrabException(msg)
222
223 # TODO fix not needed first field
224 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
225 else:
226 # subsequent submissions and resubmit
227 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
228
229 if subOutcome != 0:
230 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
231 raise CrabException(msg)
232 elif firstSubmission is True:
233 self.markSubmitting(firstSubmission)
234
235 del csCommunicator
236
237 return
238
239
240 def markSubmitting(self, firstSubmission):
241 """
242 _markSubmitting_
243 sign local db for jobs sent -submitted- to the server
244 (just for the first submission)
245 """
246 common.logger.debug(4, "Updating submitting jobs %s"%str(self.submitRange))
247 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
248 common._db.updateRunJob_(self.submitRange, updlist)
249