1 |
"""
|
2 |
Get output for server mode
|
3 |
"""
|
4 |
|
5 |
__revision__ = "$Id: GetOutputServer.py,v 1.50 2010/05/04 15:52:14 spiga Exp $"
|
6 |
__version__ = "$Revision: 1.50 $"
|
7 |
|
8 |
from GetOutput import GetOutput
|
9 |
from StatusServer import StatusServer
|
10 |
from crab_util import *
|
11 |
import common
|
12 |
import time
|
13 |
|
14 |
from ServerCommunicator import ServerCommunicator
|
15 |
|
16 |
from ProdCommon.Storage.SEAPI.SElement import SElement
|
17 |
from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
|
18 |
|
19 |
class GetOutputServer( GetOutput, StatusServer ):
|
20 |
|
21 |
def __init__(self, *args):
|
22 |
|
23 |
GetOutput.__init__(self, *args)
|
24 |
|
25 |
# init client server params...
|
26 |
CliServerParams(self)
|
27 |
|
28 |
if self.storage_path[0] != '/':
|
29 |
self.storage_path = '/'+self.storage_path
|
30 |
|
31 |
self.copyTout= setLcgTimeout()
|
32 |
if common.scheduler.name().upper() in ['LSF', 'CAF']:
|
33 |
self.copyTout= ' '
|
34 |
|
35 |
return
|
36 |
|
37 |
|
38 |
def getOutput(self):
|
39 |
|
40 |
# get updated status from server #inherited from StatusServer
|
41 |
warning_msg = self.resynchClientSide()
|
42 |
if warning_msg is not None:
|
43 |
common.logger.info(warning_msg)
|
44 |
|
45 |
|
46 |
# understand whether the required output are available
|
47 |
self.checkBeforeGet()
|
48 |
|
49 |
# retrive files
|
50 |
filesAndJodId = { }
|
51 |
|
52 |
filesAndJodId.update( self.retrieveFiles(self.list_id) )
|
53 |
common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
|
54 |
|
55 |
# load updated task
|
56 |
task = common._db.getTask()
|
57 |
|
58 |
self.organizeOutput( task, self.list_id )
|
59 |
|
60 |
self.notifyRetrievalToServer(filesAndJodId)
|
61 |
return
|
62 |
|
63 |
def retrieveFiles(self, filesToRetrieve):
|
64 |
"""
|
65 |
Real get output from server storage
|
66 |
"""
|
67 |
|
68 |
self.taskuuid = str(common._db.queryTask('name'))
|
69 |
common.logger.debug( "Task name: " + self.taskuuid)
|
70 |
|
71 |
# create the list with the actual filenames
|
72 |
remotedir = os.path.join(self.storage_path, self.taskuuid)
|
73 |
|
74 |
# list of file to retrieve
|
75 |
osbTemplate = remotedir + '/out_files_%s.tgz'
|
76 |
osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
|
77 |
if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
|
78 |
osbTemplate = remotedir + '/CMSSW_%s.stdout'
|
79 |
osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
|
80 |
osbTemplate = remotedir + '/CMSSW_%s.stderr'
|
81 |
osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
|
82 |
common.logger.debug( "List of OSB files: " +str(osbFiles) )
|
83 |
|
84 |
copyHere = self.outDir
|
85 |
destTemplate = copyHere+'/out_files_%s.tgz'
|
86 |
destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
|
87 |
if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
|
88 |
destTemplate = copyHere + '/CMSSW_%s.stdout'
|
89 |
destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
|
90 |
destTemplate = copyHere + '/CMSSW_%s.stderr'
|
91 |
destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
|
92 |
|
93 |
common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
|
94 |
|
95 |
try:
|
96 |
seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
|
97 |
except Exception, ex:
|
98 |
common.logger.debug( str(ex))
|
99 |
msg = "ERROR : Unable to create SE source interface \n"
|
100 |
raise CrabException(msg)
|
101 |
try:
|
102 |
loc = SElement("localhost", "local")
|
103 |
except Exception, ex:
|
104 |
common.logger.debug( str(ex))
|
105 |
msg = "ERROR : Unable to create destination interface \n"
|
106 |
raise CrabException(msg)
|
107 |
|
108 |
## copy ISB ##
|
109 |
sbi = SBinterface(seEl, loc, logger = common.logger.logger)
|
110 |
|
111 |
filesAndJodId = {}
|
112 |
|
113 |
if self.storage_proto in ['globus']:
|
114 |
# construct a list of absolute paths of input files
|
115 |
# and the destinations to copy them to
|
116 |
sourcesList = []
|
117 |
destsList = []
|
118 |
for i in xrange(len(osbFiles)):
|
119 |
sourcesList.append(osbFiles[i])
|
120 |
destsList.append(destFiles[i])
|
121 |
#if i < len(filesToRetrieve):
|
122 |
# filesAndJodId[ filesToRetrieve[i] ] = osbFiles[i]
|
123 |
|
124 |
# construct logging information
|
125 |
toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
|
126 |
common.logger.debug("Retrieving:\n " + toCopy)
|
127 |
|
128 |
# try to do the copy
|
129 |
copy_res = None
|
130 |
try:
|
131 |
copy_res = sbi.copy( sourcesList, destsList, opt="tout=300")
|
132 |
except Exception, ex:
|
133 |
msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
|
134 |
msg += str(ex)
|
135 |
common.logger.debug(msg)
|
136 |
import traceback
|
137 |
common.logger.debug( str(traceback.format_exc()) )
|
138 |
if copy_res is not None:
|
139 |
## evaluating copy results
|
140 |
copy_err_list = []
|
141 |
count = 0
|
142 |
for ll in map(None, copy_res, sourcesList):
|
143 |
exitcode = int(ll[0][0])
|
144 |
if exitcode == 0:
|
145 |
filesAndJodId[ filesToRetrieve[count] ] = osbFiles[count]
|
146 |
else:
|
147 |
copy_err_list.append( [ ll[1], ll[0][1] ] )
|
148 |
count += 1
|
149 |
if len(copy_err_list) > 0:
|
150 |
msg = "ERROR : Unable to retrieve output file \n"
|
151 |
for problem in copy_err_list:
|
152 |
msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
|
153 |
else:
|
154 |
# retrieve them from SE
|
155 |
for i in xrange(len(osbFiles)):
|
156 |
source = osbFiles[i]
|
157 |
dest = destFiles[i]
|
158 |
common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
|
159 |
try:
|
160 |
sbi.copy( source, dest , opt=self.copyTout)
|
161 |
if i < len(filesToRetrieve):
|
162 |
filesAndJodId[ filesToRetrieve[i] ] = dest
|
163 |
except Exception, ex:
|
164 |
msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
|
165 |
msg += str(ex)
|
166 |
common.logger.debug(msg)
|
167 |
import traceback
|
168 |
common.logger.debug( str(traceback.format_exc()) )
|
169 |
continue
|
170 |
|
171 |
return filesAndJodId
|
172 |
|
173 |
def notifyRetrievalToServer(self, fileAndJobList):
|
174 |
retrievedFilesJodId = []
|
175 |
|
176 |
for jid in fileAndJobList:
|
177 |
if not os.path.exists(fileAndJobList[jid]):
|
178 |
# it means the file has been untarred
|
179 |
retrievedFilesJodId.append(jid)
|
180 |
|
181 |
common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
|
182 |
|
183 |
# notify to the server that output have been retrieved successfully. proxy from StatusServer
|
184 |
if len(retrievedFilesJodId) > 0:
|
185 |
csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
|
186 |
try:
|
187 |
csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
|
188 |
except Exception, e:
|
189 |
msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
|
190 |
common.logger.debug( msg)
|
191 |
pass
|
192 |
return
|
193 |
|