3 |
|
import common |
4 |
|
from ApmonIf import ApmonIf |
5 |
|
|
6 |
< |
import os, errno, time, sys, re |
6 |
> |
import os, errno, time, sys, re |
7 |
|
import commands, traceback |
8 |
|
import zlib |
9 |
|
|
10 |
|
from Submitter import Submitter |
11 |
< |
from ServerCommunicator import ServerCommunicator |
11 |
> |
from ServerCommunicator import ServerCommunicator |
12 |
|
|
13 |
|
from ProdCommon.Storage.SEAPI.SElement import SElement |
14 |
|
from ProdCommon.Storage.SEAPI.SBinterface import SBinterface |
15 |
|
from ProdCommon.Storage.SEAPI.Exceptions import * |
16 |
|
|
17 |
< |
|
17 |
> |
|
18 |
|
class SubmitterServer( Submitter ): |
19 |
|
def __init__(self, cfg_params, parsed_range, val): |
20 |
|
self.srvCfg = {} |
21 |
|
self.cfg_params = cfg_params |
22 |
|
self.submitRange = [] |
23 |
< |
self.credentialType = 'Proxy' |
24 |
< |
self.copyTout= ' -t 600 ' |
23 |
> |
self.credentialType = 'Proxy' |
24 |
> |
self.copyTout= ' -t 600 ' |
25 |
|
if common.scheduler.name().upper() in ['LSF', 'CAF']: |
26 |
< |
self.credentialType = 'Token' |
27 |
< |
self.copyTout= ' ' |
26 |
> |
self.credentialType = 'Token' |
27 |
> |
self.copyTout= ' ' |
28 |
> |
|
29 |
> |
Submitter.__init__(self, cfg_params, parsed_range, val) |
30 |
|
|
29 |
– |
Submitter.__init__(self, cfg_params, parsed_range, val) |
30 |
– |
|
31 |
|
# init client server params... |
32 |
< |
CliServerParams(self) |
32 |
> |
CliServerParams(self) |
33 |
|
|
34 |
|
# path fix |
35 |
|
if self.storage_path[0]!='/': |
36 |
|
self.storage_path = '/'+self.storage_path |
37 |
|
|
38 |
|
self.taskuuid = str(common._db.queryTask('name')) |
39 |
< |
|
40 |
< |
|
39 |
> |
self.limitJobs = False |
40 |
> |
|
41 |
|
return |
42 |
|
|
43 |
|
def run(self): |
51 |
|
self.BuildJobList() |
52 |
|
|
53 |
|
self.submitRange = self.nj_list |
54 |
< |
|
55 |
< |
check = self.checkIfCreate() |
54 |
> |
|
55 |
> |
check = self.checkIfCreate() |
56 |
|
|
57 |
|
if check == 0 : |
58 |
|
|
59 |
|
self.remotedir = os.path.join(self.storage_path, self.taskuuid) |
60 |
|
self.manageCredential() |
61 |
< |
|
62 |
< |
# check if it is the first submission |
63 |
< |
isFirstSubmission = common._db.checkIfNeverSubmittedBefore() |
61 |
> |
|
62 |
> |
# check if it is the first submission |
63 |
> |
isFirstSubmission = common._db.checkIfNeverSubmittedBefore() |
64 |
|
|
65 |
|
# standard submission to the server |
66 |
|
self.performSubmission(isFirstSubmission) |
67 |
< |
|
67 |
> |
|
68 |
|
stop = time.time() |
69 |
|
common.logger.debug("Submission Time: "+str(stop - start)) |
70 |
|
|
71 |
< |
msg = 'Total of %d jobs submitted'%len(self.submitRange) |
71 |
> |
msg = 'Total of %d jobs submitted'%len(self.submitRange) |
72 |
|
common.logger.info(msg) |
73 |
< |
|
73 |
> |
|
74 |
|
return |
75 |
|
|
76 |
|
def moveISB_SEAPI(self): |
78 |
|
common.logger.debug("Task name: " + self.taskuuid) |
79 |
|
isblist = common._db.queryTask('globalSandbox').split(',') |
80 |
|
common.logger.debug("List of ISB files: " +str(isblist) ) |
81 |
< |
|
81 |
> |
|
82 |
|
# init SE interface |
83 |
|
common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...") |
84 |
< |
try: |
84 |
> |
try: |
85 |
|
seEl = SElement(self.storage_name, self.storage_proto, self.storage_port) |
86 |
|
except Exception, ex: |
87 |
|
common.logger.debug(str(ex)) |
88 |
|
msg = "ERROR : Unable to create SE destination interface \n" |
89 |
|
msg +="Project "+ self.taskuuid +" not Submitted \n" |
90 |
|
raise CrabException(msg) |
91 |
< |
|
92 |
< |
try: |
91 |
> |
|
92 |
> |
try: |
93 |
|
loc = SElement("localhost", "local") |
94 |
|
except Exception, ex: |
95 |
|
common.logger.debug(str(ex)) |
100 |
|
|
101 |
|
### it should not be there... To move into SE API. DS |
102 |
|
|
103 |
< |
# create remote dir for gsiftp |
103 |
> |
# create remote dir for gsiftp |
104 |
|
if self.storage_proto in ['gridftp','rfio']: |
105 |
|
try: |
106 |
< |
action = SBinterface( seEl ) |
106 |
> |
action = SBinterface( seEl ) |
107 |
|
action.createDir( self.remotedir ) |
108 |
|
except AlreadyExistsException, ex: |
109 |
< |
msg = "Project %s already exist on the Storage Element \n"%self.taskuuid |
109 |
> |
msg = "Project %s already exist on the Storage Element \n"%self.taskuuid |
110 |
|
msg +='\t%s'%str(ex) |
111 |
|
common.logger.debug(msg) |
112 |
|
except OperationException, ex: |
124 |
|
sbi = SBinterface( loc, seEl ) |
125 |
|
|
126 |
|
for filetocopy in isblist: |
127 |
< |
source = os.path.abspath(filetocopy) |
127 |
> |
source = os.path.abspath(filetocopy) |
128 |
|
dest = os.path.join(self.remotedir, os.path.basename(filetocopy)) |
129 |
|
common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name) |
130 |
|
try: |
146 |
|
return |
147 |
|
|
148 |
|
|
149 |
< |
def manageCredential(self): |
149 |
> |
def manageCredential(self): |
150 |
|
""" |
151 |
< |
Prepare configuration and Call credential API |
151 |
> |
Prepare configuration and Call credential API |
152 |
|
""" |
153 |
|
common.logger.info("Registering credential to the server : %s"%self.server_name) |
154 |
< |
# only for temporary back-comp. |
155 |
< |
if self.credentialType == 'Proxy': |
154 |
> |
# only for temporary back-comp. |
155 |
> |
if self.credentialType == 'Proxy': |
156 |
|
# for proxy all works as before.... |
157 |
|
self.moveProxy() |
158 |
< |
# myProxyMoveProxy() # check within the API ( Proxy.py ) |
158 |
> |
# myProxyMoveProxy() # check within the API ( Proxy.py ) |
159 |
|
else: |
160 |
|
from ProdCommon.Credential.CredentialAPI import CredentialAPI |
161 |
|
myproxyserver = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch') |
168 |
|
'logger' : common.logger() \ |
169 |
|
} |
170 |
|
try: |
171 |
< |
CredAPI = CredentialAPI( configAPI ) |
172 |
< |
except Exception, err : |
171 |
> |
CredAPI = CredentialAPI( configAPI ) |
172 |
> |
except Exception, err : |
173 |
|
common.logger.debug("Configuring Credential API: " +str(traceback.format_exc())) |
174 |
|
raise CrabException("ERROR: Unable to configure Credential Client API %s\n"%str(err)) |
175 |
|
if not CredAPI.checkCredential(Time=12) : |
180 |
|
raise CrabException(str(ex)) |
181 |
|
|
182 |
|
try: |
183 |
< |
dict = CredAPI.registerCredential() |
183 |
> |
dict = CredAPI.registerCredential() |
184 |
|
except Exception, err: |
185 |
|
common.logger.debug("Registering Credentials : " +str(traceback.format_exc())) |
186 |
|
raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name )) |
191 |
|
# TO REMOVE |
192 |
|
def moveProxy( self ): |
193 |
|
WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0]) |
194 |
< |
## Temporary... to remove soon |
194 |
> |
## Temporary... to remove soon |
195 |
|
common.scheduler.checkProxy(minTime=100) |
196 |
|
try: |
197 |
|
common.logger.debug("Registering a valid proxy to the server:") |
224 |
|
self.cfg_params['CRAB.se_remote_dir'] = self.remotedir |
225 |
|
|
226 |
|
if firstSubmission==True: |
227 |
< |
|
228 |
< |
TotJob = common._db.nJobs() |
227 |
> |
|
228 |
> |
TotJob = common._db.nJobs() |
229 |
|
# move the sandbox |
230 |
< |
self.moveISB_SEAPI() |
230 |
> |
self.moveISB_SEAPI() |
231 |
|
|
232 |
|
# first time submit |
233 |
|
try: |
241 |
|
msg += str(e) |
242 |
|
raise CrabException(msg) |
243 |
|
|
244 |
< |
# TODO fix not needed first field |
244 |
> |
# TODO fix not needed first field |
245 |
|
subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob) |
246 |
|
else: |
247 |
|
# subsequent submissions and resubmit |
256 |
|
if subOutcome != 0: |
257 |
|
msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome |
258 |
|
self.stateChange( self.submitRange, "Created" ) |
259 |
< |
common.logger.debug(msg) |
259 |
> |
common.logger.debug(msg) |
260 |
|
raise CrabException('ERROR Jobs NOT submitted.') |
261 |
|
|
262 |
|
del csCommunicator |