ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.51
Committed: Tue Jul 27 15:56:13 2010 UTC (14 years, 9 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, CRAB_2_8_2_patch1, CRAB_2_8_2, CRAB_2_8_2_pre5, CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1, CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_9_patch2_pre1, CRAB_2_7_10_pre2, CRAB_2_7_10_pre1, CRAB_2_7_9_patch1, CRAB_2_7_9, CRAB_2_7_9_pre5, CRAB_2_7_9_pre4, CRAB_2_7_9_pre3, CRAB_2_7_9_pre2, CRAB_2_7_8_patch2, CRAB_2_7_9_pre1, CRAB_2_7_8_patch2_pre1, CRAB_2_7_8_patch1, CRAB_2_7_8_patch1_pre1, CRAB_2_7_8, CRAB_2_7_8_pre3, CRAB_2_7_8_pre2, CRAB_2_7_8_dash3, CRAB_2_7_8_dash2, CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5, CRAB_2_7_5_pre3, CRAB_2_7_5_pre2, CRAB_2_7_5_pre1, CRAB_2_7_4_patch1, CRAB_2_7_4, CRAB_2_7_4_pre6, CRAB_2_7_4_pre5, CRAB_2_7_4_pre4, HEAD
Changes since 1.50: +7 -7 lines
Log Message:
Pass logger to SBinterface, fix bug 59901

File Contents

# User Rev Content
1 ewv 1.32 """
2     Get output for server mode
3     """
4    
5 ewv 1.51 __revision__ = "$Id: GetOutputServer.py,v 1.50 2010/05/04 15:52:14 spiga Exp $"
6     __version__ = "$Revision: 1.50 $"
7 ewv 1.32
8 spiga 1.16 from GetOutput import GetOutput
9     from StatusServer import StatusServer
10 spiga 1.1 from crab_util import *
11     import common
12     import time
13    
14 farinafa 1.14 from ServerCommunicator import ServerCommunicator
15    
16 spiga 1.19 from ProdCommon.Storage.SEAPI.SElement import SElement
17     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18 farinafa 1.14
19 spiga 1.18 class GetOutputServer( GetOutput, StatusServer ):
20 farinafa 1.14
21     def __init__(self, *args):
22 ewv 1.32
23     GetOutput.__init__(self, *args)
24 spiga 1.21
25 spiga 1.22 # init client server params...
26 ewv 1.32 CliServerParams(self)
27 farinafa 1.14
28 ewv 1.32 if self.storage_path[0] != '/':
29 farinafa 1.14 self.storage_path = '/'+self.storage_path
30 spiga 1.42
31 spiga 1.45 self.copyTout= setLcgTimeout()
32 spiga 1.44 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33     self.copyTout= ' '
34 ewv 1.51
35 spiga 1.1 return
36 farinafa 1.14
37 ewv 1.32
38     def getOutput(self):
39 spiga 1.1
40 farinafa 1.14 # get updated status from server #inherited from StatusServer
41 mcinquil 1.49 warning_msg = self.resynchClientSide()
42     if warning_msg is not None:
43     common.logger.info(warning_msg)
44    
45 mcinquil 1.2
46 farinafa 1.14 # understand whether the required output are available
47     self.checkBeforeGet()
48 spiga 1.18
49     # retrive files
50 farinafa 1.29 filesAndJodId = { }
51 spiga 1.18
52 farinafa 1.29 filesAndJodId.update( self.retrieveFiles(self.list_id) )
53 spiga 1.39 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
54 spiga 1.18
55 ewv 1.51 # load updated task
56 spiga 1.36 task = common._db.getTask()
57 ewv 1.51
58 spiga 1.36 self.organizeOutput( task, self.list_id )
59 farinafa 1.29
60     self.notifyRetrievalToServer(filesAndJodId)
61 spiga 1.18 return
62    
63 ewv 1.32 def retrieveFiles(self, filesToRetrieve):
64 spiga 1.18 """
65     Real get output from server storage
66     """
67 spiga 1.19
68     self.taskuuid = str(common._db.queryTask('name'))
69 spiga 1.39 common.logger.debug( "Task name: " + self.taskuuid)
70 farinafa 1.14
71 ewv 1.32 # create the list with the actual filenames
72 spiga 1.19 remotedir = os.path.join(self.storage_path, self.taskuuid)
73 ewv 1.32
74 spiga 1.19 # list of file to retrieve
75 ewv 1.32 osbTemplate = remotedir + '/out_files_%s.tgz'
76     osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
77 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
78 ewv 1.32 osbTemplate = remotedir + '/CMSSW_%s.stdout'
79     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
80     osbTemplate = remotedir + '/CMSSW_%s.stderr'
81     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
82 spiga 1.39 common.logger.debug( "List of OSB files: " +str(osbFiles) )
83 ewv 1.32
84     copyHere = self.outDir
85     destTemplate = copyHere+'/out_files_%s.tgz'
86     destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
87 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
88 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stdout'
89 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
90 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stderr'
91 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
92 farinafa 1.14
93 spiga 1.39 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
94 spiga 1.19
95 ewv 1.32 try:
96 spiga 1.19 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
97     except Exception, ex:
98 spiga 1.39 common.logger.debug( str(ex))
99 spiga 1.19 msg = "ERROR : Unable to create SE source interface \n"
100     raise CrabException(msg)
101 ewv 1.32 try:
102 spiga 1.19 loc = SElement("localhost", "local")
103     except Exception, ex:
104 spiga 1.39 common.logger.debug( str(ex))
105 spiga 1.19 msg = "ERROR : Unable to create destination interface \n"
106     raise CrabException(msg)
107    
108     ## copy ISB ##
109 ewv 1.51 sbi = SBinterface(seEl, loc, logger = common.logger.logger)
110 spiga 1.19
111 farinafa 1.29 filesAndJodId = {}
112 mcinquil 1.47
113     if self.storage_proto in ['globus']:
114 ewv 1.51 # construct a list of absolute paths of input files
115 mcinquil 1.47 # and the destinations to copy them to
116     sourcesList = []
117     destsList = []
118     for i in xrange(len(osbFiles)):
119     sourcesList.append(osbFiles[i])
120     destsList.append(destFiles[i])
121     #if i < len(filesToRetrieve):
122     # filesAndJodId[ filesToRetrieve[i] ] = osbFiles[i]
123    
124     # construct logging information
125     toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
126     common.logger.debug("Retrieving:\n " + toCopy)
127    
128     # try to do the copy
129     copy_res = None
130 farinafa 1.15 try:
131 spiga 1.50 copy_res = sbi.copy( sourcesList, destsList, opt="tout=300")
132 spiga 1.19 except Exception, ex:
133 ewv 1.32 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
134 spiga 1.20 msg += str(ex)
135 spiga 1.39 common.logger.debug(msg)
136 mcinquil 1.47 import traceback
137     common.logger.debug( str(traceback.format_exc()) )
138     if copy_res is not None:
139     ## evaluating copy results
140     copy_err_list = []
141     count = 0
142     for ll in map(None, copy_res, sourcesList):
143     exitcode = int(ll[0][0])
144     if exitcode == 0:
145     filesAndJodId[ filesToRetrieve[count] ] = osbFiles[count]
146     else:
147     copy_err_list.append( [ ll[1], ll[0][1] ] )
148     count += 1
149     if len(copy_err_list) > 0:
150     msg = "ERROR : Unable to retrieve output file \n"
151     for problem in copy_err_list:
152     msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
153     else:
154     # retrieve them from SE
155     for i in xrange(len(osbFiles)):
156     source = osbFiles[i]
157     dest = destFiles[i]
158     common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
159     try:
160     sbi.copy( source, dest , opt=self.copyTout)
161     if i < len(filesToRetrieve):
162     filesAndJodId[ filesToRetrieve[i] ] = dest
163     except Exception, ex:
164     msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
165     msg += str(ex)
166     common.logger.debug(msg)
167     import traceback
168     common.logger.debug( str(traceback.format_exc()) )
169     continue
170 farinafa 1.29
171     return filesAndJodId
172    
173     def notifyRetrievalToServer(self, fileAndJobList):
174 ewv 1.32 retrievedFilesJodId = []
175 farinafa 1.29
176     for jid in fileAndJobList:
177     if not os.path.exists(fileAndJobList[jid]):
178     # it means the file has been untarred
179     retrievedFilesJodId.append(jid)
180    
181 spiga 1.39 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
182 farinafa 1.14
183     # notify to the server that output have been retrieved successfully. proxy from StatusServer
184 farinafa 1.23 if len(retrievedFilesJodId) > 0:
185 spiga 1.22 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
186 farinafa 1.28 try:
187     csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
188     except Exception, e:
189 slacapra 1.43 msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
190     common.logger.debug( msg)
191 farinafa 1.28 pass
192 farinafa 1.14 return
193 spiga 1.1