ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.46
Committed: Mon Jun 16 09:40:42 2008 UTC (16 years, 10 months ago) by farinafa
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_3_2_pre1, CRAB_2_4_0_test
Branch point for: AnaDataSet
Changes since 1.45: +1 -20 lines
Log Message:
Moved all the task object manipulations to serverside

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands
8 import zlib
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15
16 class SubmitterServer( Submitter ):
17 def __init__(self, cfg_params, parsed_range, val):
18 self.srvCfg = {}
19 self.cfg_params = cfg_params
20 self.submitRange = []
21 self.dontMoveProxy = False
22 if string.lower(self.cfg_params.get("CRAB.scheduler")) in ['lsf','caf']:
23 self.dontMoveProxy = True
24
25 Submitter.__init__(self, cfg_params, parsed_range, val)
26
27 # init client server params...
28 CliServerParams(self)
29
30 # path fix
31 if self.storage_path[0]!='/':
32 self.storage_path = '/'+self.storage_path
33
34 self.taskuuid = str(common._db.queryTask('name'))
35
36 return
37
38 def run(self):
39 """
40 The main method of the class: submit jobs in range self.nj_list
41 """
42 common.logger.debug(5, "SubmitterServer::run() called")
43
44 self.submitRange = self.nj_list
45
46 check = self.checkIfCreate()
47
48 if check == 0 :
49
50 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
51 self.moveProxy(self.dontMoveProxy)
52
53 # check if it is the first submission
54 isFirstSubmission = common._db.checkIfNeverSubmittedBefore()
55
56 # standard submission to the server
57 self.performSubmission(isFirstSubmission)
58
59 msg = '\nTotal of %d jobs submitted'%len(self.submitRange)
60 common.logger.message(msg)
61
62 return
63
64 def moveISB_SEAPI(self):
65 ## get task info from BL ##
66 common.logger.debug(3, "Task name: " + self.taskuuid)
67 isblist = common._db.queryTask('globalSandbox').split(',')
68 common.logger.debug(3, "List of ISB files: " +str(isblist) )
69
70 # init SE interface
71 common.logger.message("Starting sending the project to the storage "+str(self.storage_name)+"...")
72 try:
73 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
74 except Exception, ex:
75 common.logger.debug(1, str(ex))
76 msg = "ERROR : Unable to create SE destination interface \n"
77 msg +="Project "+ self.taskuuid +" not Submitted \n"
78 raise CrabException(msg)
79
80 try:
81 loc = SElement("localhost", "local")
82 except Exception, ex:
83 common.logger.debug(1, str(ex))
84 msg = "ERROR : Unable to create SE source interface \n"
85 msg +="Project "+ self.taskuuid +" not Submitted \n"
86 raise CrabException(msg)
87
88
89 ### it should not be there... To move into SE API. DS
90
91 # create remote dir for gsiftp
92 if self.storage_proto in ['gridftp','rfio']:
93 try:
94 action = SBinterface( seEl )
95 action.createDir( self.remotedir)
96 except Exception, ex:
97 common.logger.debug(1, str(ex))
98 msg = "ERROR : Unable to create project destination on the Storage Element \n"
99 msg +="Project "+ self.taskuuid +" not Submitted \n"
100 raise CrabException(msg)
101
102 ## copy ISB ##
103 sbi = SBinterface( loc, seEl )
104
105 for filetocopy in isblist:
106 source = os.path.abspath(filetocopy)
107 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
108 common.logger.debug(1, "Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
109 try:
110 sbi.copy( source, dest)
111 except Exception, ex:
112 common.logger.debug(1, str(ex))
113 msg = "ERROR : Unable to ship the project to the server \n"
114 msg +="Project "+ self.taskuuid +" not Submitted \n"
115 raise CrabException(msg)
116
117 ## if here then project submitted ##
118 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
119 common.logger.debug(3,msg)
120 return
121
122 def moveProxy(self,dontMove):
123
124 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
125 if dontMove==True:
126 msg = 'Submittig to local resources...proxy not needed.\n'
127 common.logger.debug(5, msg)
128 else:
129 ## register proxy ##
130 common.scheduler.checkProxy(deep=1)
131 try:
132 flag = " --myproxy"
133 common.logger.message("Registering a valid proxy to the server:")
134 cmd = 'asap-user-register --server '+str(self.server_name) + flag
135 attempt = 3
136 while attempt:
137 common.logger.debug(3, " executing:\n " + cmd)
138 status, outp = commands.getstatusoutput(cmd)
139 common.logger.debug(3, outp)
140 if status == 0:
141 common.logger.message("Proxy successfully delegated to the server.\n")
142 break
143 else:
144 attempt = attempt - 1
145 if (attempt == 0):
146 raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
147 except:
148 msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
149 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
150 raise CrabException(msg)
151 return None
152 return
153
154 def performSubmission(self, firstSubmission=True):
155 # create the communication session with the server frontend
156 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
157 taskXML = ''
158 subOutcome = 0
159
160 # transfer remote dir to server
161 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
162
163 if firstSubmission==True:
164 # move the sandbox
165 self.moveISB_SEAPI()
166
167 # first time submit
168 try:
169 taskXML += common._db.serializeTask( common._db.getTask() )
170 common.logger.debug(5, taskXML)
171 except Exception, e:
172 msg = "BossLite ERROR: Unable to serialize task object\n"
173 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
174 msg += str(e)
175 raise CrabException(msg)
176
177 # TODO fix not needed first field
178 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange)
179 else:
180 # subsequent submissions and resubmit
181 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
182
183 if subOutcome != 0:
184 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
185 raise CrabException(msg)
186
187 del csCommunicator
188
189 # update runningjobs status
190 updlist = [{'statusScheduler':'Submitting', 'status':'SSE'}] * len(self.submitRange)
191 common._db.updateRunJob_(self.submitRange, updlist)
192 return
193
194