ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
Revision: 1.38
Committed: Tue May 20 20:48:08 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_1_pre2
Changes since 1.37: +3 -1 lines
Log Message:
temporary rollback

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 import common
4 from ApmonIf import ApmonIf
5
6 import os, errno, time, sys, re
7 import commands
8 import zlib
9
10 from Submitter import Submitter
11 from ServerCommunicator import ServerCommunicator
12
13 from ProdCommon.Storage.SEAPI.SElement import SElement
14 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15
16 class SubmitterServer( Submitter ):
17 def __init__(self, cfg_params, parsed_range, val):
18 self.srvCfg = {}
19 self.cfg_params = cfg_params
20 self.submitRange = []
21 self.dontMoveProxy = False
22 if string.lower(self.cfg_params.get("CRAB.scheduler")) in ['lsf','caf']:
23 self.dontMoveProxy = True
24
25 Submitter.__init__(self, cfg_params, parsed_range, val)
26
27 # init client server params...
28 CliServerParams(self)
29
30 # path fix
31 if self.storage_path[0]!='/':
32 self.storage_path = '/'+self.storage_path
33 return
34
35 def run(self):
36 """
37 The main method of the class: submit jobs in range self.nj_list
38 """
39 common.logger.debug(5, "SubmitterServer::run() called")
40
41 self.submitRange = self.nj_list
42
43 check = self.checkIfCreate()
44
45 if check == 0 :
46 isFirstSubmission = False
47
48 self.taskuuid = str(common._db.queryTask('name'))
49 self.remotedir = os.path.join(self.storage_path, self.taskuuid)
50 self.moveProxy(self.dontMoveProxy)
51
52 # check if it is the first submission
53 n_createdJob = len(common._db.queryAttrRunJob({'status':'C'},'status'))
54 if n_createdJob == len(self.complete_List): isFirstSubmission = True
55
56 # standard submission to the server
57 self.performSubmission(isFirstSubmission)
58
59 msg = '\nTotal of %d jobs submitted'%len(self.submitRange)
60 common.logger.message(msg)
61
62 return
63
64 def moveISB_SEAPI(self):
65 ## get task info from BL ##
66 common.logger.debug(3, "Task name: " + self.taskuuid)
67 isblist = common._db.queryTask('globalSandbox').split(',')
68 common.logger.debug(3, "List of ISB files: " +str(isblist) )
69
70 # init SE interface
71 common.logger.message("Starting sending the project to the storage "+str(self.storage_name)+"...")
72 try:
73 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
74 except Exception, ex:
75 common.logger.debug(1, str(ex))
76 msg = "ERROR : Unable to create SE destination interface \n"
77 msg +="Project "+ self.taskuuid +" not Submitted \n"
78 raise CrabException(msg)
79
80 try:
81 loc = SElement("localhost", "local")
82 except Exception, ex:
83 common.logger.debug(1, str(ex))
84 msg = "ERROR : Unable to create SE source interface \n"
85 msg +="Project "+ self.taskuuid +" not Submitted \n"
86 raise CrabException(msg)
87
88
89 ### it should not be there... To move into SE API. DS
90
91 # create remote dir for gsiftp
92 if self.storage_proto in ['gridftp','rfio']:
93 try:
94 action = SBinterface( seEl )
95 action.createDir( self.remotedir)
96 except Exception, ex:
97 common.logger.debug(1, str(ex))
98 msg = "ERROR : Unable to create project destination on the Storage Element \n"
99 msg +="Project "+ self.taskuuid +" not Submitted \n"
100 raise CrabException(msg)
101
102 ## copy ISB ##
103 sbi = SBinterface( loc, seEl )
104
105 for filetocopy in isblist:
106 source = os.path.abspath(filetocopy)
107 dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
108 common.logger.debug(1, "Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
109 try:
110 sbi.copy( source, dest)
111 except Exception, ex:
112 common.logger.debug(1, str(ex))
113 msg = "ERROR : Unable to ship the project to the server \n"
114 msg +="Project "+ self.taskuuid +" not Submitted \n"
115 raise CrabException(msg)
116
117 ## if here then project submitted ##
118 msg = 'Project '+ self.taskuuid +' files successfully submitted to the supporting storage element.\n'
119 common.logger.debug(3,msg)
120 return
121
122 def moveProxy(self,dontMove):
123
124 WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
125 if dontMove==True:
126 msg = 'Submittig to local resources...proxy not needed.'
127 common.logger.debug(5, msg)
128 else:
129 ## register proxy ##
130 common.scheduler.checkProxy()
131 try:
132 flag = " --myproxy"
133 common.logger.message("Registering a valid proxy to the server\n")
134 cmd = 'asap-user-register --server '+str(self.server_name) + flag
135 attempt = 3
136 while attempt:
137 common.logger.debug(3, " executing:\n " + cmd)
138 status, outp = commands.getstatusoutput(cmd)
139 common.logger.debug(3, outp)
140 if status == 0:
141 common.logger.message("Proxy successfully delegated to the server.")
142 break
143 else:
144 attempt = attempt - 1
145 if (attempt == 0):
146 raise CrabException("ASAP ERROR: Unable to ship a valid proxy to the server "+str(self.server_name)+"\n")
147 except:
148 msg = "ASAP ERROR: Unable to ship a valid proxy to the server \n"
149 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
150 raise CrabException(msg)
151 return None
152 return
153
154 def performSubmission(self, firstSubmission=True):
155 # create the communication session with the server frontend
156 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
157 taskXML = ''
158 subOutcome = 0
159
160 # transfer remote dir to server
161 self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
162
163 if firstSubmission==True:
164 # move the sandbox
165 self.moveISB_SEAPI()
166
167 # first time submit
168 try:
169 task = common._db.getTask()
170
171 # set the paths refered to SE remotedir
172 # NOTE WMS/JDL supports only gsiftp protocol for base ISB/OSB
173 surlpreamble = '' #'gsiftp://%s:%s'%(self.storage_name, str(self.storage_port) )
174 remoteSBlist = [surlpreamble + os.path.join(self.remotedir, os.path.basename(f)) \
175 for f in common._db.queryTask('globalSandbox').split(',') ]
176 task['globalSandbox'] = ','.join(remoteSBlist)
177 task['outputDirectory'] = self.remotedir
178 task['scriptName'] = surlpreamble + os.path.join( self.remotedir, \
179 os.path.basename(common._db.queryTask('scriptName')) )
180 task['cfgName'] = surlpreamble + os.path.join( self.remotedir, \
181 os.path.basename(common._db.queryTask('cfgName')) )
182
183 for j in task.jobs:
184 j['executable'] = os.path.basename(j['executable'])
185 # buggy, only the local file needed #surlpreamble + os.path.join( self.remotedir, os.path.basename(j['executable']) )
186 #
187
188 taskXML += common._db.serializeTask(task)
189 common.logger.debug(5, taskXML)
190 except Exception, e:
191 msg = "BossLite ERROR: Unable to serialize task object\n"
192 msg +="Project "+str(self.taskuuid)+" not Submitted \n"
193 msg += str(e)
194 raise CrabException(msg)
195
196 # TODO fix not needed first field
197 subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange)
198 else:
199 # subsequent submissions and resubmit
200 subOutcome = csCommunicator.subsequentJobSubmit(self.taskuuid, self.submitRange)
201
202 if subOutcome != 0:
203 msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
204 raise CrabException(msg)
205
206 del csCommunicator
207
208 # update runningjobs status
209 updList = [{'statusScheduler':'Submitted', 'status':'S'}] * len(self.submitRange)
210 common._db.updateRunJob_(self.submitRange, updList)
211 return
212
213