ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerLocal.py
Revision: 1.23
Committed: Fri Jun 27 14:21:26 2008 UTC (16 years, 10 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
Changes since 1.22: +78 -28 lines
Log Message:
move rfcp function from SchedulerLocal to cmscp in crab_template and added some changes for publication using CAF and LSF

File Contents

# User Rev Content
1 slacapra 1.1 from Scheduler import Scheduler
2     from crab_exceptions import *
3     from crab_logger import Logger
4     import common
5 fanzago 1.23 from LFNBaseName import *
6 slacapra 1.1
7     import os,string
8    
9 slacapra 1.2 # Base class for all local scheduler
10 slacapra 1.1
11     class SchedulerLocal(Scheduler) :
12    
13     def configure(self, cfg_params):
14    
15     self.jobtypeName = cfg_params['CRAB.jobtype']
16    
17     name=string.upper(self.name())
18     self.queue = cfg_params.get(name+'.queue',None)
19    
20     self.res = cfg_params.get(name+'.resource',None)
21    
22     if (cfg_params.has_key(self.name()+'.env_id')): self.environment_unique_identifier = cfg_params[self.name()+'.env_id']
23    
24 spiga 1.14 self._taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
25 slacapra 1.1
26 slacapra 1.3 self.return_data = int(cfg_params.get('USER.return_data',0))
27    
28 fanzago 1.23 ## FEDE publication options
29     self.publish_data = cfg_params.get("USER.publish_data",0)
30     if int(self.publish_data) == 1 :
31     self.publish_data_name = cfg_params.get('USER.publish_data_name',None)
32     if not self.publish_data_name and int(self.publish_data) == 1:
33     msg = "Error. The [USER] section does not have 'publish_data_name'"
34     raise CrabException(msg)
35     self.SE = cfg_params.get("USER.storage_element", None)
36     if not self.SE:
37     msg = "Error. The [USER] section does not have 'storage_element'.\n"
38     msg = msg + "Please fill this field if you want to publish your data"
39     raise CrabException(msg)
40    
41 slacapra 1.3 self.copy_data = int(cfg_params.get("USER.copy_data",0))
42     if self.copy_data == 1:
43 fanzago 1.23 #self._copyCommand = cfg_params.get('USER.copycommand','rfcp')
44     #common.logger.debug(3, "copyCommand set to "+ self._copyCommand)
45    
46     ### FEDE could be useful to have also the name of the SE in any case?
47     ##self.SE= cfg_params.get('USER.storage_element',None)
48     ###
49    
50 slacapra 1.3 self.SE_path= cfg_params.get('USER.storage_path',None)
51     if not self.SE_path:
52 fanzago 1.23 # do not allow CASTOR_HOME if publish_data is enabled
53     if int(self.publish_data) == 0 and os.environ.has_key('CASTOR_HOME'):
54 slacapra 1.3 self.SE_path=os.environ['CASTOR_HOME']
55     else:
56     msg='No USER.storage_path has been provided: cannot copy_output'
57     raise CrabException(msg)
58     pass
59     pass
60 fanzago 1.23 ### FEDE to improve the final / control
61 slacapra 1.10 self.SE_path+='/'
62 slacapra 1.3
63     if ( self.return_data == 0 and self.copy_data == 0 ):
64     msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
65     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
66     raise CrabException(msg)
67    
68     if ( self.return_data == 1 and self.copy_data == 1 ):
69     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
70     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
71     raise CrabException(msg)
72 slacapra 1.1
73     ## Get local domain name
74     import socket
75     tmp=socket.gethostname()
76     dot=string.find(tmp,'.')
77     if (dot==-1):
78     msg='Unkown domain name. Cannot use local scheduler'
79     raise CrabException(msg)
80     localDomainName = string.split(tmp,'.',1)[-1]
81 slacapra 1.11 #common.taskDB.setDict('localSite',localDomainName)
82 slacapra 1.1 ## is this ok?
83 slacapra 1.16 if not cfg_params.has_key('EDG.se_white_list'):
84     cfg_params['EDG.se_white_list']=localDomainName
85     common.logger.message("Your domain name is "+str(localDomainName)+": only local dataset will be considered")
86     else:
87     common.logger.message("Your se_white_list is set to "+str(cfg_params['EDG.se_white_list'])+": only local dataset will be considered")
88    
89 ewv 1.20
90    
91 slacapra 1.1
92 slacapra 1.13 Scheduler.configure(self,cfg_params)
93 slacapra 1.1 return
94    
95     def userName(self):
96     """ return the user name """
97 slacapra 1.5 import pwd,getpass
98     tmp=pwd.getpwnam(getpass.getuser())[4]
99     return "/CN="+tmp.strip()
100 slacapra 1.1
101     def wsSetupEnvironment(self):
102     """
103     Returns part of a job script which does scheduler-specific work.
104     """
105     if not self.environment_unique_identifier:
106     raise CrabException('environment_unique_identifier not set')
107    
108 slacapra 1.12 index = int(common._db.nJobs())
109     job = common.job_list[index-1]
110     jbt = job.type()
111 ewv 1.20 # start with wrapper timing
112 spiga 1.18 txt = 'export TIME_WRAP_INI=`date +%s` \n'
113 farinafa 1.22 txt += 'export TIME_STAGEOUT=-2 \n\n'
114 spiga 1.17
115 spiga 1.19 txt += '# '+self.name()+' specific stuff\n'
116 slacapra 1.1 txt += '# strip arguments\n'
117     txt += 'echo "strip arguments"\n'
118     txt += 'args=("$@")\n'
119     txt += 'nargs=$#\n'
120     txt += 'shift $nargs\n'
121     txt += "# job number (first parameter for job wrapper)\n"
122 ewv 1.9 txt += "NJob=${args[0]}; export NJob\n"
123 slacapra 1.1
124 slacapra 1.12 txt += "out_files=out_files_${NJob}; export out_files\n"
125     txt += "echo $out_files\n"
126     txt += jbt.outList()
127    
128 slacapra 1.6 txt += 'SyncGridJobId=`echo '+self.environment_unique_identifier+'`\n'
129     txt += 'MonitorJobID=`echo ${NJob}_${SyncGridJobId}`\n'
130 slacapra 1.1 txt += 'MonitorID=`echo ' + self._taskId + '`\n'
131    
132     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
133     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
134     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
135 slacapra 1.7 txt += 'echo "SyncCE='+self.name()+'.`hostname -d`" | tee -a $RUNTIME_AREA/$repo \n'
136 fanzago 1.23 ###### FEDE
137     txt += 'middleware='+self.name().upper()+' \n'
138 slacapra 1.1
139     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
140    
141 slacapra 1.12 txt += 'InputSandBox=${args[3]}\n'
142    
143 slacapra 1.1 txt += '\n\n'
144    
145     return txt
146    
147 slacapra 1.3 def wsCopyOutput(self):
148     """
149     Write a CopyResults part of a job script, e.g.
150     to copy produced output into a storage element.
151     """
152 slacapra 1.8 txt = '\n'
153     if not self.copy_data: return txt
154 slacapra 1.3
155    
156 fanzago 1.23
157    
158     if int(self.publish_data) == 1:
159     self.path_add = PFNportion(self.publish_data_name,LocalUser=True) +'_${PSETHASH}/'
160    
161     #### FEDE we can generalize this function for each scheluder ....
162     txt += '#\n'
163     txt += '# publication = 1 --> verify is the SE path exists '+self.SE_path+self.path_add+'\n'
164     txt += '#\n\n'
165     txt += 'verifySePath ' + self.SE_path + ' ' + self.path_add + '\n'
166    
167     self.SE_path = self.SE_path + self.path_add
168    
169    
170    
171 slacapra 1.3 txt += '#\n'
172 fanzago 1.23 txt += '# COPY OUTPUT FILE TO '+self.SE_path+ '\n'
173 slacapra 1.3 txt += '#\n\n'
174    
175 fanzago 1.23 if int(self.publish_data) == 1:
176     txt += 'export SE='+self.SE+'\n'
177 slacapra 1.3 txt += 'export SE_PATH='+self.SE_path+'\n'
178    
179 fanzago 1.23 #txt += 'export CP_CMD='+self._copyCommand+'\n'
180     #common.logger.debug(3, "Wrapper script CP_CMD set to "+ self._copyCommand)
181 slacapra 1.3
182 fanzago 1.23 #txt += 'echo ">>> Copy output files from WN = `hostname` to PATH = $SE_PATH using $CP_CMD :"\n'
183 slacapra 1.3
184 fanzago 1.23 #txt += 'if [ $job_exit_code -eq 60302 ]; then\n'
185     #txt += ' echo "--> No output file to copy to $SE"\n'
186     #txt += ' copy_exit_status=$job_exit_code\n'
187     #txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
188     #txt += 'else\n'
189     #txt += ' for out_file in $file_list ; do\n'
190     txt += 'echo ">>> Copy output files from WN = `hostname` to SE_PATH = $SE_PATH :"\n'
191     txt += 'export TIME_STAGEOUT_INI=`date +%s` \n'
192     txt += 'copy_exit_status=0\n'
193     txt += 'for out_file in $file_list ; do\n'
194     txt += ' if [ -e $SOFTWARE_DIR/$out_file ] ; then\n'
195 slacapra 1.3 txt += ' echo "Trying to copy output file to $SE_PATH"\n'
196 fanzago 1.23 #txt += ' $CP_CMD $SOFTWARE_DIR/$out_file ${SE_PATH}/$out_file\n'
197     txt += ' cmscp $middleware $SOFTWARE_DIR/$out_file $out_file ${SE_PATH}\n'
198     txt += ' if [ $cmscp_exit_status -ne 0 ]; then\n'
199     txt += ' echo "Problem copying $out_file to $SE_PATH"\n'
200     txt += ' copy_exit_status=$cmscp_exit_status\n'
201 slacapra 1.3 txt += ' else\n'
202     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
203     txt += ' fi\n'
204 fanzago 1.23 txt += ' else\n'
205     txt += ' copy_exit_status=60302\n'
206     txt += ' echo "StageOutExitStatus = $copy_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
207     txt += ' echo "StageOutExitStatusReason = file to copy not found" | tee -a $RUNTIME_AREA/$repo\n'
208     txt += ' fi\n'
209     txt += 'done\n'
210     #txt += ' copy_exit_status=$?\n'
211     #txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
212     #txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
213     txt += 'if [ $copy_exit_status -ne 0 ]; then\n'
214     txt += ' SE=""\n'
215     txt += ' SE_PATH=""\n'
216     txt += ' job_exit_code=$copy_exit_status\n'
217 slacapra 1.3 txt += 'fi\n'
218 fanzago 1.23 txt += 'export TIME_STAGEOUT_END=`date +%s` \n'
219     txt += 'let "TIME_STAGEOUT = TIME_STAGEOUT_END - TIME_STAGEOUT_INI" \n'
220 slacapra 1.3
221     return txt
222 spiga 1.21
223    
224     def wsExitFunc_comm(self):
225     """
226     """
227     txt = ''
228     txt += ' if [ $PYTHONPATH ]; then \n'
229     txt += ' if [ ! -s $RUNTIME_AREA/fillCrabFjr.py ]; then \n'
230     txt += ' echo "WARNING: it is not possible to create crab_fjr.xml to final report" \n'
231     txt += ' else \n'
232     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --errorcode $job_exit_code $executable_exit_status \n'
233     txt += ' fi\n'
234     txt += ' fi\n'
235     txt += ' cd $RUNTIME_AREA \n'
236     txt += ' for file in $filesToCheck ; do\n'
237     txt += ' if [ -e $file ]; then\n'
238     txt += ' echo "tarring file $file in $out_files"\n'
239     txt += ' else\n'
240     txt += ' echo "WARNING: output file $file not found!"\n'
241     txt += ' fi\n'
242     txt += ' done\n'
243     txt += ' export TIME_WRAP_END=`date +%s`\n'
244     txt += ' let "TIME_WRAP = TIME_WRAP_END - TIME_WRAP_END_INI" \n'
245     txt += ' if [ $PYTHONPATH ]; then \n'
246     txt += ' if [ ! -s $RUNTIME_AREA/fillCrabFjr.py ]; then \n'
247     txt += ' echo "WARNING: it is not possible to create crab_fjr.xml to final report" \n'
248     txt += ' else \n'
249     # call timing FJR filling
250     txt += ' python $RUNTIME_AREA/fillCrabFjr.py $RUNTIME_AREA/crab_fjr_$NJob.xml --timing $TIME_WRAP $TIME_EXE $TIME_STAGEOUT \n'
251     txt += ' echo "CrabWrapperTime=$TIME_WRAP" >> $RUNTIME_AREA/$repo \n'
252     txt += ' if [ $TIME_STAGEOUT -lt 0 ]; then \n'
253     txt += ' export TIME_STAGEOUT=NULL \n'
254     txt += ' fi\n'
255     txt += ' echo "CrabStageoutTime=$TIME_STAGEOUT" >> $RUNTIME_AREA/$repo \n'
256     txt += ' fi\n'
257     txt += ' fi\n'
258     txt += ' dumpStatus $RUNTIME_AREA/$repo \n\n'
259     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
260     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
261     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
262    
263     return txt
264