ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerGlite.py
Revision: 1.51
Committed: Sat May 3 17:14:17 2008 UTC (16 years, 11 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_2_0_pre16
Changes since 1.50: +59 -0 lines
Log Message:
removed func_exit() from template. Now it is written by crab... and configured with scheduler specific needed. Restored check for OSB size if scheduler is gLite, reorganized the real scheduler (means the BLite script interacting with real scheduler) configuration parameters. Now the dictionary is prepared by different schedulers and any needed parameters can be shipped.

File Contents

# User Rev Content
1 slacapra 1.31 from SchedulerGrid import SchedulerGrid
2 spiga 1.2 from crab_logger import Logger
3     from crab_exceptions import *
4     from crab_util import *
5 fanzago 1.4 from GliteConfig import *
6 spiga 1.47 import EdgLoggingInfo
7 spiga 1.2 import common
8    
9     import os, sys, time
10    
11 slacapra 1.31 class SchedulerGlite(SchedulerGrid):
12 slacapra 1.33 def __init__(self, name="GLITE"):
13     SchedulerGrid.__init__(self,name)
14 slacapra 1.31
15 spiga 1.51 self.OSBsize = 55000
16    
17 slacapra 1.31 def configure(self,cfg_params):
18     SchedulerGrid.configure(self, cfg_params)
19 ewv 1.42 self.checkProxy()
20 slacapra 1.31 self.environment_unique_identifier = 'GLITE_WMS_JOBID'
21 slacapra 1.3
22 spiga 1.51 def realSchedParams(self,cfg_params):
23     """
24     Return dictionary with specific parameters, to use
25     with real scheduler
26     """
27     self.rb_param_file=''
28     if (cfg_params.has_key('EDG.rb')):
29     self.rb_param_file=common.scheduler.rb_configure(cfg_params.get("EDG.rb"))
30     self.wms_service=cfg_params.get("EDG.wms_service",'')
31     params = { 'service' : self.wms_service, \
32     'config' : self.rb_param_file
33     }
34     return params
35    
36    
37 fanzago 1.4 def rb_configure(self, RB):
38 slacapra 1.31 if not RB: return None
39     glite_config = None
40     rb_param_file = None
41 fanzago 1.4
42     gliteConfig = GliteConfig(RB)
43 slacapra 1.31 glite_config = gliteConfig.config()
44 fanzago 1.4
45 slacapra 1.31 if (glite_config ):
46 spiga 1.40 rb_param_file = glite_config
47 slacapra 1.31 return rb_param_file
48 fanzago 1.4
49 spiga 1.36 def ce_list(self):
50 spiga 1.2 """
51 ewv 1.42 Returns string with requirement CE related
52 spiga 1.2 """
53 ewv 1.42 req = ''
54 spiga 1.2 if self.EDG_ce_white_list:
55 mcinquil 1.32 ce_white_list = self.EDG_ce_white_list
56 slacapra 1.13 tmpCe=[]
57     concString = '&&'
58     for ce in ce_white_list:
59     tmpCe.append('RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId)')
60 mcinquil 1.20 if len(tmpCe) == 1:
61     req += " && (" + concString.join(tmpCe) + ") "
62     elif len(tmpCe) > 1:
63     firstCE = 0
64     for reqTemp in tmpCe:
65     if firstCE == 0:
66     req += " && ( (" + reqTemp + ") "
67     firstCE = 1
68     elif firstCE > 0:
69     req += " || (" + reqTemp + ") "
70     if firstCE > 0:
71     req += ") "
72 ewv 1.28
73 spiga 1.2 if self.EDG_ce_black_list:
74 mcinquil 1.32 ce_black_list = self.EDG_ce_black_list
75 slacapra 1.13 tmpCe=[]
76     concString = '&&'
77 spiga 1.2 for ce in ce_black_list:
78 slacapra 1.13 tmpCe.append('(!RegExp("' + string.strip(ce) + '", other.GlueCEUniqueId))')
79 spiga 1.36 if len(tmpCe): req += " && (" + concString.join(tmpCe) + ") "
80    
81     # requirement added to skip gliteCE
82     req += '&& (!RegExp("blah", other.GlueCEUniqueId))'
83    
84 spiga 1.41 return req,self.EDG_ce_white_list,self.EDG_ce_black_list
85 spiga 1.36
86 spiga 1.39 def se_list(self, id, dest):
87 spiga 1.36 """
88 ewv 1.42 Returns string with requirement SE related
89     """
90 spiga 1.39 hostList=self.findSites_(id,dest)
91 spiga 1.36 req=''
92     reqtmp=[]
93     concString = '||'
94    
95     for arg in hostList:
96     reqtmp.append(' Member("'+arg+'" , other.GlueCESEBindGroupSEUniqueID) ')
97    
98     if len(reqtmp): req += " && (" + concString.join(reqtmp) + ") "
99 spiga 1.11
100 spiga 1.36 return req
101    
102     def jdlParam(self):
103     """
104 ewv 1.42 Returns
105     """
106 spiga 1.36 req=''
107     if self.EDG_addJdlParam:
108 ewv 1.42 if self.EDG_addJdlParam[-1] == '': self.EDG_addJdlParam= self.EDG_addJdlParam[:-1]
109 spiga 1.36 for p in self.EDG_addJdlParam:
110     # param_file.write(string.strip(p)+';\n')
111     req+=string.strip(p)+';\n' ## BL--DS
112     return req
113    
114     def specific_req(self):
115     """
116     Returns string with specific requirements
117 ewv 1.42 """
118 spiga 1.36 req=''
119 spiga 1.2 if self.EDG_clock_time:
120 slacapra 1.13 if (not req == ' '): req = req + ' && '
121     req = req + 'other.GlueCEPolicyMaxWallClockTime>='+self.EDG_clock_time
122 spiga 1.2
123     if self.EDG_cpu_time:
124 slacapra 1.13 if (not req == ' '): req = req + ' && '
125     req = req + ' other.GlueCEPolicyMaxCPUTime>='+self.EDG_cpu_time
126 ewv 1.42
127 spiga 1.36 return req
128 ewv 1.28
129 spiga 1.39 def sched_fix_parameter(self):
130 spiga 1.36 """
131     Returns string with requirements and scheduler-specific parameters
132     """
133 ewv 1.42 index = int(common._db.nJobs())
134 spiga 1.39 job = common.job_list[index-1]
135 spiga 1.36 jbt = job.type()
136     req = ''
137     req = req + jbt.getRequirements()
138 slacapra 1.13
139 spiga 1.36 if self.EDG_requirements:
140     if (not req == ' '): req = req + ' && '
141     req = req + self.EDG_requirements
142 spiga 1.2
143 spiga 1.39 Task_Req={'jobType':req}## DS--BL
144 ewv 1.42 common._db.updateTask_(Task_Req)
145 spiga 1.39
146     def sched_parameter(self,i,task):
147     """
148     Returns string with requirements and scheduler-specific parameters
149     """
150 spiga 1.48 dest= task.jobs[i-1]['dlsDestination'] ## DS--BL
151 spiga 1.39
152     req=''
153     req +=task['jobType']
154    
155     sched_param=''
156     sched_param+='Requirements = ' + req +self.specific_req() + self.se_list(i,dest) +\
157 spiga 1.41 self.ce_list()[0] +';\n' ## BL--DS
158 spiga 1.39 if self.EDG_addJdlParam: sched_param+=self.jdlParam() ## BL--DS
159     sched_param+='MyProxyServer = "' + self.proxyServer + '";\n'
160     sched_param+='VirtualOrganisation = "' + self.VO + '";\n'
161     sched_param+='RetryCount = '+str(self.EDG_retry_count)+';\n'
162     sched_param+='ShallowRetryCount = '+str(self.EDG_shallow_retry_count)+';\n'
163    
164     return sched_param
165 ewv 1.42
166 spiga 1.47 def decodeLogInfo(self, file):
167     """
168     Parse logging info file and return main info
169     """
170     loggingInfo = EdgLoggingInfo.EdgLoggingInfo()
171     reason = loggingInfo.decodeReason(file)
172     return reason
173    
174 spiga 1.39 def findSites_(self, n, sites):
175     itr4 =[]
176     if len(sites)>0 and sites[0]=="":
177     return itr4
178     if sites != [""]:
179     ##Addedd Daniele
180     replicas = self.blackWhiteListParser.checkBlackList(sites,n)
181     if len(replicas)!=0:
182     replicas = self.blackWhiteListParser.checkWhiteList(replicas,n)
183 ewv 1.42
184 spiga 1.39 itr4 = replicas
185     #####
186     return itr4
187    
188 spiga 1.51
189     def wsExitFunc(self):
190     """
191     """
192     txt = '\n'
193    
194     txt += '#\n'
195     txt += '# EXECUTE THIS FUNCTION BEFORE EXIT \n'
196     txt += '#\n\n'
197    
198     txt += 'func_exit() { \n'
199     txt += self.wsExitFunc_common()
200     ### specific Glite check for OSB
201     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
202     txt += ' tmp_size=`ls -gGrta ${out_files}.tgz | awk \'{ print $3 }\'`\n'
203     txt += ' rm ${out_files}.tgz\n'
204     txt += ' size=`expr $tmp_size`\n'
205     txt += ' echo "Total Output dimension: $size"\n'
206     txt += ' limit='+str(self.OSBsize) +' \n'
207     txt += ' echo "WARNING: output files size limit is set to: $limit"\n'
208     txt += ' if [ $limit -lt $sum ]; then\n'
209     txt += ' exceed=1\n'
210     txt += ' job_exit_code=70000\n'
211     txt += ' echo "Output Sanbox too big. Produced output is lost "\n'
212     txt += ' else\n'
213     txt += ' exceed=0\n'
214     txt += ' echo "Total Output dimension $sum is fine."\n'
215     txt += ' fi\n'
216    
217     txt += ' echo "JOB_EXIT_STATUS = $job_exit_code"\n'
218     txt += ' echo "JobExitCode=$job_exit_code" >> $RUNTIME_AREA/$repo\n'
219     txt += ' dumpStatus $RUNTIME_AREA/$repo\n'
220     txt += ' if [ $exceed -ne 1 ]; then\n'
221     txt += ' tar zcvf ${out_files}.tgz ${final_list}\n'
222     txt += ' else\n'
223     txt += ' tar zcvf ${out_files}.tgz CMSSW_${NJob}.stdout CMSSW_${NJob}.stderr\n'
224     txt += ' fi\n'
225     txt += ' exit $job_exit_code\n'
226    
227     txt += '}\n'
228     return txt
229    
230 spiga 1.50 def userName(self):
231     """ return the user name """
232     tmp=runCommand("voms-proxy-info -identity")
233     return tmp.strip()