ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.40
Committed: Wed Jul 1 09:04:43 2009 UTC (15 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.39: +5 -3 lines
Log Message:
set a 10min timeout in the client-storage interaction

File Contents

# User Rev Content
1 ewv 1.32 """
2     Get output for server mode
3     """
4    
5 spiga 1.40 __revision__ = "$Id: GetOutputServer.py,v 1.39 2009/05/26 10:23:00 spiga Exp $"
6     __version__ = "$Revision: 1.39 $"
7 ewv 1.32
8 spiga 1.16 from GetOutput import GetOutput
9     from StatusServer import StatusServer
10 spiga 1.1 from crab_util import *
11     import common
12     import time
13    
14 farinafa 1.14 from ServerCommunicator import ServerCommunicator
15    
16 spiga 1.19 from ProdCommon.Storage.SEAPI.SElement import SElement
17     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18 farinafa 1.14
19 spiga 1.18 class GetOutputServer( GetOutput, StatusServer ):
20 farinafa 1.14
21     def __init__(self, *args):
22 ewv 1.32
23     GetOutput.__init__(self, *args)
24 spiga 1.21
25 spiga 1.22 # init client server params...
26 ewv 1.32 CliServerParams(self)
27 farinafa 1.14
28 ewv 1.32 if self.storage_path[0] != '/':
29 farinafa 1.14 self.storage_path = '/'+self.storage_path
30 spiga 1.40
31     self.copyTout = ' -t 600 '
32 spiga 1.16
33 spiga 1.1 return
34 farinafa 1.14
35 ewv 1.32
36     def getOutput(self):
37 spiga 1.1
38 farinafa 1.14 # get updated status from server #inherited from StatusServer
39     self.resynchClientSide()
40 mcinquil 1.2
41 farinafa 1.14 # understand whether the required output are available
42     self.checkBeforeGet()
43 spiga 1.18
44     # retrive files
45 farinafa 1.29 filesAndJodId = { }
46 spiga 1.18
47 farinafa 1.29 filesAndJodId.update( self.retrieveFiles(self.list_id) )
48 spiga 1.39 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
49 spiga 1.18
50 spiga 1.36 # load updated task
51     task = common._db.getTask()
52    
53     self.organizeOutput( task, self.list_id )
54 farinafa 1.29
55     self.notifyRetrievalToServer(filesAndJodId)
56 spiga 1.18 return
57    
58 ewv 1.32 def retrieveFiles(self, filesToRetrieve):
59 spiga 1.18 """
60     Real get output from server storage
61     """
62 spiga 1.19
63     self.taskuuid = str(common._db.queryTask('name'))
64 spiga 1.39 common.logger.debug( "Task name: " + self.taskuuid)
65 farinafa 1.14
66 ewv 1.32 # create the list with the actual filenames
67 spiga 1.19 remotedir = os.path.join(self.storage_path, self.taskuuid)
68 ewv 1.32
69 spiga 1.19 # list of file to retrieve
70 ewv 1.32 osbTemplate = remotedir + '/out_files_%s.tgz'
71     osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
72     if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g", "glidein"]:
73     osbTemplate = remotedir + '/CMSSW_%s.stdout'
74     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
75     osbTemplate = remotedir + '/CMSSW_%s.stderr'
76     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
77 spiga 1.39 common.logger.debug( "List of OSB files: " +str(osbFiles) )
78 ewv 1.32
79     copyHere = self.outDir
80     destTemplate = copyHere+'/out_files_%s.tgz'
81     destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
82     if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g","glidein"]:
83 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stdout'
84 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
85 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stderr'
86 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
87 farinafa 1.14
88 spiga 1.39 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
89 spiga 1.19
90 ewv 1.32 try:
91 spiga 1.19 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
92     except Exception, ex:
93 spiga 1.39 common.logger.debug( str(ex))
94 spiga 1.19 msg = "ERROR : Unable to create SE source interface \n"
95     raise CrabException(msg)
96 ewv 1.32 try:
97 spiga 1.19 loc = SElement("localhost", "local")
98     except Exception, ex:
99 spiga 1.39 common.logger.debug( str(ex))
100 spiga 1.19 msg = "ERROR : Unable to create destination interface \n"
101     raise CrabException(msg)
102    
103     ## copy ISB ##
104     sbi = SBinterface( seEl, loc )
105    
106 ewv 1.32 # retrieve them from SE
107 farinafa 1.29 filesAndJodId = {}
108 ewv 1.34 for i in xrange(len(osbFiles)):
109 ewv 1.32 source = osbFiles[i]
110 spiga 1.19 dest = destFiles[i]
111 spiga 1.39 common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
112 farinafa 1.15 try:
113 spiga 1.40 sbi.copy( source, dest, opt=self.copyTout)
114 ewv 1.34 if i < len(filesToRetrieve):
115     filesAndJodId[ filesToRetrieve[i] ] = dest
116 spiga 1.19 except Exception, ex:
117 ewv 1.32 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
118 spiga 1.20 msg += str(ex)
119 spiga 1.39 common.logger.debug(msg)
120 farinafa 1.15 continue
121 farinafa 1.29
122     return filesAndJodId
123    
124     def notifyRetrievalToServer(self, fileAndJobList):
125 ewv 1.32 retrievedFilesJodId = []
126 farinafa 1.29
127     for jid in fileAndJobList:
128     if not os.path.exists(fileAndJobList[jid]):
129     # it means the file has been untarred
130     retrievedFilesJodId.append(jid)
131    
132 spiga 1.39 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
133 farinafa 1.14
134     # notify to the server that output have been retrieved successfully. proxy from StatusServer
135 farinafa 1.23 if len(retrievedFilesJodId) > 0:
136 spiga 1.22 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
137 farinafa 1.28 try:
138     csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
139     except Exception, e:
140     pass
141 farinafa 1.14 return
142 spiga 1.1