ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerLocal.py
Revision: 1.12
Committed: Fri Apr 11 14:54:22 2008 UTC (17 years ago) by slacapra
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre7
Changes since 1.11: +9 -133 lines
Log Message:
Many changes to have LSF working with BossLite
Introduce Killer class to handle -kill which works again
Work_space::res() return the correct output directory also in case user has set a non default one, Likewise for logDir()
USER.outputdir is not to be used anywhere outside workspace class
Some cleanup in submit logic, to reduce call of Scheduler specific classes from Submitter.py
crab -clean works as well (well, almost, still need to remove twice the directory)
Fill startDirectory and outputDirectory to Task
GetOuput check status and not schedulerStatus (not stantard)
Some cleanup in the use of BlackWhiteListParser
No explicit check of scheduler concrete type in Submitter at listMatch level: move different behaviour in SchedulerXYZ implementation
Plus other things I'm forgetting...

Stefano

File Contents

# User Rev Content
1 slacapra 1.1 from Scheduler import Scheduler
2     from crab_exceptions import *
3     from crab_logger import Logger
4     import common
5    
6     import os,string
7    
8 slacapra 1.2 # Base class for all local scheduler
9 slacapra 1.1
10     class SchedulerLocal(Scheduler) :
11    
12     def configure(self, cfg_params):
13 slacapra 1.2 Scheduler.configure(self,cfg_params)
14 slacapra 1.1
15     self.jobtypeName = cfg_params['CRAB.jobtype']
16    
17     name=string.upper(self.name())
18     self.queue = cfg_params.get(name+'.queue',None)
19    
20     self.res = cfg_params.get(name+'.resource',None)
21    
22     if (cfg_params.has_key(self.name()+'.env_id')): self.environment_unique_identifier = cfg_params[self.name()+'.env_id']
23    
24 slacapra 1.11 self._taskId = common._db.queryTask('name') ## DS--BL
25 slacapra 1.1
26 slacapra 1.3 self.return_data = int(cfg_params.get('USER.return_data',0))
27    
28     self.copy_data = int(cfg_params.get("USER.copy_data",0))
29     if self.copy_data == 1:
30     self._copyCommand = cfg_params.get('USER.copyCommand','rfcp')
31     self.SE_path= cfg_params.get('USER.storage_path',None)
32     if not self.SE_path:
33     if os.environ.has_key('CASTOR_HOME'):
34     self.SE_path=os.environ['CASTOR_HOME']
35     else:
36     msg='No USER.storage_path has been provided: cannot copy_output'
37     raise CrabException(msg)
38     pass
39     pass
40 slacapra 1.10 self.SE_path+='/'
41 slacapra 1.3
42     if ( self.return_data == 0 and self.copy_data == 0 ):
43     msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
44     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
45     raise CrabException(msg)
46    
47     if ( self.return_data == 1 and self.copy_data == 1 ):
48     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
49     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
50     raise CrabException(msg)
51 slacapra 1.1
52     ## Get local domain name
53     import socket
54     tmp=socket.gethostname()
55     dot=string.find(tmp,'.')
56     if (dot==-1):
57     msg='Unkown domain name. Cannot use local scheduler'
58     raise CrabException(msg)
59     localDomainName = string.split(tmp,'.',1)[-1]
60 slacapra 1.11 #common.taskDB.setDict('localSite',localDomainName)
61 slacapra 1.1 ## is this ok?
62     cfg_params['EDG.se_white_list']=localDomainName
63     common.logger.message("Your domain name is "+str(localDomainName)+": only local dataset will be considered")
64    
65     return
66    
67     def userName(self):
68     """ return the user name """
69 slacapra 1.5 import pwd,getpass
70     tmp=pwd.getpwnam(getpass.getuser())[4]
71     return "/CN="+tmp.strip()
72 slacapra 1.1
73     def wsSetupEnvironment(self):
74     """
75     Returns part of a job script which does scheduler-specific work.
76     """
77     if not self.environment_unique_identifier:
78     raise CrabException('environment_unique_identifier not set')
79    
80 slacapra 1.12 index = int(common._db.nJobs())
81     job = common.job_list[index-1]
82     jbt = job.type()
83 slacapra 1.1 txt = '# '+self.name()+' specific stuff\n'
84     txt += '# strip arguments\n'
85     txt += 'echo "strip arguments"\n'
86     txt += 'args=("$@")\n'
87     txt += 'nargs=$#\n'
88     txt += 'shift $nargs\n'
89     txt += "# job number (first parameter for job wrapper)\n"
90 ewv 1.9 txt += "NJob=${args[0]}; export NJob\n"
91 slacapra 1.1
92 slacapra 1.12 txt += "out_files=out_files_${NJob}; export out_files\n"
93     txt += "echo $out_files\n"
94     txt += jbt.outList()
95    
96 slacapra 1.6 txt += 'SyncGridJobId=`echo '+self.environment_unique_identifier+'`\n'
97     txt += 'MonitorJobID=`echo ${NJob}_${SyncGridJobId}`\n'
98 slacapra 1.1 txt += 'MonitorID=`echo ' + self._taskId + '`\n'
99    
100     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
101     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
102     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
103 slacapra 1.7 txt += 'echo "SyncCE='+self.name()+'.`hostname -d`" | tee -a $RUNTIME_AREA/$repo \n'
104 slacapra 1.1
105     txt += 'middleware='+self.name()+' \n'
106    
107     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
108    
109 slacapra 1.12 txt += 'InputSandBox=${args[3]}\n'
110    
111 slacapra 1.1 txt += '\n\n'
112    
113     return txt
114    
115 slacapra 1.3 def wsCopyOutput(self):
116     """
117     Write a CopyResults part of a job script, e.g.
118     to copy produced output into a storage element.
119     """
120 slacapra 1.8 txt = '\n'
121     if not self.copy_data: return txt
122 slacapra 1.3
123    
124     txt += '#\n'
125     txt += '# COPY OUTPUT FILE TO '+self.SE_path
126     txt += '#\n\n'
127    
128     txt += 'export SE_PATH='+self.SE_path+'\n'
129    
130     txt += 'export CP_CMD='+self._copyCommand+'\n'
131    
132     txt += 'echo ">>> Copy output files from WN = `hostname` to PATH = $SE_PATH using $CP_CMD :"\n'
133    
134     txt += 'if [ $output_exit_status -eq 60302 ]; then\n'
135     txt += ' echo "--> No output file to copy to $SE"\n'
136     txt += ' copy_exit_status=$output_exit_status\n'
137     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
138     txt += 'else\n'
139     txt += ' for out_file in $file_list ; do\n'
140     txt += ' echo "Trying to copy output file to $SE_PATH"\n'
141     txt += ' $CP_CMD $SOFTWARE_DIR/$out_file ${SE_PATH}/$out_file\n'
142     txt += ' copy_exit_status=$?\n'
143     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
144     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
145     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
146     txt += ' echo "Problem copying $out_file to $SE $SE_PATH"\n'
147     txt += ' echo "StageOutExitStatus = $copy_exit_status " | tee -a $RUNTIME_AREA/$repo\n'
148 fanzago 1.4 #txt += ' copy_exit_status=60307\n'
149 slacapra 1.3 txt += ' else\n'
150     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
151     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
152     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
153     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
154     txt += ' fi\n'
155     txt += ' done\n'
156     txt += 'fi\n'
157     txt += 'exit_status=$copy_exit_status\n'
158    
159     return txt