ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.47
Committed: Tue Oct 27 13:49:04 2009 UTC (15 years, 6 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_0_pre8, CRAB_2_7_0_pre7, CRAB_2_7_0_pre6
Branch point for: Lumi2_8
Changes since 1.46: +59 -11 lines
Log Message:
Adding list usage with globus plugin

File Contents

# User Rev Content
1 ewv 1.32 """
2     Get output for server mode
3     """
4    
5 mcinquil 1.47 __revision__ = "$Id: GetOutputServer.py,v 1.46 2009/09/24 19:10:52 ewv Exp $"
6     __version__ = "$Revision: 1.46 $"
7 ewv 1.32
8 spiga 1.16 from GetOutput import GetOutput
9     from StatusServer import StatusServer
10 spiga 1.1 from crab_util import *
11     import common
12     import time
13    
14 farinafa 1.14 from ServerCommunicator import ServerCommunicator
15    
16 spiga 1.19 from ProdCommon.Storage.SEAPI.SElement import SElement
17     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18 farinafa 1.14
19 spiga 1.18 class GetOutputServer( GetOutput, StatusServer ):
20 farinafa 1.14
21     def __init__(self, *args):
22 ewv 1.32
23     GetOutput.__init__(self, *args)
24 spiga 1.21
25 spiga 1.22 # init client server params...
26 ewv 1.32 CliServerParams(self)
27 farinafa 1.14
28 ewv 1.32 if self.storage_path[0] != '/':
29 farinafa 1.14 self.storage_path = '/'+self.storage_path
30 spiga 1.42
31 spiga 1.45 self.copyTout= setLcgTimeout()
32 spiga 1.44 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33     self.copyTout= ' '
34 spiga 1.42
35 spiga 1.1 return
36 farinafa 1.14
37 ewv 1.32
38     def getOutput(self):
39 spiga 1.1
40 farinafa 1.14 # get updated status from server #inherited from StatusServer
41     self.resynchClientSide()
42 mcinquil 1.2
43 farinafa 1.14 # understand whether the required output are available
44     self.checkBeforeGet()
45 spiga 1.18
46     # retrive files
47 farinafa 1.29 filesAndJodId = { }
48 spiga 1.18
49 farinafa 1.29 filesAndJodId.update( self.retrieveFiles(self.list_id) )
50 spiga 1.39 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
51 spiga 1.18
52 spiga 1.36 # load updated task
53     task = common._db.getTask()
54    
55     self.organizeOutput( task, self.list_id )
56 farinafa 1.29
57     self.notifyRetrievalToServer(filesAndJodId)
58 spiga 1.18 return
59    
60 ewv 1.32 def retrieveFiles(self, filesToRetrieve):
61 spiga 1.18 """
62     Real get output from server storage
63     """
64 spiga 1.19
65     self.taskuuid = str(common._db.queryTask('name'))
66 spiga 1.39 common.logger.debug( "Task name: " + self.taskuuid)
67 farinafa 1.14
68 ewv 1.32 # create the list with the actual filenames
69 spiga 1.19 remotedir = os.path.join(self.storage_path, self.taskuuid)
70 ewv 1.32
71 spiga 1.19 # list of file to retrieve
72 ewv 1.32 osbTemplate = remotedir + '/out_files_%s.tgz'
73     osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
74 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
75 ewv 1.32 osbTemplate = remotedir + '/CMSSW_%s.stdout'
76     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
77     osbTemplate = remotedir + '/CMSSW_%s.stderr'
78     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
79 spiga 1.39 common.logger.debug( "List of OSB files: " +str(osbFiles) )
80 ewv 1.32
81     copyHere = self.outDir
82     destTemplate = copyHere+'/out_files_%s.tgz'
83     destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
84 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
85 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stdout'
86 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
87 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stderr'
88 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
89 farinafa 1.14
90 spiga 1.39 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
91 spiga 1.19
92 ewv 1.32 try:
93 spiga 1.19 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
94     except Exception, ex:
95 spiga 1.39 common.logger.debug( str(ex))
96 spiga 1.19 msg = "ERROR : Unable to create SE source interface \n"
97     raise CrabException(msg)
98 ewv 1.32 try:
99 spiga 1.19 loc = SElement("localhost", "local")
100     except Exception, ex:
101 spiga 1.39 common.logger.debug( str(ex))
102 spiga 1.19 msg = "ERROR : Unable to create destination interface \n"
103     raise CrabException(msg)
104    
105     ## copy ISB ##
106     sbi = SBinterface( seEl, loc )
107    
108 farinafa 1.29 filesAndJodId = {}
109 mcinquil 1.47
110     if self.storage_proto in ['globus']:
111     #print "[moveISB_SEAPI] doing globus-url-copy"
112     # construct a list of absolute paths of input files
113     # and the destinations to copy them to
114     sourcesList = []
115     destsList = []
116     for i in xrange(len(osbFiles)):
117     sourcesList.append(osbFiles[i])
118     destsList.append(destFiles[i])
119     #if i < len(filesToRetrieve):
120     # filesAndJodId[ filesToRetrieve[i] ] = osbFiles[i]
121    
122     # construct logging information
123     toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
124     common.logger.debug("Retrieving:\n " + toCopy)
125    
126     # try to do the copy
127     copy_res = None
128 farinafa 1.15 try:
129 mcinquil 1.47 copy_res = sbi.copy( sourcesList, destsList, opt=self.copyTout)
130 spiga 1.19 except Exception, ex:
131 ewv 1.32 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
132 spiga 1.20 msg += str(ex)
133 spiga 1.39 common.logger.debug(msg)
134 mcinquil 1.47 import traceback
135     common.logger.debug( str(traceback.format_exc()) )
136     if copy_res is not None:
137     ## evaluating copy results
138     copy_err_list = []
139     count = 0
140     #print copy_res
141     for ll in map(None, copy_res, sourcesList):
142     exitcode = int(ll[0][0])
143     if exitcode == 0:
144     filesAndJodId[ filesToRetrieve[count] ] = osbFiles[count]
145     else:
146     copy_err_list.append( [ ll[1], ll[0][1] ] )
147     count += 1
148     ## now raise an exception, but the submission could be retried
149     if len(copy_err_list) > 0:
150     msg = "ERROR : Unable to retrieve output file \n"
151     for problem in copy_err_list:
152     msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
153     #raise CrabException(msg)
154     else:
155     # retrieve them from SE
156     for i in xrange(len(osbFiles)):
157     source = osbFiles[i]
158     dest = destFiles[i]
159     common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
160     try:
161     sbi.copy( source, dest , opt=self.copyTout)
162     if i < len(filesToRetrieve):
163     filesAndJodId[ filesToRetrieve[i] ] = dest
164     except Exception, ex:
165     msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
166     msg += str(ex)
167     common.logger.debug(msg)
168     import traceback
169     common.logger.debug( str(traceback.format_exc()) )
170     continue
171 farinafa 1.29
172     return filesAndJodId
173    
174     def notifyRetrievalToServer(self, fileAndJobList):
175 ewv 1.32 retrievedFilesJodId = []
176 farinafa 1.29
177     for jid in fileAndJobList:
178     if not os.path.exists(fileAndJobList[jid]):
179     # it means the file has been untarred
180     retrievedFilesJodId.append(jid)
181    
182 spiga 1.39 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
183 farinafa 1.14
184     # notify to the server that output have been retrieved successfully. proxy from StatusServer
185 farinafa 1.23 if len(retrievedFilesJodId) > 0:
186 spiga 1.22 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
187 farinafa 1.28 try:
188     csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
189     except Exception, e:
190 slacapra 1.43 msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
191     common.logger.debug( msg)
192 farinafa 1.28 pass
193 farinafa 1.14 return
194 spiga 1.1