ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/GetOutputServer.py
Revision: 1.17
Committed: Fri Apr 11 14:54:22 2008 UTC (17 years ago) by slacapra
Content type: text/x-python
Branch: MAIN
Changes since 1.16: +3 -5 lines
Log Message:
Many changes to have LSF working with BossLite
Introduce Killer class to handle -kill which works again
Work_space::res() return the correct output directory also in case user has set a non default one, Likewise for logDir()
USER.outputdir is not to be used anywhere outside workspace class
Some cleanup in submit logic, to reduce call of Scheduler specific classes from Submitter.py
crab -clean works as well (well, almost, still need to remove twice the directory)
Fill startDirectory and outputDirectory to Task
GetOuput check status and not schedulerStatus (not stantard)
Some cleanup in the use of BlackWhiteListParser
No explicit check of scheduler concrete type in Submitter at listMatch level: move different behaviour in SchedulerXYZ implementation
Plus other things I'm forgetting...

Stefano

File Contents

# User Rev Content
1 spiga 1.16 from GetOutput import GetOutput
2     from StatusServer import StatusServer
3 spiga 1.1 from crab_util import *
4     import common
5     import time
6    
7 farinafa 1.14 import traceback
8     from xml.dom import minidom
9     from ServerCommunicator import ServerCommunicator
10     from ServerConfig import *
11    
12    
13     class GetOutputServer(StatusServer, GetOutput):
14    
15     def __init__(self, *args):
16     self.cfg_params = args[0]
17     self.jobs = args[1]
18    
19     self.server_name = None
20     self.server_port = None
21    
22     self.storage_name = None
23     self.storage_path = None
24     self.storage_proto = None
25     self.storage_port = None
26 mcinquil 1.2
27 farinafa 1.14 self.srvCfg = {}
28    
29     try:
30     self.srvCfg = ServerConfig(self.cfg_params['CRAB.server_name']).config()
31     self.server_name = str(self.srvCfg['serverName'])
32     self.server_port = int(self.srvCfg['serverPort'])
33     self.storage_name = str(self.srvCfg['storageName'])
34     self.storage_path = str(self.srvCfg['storagePath'])
35     self.storage_proto = str(self.srvCfg['storageProtocol'])
36     self.storage_port = str(self.srvCfg['storagePort'])
37 spiga 1.3 except KeyError:
38 farinafa 1.14 msg = 'No server selected or port specified.'
39     msg = msg + 'Please specify a server in the crab cfg file'
40     raise CrabException(msg)
41    
42     if self.storage_path[0]!='/':
43     self.storage_path = '/'+self.storage_path
44 spiga 1.16
45 slacapra 1.17 self.outDir = common.work_space.resDir()
46     self.logDir = common.work_space.resDir()
47 spiga 1.16 self.return_data = self.cfg_params.get('USER.return_data',0)
48    
49     self.possible_status = [
50     'Undefined',
51     'Submitted',
52     'Waiting',
53     'Ready',
54     'Scheduled',
55     'Running',
56     'Done',
57     'Cancelled',
58     'Aborted',
59     'Unknown',
60     'Done(failed)'
61     'Cleared'
62     ]
63 spiga 1.1 return
64 farinafa 1.14
65 spiga 1.1 def run(self):
66     common.logger.debug(5, "GetOutput server::run() called")
67     start = time.time()
68     common.scheduler.checkProxy()
69    
70 farinafa 1.14 # get updated status from server #inherited from StatusServer
71     self.resynchClientSide()
72 mcinquil 1.2
73 farinafa 1.14 # understand whether the required output are available
74     self.checkBeforeGet()
75     filesToRetrieve = self.list_id
76    
77     # create the list with the actual filenames
78     taskuuid = str(common._db.queryTask('name'))
79     remotedir = os.path.join(self.storage_path, taskuuid)
80    
81     osbTemplate = self.storage_proto + '://'+ self.storage_name +\
82     ':' + self.storage_port + remotedir + '/out_%s.tgz'
83     osbFiles = [ osbTemplate%str(jid) for jid in filesToRetrieve ]
84    
85 slacapra 1.17 copyHere = common.work_space.resDir()
86 farinafa 1.14 destTemplate = 'file://'+copyHere+'/out_%s.tgz'
87     destFiles = [ destTemplate%str(jid) for jid in filesToRetrieve ]
88    
89     # retrieve them from SE #TODO replace this with SE-API
90     for i in xrange(len(filesToRetrieve)):
91 farinafa 1.15 try:
92     cmd = 'lcg-cp --vo cms %s %s'%(osbFiles[i], destFiles[i])
93     out = os.system(cmd +' >& /dev/null')
94     common.logger.debug(5, cmd)
95     if out != 0:
96     print "Unable to retrieve output file %s"%osbFiles[i]
97     del filesToRetrieve[i]
98     continue
99    
100     ##
101     ## TODO check if sizes are ok/the transfer was safe. Simpler with the API
102     ##
103    
104     ## clean-up SE
105     cmd = 'lcg-del --vo cms %s'%osbFiles[i]
106     out = os.system(cmd +' >& /dev/null')
107     common.logger.debug(5, cmd)
108     if out != 0:
109     print "Unable to clean up SE for output file %s. The deletion will be performed by the server"%osbFiles[i]
110     continue
111    
112     except Exception, e:
113     print "Unable to retrieve output file %s"%osbFiles[i]
114     del filesToRetrieve[i]
115     continue
116 farinafa 1.14
117     # notify to the server that output have been retrieved successfully. proxy from StatusServer
118 farinafa 1.15 if len(filesToRetrieve) > 0:
119     common.logger.debug(5, "List of retrieved files notified to server: %s"%str(filesToRetrieve) )
120     csCommunicator = ServerCommunicator(self.server_name, self.server_port, self.cfg_params, self.proxy)
121     csCommunicator.outputRetrieved(taskuuid, filesToRetrieve)
122 spiga 1.1
123 farinafa 1.14 return
124 spiga 1.1
125