ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.46
Committed: Thu Sep 24 19:10:52 2009 UTC (15 years, 7 months ago) by ewv
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_7_0_pre5, CRAB_2_7_0_pre4, CRAB_2_7_0_pre3
Changes since 1.45: +4 -4 lines
Log Message:
Don't transfer glidein files twice

File Contents

# User Rev Content
1 ewv 1.32 """
2     Get output for server mode
3     """
4    
5 ewv 1.46 __revision__ = "$Id: GetOutputServer.py,v 1.44.2.1 2009/09/16 13:26:24 spiga Exp $"
6     __version__ = "$Revision: 1.44.2.1 $"
7 ewv 1.32
8 spiga 1.16 from GetOutput import GetOutput
9     from StatusServer import StatusServer
10 spiga 1.1 from crab_util import *
11     import common
12     import time
13    
14 farinafa 1.14 from ServerCommunicator import ServerCommunicator
15    
16 spiga 1.19 from ProdCommon.Storage.SEAPI.SElement import SElement
17     from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18 farinafa 1.14
19 spiga 1.18 class GetOutputServer( GetOutput, StatusServer ):
20 farinafa 1.14
21     def __init__(self, *args):
22 ewv 1.32
23     GetOutput.__init__(self, *args)
24 spiga 1.21
25 spiga 1.22 # init client server params...
26 ewv 1.32 CliServerParams(self)
27 farinafa 1.14
28 ewv 1.32 if self.storage_path[0] != '/':
29 farinafa 1.14 self.storage_path = '/'+self.storage_path
30 spiga 1.42
31 spiga 1.45 self.copyTout= setLcgTimeout()
32 spiga 1.44 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33     self.copyTout= ' '
34 spiga 1.42
35 spiga 1.1 return
36 farinafa 1.14
37 ewv 1.32
38     def getOutput(self):
39 spiga 1.1
40 farinafa 1.14 # get updated status from server #inherited from StatusServer
41     self.resynchClientSide()
42 mcinquil 1.2
43 farinafa 1.14 # understand whether the required output are available
44     self.checkBeforeGet()
45 spiga 1.18
46     # retrive files
47 farinafa 1.29 filesAndJodId = { }
48 spiga 1.18
49 farinafa 1.29 filesAndJodId.update( self.retrieveFiles(self.list_id) )
50 spiga 1.39 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
51 spiga 1.18
52 spiga 1.36 # load updated task
53     task = common._db.getTask()
54    
55     self.organizeOutput( task, self.list_id )
56 farinafa 1.29
57     self.notifyRetrievalToServer(filesAndJodId)
58 spiga 1.18 return
59    
60 ewv 1.32 def retrieveFiles(self, filesToRetrieve):
61 spiga 1.18 """
62     Real get output from server storage
63     """
64 spiga 1.19
65     self.taskuuid = str(common._db.queryTask('name'))
66 spiga 1.39 common.logger.debug( "Task name: " + self.taskuuid)
67 farinafa 1.14
68 ewv 1.32 # create the list with the actual filenames
69 spiga 1.19 remotedir = os.path.join(self.storage_path, self.taskuuid)
70 ewv 1.32
71 spiga 1.19 # list of file to retrieve
72 ewv 1.32 osbTemplate = remotedir + '/out_files_%s.tgz'
73     osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
74 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
75 ewv 1.32 osbTemplate = remotedir + '/CMSSW_%s.stdout'
76     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
77     osbTemplate = remotedir + '/CMSSW_%s.stderr'
78     osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
79 spiga 1.39 common.logger.debug( "List of OSB files: " +str(osbFiles) )
80 ewv 1.32
81     copyHere = self.outDir
82     destTemplate = copyHere+'/out_files_%s.tgz'
83     destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
84 ewv 1.46 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g"]:
85 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stdout'
86 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
87 ewv 1.34 destTemplate = copyHere + '/CMSSW_%s.stderr'
88 ewv 1.32 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
89 farinafa 1.14
90 spiga 1.39 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
91 spiga 1.19
92 ewv 1.32 try:
93 spiga 1.19 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
94     except Exception, ex:
95 spiga 1.39 common.logger.debug( str(ex))
96 spiga 1.19 msg = "ERROR : Unable to create SE source interface \n"
97     raise CrabException(msg)
98 ewv 1.32 try:
99 spiga 1.19 loc = SElement("localhost", "local")
100     except Exception, ex:
101 spiga 1.39 common.logger.debug( str(ex))
102 spiga 1.19 msg = "ERROR : Unable to create destination interface \n"
103     raise CrabException(msg)
104    
105     ## copy ISB ##
106     sbi = SBinterface( seEl, loc )
107    
108 ewv 1.32 # retrieve them from SE
109 farinafa 1.29 filesAndJodId = {}
110 ewv 1.34 for i in xrange(len(osbFiles)):
111 ewv 1.32 source = osbFiles[i]
112 spiga 1.19 dest = destFiles[i]
113 spiga 1.39 common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
114 farinafa 1.15 try:
115 spiga 1.42 sbi.copy( source, dest , opt=self.copyTout)
116 ewv 1.34 if i < len(filesToRetrieve):
117     filesAndJodId[ filesToRetrieve[i] ] = dest
118 spiga 1.19 except Exception, ex:
119 ewv 1.32 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
120 spiga 1.20 msg += str(ex)
121 spiga 1.39 common.logger.debug(msg)
122 farinafa 1.15 continue
123 farinafa 1.29
124     return filesAndJodId
125    
126     def notifyRetrievalToServer(self, fileAndJobList):
127 ewv 1.32 retrievedFilesJodId = []
128 farinafa 1.29
129     for jid in fileAndJobList:
130     if not os.path.exists(fileAndJobList[jid]):
131     # it means the file has been untarred
132     retrievedFilesJodId.append(jid)
133    
134 spiga 1.39 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
135 farinafa 1.14
136     # notify to the server that output have been retrieved successfully. proxy from StatusServer
137 farinafa 1.23 if len(retrievedFilesJodId) > 0:
138 spiga 1.22 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
139 farinafa 1.28 try:
140     csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
141     except Exception, e:
142 slacapra 1.43 msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
143     common.logger.debug( msg)
144 farinafa 1.28 pass
145 farinafa 1.14 return
146 spiga 1.1