ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.48
Committed: Wed Nov 11 13:13:47 2009 UTC (15 years, 5 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: fede_170310, CRAB_2_7_1_pre9, CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3, CRAB_2_7_1_pre2, CRAB_2_7_1_pre1, CRAB_2_7_0
Branch point for: CRAB_multiout, CRAB_2_7_1_branch
Changes since 1.47: +2 -6 lines
Log Message:
Removed deprecated comments

File Contents

# Content
1 """
2 Get output for server mode
3 """
4
5 __revision__ = "$Id: GetOutputServer.py,v 1.47 2009/10/27 13:49:04 mcinquil Exp $"
6 __version__ = "$Revision: 1.47 $"
7
8 from GetOutput import GetOutput
9 from StatusServer import StatusServer
10 from crab_util import *
11 import common
12 import time
13
14 from ServerCommunicator import ServerCommunicator
15
16 from ProdCommon.Storage.SEAPI.SElement import SElement
17 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18
19 class GetOutputServer( GetOutput, StatusServer ):
20
21 def __init__(self, *args):
22
23 GetOutput.__init__(self, *args)
24
25 # init client server params...
26 CliServerParams(self)
27
28 if self.storage_path[0] != '/':
29 self.storage_path = '/'+self.storage_path
30
31 self.copyTout= setLcgTimeout()
32 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33 self.copyTout= ' '
34
35 return
36
37
38 def getOutput(self):
39
40 # get updated status from server #inherited from StatusServer
41 self.resynchClientSide()
42
43 # understand whether the required output are available
44 self.checkBeforeGet()
45
46 # retrive files
47 filesAndJodId = { }
48
49 filesAndJodId.update( self.retrieveFiles(self.list_id) )
50 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
51
52 # load updated task
53 task = common._db.getTask()
54
55 self.organizeOutput( task, self.list_id )
56
57 self.notifyRetrievalToServer(filesAndJodId)
58 return
59
60 def retrieveFiles(self, filesToRetrieve):
61 """
62 Real get output from server storage
63 """
64
65 self.taskuuid = str(common._db.queryTask('name'))
66 common.logger.debug( "Task name: " + self.taskuuid)
67
68 # create the list with the actual filenames
69 remotedir = os.path.join(self.storage_path, self.taskuuid)
70
71 # list of file to retrieve
72 osbTemplate = remotedir + '/out_files_%s.tgz'
73 osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
74 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
75 osbTemplate = remotedir + '/CMSSW_%s.stdout'
76 osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
77 osbTemplate = remotedir + '/CMSSW_%s.stderr'
78 osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
79 common.logger.debug( "List of OSB files: " +str(osbFiles) )
80
81 copyHere = self.outDir
82 destTemplate = copyHere+'/out_files_%s.tgz'
83 destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
84 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
85 destTemplate = copyHere + '/CMSSW_%s.stdout'
86 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
87 destTemplate = copyHere + '/CMSSW_%s.stderr'
88 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
89
90 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
91
92 try:
93 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
94 except Exception, ex:
95 common.logger.debug( str(ex))
96 msg = "ERROR : Unable to create SE source interface \n"
97 raise CrabException(msg)
98 try:
99 loc = SElement("localhost", "local")
100 except Exception, ex:
101 common.logger.debug( str(ex))
102 msg = "ERROR : Unable to create destination interface \n"
103 raise CrabException(msg)
104
105 ## copy ISB ##
106 sbi = SBinterface( seEl, loc )
107
108 filesAndJodId = {}
109
110 if self.storage_proto in ['globus']:
111 # construct a list of absolute paths of input files
112 # and the destinations to copy them to
113 sourcesList = []
114 destsList = []
115 for i in xrange(len(osbFiles)):
116 sourcesList.append(osbFiles[i])
117 destsList.append(destFiles[i])
118 #if i < len(filesToRetrieve):
119 # filesAndJodId[ filesToRetrieve[i] ] = osbFiles[i]
120
121 # construct logging information
122 toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
123 common.logger.debug("Retrieving:\n " + toCopy)
124
125 # try to do the copy
126 copy_res = None
127 try:
128 copy_res = sbi.copy( sourcesList, destsList, opt=self.copyTout)
129 except Exception, ex:
130 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
131 msg += str(ex)
132 common.logger.debug(msg)
133 import traceback
134 common.logger.debug( str(traceback.format_exc()) )
135 if copy_res is not None:
136 ## evaluating copy results
137 copy_err_list = []
138 count = 0
139 for ll in map(None, copy_res, sourcesList):
140 exitcode = int(ll[0][0])
141 if exitcode == 0:
142 filesAndJodId[ filesToRetrieve[count] ] = osbFiles[count]
143 else:
144 copy_err_list.append( [ ll[1], ll[0][1] ] )
145 count += 1
146 if len(copy_err_list) > 0:
147 msg = "ERROR : Unable to retrieve output file \n"
148 for problem in copy_err_list:
149 msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
150 else:
151 # retrieve them from SE
152 for i in xrange(len(osbFiles)):
153 source = osbFiles[i]
154 dest = destFiles[i]
155 common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
156 try:
157 sbi.copy( source, dest , opt=self.copyTout)
158 if i < len(filesToRetrieve):
159 filesAndJodId[ filesToRetrieve[i] ] = dest
160 except Exception, ex:
161 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
162 msg += str(ex)
163 common.logger.debug(msg)
164 import traceback
165 common.logger.debug( str(traceback.format_exc()) )
166 continue
167
168 return filesAndJodId
169
170 def notifyRetrievalToServer(self, fileAndJobList):
171 retrievedFilesJodId = []
172
173 for jid in fileAndJobList:
174 if not os.path.exists(fileAndJobList[jid]):
175 # it means the file has been untarred
176 retrievedFilesJodId.append(jid)
177
178 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
179
180 # notify to the server that output have been retrieved successfully. proxy from StatusServer
181 if len(retrievedFilesJodId) > 0:
182 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
183 try:
184 csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
185 except Exception, e:
186 msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
187 common.logger.debug( msg)
188 pass
189 return
190