ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.79
Committed: Thu Jun 11 09:29:26 2009 UTC (15 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_6_0, CRAB_2_6_0_pre14, CRAB_2_6_0_pre13, CRAB_2_6_0_pre12, CRAB_2_6_0_pre11, CRAB_2_6_0_pre10, CRAB_2_6_0_pre9
Changes since 1.78: +7 -0 lines
Log Message:
prepare list of jobs to be submitted after the database has been filled

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     from crab_util import *
3 spiga 1.3 import common
4 spiga 1.1 from ApmonIf import ApmonIf
5 corvo 1.2
6 farinafa 1.23 import os, errno, time, sys, re
7 farinafa 1.48 import commands, traceback
8 farinafa 1.23 import zlib
9 farinafa 1.10
10 spiga 1.34 from Submitter import Submitter
11 farinafa 1.20 from ServerCommunicator import ServerCommunicator
12 farinafa 1.32
13     from ProdCommon.Storage.SEAPI.SElement import SElement
14     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15 spiga 1.61 from ProdCommon.Storage.SEAPI.Exceptions import *
16 spiga 1.51
17 farinafa 1.24
18 spiga 1.34 class SubmitterServer( Submitter ):
19 farinafa 1.29 def __init__(self, cfg_params, parsed_range, val):
20 slacapra 1.76 self.srvCfg = {}
21 farinafa 1.24 self.cfg_params = cfg_params
22 farinafa 1.29 self.submitRange = []
23 spiga 1.51 self.credentialType = 'Proxy'
24     if common.scheduler.name().upper() in ['LSF', 'CAF']:
25     self.credentialType = 'Token'
26 mcinquil 1.56
27 spiga 1.34 Submitter.__init__(self, cfg_params, parsed_range, val)
28 spiga 1.36
29     # init client server params...
30     CliServerParams(self)
31 farinafa 1.6
32 farinafa 1.24 # path fix
33     if self.storage_path[0]!='/':
34     self.storage_path = '/'+self.storage_path
35 mcinquil 1.43
36     self.taskuuid = str(common._db.queryTask('name'))
37 spiga 1.55
38 farinafa 1.20 return
39 spiga 1.1
40     def run(self):
41 farinafa 1.20 """
42     The main method of the class: submit jobs in range self.nj_list
43     """
44 slacapra 1.76 common.logger.debug("SubmitterServer::run() called")
45 spiga 1.34
46 spiga 1.79 start = time.time()
47    
48     self.BuildJobList()
49    
50 spiga 1.34 self.submitRange = self.nj_list
51 spiga 1.55
52 spiga 1.34 check = self.checkIfCreate()
53 farinafa 1.31
54 spiga 1.34 if check == 0 :
55 farinafa 1.20
56 spiga 1.34 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
57 spiga 1.51 self.manageCredential()
58 spiga 1.35
59     # check if it is the first submission
60 spiga 1.39 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
61 farinafa 1.20
62 spiga 1.34 # standard submission to the server
63     self.performSubmission(isFirstSubmission)
64    
65 spiga 1.79 stop = time.time()
66     common.logger.debug("Submission Time: "+str(stop - start))
67    
68 spiga 1.78 msg = 'Total of %d jobs submitted'%len(self.submitRange)
69 spiga 1.73 common.logger.info(msg)
70 spiga 1.34
71 farinafa 1.20 return
72 farinafa 1.32
73     def moveISB_SEAPI(self):
74     ## get task info from BL ##
75 spiga 1.73 common.logger.debug("Task name: " + self.taskuuid)
76 farinafa 1.32 isblist = common._db.queryTask('globalSandbox').split(',')
77 spiga 1.73 common.logger.debug("List of ISB files: " +str(isblist) )
78 farinafa 1.32
79     # init SE interface
80 spiga 1.73 common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...")
81 spiga 1.33 try:
82     seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
83     except Exception, ex:
84 spiga 1.73 common.logger.debug(str(ex))
85 spiga 1.33 msg = "ERROR : Unable to create SE destination interface \n"
86     msg +="Project "+ self.taskuuid +" not Submitted \n"
87     raise CrabException(msg)
88    
89     try:
90     loc = SElement("localhost", "local")
91     except Exception, ex:
92 spiga 1.73 common.logger.debug(str(ex))
93 spiga 1.33 msg = "ERROR : Unable to create SE source interface \n"
94     msg +="Project "+ self.taskuuid +" not Submitted \n"
95     raise CrabException(msg)
96    
97    
98     ### it should not be there... To move into SE API. DS
99 farinafa 1.32
100     # create remote dir for gsiftp
101 spiga 1.36 if self.storage_proto in ['gridftp','rfio']:
102 farinafa 1.32 try:
103     action = SBinterface( seEl )
104 spiga 1.61 action.createDir( self.remotedir )
105     except AlreadyExistsException, ex:
106     msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
107     msg +='\t%s'%str(ex)
108 spiga 1.73 common.logger.debug(msg)
109 spiga 1.61 except OperationException, ex:
110 spiga 1.73 common.logger.debug(str(ex.detail))
111 mcinquil 1.66 msg = "ERROR: Unable to create project destination on the Storage Element %s\n"%str(ex)
112     msg +="Project "+ self.taskuuid +" not Submitted \n"
113     raise CrabException(msg)
114     except AuthorizationException, ex:
115 spiga 1.73 common.logger.debug(str(ex.detail))
116 mcinquil 1.66 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
117 farinafa 1.32 msg +="Project "+ self.taskuuid +" not Submitted \n"
118     raise CrabException(msg)
119    
120     ## copy ISB ##
121     sbi = SBinterface( loc, seEl )
122    
123     for filetocopy in isblist:
124     source = os.path.abspath(filetocopy)
125     dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
126 spiga 1.73 common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
127 farinafa 1.32 try:
128 spiga 1.36 sbi.copy( source, dest)
129 mcinquil 1.66 except AuthorizationException, ex:
130 spiga 1.73 common.logger.debug(str(ex.detail))
131 mcinquil 1.66 msg = "ERROR: Unable to create project destination on the Storage Element: %s\n"%str(ex)
132     msg +="Project "+ self.taskuuid +" not Submitted \n"
133     raise CrabException(msg)
134 farinafa 1.32 except Exception, ex:
135 spiga 1.73 common.logger.debug(str(ex))
136 mcinquil 1.66 msg = "ERROR : Unable to ship the project to the server %s\n"%str(ex)
137 farinafa 1.32 msg +="Project "+ self.taskuuid +" not Submitted \n"
138     raise CrabException(msg)
139    
140     ## if here then project submitted ##
141     msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
142 spiga 1.73 common.logger.debug(msg)
143 farinafa 1.32 return
144    
145 spiga 1.51
146     def manageCredential(self):
147     """
148     Prepare configuration and Call credential API
149     """
150 spiga 1.75 common.logger.info("Registering credential to the server : %s"%self.server_name)
151 spiga 1.51 # only for temporary back-comp.
152 spiga 1.57 if self.credentialType == 'Proxy':
153     # for proxy all works as before....
154     self.moveProxy()
155     # myProxyMoveProxy() # check within the API ( Proxy.py )
156     else:
157 spiga 1.62 from ProdCommon.Credential.CredentialAPI import CredentialAPI
158 spiga 1.74 myproxyserver = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
159 spiga 1.51 configAPI = {'credential' : self.credentialType, \
160     'myProxySvr' : myproxyserver,\
161     'serverDN' : self.server_dn,\
162     'shareDir' : common.work_space.shareDir() ,\
163 spiga 1.69 'userName' : getUserName(),\
164 spiga 1.57 'serverName' : self.server_name \
165 spiga 1.51 }
166     try:
167     CredAPI = CredentialAPI( configAPI )
168     except Exception, err :
169 spiga 1.73 common.logger.debug("Configuring Credential API: " +str(traceback.format_exc()))
170 spiga 1.51 raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err))
171 spiga 1.67 if not CredAPI.checkCredential(Time=12) :
172 spiga 1.73 common.logger.info("Please renew the token:\n")
173 spiga 1.65 try:
174     CredAPI.ManualRenewCredential()
175     except Exception, ex:
176     raise CrabException(str(ex))
177 mcinquil 1.66
178 spiga 1.51 try:
179 spiga 1.68 dict = CredAPI.registerCredential()
180 spiga 1.51 except Exception, err:
181 spiga 1.73 common.logger.debug("Registering Credentials : " +str(traceback.format_exc()))
182 spiga 1.51 raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
183 spiga 1.74 self.cfg_params['GRID.proxyInfos'] = dict
184 farinafa 1.49
185 spiga 1.73 common.logger.info("Credential successfully delegated to the server.\n")
186 farinafa 1.49 return
187 spiga 1.51 # TO REMOVE
188 spiga 1.57 def moveProxy( self ):
189 farinafa 1.49 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
190 spiga 1.60 ## Temporary... to remove soon
191     common.scheduler.checkProxy(minTime=100)
192 spiga 1.57 try:
193 spiga 1.73 common.logger.debug("Registering a valid proxy to the server:")
194 spiga 1.57 flag = " --myproxy"
195     cmd = 'asap-user-register --server '+str(self.server_name) + flag
196     attempt = 3
197     while attempt:
198 spiga 1.73 common.logger.debug(" executing:\n " + cmd)
199 spiga 1.57 status, outp = commands.getstatusoutput(cmd)
200 spiga 1.73 common.logger.debug(outp)
201 spiga 1.57 if status == 0:
202     break
203     else:
204     attempt = attempt - 1
205     if (attempt == 0):
206     raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
207     except:
208     msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
209     msg +="Project "+str(self.taskuuid)+" not Submitted \n"
210     raise CrabException(msg)
211 farinafa 1.49 return
212 farinafa 1.20
213     def performSubmission(self, firstSubmission=True):
214     # create the communication session with the server frontend
215 spiga 1.36 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
216 farinafa 1.20 taskXML = ''
217     subOutcome = 0
218    
219     # transfer remote dir to server
220     self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
221    
222     if firstSubmission==True:
223 spiga 1.55
224     TotJob = common._db.nJobs()
225 spiga 1.35 # move the sandbox
226     self.moveISB_SEAPI()
227    
228 farinafa 1.20 # first time submit
229     try:
230 mcinquil 1.72 self.stateChange( self.submitRange, "SubRequested" )
231 farinafa 1.46 taskXML += common._db.serializeTask( common._db.getTask() )
232 spiga 1.73 common.logger.debug(taskXML)
233 farinafa 1.20 except Exception, e:
234 mcinquil 1.72 self.stateChange( self.submitRange, "Created" )
235 farinafa 1.20 msg = "BossLite ERROR: Unable to serialize task object\n"
236 farinafa 1.24 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
237 farinafa 1.20 msg += str(e)
238     raise CrabException(msg)
239 farinafa 1.23
240     # TODO fix not needed first field
241 spiga 1.55 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
242 farinafa 1.8 else:
243 farinafa 1.20 # subsequent submissions and resubmit
244 mcinquil 1.72 self.stateChange( self.submitRange, "SubRequested" )
245 mcinquil 1.71 try:
246     subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
247     except Exception, ex: ##change to specific exception
248     ## clean sub. requested status
249 mcinquil 1.72 self.stateChange( self.submitRange, "Created" )
250 mcinquil 1.71
251 spiga 1.3
252 farinafa 1.20 if subOutcome != 0:
253     msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
254 mcinquil 1.72 self.stateChange( self.submitRange, "Created" )
255 spiga 1.77 common.logger.debug(msg)
256     raise CrabException('ERROR Jobs NOT submitted.')
257 farinafa 1.6
258 farinafa 1.30 del csCommunicator
259    
260 farinafa 1.31 return
261 farinafa 1.6
262 mcinquil 1.56
263 mcinquil 1.71 def markSubmitting(self):
264 mcinquil 1.56 """
265     _markSubmitting_
266     sign local db for jobs sent -submitted- to the server
267     (just for the first submission)
268     """
269 spiga 1.73 common.logger.debug("Updating submitting jobs %s"%str(self.submitRange))
270 mcinquil 1.56 updlist = [{'statusScheduler':'Submitting', 'status':'CS'}] * len(self.submitRange)
271     common._db.updateRunJob_(self.submitRange, updlist)
272    
273 mcinquil 1.71