ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerLocal.py
Revision: 1.1
Committed: Mon Jan 7 18:20:20 2008 UTC (17 years, 3 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre2
Log Message:
introduce a SchedulerLocal class to reuse code for local scheduler such as LSF-PBS/Torque etc, as suggested by Stijn De Weirdt (thanks!)

File Contents

# User Rev Content
1 slacapra 1.1 from Scheduler import Scheduler
2     from crab_exceptions import *
3     from crab_logger import Logger
4     import common
5    
6     import os,string
7    
8     #
9     # Naming convention:
10     # methods starting with 'ws' are responsible to provide
11     # corresponding part of the job script ('ws' stands for 'write script').
12     #
13    
14     class SchedulerLocal(Scheduler) :
15    
16     def configure(self, cfg_params):
17    
18     self.jobtypeName = cfg_params['CRAB.jobtype']
19    
20     name=string.upper(self.name())
21     self.queue = cfg_params.get(name+'.queue',None)
22    
23     self.res = cfg_params.get(name+'.resource',None)
24    
25     self.user = cfg_params.get(name+'.user',None)
26    
27     if (cfg_params.has_key(self.name()+'.env_id')): self.environment_unique_identifier = cfg_params[self.name()+'.env_id']
28    
29     self._taskId = common.taskDB.dict('taskId')
30    
31     self.return_data = 1
32    
33     ## Get local domain name
34     import socket
35     tmp=socket.gethostname()
36     dot=string.find(tmp,'.')
37     if (dot==-1):
38     msg='Unkown domain name. Cannot use local scheduler'
39     raise CrabException(msg)
40     localDomainName = string.split(tmp,'.',1)[-1]
41     ## is this ok?
42     cfg_params['EDG.se_white_list']=localDomainName
43     common.logger.message("Your domain name is "+str(localDomainName)+": only local dataset will be considered")
44    
45     return
46    
47    
48     def sched_parameter(self):
49     """
50     Returns parameter scheduler-specific, to use with BOSS .
51     """
52     index = int(common.jobDB.nJobs()) - 1
53     job = common.job_list[index]
54     jbt = job.type()
55    
56     lastBlock=-1
57     first = []
58     for n in range(common.jobDB.nJobs()):
59     currBlock=common.jobDB.block(n)
60     if (currBlock!=lastBlock):
61     lastBlock = currBlock
62     first.append(n)
63    
64     req = ''
65     req = req + jbt.getRequirements()
66    
67     for i in range(len(first)): # Add loop DS
68     groupReq = req
69     self.param='sched_param_'+str(i)+'.clad'
70     param_file = open(common.work_space.shareDir()+'/'+self.param, 'w')
71    
72     param_file.write('foo = bar;\n') ## Boss complain for empty clad
73     if (self.queue):
74     param_file.write('queue = '+self.queue +';\n')
75     if (self.res): param_file.write('requirement = '+self.res +';\n')
76     pass
77    
78     param_file.close()
79     pass
80    
81     return
82    
83     def userName(self):
84     """ return the user name """
85     if self.user:
86     return self.user
87     else:
88     import pwd,getpass
89     tmp=pwd.getpwnam(getpass.getuser())[4]
90     return tmp.strip()
91    
92     def wsSetupEnvironment(self):
93     """
94     Returns part of a job script which does scheduler-specific work.
95     """
96     if not self.environment_unique_identifier:
97     raise CrabException('environment_unique_identifier not set')
98    
99     txt = '# '+self.name()+' specific stuff\n'
100     txt += '# strip arguments\n'
101     txt += 'echo "strip arguments"\n'
102     txt += 'args=("$@")\n'
103     txt += 'nargs=$#\n'
104     txt += 'shift $nargs\n'
105     txt += "# job number (first parameter for job wrapper)\n"
106     txt += "NJob=${args[0]}\n"
107    
108     txt += 'MonitorJobID=`echo ${NJob}_$'+self.environment_unique_identifier+'`\n'
109     txt += 'SyncGridJobId=`echo $'+self.environment_unique_identifier+'`\n'
110     txt += 'MonitorID=`echo ' + self._taskId + '`\n'
111    
112     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
113     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
114     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
115    
116     txt += 'middleware='+self.name()+' \n'
117    
118     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
119    
120     txt += '\n\n'
121    
122     return txt
123    
124     def createXMLSchScript(self, nj, argsList):
125    
126     """
127     Create a XML-file for BOSS4.
128     """
129    
130     """
131     INDY
132     [begin] FIX-ME:
133     I would pass jobType instead of job
134     """
135     index = nj - 1
136     job = common.job_list[index]
137     jbt = job.type()
138     inp_sandbox = jbt.inputSandbox(index)
139     #out_sandbox = jbt.outputSandbox(index)
140     """
141     [end] FIX-ME
142     """
143    
144    
145     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
146     jt_string = ''
147    
148     xml_fname = str(self.jobtypeName)+'.xml'
149     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
150    
151     #TaskName
152     dir = string.split(common.work_space.topDir(), '/')
153     taskName = dir[len(dir)-2]
154    
155     to_write = ''
156    
157     req=' '
158     req = req + jbt.getRequirements()
159    
160     #TaskName
161     dir = string.split(common.work_space.topDir(), '/')
162     taskName = dir[len(dir)-2]
163    
164     xml.write(str(title))
165    
166     #First check the X509_USER_PROXY. In not there use the default
167     xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + '>\n')
168    
169     xml.write(jt_string)
170    
171     if (to_write != ''):
172     xml.write('<extraTags\n')
173     xml.write(to_write)
174     xml.write('/>\n')
175     pass
176    
177     xml.write('<iterator>\n')
178     xml.write('\t<iteratorRule name="ITR1">\n')
179     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
180     xml.write('\t</iteratorRule>\n')
181     xml.write('\t<iteratorRule name="ITR2">\n')
182     for arg in argsList:
183     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
184     pass
185     xml.write('\t</iteratorRule>\n')
186     #print jobList
187     xml.write('\t<iteratorRule name="ITR3">\n')
188     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
189     xml.write('\t</iteratorRule>\n')
190    
191     xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.name())+'">\n')
192     # xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
193     xml.write(jt_string)
194    
195     #executable
196    
197     script = job.scriptFilename()
198     xml.write('<program>\n')
199     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
200     xml.write(jt_string)
201    
202     xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
203     xml.write('<program_types> crabjob </program_types>\n')
204     inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
205    
206     if inp_sandbox != None:
207     for fl in inp_sandbox:
208     inp_box = inp_box + '' + fl + ','
209     pass
210     pass
211    
212     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
213     inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
214     xml.write(inp_box)
215    
216     base = jbt.name()
217     stdout = base + '__ITR3_.stdout'
218     stderr = base + '__ITR3_.stderr'
219    
220     xml.write('<stderr> ' + stderr + '</stderr>\n')
221     xml.write('<stdout> ' + stdout + '</stdout>\n')
222    
223    
224     out_box = stdout + ',' + \
225     stderr + ',.BrokerInfo,'
226    
227     # Stuff to be returned _always_ via sandbox
228     for fl in jbt.output_file_sandbox:
229     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
230     pass
231     pass
232    
233     # via sandbox iif required return_data
234     if int(self.return_data) == 1:
235     for fl in jbt.output_file:
236     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
237     pass
238     pass
239    
240     if out_box[-1] == ',' : out_box = out_box[:-1]
241     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
242     xml.write(out_box)
243    
244     xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
245    
246     xml.write('</program>\n')
247     xml.write('</chain>\n')
248    
249     xml.write('</iterator>\n')
250     xml.write('</task>\n')
251    
252     xml.close()
253    
254    
255     return