ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.41
Committed: Wed Jul 1 16:03:37 2009 UTC (15 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.40: +3 -5 lines
Log Message:
removed time out. GO operation can take long time for big task. need to discuss here..

File Contents

# User Rev Content
1 ewv 1.32 """
2     Get output for server mode
3     """
4    
5 spiga 1.41 __revision__ = "$Id: GetOutputServer.py,v 1.40 2009/07/01 09:04:43 spiga Exp $"
6     __version__ = "$Revision: 1.40 $"
7 ewv 1.32
8 spiga 1.16 from GetOutput import GetOutput
9     from StatusServer import StatusServer
10 spiga 1.1 from crab_util import *
11     import common
12     import time
13    
14 farinafa 1.14 from ServerCommunicator import ServerCommunicator
15    
16 spiga 1.19 from ProdCommon.Storage.SEAPI.SElement import SElement
17     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18 farinafa 1.14
19 spiga 1.18 class GetOutputServer( GetOutput, StatusServer ):
20 farinafa 1.14
21     def __init__(self, *args):
22 ewv 1.32
23     GetOutput.__init__(self, *args)
24 spiga 1.21
25 spiga 1.22 # init client server params...
26 ewv 1.32 CliServerParams(self)
27 farinafa 1.14
28 ewv 1.32 if self.storage_path[0] != '/':
29 farinafa 1.14 self.storage_path = '/'+self.storage_path
30 spiga 1.40
31 spiga 1.1 return
32 farinafa 1.14
33 ewv 1.32
34     def getOutput(self):
35 spiga 1.1
36 farinafa 1.14 # get updated status from server #inherited from StatusServer
37     self.resynchClientSide()
38 mcinquil 1.2
39 farinafa 1.14 # understand whether the required output are available
40     self.checkBeforeGet()
41 spiga 1.18
42     # retrive files
43 farinafa 1.29 filesAndJodId = { }
44 spiga 1.18
45 farinafa 1.29 filesAndJodId.update( self.retrieveFiles(self.list_id) )
46 spiga 1.39 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
47 spiga 1.18
48 spiga 1.36 # load updated task
49     task = common._db.getTask()
50    
51     self.organizeOutput( task, self.list_id )
52 farinafa 1.29
53     self.notifyRetrievalToServer(filesAndJodId)
54 spiga 1.18 return
55    
56 ewv 1.32 def retrieveFiles(self, filesToRetrieve):
57 spiga 1.18 """
58     Real get output from server storage
59     """
60 spiga 1.19
61     self.taskuuid = str(common._db.queryTask('name'))
62 spiga 1.39 common.logger.debug( "Task name: " + self.taskuuid)
63 farinafa 1.14
64 ewv 1.32 # create the list with the actual filenames
65 spiga 1.19 remotedir = os.path.join(self.storage_path, self.taskuuid)
66 ewv 1.32
67 spiga 1.19 # list of file to retrieve
68 ewv 1.32 osbTemplate = remotedir + '/out_files_%s.tgz'
69     osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
70     if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g", "glidein"]:
71     osbTemplate = remotedir + '/CMSSW_%s.stdout'
72     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
73     osbTemplate = remotedir + '/CMSSW_%s.stderr'
74     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
75 spiga 1.39 common.logger.debug( "List of OSB files: " +str(osbFiles) )
76 ewv 1.32
77     copyHere = self.outDir
78     destTemplate = copyHere+'/out_files_%s.tgz'
79     destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
80     if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g","glidein"]:
81 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stdout'
82 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
83 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stderr'
84 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
85 farinafa 1.14
86 spiga 1.39 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
87 spiga 1.19
88 ewv 1.32 try:
89 spiga 1.19 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
90     except Exception, ex:
91 spiga 1.39 common.logger.debug( str(ex))
92 spiga 1.19 msg = "ERROR : Unable to create SE source interface \n"
93     raise CrabException(msg)
94 ewv 1.32 try:
95 spiga 1.19 loc = SElement("localhost", "local")
96     except Exception, ex:
97 spiga 1.39 common.logger.debug( str(ex))
98 spiga 1.19 msg = "ERROR : Unable to create destination interface \n"
99     raise CrabException(msg)
100    
101     ## copy ISB ##
102     sbi = SBinterface( seEl, loc )
103    
104 ewv 1.32 # retrieve them from SE
105 farinafa 1.29 filesAndJodId = {}
106 ewv 1.34 for i in xrange(len(osbFiles)):
107 ewv 1.32 source = osbFiles[i]
108 spiga 1.19 dest = destFiles[i]
109 spiga 1.39 common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
110 farinafa 1.15 try:
111 spiga 1.41 sbi.copy( source, dest )
112 ewv 1.34 if i < len(filesToRetrieve):
113     filesAndJodId[ filesToRetrieve[i] ] = dest
114 spiga 1.19 except Exception, ex:
115 ewv 1.32 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
116 spiga 1.20 msg += str(ex)
117 spiga 1.39 common.logger.debug(msg)
118 farinafa 1.15 continue
119 farinafa 1.29
120     return filesAndJodId
121    
122     def notifyRetrievalToServer(self, fileAndJobList):
123 ewv 1.32 retrievedFilesJodId = []
124 farinafa 1.29
125     for jid in fileAndJobList:
126     if not os.path.exists(fileAndJobList[jid]):
127     # it means the file has been untarred
128     retrievedFilesJodId.append(jid)
129    
130 spiga 1.39 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
131 farinafa 1.14
132     # notify to the server that output have been retrieved successfully. proxy from StatusServer
133 farinafa 1.23 if len(retrievedFilesJodId) > 0:
134 spiga 1.22 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
135 farinafa 1.28 try:
136     csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
137     except Exception, e:
138     pass
139 farinafa 1.14 return
140 spiga 1.1