ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerLocal.py
Revision: 1.7
Committed: Tue Feb 5 15:25:32 2008 UTC (17 years, 2 months ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_1_0_pre4
Changes since 1.6: +2 -0 lines
Log Message:
further fix for dashboard

File Contents

# User Rev Content
1 slacapra 1.1 from Scheduler import Scheduler
2     from crab_exceptions import *
3     from crab_logger import Logger
4     import common
5    
6     import os,string
7    
8 slacapra 1.2 # Base class for all local scheduler
9 slacapra 1.1
10     class SchedulerLocal(Scheduler) :
11    
12     def configure(self, cfg_params):
13 slacapra 1.2 Scheduler.configure(self,cfg_params)
14 slacapra 1.1
15     self.jobtypeName = cfg_params['CRAB.jobtype']
16    
17     name=string.upper(self.name())
18     self.queue = cfg_params.get(name+'.queue',None)
19    
20     self.res = cfg_params.get(name+'.resource',None)
21    
22     if (cfg_params.has_key(self.name()+'.env_id')): self.environment_unique_identifier = cfg_params[self.name()+'.env_id']
23    
24     self._taskId = common.taskDB.dict('taskId')
25    
26 slacapra 1.3 self.return_data = int(cfg_params.get('USER.return_data',0))
27    
28     self.copy_data = int(cfg_params.get("USER.copy_data",0))
29     if self.copy_data == 1:
30     self._copyCommand = cfg_params.get('USER.copyCommand','rfcp')
31     self.SE_path= cfg_params.get('USER.storage_path',None)
32     self.SE_path+='/'
33     if not self.SE_path:
34     if os.environ.has_key('CASTOR_HOME'):
35     self.SE_path=os.environ['CASTOR_HOME']
36     else:
37     msg='No USER.storage_path has been provided: cannot copy_output'
38     raise CrabException(msg)
39     pass
40     pass
41    
42     if ( self.return_data == 0 and self.copy_data == 0 ):
43     msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
44     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
45     raise CrabException(msg)
46    
47     if ( self.return_data == 1 and self.copy_data == 1 ):
48     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
49     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
50     raise CrabException(msg)
51 slacapra 1.1
52     ## Get local domain name
53     import socket
54     tmp=socket.gethostname()
55     dot=string.find(tmp,'.')
56     if (dot==-1):
57     msg='Unkown domain name. Cannot use local scheduler'
58     raise CrabException(msg)
59     localDomainName = string.split(tmp,'.',1)[-1]
60 slacapra 1.7 common.taskDB.setDict('localSite',localDomainName)
61 slacapra 1.1 ## is this ok?
62     cfg_params['EDG.se_white_list']=localDomainName
63     common.logger.message("Your domain name is "+str(localDomainName)+": only local dataset will be considered")
64    
65     return
66    
67     def userName(self):
68     """ return the user name """
69 slacapra 1.5 import pwd,getpass
70     tmp=pwd.getpwnam(getpass.getuser())[4]
71     return "/CN="+tmp.strip()
72 slacapra 1.1
73     def wsSetupEnvironment(self):
74     """
75     Returns part of a job script which does scheduler-specific work.
76     """
77     if not self.environment_unique_identifier:
78     raise CrabException('environment_unique_identifier not set')
79    
80     txt = '# '+self.name()+' specific stuff\n'
81     txt += '# strip arguments\n'
82     txt += 'echo "strip arguments"\n'
83     txt += 'args=("$@")\n'
84     txt += 'nargs=$#\n'
85     txt += 'shift $nargs\n'
86     txt += "# job number (first parameter for job wrapper)\n"
87     txt += "NJob=${args[0]}\n"
88    
89 slacapra 1.6 txt += 'SyncGridJobId=`echo '+self.environment_unique_identifier+'`\n'
90     txt += 'MonitorJobID=`echo ${NJob}_${SyncGridJobId}`\n'
91 slacapra 1.1 txt += 'MonitorID=`echo ' + self._taskId + '`\n'
92    
93     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
94     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
95     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
96 slacapra 1.7 txt += 'echo "SyncCE='+self.name()+'.`hostname -d`" | tee -a $RUNTIME_AREA/$repo \n'
97 slacapra 1.1
98     txt += 'middleware='+self.name()+' \n'
99    
100     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
101    
102     txt += '\n\n'
103    
104     return txt
105    
106 slacapra 1.3 def wsCopyOutput(self):
107     """
108     Write a CopyResults part of a job script, e.g.
109     to copy produced output into a storage element.
110     """
111     if not self.copy_data: return
112    
113     txt = '\n'
114    
115     txt += '#\n'
116     txt += '# COPY OUTPUT FILE TO '+self.SE_path
117     txt += '#\n\n'
118    
119     txt += 'export SE_PATH='+self.SE_path+'\n'
120    
121     txt += 'export CP_CMD='+self._copyCommand+'\n'
122    
123     txt += 'echo ">>> Copy output files from WN = `hostname` to PATH = $SE_PATH using $CP_CMD :"\n'
124    
125     txt += 'if [ $output_exit_status -eq 60302 ]; then\n'
126     txt += ' echo "--> No output file to copy to $SE"\n'
127     txt += ' copy_exit_status=$output_exit_status\n'
128     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
129     txt += 'else\n'
130     txt += ' for out_file in $file_list ; do\n'
131     txt += ' echo "Trying to copy output file to $SE_PATH"\n'
132     txt += ' $CP_CMD $SOFTWARE_DIR/$out_file ${SE_PATH}/$out_file\n'
133     txt += ' copy_exit_status=$?\n'
134     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
135     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
136     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
137     txt += ' echo "Problem copying $out_file to $SE $SE_PATH"\n'
138     txt += ' echo "StageOutExitStatus = $copy_exit_status " | tee -a $RUNTIME_AREA/$repo\n'
139 fanzago 1.4 #txt += ' copy_exit_status=60307\n'
140 slacapra 1.3 txt += ' else\n'
141     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
142     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
143     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
144     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
145     txt += ' fi\n'
146     txt += ' done\n'
147     txt += 'fi\n'
148     txt += 'exit_status=$copy_exit_status\n'
149    
150     return txt
151    
152 slacapra 1.1 def createXMLSchScript(self, nj, argsList):
153    
154     """
155     Create a XML-file for BOSS4.
156     """
157    
158     """
159     INDY
160     [begin] FIX-ME:
161     I would pass jobType instead of job
162     """
163     index = nj - 1
164     job = common.job_list[index]
165     jbt = job.type()
166     inp_sandbox = jbt.inputSandbox(index)
167     #out_sandbox = jbt.outputSandbox(index)
168     """
169     [end] FIX-ME
170     """
171    
172    
173     title = '<?xml version="1.0" encoding="UTF-8" standalone="no"?>\n'
174     jt_string = ''
175    
176     xml_fname = str(self.jobtypeName)+'.xml'
177     xml = open(common.work_space.shareDir()+'/'+xml_fname, 'a')
178    
179     #TaskName
180     dir = string.split(common.work_space.topDir(), '/')
181     taskName = dir[len(dir)-2]
182    
183     to_write = ''
184    
185     req=' '
186     req = req + jbt.getRequirements()
187    
188     #TaskName
189     dir = string.split(common.work_space.topDir(), '/')
190     taskName = dir[len(dir)-2]
191    
192     xml.write(str(title))
193    
194     #First check the X509_USER_PROXY. In not there use the default
195     xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + '>\n')
196    
197     xml.write(jt_string)
198    
199     if (to_write != ''):
200     xml.write('<extraTags\n')
201     xml.write(to_write)
202     xml.write('/>\n')
203     pass
204    
205     xml.write('<iterator>\n')
206     xml.write('\t<iteratorRule name="ITR1">\n')
207     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ' </ruleElement>\n')
208     xml.write('\t</iteratorRule>\n')
209     xml.write('\t<iteratorRule name="ITR2">\n')
210     for arg in argsList:
211     xml.write('\t\t<ruleElement> <![CDATA[\n'+ arg + '\n\t\t]]> </ruleElement>\n')
212     pass
213     xml.write('\t</iteratorRule>\n')
214     #print jobList
215     xml.write('\t<iteratorRule name="ITR3">\n')
216     xml.write('\t\t<ruleElement> 1:'+ str(nj) + ':1:6 </ruleElement>\n')
217     xml.write('\t</iteratorRule>\n')
218    
219     xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.name())+'">\n')
220     # xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
221     xml.write(jt_string)
222    
223     #executable
224    
225     script = job.scriptFilename()
226     xml.write('<program>\n')
227     xml.write('<exec> ' + os.path.basename(script) +' </exec>\n')
228     xml.write(jt_string)
229    
230     xml.write('<args> <![CDATA[\n _ITR2_ \n]]> </args>\n')
231     xml.write('<program_types> crabjob </program_types>\n')
232     inp_box = common.work_space.pathForTgz() + 'job/' + jbt.scriptName + ','
233    
234     if inp_sandbox != None:
235     for fl in inp_sandbox:
236     inp_box = inp_box + '' + fl + ','
237     pass
238     pass
239    
240     if inp_box[-1] == ',' : inp_box = inp_box[:-1]
241     inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
242     xml.write(inp_box)
243    
244     base = jbt.name()
245     stdout = base + '__ITR3_.stdout'
246     stderr = base + '__ITR3_.stderr'
247    
248     xml.write('<stderr> ' + stderr + '</stderr>\n')
249     xml.write('<stdout> ' + stdout + '</stdout>\n')
250    
251    
252     out_box = stdout + ',' + \
253     stderr + ',.BrokerInfo,'
254    
255     # Stuff to be returned _always_ via sandbox
256     for fl in jbt.output_file_sandbox:
257     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
258     pass
259     pass
260    
261     # via sandbox iif required return_data
262     if int(self.return_data) == 1:
263     for fl in jbt.output_file:
264     out_box = out_box + '' + jbt.numberFile_(fl, '_ITR1_') + ','
265     pass
266     pass
267    
268     if out_box[-1] == ',' : out_box = out_box[:-1]
269     out_box = '<outfiles> <![CDATA[\n' + out_box + '\n]]></outfiles>\n'
270     xml.write(out_box)
271    
272     xml.write('<BossAttr> crabjob.INTERNAL_ID=_ITR1_ </BossAttr>\n')
273    
274     xml.write('</program>\n')
275     xml.write('</chain>\n')
276    
277     xml.write('</iterator>\n')
278     xml.write('</task>\n')
279    
280     xml.close()
281    
282    
283     return