ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.62
Committed: Fri Dec 5 11:58:05 2008 UTC (16 years, 4 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.61: +1 -2 lines
Log Message:
use credential API from ProdCOmmon

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     from crab_util import *
3 spiga 1.3 import common
4 spiga 1.1 from ApmonIf import ApmonIf
5 corvo 1.2
6 farinafa 1.23 import os, errno, time, sys, re
7 farinafa 1.48 import commands, traceback
8 farinafa 1.23 import zlib
9 farinafa 1.10
10 spiga 1.34 from Submitter import Submitter
11 farinafa 1.20 from ServerCommunicator import ServerCommunicator
12 farinafa 1.32
13     from ProdCommon.Storage.SEAPI.SElement import SElement
14     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 spiga 1.61 from ProdCommon.Storage.SEAPI.Exceptions import *
16 spiga 1.51
17 farinafa 1.24
18 spiga 1.34 class SubmitterServer( Submitter ):
19 farinafa 1.29 def __init__(self, cfg_params, parsed_range, val):
20 farinafa 1.24 self.srvCfg = {}
21     self.cfg_params = cfg_params
22 farinafa 1.29 self.submitRange = []
23 spiga 1.51 self.credentialType = 'Proxy'
24     if common.scheduler.name().upper() in ['LSF', 'CAF']:
25     self.credentialType = 'Token'
26 mcinquil 1.56
27 spiga 1.34 Submitter.__init__(self, cfg_params, parsed_range, val)
28 spiga 1.36
29     # init client server params...
30     CliServerParams(self)
31 farinafa 1.6
32 farinafa 1.24 # path fix
33     if self.storage_path[0]!='/':
34     self.storage_path = '/'+self.storage_path
35 mcinquil 1.43
36     self.taskuuid = str(common._db.queryTask('name'))
37 spiga 1.55
38 farinafa 1.20 return
39 spiga 1.1
40     def run(self):
41 farinafa 1.20 """
42     The main method of the class: submit jobs in range self.nj_list
43     """
44 spiga 1.34 common.logger.debug(5, "SubmitterServer::run() called")
45    
46     self.submitRange = self.nj_list
47 spiga 1.55
48 spiga 1.34 check = self.checkIfCreate()
49 farinafa 1.31
50 spiga 1.34 if check == 0 :
51 farinafa 1.20
52 spiga 1.34 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
53 spiga 1.51 self.manageCredential()
54 spiga 1.35
55     # check if it is the first submission
56 spiga 1.39 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
57 farinafa 1.20
58 spiga 1.34 # standard submission to the server
59     self.performSubmission(isFirstSubmission)
60    
61     msg = '\nTotal of %d jobs submitted'%len(self.submitRange)
62     common.logger.message(msg)
63    
64 farinafa 1.20 return
65 farinafa 1.32
66     def moveISB_SEAPI(self):
67     ## get task info from BL ##
68     common.logger.debug(3, "Task name: " + self.taskuuid)
69     isblist = common._db.queryTask('globalSandbox').split(',')
70     common.logger.debug(3, "List of ISB files: " +str(isblist) )
71    
72     # init SE interface
73     common.logger.message("Starting sending the project to the storage "+str(self.storage_name)+"...")
74 spiga 1.33 try:
75     seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
76     except Exception, ex:
77     common.logger.debug(1, str(ex))
78     msg = "ERROR : Unable to create SE destination interface \n"
79     msg +="Project "+ self.taskuuid +" not Submitted \n"
80     raise CrabException(msg)
81    
82     try:
83     loc = SElement("localhost", "local")
84     except Exception, ex:
85     common.logger.debug(1, str(ex))
86     msg = "ERROR : Unable to create SE source interface \n"
87     msg +="Project "+ self.taskuuid +" not Submitted \n"
88     raise CrabException(msg)
89    
90    
91     ### it should not be there... To move into SE API. DS
92 farinafa 1.32
93     # create remote dir for gsiftp
94 spiga 1.36 if self.storage_proto in ['gridftp','rfio']:
95 farinafa 1.32 try:
96     action = SBinterface( seEl )
97 spiga 1.61 action.createDir( self.remotedir )
98     except AlreadyExistsException, ex:
99     msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
100     msg +='\t%s'%str(ex)
101     common.logger.debug(1, msg)
102     except OperationException, ex:
103 farinafa 1.32 common.logger.debug(1, str(ex))
104     msg = "ERROR : Unable to create project destination on the Storage Element \n"
105     msg +="Project "+ self.taskuuid +" not Submitted \n"
106     raise CrabException(msg)
107 spiga 1.61 if self.storage_proto == 'rfio':
108     opt = '777' # REMOVE me
109     try:
110     action.setGrant( self.remotedir, opt)
111     except Exception, ex:
112     common.logger.debug(1, str(ex))
113     msg = "ERROR : Unable to change permission on the Storage Element \n"
114     msg +="Project "+ self.taskuuid +" not Submitted \n"
115     raise CrabException(msg)
116 farinafa 1.32
117     ## copy ISB ##
118     sbi = SBinterface( loc, seEl )
119    
120     for filetocopy in isblist:
121     source = os.path.abspath(filetocopy)
122     dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
123     common.logger.debug(1, "Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
124     try:
125 spiga 1.36 sbi.copy( source, dest)
126 farinafa 1.32 except Exception, ex:
127     common.logger.debug(1, str(ex))
128     msg = "ERROR : Unable to ship the project to the server \n"
129     msg +="Project "+ self.taskuuid +" not Submitted \n"
130     raise CrabException(msg)
131    
132     ## if here then project submitted ##
133     msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
134     common.logger.debug(3,msg)
135     return
136    
137 spiga 1.51
138     def manageCredential(self):
139     """
140     Prepare configuration and Call credential API
141     """
142 spiga 1.58 common.logger.message("Registering credential to the server")
143 spiga 1.51 # only for temporary back-comp.
144 spiga 1.57 if self.credentialType == 'Proxy':
145     # for proxy all works as before....
146     self.moveProxy()
147     # myProxyMoveProxy() # check within the API ( Proxy.py )
148     else:
149 spiga 1.62 from ProdCommon.Credential.CredentialAPI import CredentialAPI
150 spiga 1.51 myproxyserver = self.cfg_params.get('EDG.proxy_server', 'myproxy.cern.ch')
151     configAPI = {'credential' : self.credentialType, \
152     'myProxySvr' : myproxyserver,\
153     'serverDN' : self.server_dn,\
154     'shareDir' : common.work_space.shareDir() ,\
155 spiga 1.57 'userName' : UnixUserName(),\
156     'serverName' : self.server_name \
157 spiga 1.51 }
158     try:
159     CredAPI = CredentialAPI( configAPI )
160     except Exception, err :
161     common.logger.debug(3, "Configuring Credential API: " +str(traceback.format_exc()))
162     raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
163     try:
164 spiga 1.57 dict = CredAPI.registerCredential('submit')
165 spiga 1.51 except Exception, err:
166     common.logger.debug(3, "Configuring Credential API: " +str(traceback.format_exc()))
167     raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
168     self.cfg_params['EDG.proxyInfos'] = dict
169 farinafa 1.49
170 spiga 1.58 common.logger.message("Credential successfully delegated to the server.\n")
171 farinafa 1.49 return
172 spiga 1.51 # TO REMOVE
173 spiga 1.57 def moveProxy( self ):
174 farinafa 1.49 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
175 spiga 1.60 ## Temporary... to remove soon
176     common.scheduler.checkProxy(minTime=100)
177 spiga 1.57 try:
178     common.logger.message("Registering a valid proxy to the server:")
179     flag = " --myproxy"
180     cmd = 'asap-user-register --server '+str(self.server_name) + flag
181     attempt = 3
182     while attempt:
183     common.logger.debug(3, " executing:\n " + cmd)
184     status, outp = commands.getstatusoutput(cmd)
185     common.logger.debug(3, outp)
186     if status == 0:
187     break
188     else:
189     attempt = attempt - 1
190     if (attempt == 0):
191     raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
192     except:
193     msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
194     msg +="Project "+str(self.taskuuid)+" not Submitted \n"
195     raise CrabException(msg)
196 farinafa 1.49 return
197 farinafa 1.20
198     def performSubmission(self, firstSubmission=True):
199     # create the communication session with the server frontend
200 spiga 1.36 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
201 farinafa 1.20 taskXML = ''
202     subOutcome = 0
203    
204     # transfer remote dir to server
205     self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
206    
207     if firstSubmission==True:
208 spiga 1.55
209     TotJob = common._db.nJobs()
210 spiga 1.35 # move the sandbox
211     self.moveISB_SEAPI()
212    
213 farinafa 1.20 # first time submit
214     try:
215 farinafa 1.46 taskXML += common._db.serializeTask( common._db.getTask() )
216 farinafa 1.20 common.logger.debug(5, taskXML)
217     except Exception, e:
218     msg = "BossLite ERROR: Unable to serialize task object\n"
219 farinafa 1.24 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
220 farinafa 1.20 msg += str(e)
221     raise CrabException(msg)
222 farinafa 1.23
223     # TODO fix not needed first field
224 spiga 1.55 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
225 farinafa 1.8 else:
226 farinafa 1.20 # subsequent submissions and resubmit
227 farinafa 1.24 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
228 spiga 1.3
229 farinafa 1.20 if subOutcome != 0:
230     msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
231 spiga 1.3 raise CrabException(msg)
232 mcinquil 1.56 elif firstSubmission is True:
233     self.markSubmitting(firstSubmission)
234 farinafa 1.6
235 farinafa 1.30 del csCommunicator
236    
237 farinafa 1.31 return
238 farinafa 1.6
239 mcinquil 1.56
240     def markSubmitting(self, firstSubmission):
241     """
242     _markSubmitting_
243     sign local db for jobs sent -submitted- to the server
244     (just for the first submission)
245     """
246     common.logger.debug(4, "Updating submitting jobs %s"%str(self.submitRange))
247     updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
248     common._db.updateRunJob_(self.submitRange, updlist)
249