ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.68
Committed: Fri Feb 6 18:05:47 2009 UTC (16 years, 2 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_5_0_pre5, CRAB_2_5_0_pre4, CRAB_2_5_0_pre3, CRAB_2_5_0_pre2
Changes since 1.67: +1 -1 lines
Log Message:
adapt to CredentialApi changes

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     from crab_util import *
3 spiga 1.3 import common
4 spiga 1.1 from ApmonIf import ApmonIf
5 corvo 1.2
6 farinafa 1.23 import os, errno, time, sys, re
7 farinafa 1.48 import commands, traceback
8 farinafa 1.23 import zlib
9 farinafa 1.10
10 spiga 1.34 from Submitter import Submitter
11 farinafa 1.20 from ServerCommunicator import ServerCommunicator
12 farinafa 1.32
13     from ProdCommon.Storage.SEAPI.SElement import SElement
14     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 spiga 1.61 from ProdCommon.Storage.SEAPI.Exceptions import *
16 spiga 1.51
17 farinafa 1.24
18 spiga 1.34 class SubmitterServer( Submitter ):
19 farinafa 1.29 def __init__(self, cfg_params, parsed_range, val):
20 farinafa 1.24 self.srvCfg = {}
21     self.cfg_params = cfg_params
22 farinafa 1.29 self.submitRange = []
23 spiga 1.51 self.credentialType = 'Proxy'
24     if common.scheduler.name().upper() in ['LSF', 'CAF']:
25     self.credentialType = 'Token'
26 mcinquil 1.56
27 spiga 1.34 Submitter.__init__(self, cfg_params, parsed_range, val)
28 spiga 1.36
29     # init client server params...
30     CliServerParams(self)
31 farinafa 1.6
32 farinafa 1.24 # path fix
33     if self.storage_path[0]!='/':
34     self.storage_path = '/'+self.storage_path
35 mcinquil 1.43
36     self.taskuuid = str(common._db.queryTask('name'))
37 spiga 1.55
38 farinafa 1.20 return
39 spiga 1.1
40     def run(self):
41 farinafa 1.20 """
42     The main method of the class: submit jobs in range self.nj_list
43     """
44 spiga 1.34 common.logger.debug(5, "SubmitterServer::run() called")
45    
46     self.submitRange = self.nj_list
47 spiga 1.55
48 spiga 1.34 check = self.checkIfCreate()
49 farinafa 1.31
50 spiga 1.34 if check == 0 :
51 farinafa 1.20
52 spiga 1.34 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
53 spiga 1.51 self.manageCredential()
54 spiga 1.35
55     # check if it is the first submission
56 spiga 1.39 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
57 farinafa 1.20
58 spiga 1.34 # standard submission to the server
59     self.performSubmission(isFirstSubmission)
60    
61     msg = '\nTotal of %d jobs submitted'%len(self.submitRange)
62     common.logger.message(msg)
63    
64 farinafa 1.20 return
65 farinafa 1.32
66     def moveISB_SEAPI(self):
67     ## get task info from BL ##
68     common.logger.debug(3, "Task name: " + self.taskuuid)
69     isblist = common._db.queryTask('globalSandbox').split(',')
70     common.logger.debug(3, "List of ISB files: " +str(isblist) )
71    
72     # init SE interface
73     common.logger.message("Starting sending the project to the storage "+str(self.storage_name)+"...")
74 spiga 1.33 try:
75     seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
76     except Exception, ex:
77     common.logger.debug(1, str(ex))
78     msg = "ERROR : Unable to create SE destination interface \n"
79     msg +="Project "+ self.taskuuid +" not Submitted \n"
80     raise CrabException(msg)
81    
82     try:
83     loc = SElement("localhost", "local")
84     except Exception, ex:
85     common.logger.debug(1, str(ex))
86     msg = "ERROR : Unable to create SE source interface \n"
87     msg +="Project "+ self.taskuuid +" not Submitted \n"
88     raise CrabException(msg)
89    
90    
91     ### it should not be there... To move into SE API. DS
92 farinafa 1.32
93     # create remote dir for gsiftp
94 spiga 1.36 if self.storage_proto in ['gridftp','rfio']:
95 farinafa 1.32 try:
96     action = SBinterface( seEl )
97 spiga 1.61 action.createDir( self.remotedir )
98     except AlreadyExistsException, ex:
99     msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
100     msg +='\t%s'%str(ex)
101     common.logger.debug(1, msg)
102     except OperationException, ex:
103 mcinquil 1.66 common.logger.debug(1, str(ex.detail))
104     msg = "ERROR: Unable to create project destination on the Storage Element %s\n"%str(ex)
105     msg +="Project "+ self.taskuuid +" not Submitted \n"
106     raise CrabException(msg)
107     except AuthorizationException, ex:
108     common.logger.debug(1, str(ex.detail))
109     msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
110 farinafa 1.32 msg +="Project "+ self.taskuuid +" not Submitted \n"
111     raise CrabException(msg)
112    
113     ## copy ISB ##
114     sbi = SBinterface( loc, seEl )
115    
116     for filetocopy in isblist:
117     source = os.path.abspath(filetocopy)
118     dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
119     common.logger.debug(1, "Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
120     try:
121 spiga 1.36 sbi.copy( source, dest)
122 mcinquil 1.66 except AuthorizationException, ex:
123     common.logger.debug(1, str(ex.detail))
124     msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
125     msg +="Project "+ self.taskuuid +" not Submitted \n"
126     raise CrabException(msg)
127 farinafa 1.32 except Exception, ex:
128     common.logger.debug(1, str(ex))
129 mcinquil 1.66 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
130 farinafa 1.32 msg +="Project "+ self.taskuuid +" not Submitted \n"
131     raise CrabException(msg)
132    
133     ## if here then project submitted ##
134     msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
135     common.logger.debug(3,msg)
136     return
137    
138 spiga 1.51
139     def manageCredential(self):
140     """
141     Prepare configuration and Call credential API
142     """
143 spiga 1.58 common.logger.message("Registering credential to the server")
144 spiga 1.51 # only for temporary back-comp.
145 spiga 1.57 if self.credentialType == 'Proxy':
146     # for proxy all works as before....
147     self.moveProxy()
148     # myProxyMoveProxy() # check within the API ( Proxy.py )
149     else:
150 spiga 1.62 from ProdCommon.Credential.CredentialAPI import CredentialAPI
151 spiga 1.51 myproxyserver = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
152     configAPI = {'credential' : self.credentialType, \
153     'myProxySvr' : myproxyserver,\
154     'serverDN' : self.server_dn,\
155     'shareDir' : common.work_space.shareDir() ,\
156 spiga 1.57 'userName' : UnixUserName(),\
157     'serverName' : self.server_name \
158 spiga 1.51 }
159     try:
160     CredAPI = CredentialAPI( configAPI )
161     except Exception, err :
162     common.logger.debug(3, "Configuring Credential API: " +str(traceback.format_exc()))
163     raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
164 spiga 1.67 if not CredAPI.checkCredential(Time=12) :
165 spiga 1.65 common.logger.message("Please renew the token:\n")
166     try:
167     CredAPI.ManualRenewCredential()
168     except Exception, ex:
169     raise CrabException(str(ex))
170 mcinquil 1.66
171 spiga 1.51 try:
172 spiga 1.68 dict = CredAPI.registerCredential()
173 spiga 1.51 except Exception, err:
174 spiga 1.63 common.logger.debug(3, "Registering Credentials : " +str(traceback.format_exc()))
175 spiga 1.51 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
176     self.cfg_params['EDG.proxyInfos'] = dict
177 farinafa 1.49
178 spiga 1.58 common.logger.message("Credential successfully delegated to the server.\n")
179 farinafa 1.49 return
180 spiga 1.51 # TO REMOVE
181 spiga 1.57 def moveProxy( self ):
182 farinafa 1.49 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
183 spiga 1.60 ## Temporary... to remove soon
184     common.scheduler.checkProxy(minTime=100)
185 spiga 1.57 try:
186     common.logger.message("Registering a valid proxy to the server:")
187     flag = " --myproxy"
188     cmd = 'asap-user-register --server '+str(self.server_name) + flag
189     attempt = 3
190     while attempt:
191     common.logger.debug(3, " executing:\n " + cmd)
192     status, outp = commands.getstatusoutput(cmd)
193     common.logger.debug(3, outp)
194     if status == 0:
195     break
196     else:
197     attempt = attempt - 1
198     if (attempt == 0):
199     raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
200     except:
201     msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
202     msg +="Project "+str(self.taskuuid)+" not Submitted \n"
203     raise CrabException(msg)
204 farinafa 1.49 return
205 farinafa 1.20
206     def performSubmission(self, firstSubmission=True):
207     # create the communication session with the server frontend
208 spiga 1.36 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
209 farinafa 1.20 taskXML = ''
210     subOutcome = 0
211    
212     # transfer remote dir to server
213     self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
214    
215     if firstSubmission==True:
216 spiga 1.55
217     TotJob = common._db.nJobs()
218 spiga 1.35 # move the sandbox
219     self.moveISB_SEAPI()
220    
221 farinafa 1.20 # first time submit
222     try:
223 farinafa 1.46 taskXML += common._db.serializeTask( common._db.getTask() )
224 farinafa 1.20 common.logger.debug(5, taskXML)
225     except Exception, e:
226     msg = "BossLite ERROR: Unable to serialize task object\n"
227 farinafa 1.24 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
228 farinafa 1.20 msg += str(e)
229     raise CrabException(msg)
230 farinafa 1.23
231     # TODO fix not needed first field
232 spiga 1.55 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
233 farinafa 1.8 else:
234 farinafa 1.20 # subsequent submissions and resubmit
235 farinafa 1.24 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
236 spiga 1.3
237 farinafa 1.20 if subOutcome != 0:
238     msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
239 spiga 1.3 raise CrabException(msg)
240 mcinquil 1.56 elif firstSubmission is True:
241     self.markSubmitting(firstSubmission)
242 farinafa 1.6
243 farinafa 1.30 del csCommunicator
244    
245 farinafa 1.31 return
246 farinafa 1.6
247 mcinquil 1.56
248     def markSubmitting(self, firstSubmission):
249     """
250     _markSubmitting_
251     sign local db for jobs sent -submitted- to the server
252     (just for the first submission)
253     """
254     common.logger.debug(4, "Updating submitting jobs %s"%str(self.submitRange))
255     updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
256     common._db.updateRunJob_(self.submitRange, updlist)
257