ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.51
Committed: Tue Jul 27 15:56:13 2010 UTC (14 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1, CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_9_patch2_pre1, CRAB_2_7_10_pre2, CRAB_2_7_10_pre1, CRAB_2_7_9_patch1, CRAB_2_7_9, CRAB_2_7_9_pre5, CRAB_2_7_9_pre4, CRAB_2_7_9_pre3, CRAB_2_7_9_pre2, CRAB_2_7_8_patch2, CRAB_2_7_9_pre1, CRAB_2_7_8_patch2_pre1, CRAB_2_7_8_patch1, CRAB_2_7_8_patch1_pre1, CRAB_2_7_8, CRAB_2_7_8_pre3, CRAB_2_7_8_pre2, CRAB_2_7_8_dash3, CRAB_2_7_8_dash2, CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5, CRAB_2_7_5_pre3, CRAB_2_7_5_pre2, CRAB_2_7_5_pre1, CRAB_2_7_4_patch1, CRAB_2_7_4, CRAB_2_7_4_pre6, CRAB_2_7_4_pre5, CRAB_2_7_4_pre4, HEAD
Changes since 1.50: +7 -7 lines
Log Message:
Pass logger to SBinterface, fix bug 59901

File Contents

# Content
1 """
2 Get output for server mode
3 """
4
5 __revision__ = "$Id: GetOutputServer.py,v 1.50 2010/05/04 15:52:14 spiga Exp $"
6 __version__ = "$Revision: 1.50 $"
7
8 from GetOutput import GetOutput
9 from StatusServer import StatusServer
10 from crab_util import *
11 import common
12 import time
13
14 from ServerCommunicator import ServerCommunicator
15
16 from ProdCommon.Storage.SEAPI.SElement import SElement
17 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18
19 class GetOutputServer( GetOutput, StatusServer ):
20
21 def __init__(self, *args):
22
23 GetOutput.__init__(self, *args)
24
25 # init client server params...
26 CliServerParams(self)
27
28 if self.storage_path[0] != '/':
29 self.storage_path = '/'+self.storage_path
30
31 self.copyTout= setLcgTimeout()
32 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33 self.copyTout= ' '
34
35 return
36
37
38 def getOutput(self):
39
40 # get updated status from server #inherited from StatusServer
41 warning_msg = self.resynchClientSide()
42 if warning_msg is not None:
43 common.logger.info(warning_msg)
44
45
46 # understand whether the required output are available
47 self.checkBeforeGet()
48
49 # retrive files
50 filesAndJodId = { }
51
52 filesAndJodId.update( self.retrieveFiles(self.list_id) )
53 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
54
55 # load updated task
56 task = common._db.getTask()
57
58 self.organizeOutput( task, self.list_id )
59
60 self.notifyRetrievalToServer(filesAndJodId)
61 return
62
63 def retrieveFiles(self, filesToRetrieve):
64 """
65 Real get output from server storage
66 """
67
68 self.taskuuid = str(common._db.queryTask('name'))
69 common.logger.debug( "Task name: " + self.taskuuid)
70
71 # create the list with the actual filenames
72 remotedir = os.path.join(self.storage_path, self.taskuuid)
73
74 # list of file to retrieve
75 osbTemplate = remotedir + '/out_files_%s.tgz'
76 osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
77 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
78 osbTemplate = remotedir + '/CMSSW_%s.stdout'
79 osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
80 osbTemplate = remotedir + '/CMSSW_%s.stderr'
81 osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
82 common.logger.debug( "List of OSB files: " +str(osbFiles) )
83
84 copyHere = self.outDir
85 destTemplate = copyHere+'/out_files_%s.tgz'
86 destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
87 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
88 destTemplate = copyHere + '/CMSSW_%s.stdout'
89 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
90 destTemplate = copyHere + '/CMSSW_%s.stderr'
91 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
92
93 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
94
95 try:
96 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
97 except Exception, ex:
98 common.logger.debug( str(ex))
99 msg = "ERROR : Unable to create SE source interface \n"
100 raise CrabException(msg)
101 try:
102 loc = SElement("localhost", "local")
103 except Exception, ex:
104 common.logger.debug( str(ex))
105 msg = "ERROR : Unable to create destination interface \n"
106 raise CrabException(msg)
107
108 ## copy ISB ##
109 sbi = SBinterface(seEl, loc, logger = common.logger.logger)
110
111 filesAndJodId = {}
112
113 if self.storage_proto in ['globus']:
114 # construct a list of absolute paths of input files
115 # and the destinations to copy them to
116 sourcesList = []
117 destsList = []
118 for i in xrange(len(osbFiles)):
119 sourcesList.append(osbFiles[i])
120 destsList.append(destFiles[i])
121 #if i < len(filesToRetrieve):
122 # filesAndJodId[ filesToRetrieve[i] ] = osbFiles[i]
123
124 # construct logging information
125 toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
126 common.logger.debug("Retrieving:\n " + toCopy)
127
128 # try to do the copy
129 copy_res = None
130 try:
131 copy_res = sbi.copy( sourcesList, destsList, opt="tout=300")
132 except Exception, ex:
133 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
134 msg += str(ex)
135 common.logger.debug(msg)
136 import traceback
137 common.logger.debug( str(traceback.format_exc()) )
138 if copy_res is not None:
139 ## evaluating copy results
140 copy_err_list = []
141 count = 0
142 for ll in map(None, copy_res, sourcesList):
143 exitcode = int(ll[0][0])
144 if exitcode == 0:
145 filesAndJodId[ filesToRetrieve[count] ] = osbFiles[count]
146 else:
147 copy_err_list.append( [ ll[1], ll[0][1] ] )
148 count += 1
149 if len(copy_err_list) > 0:
150 msg = "ERROR : Unable to retrieve output file \n"
151 for problem in copy_err_list:
152 msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
153 else:
154 # retrieve them from SE
155 for i in xrange(len(osbFiles)):
156 source = osbFiles[i]
157 dest = destFiles[i]
158 common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
159 try:
160 sbi.copy( source, dest , opt=self.copyTout)
161 if i < len(filesToRetrieve):
162 filesAndJodId[ filesToRetrieve[i] ] = dest
163 except Exception, ex:
164 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
165 msg += str(ex)
166 common.logger.debug(msg)
167 import traceback
168 common.logger.debug( str(traceback.format_exc()) )
169 continue
170
171 return filesAndJodId
172
173 def notifyRetrievalToServer(self, fileAndJobList):
174 retrievedFilesJodId = []
175
176 for jid in fileAndJobList:
177 if not os.path.exists(fileAndJobList[jid]):
178 # it means the file has been untarred
179 retrievedFilesJodId.append(jid)
180
181 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
182
183 # notify to the server that output have been retrieved successfully. proxy from StatusServer
184 if len(retrievedFilesJodId) > 0:
185 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
186 try:
187 csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
188 except Exception, e:
189 msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
190 common.logger.debug( msg)
191 pass
192 return
193