ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerLocal.py
Revision: 1.19
Committed: Mon Jun 9 17:50:45 2008 UTC (16 years, 10 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.18: +1 -1 lines
Log Message:
bug fix

File Contents

# User Rev Content
1 slacapra 1.1 from Scheduler import Scheduler
2     from crab_exceptions import *
3     from crab_logger import Logger
4     import common
5    
6     import os,string
7    
8 slacapra 1.2 # Base class for all local scheduler
9 slacapra 1.1
10     class SchedulerLocal(Scheduler) :
11    
12     def configure(self, cfg_params):
13    
14     self.jobtypeName = cfg_params['CRAB.jobtype']
15    
16     name=string.upper(self.name())
17     self.queue = cfg_params.get(name+'.queue',None)
18    
19     self.res = cfg_params.get(name+'.resource',None)
20    
21     if (cfg_params.has_key(self.name()+'.env_id')): self.environment_unique_identifier = cfg_params[self.name()+'.env_id']
22    
23 spiga 1.14 self._taskId=str("_".join(common._db.queryTask('name').split('_')[:-1]))
24 slacapra 1.1
25 slacapra 1.3 self.return_data = int(cfg_params.get('USER.return_data',0))
26    
27     self.copy_data = int(cfg_params.get("USER.copy_data",0))
28     if self.copy_data == 1:
29     self._copyCommand = cfg_params.get('USER.copyCommand','rfcp')
30     self.SE_path= cfg_params.get('USER.storage_path',None)
31     if not self.SE_path:
32     if os.environ.has_key('CASTOR_HOME'):
33     self.SE_path=os.environ['CASTOR_HOME']
34     else:
35     msg='No USER.storage_path has been provided: cannot copy_output'
36     raise CrabException(msg)
37     pass
38     pass
39 slacapra 1.10 self.SE_path+='/'
40 slacapra 1.3
41     if ( self.return_data == 0 and self.copy_data == 0 ):
42     msg = 'Error: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
43     msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
44     raise CrabException(msg)
45    
46     if ( self.return_data == 1 and self.copy_data == 1 ):
47     msg = 'Error: return_data and copy_data cannot be set both to 1\n'
48     msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
49     raise CrabException(msg)
50 slacapra 1.1
51     ## Get local domain name
52     import socket
53     tmp=socket.gethostname()
54     dot=string.find(tmp,'.')
55     if (dot==-1):
56     msg='Unkown domain name. Cannot use local scheduler'
57     raise CrabException(msg)
58     localDomainName = string.split(tmp,'.',1)[-1]
59 slacapra 1.11 #common.taskDB.setDict('localSite',localDomainName)
60 slacapra 1.1 ## is this ok?
61 slacapra 1.16 if not cfg_params.has_key('EDG.se_white_list'):
62     cfg_params['EDG.se_white_list']=localDomainName
63     common.logger.message("Your domain name is "+str(localDomainName)+": only local dataset will be considered")
64     else:
65     common.logger.message("Your se_white_list is set to "+str(cfg_params['EDG.se_white_list'])+": only local dataset will be considered")
66    
67    
68    
69 slacapra 1.1
70 slacapra 1.13 Scheduler.configure(self,cfg_params)
71 slacapra 1.1 return
72    
73     def userName(self):
74     """ return the user name """
75 slacapra 1.5 import pwd,getpass
76     tmp=pwd.getpwnam(getpass.getuser())[4]
77     return "/CN="+tmp.strip()
78 slacapra 1.1
79     def wsSetupEnvironment(self):
80     """
81     Returns part of a job script which does scheduler-specific work.
82     """
83     if not self.environment_unique_identifier:
84     raise CrabException('environment_unique_identifier not set')
85    
86 slacapra 1.12 index = int(common._db.nJobs())
87     job = common.job_list[index-1]
88     jbt = job.type()
89 spiga 1.17 # start with wrapper timing
90 spiga 1.18 txt = 'export TIME_WRAP_INI=`date +%s` \n'
91 spiga 1.17 txt += 'export TIME_STAGEOUT=NULL \n\n'
92    
93 spiga 1.19 txt += '# '+self.name()+' specific stuff\n'
94 slacapra 1.1 txt += '# strip arguments\n'
95     txt += 'echo "strip arguments"\n'
96     txt += 'args=("$@")\n'
97     txt += 'nargs=$#\n'
98     txt += 'shift $nargs\n'
99     txt += "# job number (first parameter for job wrapper)\n"
100 ewv 1.9 txt += "NJob=${args[0]}; export NJob\n"
101 slacapra 1.1
102 slacapra 1.12 txt += "out_files=out_files_${NJob}; export out_files\n"
103     txt += "echo $out_files\n"
104     txt += jbt.outList()
105    
106 slacapra 1.6 txt += 'SyncGridJobId=`echo '+self.environment_unique_identifier+'`\n'
107     txt += 'MonitorJobID=`echo ${NJob}_${SyncGridJobId}`\n'
108 slacapra 1.1 txt += 'MonitorID=`echo ' + self._taskId + '`\n'
109    
110     txt += 'echo "MonitorJobID=`echo $MonitorJobID`" | tee -a $RUNTIME_AREA/$repo \n'
111     txt += 'echo "SyncGridJobId=`echo $SyncGridJobId`" | tee -a $RUNTIME_AREA/$repo \n'
112     txt += 'echo "MonitorID=`echo $MonitorID`" | tee -a $RUNTIME_AREA/$repo\n'
113 slacapra 1.7 txt += 'echo "SyncCE='+self.name()+'.`hostname -d`" | tee -a $RUNTIME_AREA/$repo \n'
114 slacapra 1.1
115     txt += 'middleware='+self.name()+' \n'
116    
117     txt += 'dumpStatus $RUNTIME_AREA/$repo \n'
118    
119 slacapra 1.12 txt += 'InputSandBox=${args[3]}\n'
120    
121 slacapra 1.1 txt += '\n\n'
122    
123     return txt
124    
125 slacapra 1.3 def wsCopyOutput(self):
126     """
127     Write a CopyResults part of a job script, e.g.
128     to copy produced output into a storage element.
129     """
130 slacapra 1.8 txt = '\n'
131     if not self.copy_data: return txt
132 slacapra 1.3
133    
134     txt += '#\n'
135     txt += '# COPY OUTPUT FILE TO '+self.SE_path
136     txt += '#\n\n'
137    
138     txt += 'export SE_PATH='+self.SE_path+'\n'
139    
140     txt += 'export CP_CMD='+self._copyCommand+'\n'
141    
142     txt += 'echo ">>> Copy output files from WN = `hostname` to PATH = $SE_PATH using $CP_CMD :"\n'
143    
144 spiga 1.15 txt += 'if [ $job_exit_code -eq 60302 ]; then\n'
145 slacapra 1.3 txt += ' echo "--> No output file to copy to $SE"\n'
146 spiga 1.15 txt += ' copy_exit_status=$job_exit_code\n'
147 slacapra 1.3 txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
148     txt += 'else\n'
149     txt += ' for out_file in $file_list ; do\n'
150     txt += ' echo "Trying to copy output file to $SE_PATH"\n'
151     txt += ' $CP_CMD $SOFTWARE_DIR/$out_file ${SE_PATH}/$out_file\n'
152     txt += ' copy_exit_status=$?\n'
153     txt += ' echo "COPY_EXIT_STATUS = $copy_exit_status"\n'
154     txt += ' echo "STAGE_OUT = $copy_exit_status"\n'
155     txt += ' if [ $copy_exit_status -ne 0 ]; then\n'
156     txt += ' echo "Problem copying $out_file to $SE $SE_PATH"\n'
157     txt += ' echo "StageOutExitStatus = $copy_exit_status " | tee -a $RUNTIME_AREA/$repo\n'
158 fanzago 1.4 #txt += ' copy_exit_status=60307\n'
159 slacapra 1.3 txt += ' else\n'
160     txt += ' echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
161     txt += ' echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
162     txt += ' echo "output copied into $SE/$SE_PATH directory"\n'
163     txt += ' echo "StageOutExitStatus = 0" | tee -a $RUNTIME_AREA/$repo\n'
164     txt += ' fi\n'
165     txt += ' done\n'
166     txt += 'fi\n'
167     txt += 'exit_status=$copy_exit_status\n'
168    
169     return txt