ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.32
Committed: Fri Sep 19 12:24:44 2008 UTC (16 years, 7 months ago) by ewv
Content type: text/x-python
Branch: MAIN
Changes since 1.31: +42 -27 lines
Log Message:
Fetch stderr/out for Condor scheds

File Contents

# User Rev Content
1 ewv 1.32 """
2     Get output for server mode
3     """
4    
5     __revision__ = "$Id: file,v 1.27 2008/08/25 16:49:09 ewv Exp $"
6     __version__ = "$Revision: 1.27 $"
7    
8 spiga 1.16 from GetOutput import GetOutput
9     from StatusServer import StatusServer
10 spiga 1.1 from crab_util import *
11     import common
12     import time
13    
14 farinafa 1.14 from ServerCommunicator import ServerCommunicator
15    
16 spiga 1.19 from ProdCommon.Storage.SEAPI.SElement import SElement
17     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18 farinafa 1.14
19 spiga 1.18 class GetOutputServer( GetOutput, StatusServer ):
20 farinafa 1.14
21     def __init__(self, *args):
22 ewv 1.32
23     GetOutput.__init__(self, *args)
24 spiga 1.21
25 spiga 1.22 # init client server params...
26 ewv 1.32 CliServerParams(self)
27 farinafa 1.14
28 ewv 1.32 if self.storage_path[0] != '/':
29 farinafa 1.14 self.storage_path = '/'+self.storage_path
30 spiga 1.16
31 spiga 1.1 return
32 farinafa 1.14
33 ewv 1.32
34     def getOutput(self):
35 spiga 1.1
36 farinafa 1.14 # get updated status from server #inherited from StatusServer
37     self.resynchClientSide()
38 mcinquil 1.2
39 farinafa 1.14 # understand whether the required output are available
40     self.checkBeforeGet()
41 spiga 1.18
42     # retrive files
43 farinafa 1.29 filesAndJodId = { }
44 spiga 1.18
45 farinafa 1.29 filesAndJodId.update( self.retrieveFiles(self.list_id) )
46 ewv 1.32 common.logger.debug(5, "Files to be organized and notified " + str(filesAndJodId))
47 spiga 1.18
48 ewv 1.32 self.organizeOutput()
49 farinafa 1.29
50     self.notifyRetrievalToServer(filesAndJodId)
51 spiga 1.18 return
52    
53 ewv 1.32 def retrieveFiles(self, filesToRetrieve):
54 spiga 1.18 """
55     Real get output from server storage
56     """
57 spiga 1.19
58     self.taskuuid = str(common._db.queryTask('name'))
59     common.logger.debug(3, "Task name: " + self.taskuuid)
60 farinafa 1.14
61 ewv 1.32 # create the list with the actual filenames
62 spiga 1.19 remotedir = os.path.join(self.storage_path, self.taskuuid)
63 ewv 1.32
64 spiga 1.19 # list of file to retrieve
65 ewv 1.32 osbTemplate = remotedir + '/out_files_%s.tgz'
66     osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
67     if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g", "glidein"]:
68     osbTemplate = remotedir + '/CMSSW_%s.stdout'
69     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
70     osbTemplate = remotedir + '/CMSSW_%s.stderr'
71     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
72 spiga 1.19 common.logger.debug(3, "List of OSB files: " +str(osbFiles) )
73 ewv 1.32
74     #
75     copyHere = self.outDir
76     destTemplate = copyHere+'/out_files_%s.tgz'
77     destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
78     if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g","glidein"]:
79     destTemplate = remotedir + '/CMSSW_%s.stdout'
80     destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
81     osbTemplate = remotedir + '/CMSSW_%s.stderr'
82     destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
83 farinafa 1.14
84 spiga 1.19 common.logger.message("Starting retrieving output from server "+str(self.storage_name)+"...")
85    
86 ewv 1.32 try:
87 spiga 1.19 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
88     except Exception, ex:
89     common.logger.debug(1, str(ex))
90     msg = "ERROR : Unable to create SE source interface \n"
91     raise CrabException(msg)
92 ewv 1.32 try:
93 spiga 1.19 loc = SElement("localhost", "local")
94     except Exception, ex:
95     common.logger.debug(1, str(ex))
96     msg = "ERROR : Unable to create destination interface \n"
97     raise CrabException(msg)
98    
99     ## copy ISB ##
100     sbi = SBinterface( seEl, loc )
101    
102 ewv 1.32 # retrieve them from SE
103 farinafa 1.29 filesAndJodId = {}
104 ewv 1.32 for i in xrange(len(filesToRetrieve)):
105     source = osbFiles[i]
106 spiga 1.19 dest = destFiles[i]
107     common.logger.debug(1, "retrieving "+ str(source) +" to "+ str(dest) )
108 farinafa 1.15 try:
109 spiga 1.22 sbi.copy( source, dest)
110 farinafa 1.29 filesAndJodId[ filesToRetrieve[i] ] = dest
111 spiga 1.19 except Exception, ex:
112 ewv 1.32 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
113 spiga 1.20 msg += str(ex)
114     common.logger.debug(1,msg)
115 farinafa 1.15 continue
116 farinafa 1.29
117     return filesAndJodId
118    
119     def notifyRetrievalToServer(self, fileAndJobList):
120 ewv 1.32 retrievedFilesJodId = []
121 farinafa 1.29
122     for jid in fileAndJobList:
123     if not os.path.exists(fileAndJobList[jid]):
124     # it means the file has been untarred
125     retrievedFilesJodId.append(jid)
126    
127     common.logger.debug(5, "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
128 farinafa 1.14
129     # notify to the server that output have been retrieved successfully. proxy from StatusServer
130 farinafa 1.23 if len(retrievedFilesJodId) > 0:
131 spiga 1.22 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
132 farinafa 1.28 try:
133     csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
134     except Exception, e:
135     pass
136 farinafa 1.14 return
137 spiga 1.1