ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.58
Committed: Tue Dec 2 15:34:34 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3_pre5
Changes since 1.57: +4 -3 lines
Log Message:
Fix path for import credential API. minor change

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands, traceback
8 import zlib
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15
16
17 class SubmitterServer( Submitter ):
18 def __init__(self, cfg_params, parsed_range, val):
19 self.srvCfg = {}
20 self.cfg_params = cfg_params
21 self.submitRange = []
22 self.credentialType = 'Proxy'
23 if common.scheduler.name().upper() in ['LSF', 'CAF']:
24 self.credentialType = 'Token'
25
26 Submitter.__init__(self, cfg_params, parsed_range, val)
27
28 # init client server params...
29 CliServerParams(self)
30
31 # path fix
32 if self.storage_path[0]!='/':
33 self.storage_path = '/'+self.storage_path
34
35 self.taskuuid = str(common._db.queryTask('name'))
36
37 return
38
39 def run(self):
40 """
41 The main method of the class: submit jobs in range self.nj_list
42 """
43 common.logger.debug(5, "SubmitterServer::run() called")
44
45 self.submitRange = self.nj_list
46
47 check = self.checkIfCreate()
48
49 if check == 0 :
50
51 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
52 self.manageCredential()
53
54 # check if it is the first submission
55 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
56
57 # standard submission to the server
58 self.performSubmission(isFirstSubmission)
59
60 msg = '\nTotal of %d jobs submitted'%len(self.submitRange)
61 common.logger.message(msg)
62
63 return
64
65 def moveISB_SEAPI(self):
66 ## get task info from BL ##
67 common.logger.debug(3, "Task name: " + self.taskuuid)
68 isblist = common._db.queryTask('globalSandbox').split(',')
69 common.logger.debug(3, "List of ISB files: " +str(isblist) )
70
71 # init SE interface
72 common.logger.message("Starting sending the project to the storage "+str(self.storage_name)+"...")
73 try:
74 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
75 except Exception, ex:
76 common.logger.debug(1, str(ex))
77 msg = "ERROR : Unable to create SE destination interface \n"
78 msg +="Project "+ self.taskuuid +" not Submitted \n"
79 raise CrabException(msg)
80
81 try:
82 loc = SElement("localhost", "local")
83 except Exception, ex:
84 common.logger.debug(1, str(ex))
85 msg = "ERROR : Unable to create SE source interface \n"
86 msg +="Project "+ self.taskuuid +" not Submitted \n"
87 raise CrabException(msg)
88
89
90 ### it should not be there... To move into SE API. DS
91
92 # create remote dir for gsiftp
93 opt = ''
94 if self.storage_proto in ['gridftp','rfio']:
95 if self.storage_proto == 'rfio': opt = '777' # REMOVE me
96 try:
97 action = SBinterface( seEl )
98 action.createDir( self.remotedir, opt )
99 except Exception, ex:
100 common.logger.debug(1, str(ex))
101 msg = "ERROR : Unable to create project destination on the Storage Element \n"
102 msg +="Project "+ self.taskuuid +" not Submitted \n"
103 raise CrabException(msg)
104
105 ## copy ISB ##
106 sbi = SBinterface( loc, seEl )
107
108 for filetocopy in isblist:
109 source = os.path.abspath(filetocopy)
110 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
111 common.logger.debug(1, "Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
112 try:
113 sbi.copy( source, dest)
114 except Exception, ex:
115 common.logger.debug(1, str(ex))
116 msg = "ERROR : Unable to ship the project to the server \n"
117 msg +="Project "+ self.taskuuid +" not Submitted \n"
118 raise CrabException(msg)
119
120 ## if here then project submitted ##
121 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
122 common.logger.debug(3,msg)
123 return
124
125
126 def manageCredential(self):
127 """
128 Prepare configuration and Call credential API
129 """
130 common.logger.message("Registering credential to the server")
131 # only for temporary back-comp.
132 if self.credentialType == 'Proxy':
133 # for proxy all works as before....
134 self.moveProxy()
135 # myProxyMoveProxy() # check within the API ( Proxy.py )
136 else:
137 #from ProdCommon.Credential.CredentialAPI import CredentialAPI
138 from CredentialAPI import CredentialAPI
139 myproxyserver = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
140 configAPI = {'credential' : self.credentialType, \
141 'myProxySvr' : myproxyserver,\
142 'serverDN' : self.server_dn,\
143 'shareDir' : common.work_space.shareDir() ,\
144 'userName' : UnixUserName(),\
145 'serverName' : self.server_name \
146 }
147 try:
148 CredAPI = CredentialAPI( configAPI )
149 except Exception, err :
150 common.logger.debug(3, "Configuring Credential API: " +str(traceback.format_exc()))
151 raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
152 try:
153 dict = CredAPI.registerCredential('submit')
154 except Exception, err:
155 common.logger.debug(3, "Configuring Credential API: " +str(traceback.format_exc()))
156 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
157 self.cfg_params['EDG.proxyInfos'] = dict
158
159 common.logger.message("Credential successfully delegated to the server.\n")
160 return
161 # TO REMOVE
162 def moveProxy( self ):
163 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
164 common.scheduler.checkProxy(Time=100)
165 try:
166 common.logger.message("Registering a valid proxy to the server:")
167 flag = " --myproxy"
168 cmd = 'asap-user-register --server '+str(self.server_name) + flag
169 attempt = 3
170 while attempt:
171 common.logger.debug(3, " executing:\n " + cmd)
172 status, outp = commands.getstatusoutput(cmd)
173 common.logger.debug(3, outp)
174 if status == 0:
175 break
176 else:
177 attempt = attempt - 1
178 if (attempt == 0):
179 raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
180 except:
181 msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
182 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
183 raise CrabException(msg)
184 return
185
186 def performSubmission(self, firstSubmission=True):
187 # create the communication session with the server frontend
188 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
189 taskXML = ''
190 subOutcome = 0
191
192 # transfer remote dir to server
193 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
194
195 if firstSubmission==True:
196
197 TotJob = common._db.nJobs()
198 # move the sandbox
199 self.moveISB_SEAPI()
200
201 # first time submit
202 try:
203 taskXML += common._db.serializeTask( common._db.getTask() )
204 common.logger.debug(5, taskXML)
205 except Exception, e:
206 msg = "BossLite ERROR: Unable to serialize task object\n"
207 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
208 msg += str(e)
209 raise CrabException(msg)
210
211 # TODO fix not needed first field
212 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
213 else:
214 # subsequent submissions and resubmit
215 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
216
217 if subOutcome != 0:
218 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
219 raise CrabException(msg)
220 elif firstSubmission is True:
221 self.markSubmitting(firstSubmission)
222
223 del csCommunicator
224
225 return
226
227
228 def markSubmitting(self, firstSubmission):
229 """
230 _markSubmitting_
231 sign local db for jobs sent -submitted- to the server
232 (just for the first submission)
233 """
234 common.logger.debug(4, "Updating submitting jobs %s"%str(self.submitRange))
235 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
236 common._db.updateRunJob_(self.submitRange, updlist)
237