ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.44
Committed: Wed Jul 22 17:55:18 2009 UTC (15 years, 9 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: test_1, CRAB_2_7_0_pre1, CRAB_2_6_2, CRAB_2_6_2_pre2, CRAB_2_6_2_pre1, CRAB_2_6_1
Branch point for: CRAB_2_6_X_br
Changes since 1.43: +5 -3 lines
Log Message:
fix time out for server at caf

File Contents

# Content
1 """
2 Get output for server mode
3 """
4
5 __revision__ = "$Id: GetOutputServer.py,v 1.43 2009/07/03 10:25:46 slacapra Exp $"
6 __version__ = "$Revision: 1.43 $"
7
8 from GetOutput import GetOutput
9 from StatusServer import StatusServer
10 from crab_util import *
11 import common
12 import time
13
14 from ServerCommunicator import ServerCommunicator
15
16 from ProdCommon.Storage.SEAPI.SElement import SElement
17 from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
18
19 class GetOutputServer( GetOutput, StatusServer ):
20
21 def __init__(self, *args):
22
23 GetOutput.__init__(self, *args)
24
25 # init client server params...
26 CliServerParams(self)
27
28 if self.storage_path[0] != '/':
29 self.storage_path = '/'+self.storage_path
30
31 self.copyTout= ' -t 600 '
32 if common.scheduler.name().upper() in ['LSF', 'CAF']:
33 self.copyTout= ' '
34
35 return
36
37
38 def getOutput(self):
39
40 # get updated status from server #inherited from StatusServer
41 self.resynchClientSide()
42
43 # understand whether the required output are available
44 self.checkBeforeGet()
45
46 # retrive files
47 filesAndJodId = { }
48
49 filesAndJodId.update( self.retrieveFiles(self.list_id) )
50 common.logger.debug( "Files to be organized and notified " + str(filesAndJodId))
51
52 # load updated task
53 task = common._db.getTask()
54
55 self.organizeOutput( task, self.list_id )
56
57 self.notifyRetrievalToServer(filesAndJodId)
58 return
59
60 def retrieveFiles(self, filesToRetrieve):
61 """
62 Real get output from server storage
63 """
64
65 self.taskuuid = str(common._db.queryTask('name'))
66 common.logger.debug( "Task name: " + self.taskuuid)
67
68 # create the list with the actual filenames
69 remotedir = os.path.join(self.storage_path, self.taskuuid)
70
71 # list of file to retrieve
72 osbTemplate = remotedir + '/out_files_%s.tgz'
73 osbFiles = [osbTemplate % str(jid) for jid in filesToRetrieve]
74 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g", "glidein"]:
75 osbTemplate = remotedir + '/CMSSW_%s.stdout'
76 osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
77 osbTemplate = remotedir + '/CMSSW_%s.stderr'
78 osbFiles.extend([osbTemplate % str(jid) for jid in filesToRetrieve])
79 common.logger.debug( "List of OSB files: " +str(osbFiles) )
80
81 copyHere = self.outDir
82 destTemplate = copyHere+'/out_files_%s.tgz'
83 destFiles = [ destTemplate % str(jid) for jid in filesToRetrieve ]
84 if self.cfg_params['CRAB.scheduler'].lower() in ["condor_g","glidein"]:
85 destTemplate = copyHere + '/CMSSW_%s.stdout'
86 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
87 destTemplate = copyHere + '/CMSSW_%s.stderr'
88 destFiles.extend([destTemplate % str(jid) for jid in filesToRetrieve])
89
90 common.logger.info("Starting retrieving output from server "+str(self.storage_name)+"...")
91
92 try:
93 seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
94 except Exception, ex:
95 common.logger.debug( str(ex))
96 msg = "ERROR : Unable to create SE source interface \n"
97 raise CrabException(msg)
98 try:
99 loc = SElement("localhost", "local")
100 except Exception, ex:
101 common.logger.debug( str(ex))
102 msg = "ERROR : Unable to create destination interface \n"
103 raise CrabException(msg)
104
105 ## copy ISB ##
106 sbi = SBinterface( seEl, loc )
107
108 # retrieve them from SE
109 filesAndJodId = {}
110 for i in xrange(len(osbFiles)):
111 source = osbFiles[i]
112 dest = destFiles[i]
113 common.logger.debug( "retrieving "+ str(source) +" to "+ str(dest) )
114 try:
115 sbi.copy( source, dest , opt=self.copyTout)
116 if i < len(filesToRetrieve):
117 filesAndJodId[ filesToRetrieve[i] ] = dest
118 except Exception, ex:
119 msg = "WARNING: Unable to retrieve output file %s \n" % osbFiles[i]
120 msg += str(ex)
121 common.logger.debug(msg)
122 continue
123
124 return filesAndJodId
125
126 def notifyRetrievalToServer(self, fileAndJobList):
127 retrievedFilesJodId = []
128
129 for jid in fileAndJobList:
130 if not os.path.exists(fileAndJobList[jid]):
131 # it means the file has been untarred
132 retrievedFilesJodId.append(jid)
133
134 common.logger.debug( "List of retrieved files notified to server: %s"%str(retrievedFilesJodId) )
135
136 # notify to the server that output have been retrieved successfully. proxy from StatusServer
137 if len(retrievedFilesJodId) > 0:
138 csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params)
139 try:
140 csCommunicator.outputRetrieved(self.taskuuid, retrievedFilesJodId)
141 except Exception, e:
142 msg = "Client Server comunication failed about outputRetrieved: jobs "+(str(retrievedFilesJodId))
143 common.logger.debug( msg)
144 pass
145 return
146