ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.48
Committed: Wed Nov 11 13:13:47 2009 UTC (15 years, 5 months ago) by mcinquil
Content type: text/x-python
Branch: MAIN
CVS Tags: fede_170310, CRAB_2_7_1_pre9, CRAB_LumiMask, CRAB_2_7_lumi, from_LimiMask, CRAB_2_7_1_pre8, CRAB_2_7_1_pre6, CRAB_2_7_1_pre5, CRAB_2_7_1_wmbs_pre4, CRAB_2_7_1_pre4, CRAB_2_7_1_pre3, CRAB_2_7_1_pre2, CRAB_2_7_1_pre1, CRAB_2_7_0
Branch point for: CRAB_multiout, CRAB_2_7_1_branch
Changes since 1.47: +2 -6 lines
Log Message:
Removed deprecated comments

File Contents

# User Rev Content
1 ewv 1.32 """
2     Get output for server mode
3     """
4    
5 mcinquil 1.48 __revision__ = "$Id: GetOutputServer.py,v 1.47 2009/10/27 13:49:04 mcinquil Exp $"
6     __version__ = "$Revision: 1.47 $"
7 ewv 1.32
8 spiga 1.16 from GetOutput import GetOutput
9     from StatusServer import StatusServer
10 spiga 1.1 from crab_util import *
11     import common
12     import time
13    
14 farinafa 1.14 from ServerCommunicator import ServerCommunicator
15    
16 spiga 1.19 from ProdCommon.Storage.SEAPI.SElement import SElement
17     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18 farinafa 1.14
19 spiga 1.18 class GetOutputServer( GetOutput, StatusServer ):
20 farinafa 1.14
21     def __init__(self, *args):
22 ewv 1.32
23     GetOutput.__init__(self, *args)
24 spiga 1.21
25 spiga 1.22 # init client server params...
26 ewv 1.32 CliServerParams(self)
27 farinafa 1.14
28 ewv 1.32 if self.storage_path[0] != '/':
29 farinafa 1.14 self.storage_path = '/'+self.storage_path
30 spiga 1.42
31 spiga 1.45 self.copyTout= setLcgTimeout()
32 spiga 1.44 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33     self.copyTout= ' '
34 spiga 1.42
35 spiga 1.1 return
36 farinafa 1.14
37 ewv 1.32
38     def getOutput(self):
39 spiga 1.1
40 farinafa 1.14 # get updated status from server #inherited from StatusServer
41     self.resynchClientSide()
42 mcinquil 1.2
43 farinafa 1.14 # understand whether the required output are available
44     self.checkBeforeGet()
45 spiga 1.18
46     # retrive files
47 farinafa 1.29 filesAndJodId = { }
48 spiga 1.18
49 farinafa 1.29 filesAndJodId.update( self.retrieveFiles(self.list_id) )
50 spiga 1.39 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
51 spiga 1.18
52 spiga 1.36 # load updated task
53     task = common._db.getTask()
54    
55     self.organizeOutput( task, self.list_id )
56 farinafa 1.29
57     self.notifyRetrievalToServer(filesAndJodId)
58 spiga 1.18 return
59    
60 ewv 1.32 def retrieveFiles(self, filesToRetrieve):
61 spiga 1.18 """
62     Real get output from server storage
63     """
64 spiga 1.19
65     self.taskuuid = str(common._db.queryTask('name'))
66 spiga 1.39 common.logger.debug( "Task name: " + self.taskuuid)
67 farinafa 1.14
68 ewv 1.32 # create the list with the actual filenames
69 spiga 1.19 remotedir = os.path.join(self.storage_path, self.taskuuid)
70 ewv 1.32
71 spiga 1.19 # list of file to retrieve
72 ewv 1.32 osbTemplate = remotedir + '/out_files_%s.tgz'
73     osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
74 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
75 ewv 1.32 osbTemplate = remotedir + '/CMSSW_%s.stdout'
76     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
77     osbTemplate = remotedir + '/CMSSW_%s.stderr'
78     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
79 spiga 1.39 common.logger.debug( "List of OSB files: " +str(osbFiles) )
80 ewv 1.32
81     copyHere = self.outDir
82     destTemplate = copyHere+'/out_files_%s.tgz'
83     destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
84 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
85 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stdout'
86 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
87 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stderr'
88 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
89 farinafa 1.14
90 spiga 1.39 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
91 spiga 1.19
92 ewv 1.32 try:
93 spiga 1.19 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
94     except Exception, ex:
95 spiga 1.39 common.logger.debug( str(ex))
96 spiga 1.19 msg = "ERROR : Unable to create SE source interface \n"
97     raise CrabException(msg)
98 ewv 1.32 try:
99 spiga 1.19 loc = SElement("localhost", "local")
100     except Exception, ex:
101 spiga 1.39 common.logger.debug( str(ex))
102 spiga 1.19 msg = "ERROR : Unable to create destination interface \n"
103     raise CrabException(msg)
104    
105     ## copy ISB ##
106     sbi = SBinterface( seEl, loc )
107    
108 farinafa 1.29 filesAndJodId = {}
109 mcinquil 1.47
110     if self.storage_proto in ['globus']:
111     # construct a list of absolute paths of input files
112     # and the destinations to copy them to
113     sourcesList = []
114     destsList = []
115     for i in xrange(len(osbFiles)):
116     sourcesList.append(osbFiles[i])
117     destsList.append(destFiles[i])
118     #if i < len(filesToRetrieve):
119     # filesAndJodId[ filesToRetrieve[i] ] = osbFiles[i]
120    
121     # construct logging information
122     toCopy = "\n".join([t[0] + " to " + t[1] for t in map(None, sourcesList, destsList)]) + "\n"
123     common.logger.debug("Retrieving:\n " + toCopy)
124    
125     # try to do the copy
126     copy_res = None
127 farinafa 1.15 try:
128 mcinquil 1.47 copy_res = sbi.copy( sourcesList, destsList, opt=self.copyTout)
129 spiga 1.19 except Exception, ex:
130 ewv 1.32 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
131 spiga 1.20 msg += str(ex)
132 spiga 1.39 common.logger.debug(msg)
133 mcinquil 1.47 import traceback
134     common.logger.debug( str(traceback.format_exc()) )
135     if copy_res is not None:
136     ## evaluating copy results
137     copy_err_list = []
138     count = 0
139     for ll in map(None, copy_res, sourcesList):
140     exitcode = int(ll[0][0])
141     if exitcode == 0:
142     filesAndJodId[ filesToRetrieve[count] ] = osbFiles[count]
143     else:
144     copy_err_list.append( [ ll[1], ll[0][1] ] )
145     count += 1
146     if len(copy_err_list) > 0:
147     msg = "ERROR : Unable to retrieve output file \n"
148     for problem in copy_err_list:
149     msg += " Problem transferring [%s]: '%s'\n" %(problem[0],problem[1])
150     else:
151     # retrieve them from SE
152     for i in xrange(len(osbFiles)):
153     source = osbFiles[i]
154     dest = destFiles[i]
155     common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
156     try:
157     sbi.copy( source, dest , opt=self.copyTout)
158     if i < len(filesToRetrieve):
159     filesAndJodId[ filesToRetrieve[i] ] = dest
160     except Exception, ex:
161     msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
162     msg += str(ex)
163     common.logger.debug(msg)
164     import traceback
165     common.logger.debug( str(traceback.format_exc()) )
166     continue
167 farinafa 1.29
168     return filesAndJodId
169    
170     def notifyRetrievalToServer(self, fileAndJobList):
171 ewv 1.32 retrievedFilesJodId = []
172 farinafa 1.29
173     for jid in fileAndJobList:
174     if not os.path.exists(fileAndJobList[jid]):
175     # it means the file has been untarred
176     retrievedFilesJodId.append(jid)
177    
178 spiga 1.39 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
179 farinafa 1.14
180     # notify to the server that output have been retrieved successfully. proxy from StatusServer
181 farinafa 1.23 if len(retrievedFilesJodId) > 0:
182 spiga 1.22 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
183 farinafa 1.28 try:
184     csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
185     except Exception, e:
186 slacapra 1.43 msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
187     common.logger.debug( msg)
188 farinafa 1.28 pass
189 farinafa 1.14 return
190 spiga 1.1