ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.7 by gutsche, Tue Jun 13 20:43:00 2006 UTC vs.
Revision 1.244 by spiga, Thu Sep 25 15:08:01 2008 UTC

# Line 2 | Line 2 | from JobType import JobType
2   from crab_logger import Logger
3   from crab_exceptions import *
4   from crab_util import *
5 + from BlackWhiteListParser import SEBlackWhiteListParser
6   import common
6 import PsetManipulator  
7
8 import DBSInfo_EDM
9 import DataDiscovery_EDM
10 import DataLocation_EDM
7   import Scram
8  
9 < import os, string, re
9 > import os, string, glob
10  
11   class Cmssw(JobType):
12 <    def __init__(self, cfg_params):
12 >    def __init__(self, cfg_params, ncjobs,skip_blocks, isNew):
13          JobType.__init__(self, 'CMSSW')
14          common.logger.debug(3,'CMSSW::__init__')
15 +        self.skip_blocks = skip_blocks
16 +
17 +        self.argsList = []
18  
20        self.analisys_common_info = {}
21        # Marco.
19          self._params = {}
20          self.cfg_params = cfg_params
21 +        # init BlackWhiteListParser
22 +        self.blackWhiteListParser = SEBlackWhiteListParser(cfg_params)
23 +
24 +        ### Temporary patch to automatically skip the ISB size check:
25 +        server=self.cfg_params.get('CRAB.server_name',None)
26 +        size = 9.5
27 +        if server: size = 99999
28 +        ### D.S.
29 +        self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',size))
30 +
31 +        # number of jobs requested to be created, limit obj splitting
32 +        self.ncjobs = ncjobs
33  
34          log = common.logger
35 <        
35 >
36          self.scram = Scram.Scram(cfg_params)
28        scramArea = ''
37          self.additional_inbox_files = []
38          self.scriptExe = ''
39          self.executable = ''
40 +        self.executable_arch = self.scram.getArch()
41          self.tgz_name = 'default.tgz'
42 +        self.scriptName = 'CMSSW.sh'
43 +        self.pset = ''
44 +        self.datasetPath = ''
45  
46 +        # set FJR file name
47 +        self.fjrFileName = 'crab_fjr.xml'
48  
49          self.version = self.scram.getSWVersion()
50 <        self.setParam_('application', self.version)
51 <        common.analisys_common_info['sw_version'] = self.version
52 <        ### FEDE
53 <        common.analisys_common_info['copy_input_data'] = 0
40 <        common.analisys_common_info['events_management'] = 1
41 <
42 <        ### collect Data cards
50 >        version_array = self.version.split('_')
51 >        self.CMSSW_major = 0
52 >        self.CMSSW_minor = 0
53 >        self.CMSSW_patch = 0
54          try:
55 <         #   self.owner = cfg_params['CMSSW.owner']
56 <         #   log.debug(6, "CMSSW::CMSSW(): owner = "+self.owner)
57 <         #   self.dataset = cfg_params['CMSSW.dataset']
58 <            self.datasetPath = cfg_params['CMSSW.datasetpath']
59 <            log.debug(6, "CMSSW::CMSSW(): datasetPath = "+self.datasetPath)
49 <        except KeyError:
50 <        #    msg = "Error: owner and/or dataset not defined "
51 <            msg = "Error: datasetpath not defined "  
55 >            self.CMSSW_major = int(version_array[1])
56 >            self.CMSSW_minor = int(version_array[2])
57 >            self.CMSSW_patch = int(version_array[3])
58 >        except:
59 >            msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
60              raise CrabException(msg)
61  
62 <        # ML monitoring
55 <        # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
56 <        datasetpath_split = self.datasetPath.split("/")
57 <        self.setParam_('dataset', datasetpath_split[1])
58 <        self.setParam_('owner', datasetpath_split[-1])
62 >        ### collect Data cards
63  
64 +        if not cfg_params.has_key('CMSSW.datasetpath'):
65 +            msg = "Error: datasetpath not defined "
66 +            raise CrabException(msg)
67  
68 +        ### Temporary: added to remove input file control in the case of PU
69 +        self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None)
70  
71 +        tmp =  cfg_params['CMSSW.datasetpath']
72 +        log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
73  
74 <        self.dataTiers = []
75 < #       try:
76 < #           tmpDataTiers = string.split(cfg_params['CMSSW.data_tier'],',')
77 < #           for tmp in tmpDataTiers:
78 < #               tmp=string.strip(tmp)
79 < #               self.dataTiers.append(tmp)
80 < #               pass
81 < #           pass
82 < #       except KeyError:
72 < #           pass
73 < #       log.debug(6, "Cmssw::Cmssw(): dataTiers = "+str(self.dataTiers))
74 >        if tmp =='':
75 >            msg = "Error: datasetpath not defined "
76 >            raise CrabException(msg)
77 >        elif string.lower(tmp)=='none':
78 >            self.datasetPath = None
79 >            self.selectNoInput = 1
80 >        else:
81 >            self.datasetPath = tmp
82 >            self.selectNoInput = 0
83  
84 +        self.dataTiers = []
85 +        self.debugWrap = ''
86 +        self.debug_wrapper = cfg_params.get('USER.debug_wrapper',False)
87 +        if self.debug_wrapper: self.debugWrap='--debug'
88          ## now the application
89 <        try:
90 <            self.executable = cfg_params['CMSSW.executable']
78 <            self.setParam_('exe', self.executable)
79 <            log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
80 <            msg = "Default executable cmsRun overridden. Switch to " + self.executable
81 <            log.debug(3,msg)
82 <        except KeyError:
83 <            self.executable = 'cmsRun'
84 <            self.setParam_('exe', self.executable)
85 <            msg = "User executable not defined. Use cmsRun"
86 <            log.debug(3,msg)
87 <            pass
89 >        self.executable = cfg_params.get('CMSSW.executable','cmsRun')
90 >        log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
91  
92 <        try:
93 <            self.pset = cfg_params['CMSSW.pset']
94 <            log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
92 >        if not cfg_params.has_key('CMSSW.pset'):
93 >            raise CrabException("PSet file missing. Cannot run cmsRun ")
94 >        self.pset = cfg_params['CMSSW.pset']
95 >        log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
96 >        if self.pset.lower() != 'none' :
97              if (not os.path.exists(self.pset)):
98                  raise CrabException("User defined PSet file "+self.pset+" does not exist")
99 <        except KeyError:
100 <            raise CrabException("PSet file missing. Cannot run cmsRun ")
99 >        else:
100 >            self.pset = None
101  
102          # output files
103 <        try:
104 <            self.output_file = []
103 >        ## stuff which must be returned always via sandbox
104 >        self.output_file_sandbox = []
105  
106 <            tmp = cfg_params['CMSSW.output_file']
107 <            if tmp != '':
108 <                tmpOutFiles = string.split(cfg_params['CMSSW.output_file'],',')
109 <                log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
110 <                for tmp in tmpOutFiles:
111 <                    tmp=string.strip(tmp)
112 <                    self.output_file.append(tmp)
113 <                    pass
114 <            else:
115 <                log.message("No output file defined: only stdout/err will be available")
116 <                pass
117 <            pass
113 <        except KeyError:
114 <            log.message("No output file defined: only stdout/err will be available")
115 <            pass
106 >        # add fjr report by default via sandbox
107 >        self.output_file_sandbox.append(self.fjrFileName)
108 >
109 >        # other output files to be returned via sandbox or copied to SE
110 >        outfileflag = False
111 >        self.output_file = []
112 >        tmp = cfg_params.get('CMSSW.output_file',None)
113 >        if tmp :
114 >            self.output_file = [x.strip() for x in tmp.split(',')]
115 >            outfileflag = True #output found
116 >        #else:
117 >        #    log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
118  
119          # script_exe file as additional file in inputSandbox
120 <        try:
121 <           self.scriptExe = cfg_params['USER.script_exe']
122 <           self.additional_inbox_files.append(self.scriptExe)
123 <        except KeyError:
124 <           pass
125 <        if self.scriptExe != '':
126 <           if os.path.isfile(self.scriptExe):
127 <              pass
128 <           else:
129 <              log.message("WARNING. file "+self.scriptExe+" not found")
130 <              sys.exit()
131 <                  
120 >        self.scriptExe = cfg_params.get('USER.script_exe',None)
121 >        if self.scriptExe :
122 >            if not os.path.isfile(self.scriptExe):
123 >                msg ="ERROR. file "+self.scriptExe+" not found"
124 >                raise CrabException(msg)
125 >            self.additional_inbox_files.append(string.strip(self.scriptExe))
126 >
127 >        if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
128 >            msg ="Error. script_exe  not defined"
129 >            raise CrabException(msg)
130 >
131 >        # use parent files...
132 >        self.useParent = self.cfg_params.get('CMSSW.use_parent',False)
133 >
134          ## additional input files
135 <        try:
136 <            tmpAddFiles = string.split(cfg_params['CMSSW.additional_input_files'],',')
135 >        if cfg_params.has_key('USER.additional_input_files'):
136 >            tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
137              for tmp in tmpAddFiles:
138 <                if not os.path.exists(tmp):
139 <                    raise CrabException("Additional input file not found: "+tmp)
140 <                tmp=string.strip(tmp)
141 <                self.additional_inbox_files.append(tmp)
138 >                tmp = string.strip(tmp)
139 >                dirname = ''
140 >                if not tmp[0]=="/": dirname = "."
141 >                files = []
142 >                if string.find(tmp,"*")>-1:
143 >                    files = glob.glob(os.path.join(dirname, tmp))
144 >                    if len(files)==0:
145 >                        raise CrabException("No additional input file found with this pattern: "+tmp)
146 >                else:
147 >                    files.append(tmp)
148 >                for file in files:
149 >                    if not os.path.exists(file):
150 >                        raise CrabException("Additional input file not found: "+file)
151 >                    pass
152 >                    self.additional_inbox_files.append(string.strip(file))
153                  pass
154              pass
155 <        except KeyError:
156 <            pass
142 <
143 <        try:
144 <            self.filesPerJob = int(cfg_params['CMSSW.files_per_jobs']) #Daniele
145 <        except KeyError:
146 <            self.filesPerJob = 1
147 <
148 <        ## Max event   will be total_number_of_events ???  Daniele
149 <        try:
150 <            self.maxEv = cfg_params['CMSSW.event_per_job']
151 <        except KeyError:
152 <            self.maxEv = "-1"
153 <        ##  
154 <        try:
155 <            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
156 <        except KeyError:
157 <            msg = 'Must define total_number_of_events'
158 <            raise CrabException(msg)
159 <        
160 <        CEBlackList = []
161 <        try:
162 <            tmpBad = string.split(cfg_params['EDG.ce_black_list'],',')
163 <            for tmp in tmpBad:
164 <                tmp=string.strip(tmp)
165 <                CEBlackList.append(tmp)
166 <        except KeyError:
167 <            pass
155 >            common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
156 >        pass
157  
158 <        self.reCEBlackList=[]
159 <        for bad in CEBlackList:
160 <            self.reCEBlackList.append(re.compile( bad ))
158 >        ## Events per job
159 >        if cfg_params.has_key('CMSSW.events_per_job'):
160 >            self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
161 >            self.selectEventsPerJob = 1
162 >        else:
163 >            self.eventsPerJob = -1
164 >            self.selectEventsPerJob = 0
165  
166 <        common.logger.debug(5,'CEBlackList: '+str(CEBlackList))
166 >        ## number of jobs
167 >        if cfg_params.has_key('CMSSW.number_of_jobs'):
168 >            self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
169 >            self.selectNumberOfJobs = 1
170 >        else:
171 >            self.theNumberOfJobs = 0
172 >            self.selectNumberOfJobs = 0
173  
174 <        CEWhiteList = []
175 <        try:
176 <            tmpGood = string.split(cfg_params['EDG.ce_white_list'],',')
177 <            for tmp in tmpGood:
178 <                tmp=string.strip(tmp)
179 <                CEWhiteList.append(tmp)
180 <        except KeyError:
181 <            pass
174 >        if cfg_params.has_key('CMSSW.total_number_of_events'):
175 >            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
176 >            self.selectTotalNumberEvents = 1
177 >            if self.selectNumberOfJobs  == 1:
178 >                if (self.total_number_of_events != -1) and int(self.total_number_of_events) < int(self.theNumberOfJobs):
179 >                    msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
180 >                    raise CrabException(msg)
181 >        else:
182 >            self.total_number_of_events = 0
183 >            self.selectTotalNumberEvents = 0
184  
185 <        #print 'CEWhiteList: ',CEWhiteList
186 <        self.reCEWhiteList=[]
187 <        for Good in CEWhiteList:
188 <            self.reCEWhiteList.append(re.compile( Good ))
185 >        if self.pset != None:
186 >             if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
187 >                 msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
188 >                 raise CrabException(msg)
189 >        else:
190 >             if (self.selectNumberOfJobs == 0):
191 >                 msg = 'Must specify  number_of_jobs.'
192 >                 raise CrabException(msg)
193 >
194 >        ## New method of dealing with seeds
195 >        self.incrementSeeds = []
196 >        self.preserveSeeds = []
197 >        if cfg_params.has_key('CMSSW.preserve_seeds'):
198 >            tmpList = cfg_params['CMSSW.preserve_seeds'].split(',')
199 >            for tmp in tmpList:
200 >                tmp.strip()
201 >                self.preserveSeeds.append(tmp)
202 >        if cfg_params.has_key('CMSSW.increment_seeds'):
203 >            tmpList = cfg_params['CMSSW.increment_seeds'].split(',')
204 >            for tmp in tmpList:
205 >                tmp.strip()
206 >                self.incrementSeeds.append(tmp)
207 >
208 >        ## FUTURE: Can remove in CRAB 2.4.0
209 >        self.sourceSeed    = cfg_params.get('CMSSW.pythia_seed',None)
210 >        self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
211 >        self.sourceSeedG4  = cfg_params.get('CMSSW.g4_seed',None)
212 >        self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
213 >        if self.sourceSeed or self.sourceSeedVtx or self.sourceSeedG4 or self.sourceSeedMix:
214 >            msg = 'pythia_seed, vtx_seed, g4_seed, and mix_seed are no longer valid settings. You must use increment_seeds or preserve_seeds'
215 >            raise CrabException(msg)
216  
217 <        common.logger.debug(5,'CEWhiteList: '+str(CEWhiteList))
217 >        self.firstRun = cfg_params.get('CMSSW.first_run',None)
218  
219 <        self.PsetEdit = PsetManipulator.PsetManipulator(self.pset) #Daniele Pset
219 >        # Copy/return
220 >        self.copy_data = int(cfg_params.get('USER.copy_data',0))
221 >        self.return_data = int(cfg_params.get('USER.return_data',0))
222  
223          #DBSDLS-start
224 <        ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
224 >        ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
225          self.maxEvents=0  # max events available   ( --> check the requested nb. of evts in Creator.py)
226          self.DBSPaths={}  # all dbs paths requested ( --> input to the site local discovery script)
227 +        self.jobDestination=[]  # Site destination(s) for each job (list of lists)
228          ## Perform the data location and discovery (based on DBS/DLS)
229 <        self.DataDiscoveryAndLocation(cfg_params)
230 <        #DBSDLS-end          
231 <
232 <        self.tgzNameWithPath = self.getTarBall(self.executable)
233 <
234 <        self.jobSplitting()  #Daniele job Splitting
235 <        self.PsetEdit.maxEvent(self.maxEv) #Daniele  
236 <        self.PsetEdit.inputModule("INPUT") #Daniele  
237 <        self.PsetEdit.psetWriter(self.configFilename())
238 <        
229 >        ## SL: Don't if NONE is specified as input (pythia use case)
230 >        blockSites = {}
231 >        if self.datasetPath:
232 >            blockSites = self.DataDiscoveryAndLocation(cfg_params)
233 >        #DBSDLS-end
234 >
235 >        ## Select Splitting
236 >        if self.selectNoInput:
237 >            if self.pset == None:
238 >                self.jobSplittingForScript()
239 >            else:
240 >                self.jobSplittingNoInput()
241 >        elif (cfg_params.get('CMSSW.noblockboundary',0)):
242 >            self.jobSplittingNoBlockBoundary(blockSites)
243 >        else:
244 >            self.jobSplittingByBlocks(blockSites)
245  
246 +        # modify Pset only the first time
247 +        if isNew:
248 +            if self.pset != None:
249 +                import PsetManipulator as pp
250 +                PsetEdit = pp.PsetManipulator(self.pset)
251 +                try:
252 +                    # Add FrameworkJobReport to parameter-set, set max events.
253 +                    # Reset later for data jobs by writeCFG which does all modifications
254 +                    PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
255 +                    PsetEdit.maxEvent(self.eventsPerJob)
256 +                    PsetEdit.psetWriter(self.configFilename())
257 +                    ## If present, add TFileService to output files
258 +                    if not int(cfg_params.get('CMSSW.skip_TFileService_output',0)):
259 +                        tfsOutput = PsetEdit.getTFileService()
260 +                        if tfsOutput:
261 +                            if tfsOutput in self.output_file:
262 +                                common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files")
263 +                            else:
264 +                                outfileflag = True #output found
265 +                                self.output_file.append(tfsOutput)
266 +                                common.logger.message("Adding "+tfsOutput+" to output files (from TFileService)")
267 +                            pass
268 +                        pass
269 +                    ## If present and requested, add PoolOutputModule to output files
270 +                    if int(cfg_params.get('CMSSW.get_edm_output',0)):
271 +                        edmOutput = PsetEdit.getPoolOutputModule()
272 +                        if edmOutput:
273 +                            if edmOutput in self.output_file:
274 +                                common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files")
275 +                            else:
276 +                                self.output_file.append(edmOutput)
277 +                                common.logger.message("Adding "+edmOutput+" to output files (from PoolOutputModule)")
278 +                            pass
279 +                        pass
280 +                except CrabException:
281 +                    msg='Error while manipulating ParameterSet: exiting...'
282 +                    raise CrabException(msg)
283 +            ## Prepare inputSandbox TarBall (only the first time)
284 +            self.tgzNameWithPath = self.getTarBall(self.executable)
285  
286      def DataDiscoveryAndLocation(self, cfg_params):
287  
288 +        import DataDiscovery
289 +        import DataLocation
290          common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
291  
214        #datasetPath = "/"+self.owner+"/"+self.dataTiers[0]+"/"+self.dataset
215        
292          datasetPath=self.datasetPath
293  
218        ## TODO
219        dataTiersList = ""
220        dataTiers = dataTiersList.split(',')
221
294          ## Contact the DBS
295 +        common.logger.message("Contacting Data Discovery Services ...")
296          try:
297 <            self.pubdata=DataDiscovery_EDM.DataDiscovery_EDM(datasetPath, dataTiers, cfg_params)
297 >            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks)
298              self.pubdata.fetchDBSInfo()
299  
300 <        except DataDiscovery_EDM.NotExistingDatasetError, ex :
300 >        except DataDiscovery.NotExistingDatasetError, ex :
301              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
302              raise CrabException(msg)
303 <
231 <        except DataDiscovery_EDM.NoDataTierinProvenanceError, ex :
303 >        except DataDiscovery.NoDataTierinProvenanceError, ex :
304              msg = 'ERROR ***: failed Data Discovery in DBS : %s'%ex.getErrorMessage()
305              raise CrabException(msg)
306 <        except DataDiscovery_EDM.DataDiscoveryError, ex:
307 <            msg = 'ERROR ***: failed Data Discovery in DBS  %s'%ex.getErrorMessage()
306 >        except DataDiscovery.DataDiscoveryError, ex:
307 >            msg = 'ERROR ***: failed Data Discovery in DBS :  %s'%ex.getErrorMessage()
308              raise CrabException(msg)
309  
310 <        ## get list of all required data in the form of dbs paths  (dbs path = /dataset/datatier/owner)
311 <        ## self.DBSPaths=self.pubdata.getDBSPaths()
312 <        common.logger.message("Required data are :"+self.datasetPath)
313 <
242 <        filesbyblock=self.pubdata.getFiles()
243 <        self.AllInputFiles=filesbyblock.values()
244 <        self.files = self.AllInputFiles        
245 <
246 <        ## TEMP
247 <    #    self.filesTmp = filesbyblock.values()
248 <    #    self.files = []
249 <    #    locPath='rfio:cmsbose2.bo.infn.it:/flatfiles/SE00/cms/fanfani/ProdTest/'
250 <    #    locPath=''
251 <    #    tmp = []
252 <    #    for file in self.filesTmp[0]:
253 <    #        tmp.append(locPath+file)
254 <    #    self.files.append(tmp)
255 <        ## END TEMP
310 >        self.filesbyblock=self.pubdata.getFiles()
311 >        self.eventsbyblock=self.pubdata.getEventsPerBlock()
312 >        self.eventsbyfile=self.pubdata.getEventsPerFile()
313 >        self.parentFiles=self.pubdata.getParent()
314  
315          ## get max number of events
316 <        #common.logger.debug(10,"number of events for primary fileblocks %i"%self.pubdata.getMaxEvents())
259 <        self.maxEvents=self.pubdata.getMaxEvents() ##  self.maxEvents used in Creator.py
260 <        common.logger.message("\nThe number of available events is %s"%self.maxEvents)
316 >        self.maxEvents=self.pubdata.getMaxEvents()
317  
318          ## Contact the DLS and build a list of sites hosting the fileblocks
319          try:
320 <            dataloc=DataLocation_EDM.DataLocation_EDM(filesbyblock.keys(),cfg_params)
320 >            dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
321              dataloc.fetchDLSInfo()
322 <        except DataLocation_EDM.DataLocationError , ex:
322 >        except DataLocation.DataLocationError , ex:
323              msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
324              raise CrabException(msg)
325 <        
326 <        allsites=dataloc.getSites()
327 <        common.logger.debug(5,"sites are %s"%allsites)
328 <        sites=self.checkBlackList(allsites)
329 <        common.logger.debug(5,"sites are (after black list) %s"%sites)
330 <        sites=self.checkWhiteList(sites)
331 <        common.logger.debug(5,"sites are (after white list) %s"%sites)
332 <
333 <        if len(sites)==0:
334 <            msg = 'No sites hosting all the needed data! Exiting... '
335 <            raise CrabException(msg)
336 <
337 <        common.logger.message("List of Sites hosting the data : "+str(sites))
338 <        common.logger.debug(6, "List of Sites: "+str(sites))
339 <        common.analisys_common_info['sites']=sites    ## used in SchedulerEdg.py in createSchScript
340 <        self.setParam_('TargetCE', ','.join(sites))
285 <        return
286 <    
287 <    def jobSplitting(self):
325 >
326 >
327 >        sites = dataloc.getSites()
328 >        allSites = []
329 >        listSites = sites.values()
330 >        for listSite in listSites:
331 >            for oneSite in listSite:
332 >                allSites.append(oneSite)
333 >        allSites = self.uniquelist(allSites)
334 >
335 >        # screen output
336 >        common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
337 >
338 >        return sites
339 >
340 >    def jobSplittingByBlocks(self, blockSites):
341          """
342 <        first implemntation for job splitting  
343 <        """    
344 <      #  print 'eventi totali '+str(self.maxEvents)
345 <      #  print 'eventi totali richiesti dallo user '+str(self.total_number_of_events)
346 <        #print 'files per job '+str(self.filesPerJob)
347 <        common.logger.message('Required '+str(self.filesPerJob)+' files per job ')
348 <        common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
349 <
350 <        ## TODO: SL need to have (from DBS) a detailed list of how many events per each file
351 <        n_tot_files = (len(self.files[0]))
352 <        ## SL: this is wrong if the files have different number of events
353 <        evPerFile = int(self.maxEvents)/n_tot_files
354 <        
355 <        common.logger.debug(5,'Events per File '+str(evPerFile))
356 <
357 <        ## if asked to process all events, do it
358 <        if self.total_number_of_events == -1:
359 <            self.total_number_of_events=self.maxEvents
360 <            self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
361 <            common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for all available events '+str(self.total_number_of_events)+' events')
362 <        
363 <        else:
364 <            self.total_number_of_files = int(self.total_number_of_events/evPerFile)
365 <            ## SL: if ask for less event than what is computed to be available on a
366 <            ##     file, process the first file anyhow.
367 <            if self.total_number_of_files == 0:
368 <                self.total_number_of_files = self.total_number_of_files + 1
369 <
370 <            common.logger.debug(5,'N files  '+str(self.total_number_of_files))
318 <
319 <            check = 0
320 <            
321 <            ## Compute the number of jobs
322 <            #self.total_number_of_jobs = int(n_tot_files)*1/int(self.filesPerJob)
323 <            self.total_number_of_jobs = int(self.total_number_of_files/self.filesPerJob)
324 <            common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
325 <
326 <            ## is there any remainder?
327 <            check = int(self.total_number_of_files) - (int(self.total_number_of_jobs)*self.filesPerJob)
328 <
329 <            common.logger.debug(5,'Check  '+str(check))
330 <
331 <            if check > 0:
332 <                self.total_number_of_jobs =  self.total_number_of_jobs + 1
333 <                common.logger.message('Warning: last job will be created with '+str(check)+' files')
342 >        Perform job splitting. Jobs run over an integer number of files
343 >        and no more than one block.
344 >        ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
345 >        REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
346 >                  self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
347 >                  self.maxEvents, self.filesbyblock
348 >        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
349 >              self.total_number_of_jobs - Total # of jobs
350 >              self.list_of_args - File(s) job will run on (a list of lists)
351 >        """
352 >
353 >        # ---- Handle the possible job splitting configurations ---- #
354 >        if (self.selectTotalNumberEvents):
355 >            totalEventsRequested = self.total_number_of_events
356 >        if (self.selectEventsPerJob):
357 >            eventsPerJobRequested = self.eventsPerJob
358 >            if (self.selectNumberOfJobs):
359 >                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
360 >
361 >        # If user requested all the events in the dataset
362 >        if (totalEventsRequested == -1):
363 >            eventsRemaining=self.maxEvents
364 >        # If user requested more events than are in the dataset
365 >        elif (totalEventsRequested > self.maxEvents):
366 >            eventsRemaining = self.maxEvents
367 >            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
368 >        # If user requested less events than are in the dataset
369 >        else:
370 >            eventsRemaining = totalEventsRequested
371  
372 <            common.logger.message(str(self.total_number_of_jobs)+' jobs will be created for a total of '+str((self.total_number_of_jobs-1)*self.filesPerJob*evPerFile + check*evPerFile)+' events')
373 <            pass
372 >        # If user requested more events per job than are in the dataset
373 >        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
374 >            eventsPerJobRequested = self.maxEvents
375 >
376 >        # For user info at end
377 >        totalEventCount = 0
378 >
379 >        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
380 >            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
381 >
382 >        if (self.selectNumberOfJobs):
383 >            common.logger.message("May not create the exact number_of_jobs requested.")
384 >
385 >        if ( self.ncjobs == 'all' ) :
386 >            totalNumberOfJobs = 999999999
387 >        else :
388 >            totalNumberOfJobs = self.ncjobs
389  
390 +        blocks = blockSites.keys()
391 +        blockCount = 0
392 +        # Backup variable in case self.maxEvents counted events in a non-included block
393 +        numBlocksInDataset = len(blocks)
394 +
395 +        jobCount = 0
396 +        list_of_lists = []
397 +
398 +        # list tracking which jobs are in which jobs belong to which block
399 +        jobsOfBlock = {}
400 +
401 +        # ---- Iterate over the blocks in the dataset until ---- #
402 +        # ---- we've met the requested total # of events    ---- #
403 +        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
404 +            block = blocks[blockCount]
405 +            blockCount += 1
406 +            if block not in jobsOfBlock.keys() :
407 +                jobsOfBlock[block] = []
408 +
409 +            if self.eventsbyblock.has_key(block) :
410 +                numEventsInBlock = self.eventsbyblock[block]
411 +                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
412 +
413 +                files = self.filesbyblock[block]
414 +                numFilesInBlock = len(files)
415 +                if (numFilesInBlock <= 0):
416 +                    continue
417 +                fileCount = 0
418 +
419 +                # ---- New block => New job ---- #
420 +                parString = ""
421 +                # counter for number of events in files currently worked on
422 +                filesEventCount = 0
423 +                # flag if next while loop should touch new file
424 +                newFile = 1
425 +                # job event counter
426 +                jobSkipEventCount = 0
427 +
428 +                # ---- Iterate over the files in the block until we've met the requested ---- #
429 +                # ---- total # of events or we've gone over all the files in this block  ---- #
430 +                pString=''
431 +                while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
432 +                    file = files[fileCount]
433 +                    if self.useParent:
434 +                        parent = self.parentFiles[file]
435 +                        for f in parent :
436 +                            pString += '\\\"' + f + '\\\"\,'
437 +                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
438 +                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
439 +                    if newFile :
440 +                        try:
441 +                            numEventsInFile = self.eventsbyfile[file]
442 +                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
443 +                            # increase filesEventCount
444 +                            filesEventCount += numEventsInFile
445 +                            # Add file to current job
446 +                            parString += '\\\"' + file + '\\\"\,'
447 +                            newFile = 0
448 +                        except KeyError:
449 +                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
450 +
451 +                    eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
452 +                    # if less events in file remain than eventsPerJobRequested
453 +                    if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
454 +                        # if last file in block
455 +                        if ( fileCount == numFilesInBlock-1 ) :
456 +                            # end job using last file, use remaining events in block
457 +                            # close job and touch new file
458 +                            fullString = parString[:-2]
459 +                            if self.useParent:
460 +                                fullParentString = pString[:-2]
461 +                                list_of_lists.append([fullString,fullParentString,str(-1),str(jobSkipEventCount)])
462 +                            else:
463 +                                list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
464 +                            common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
465 +                            self.jobDestination.append(blockSites[block])
466 +                            common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
467 +                            # fill jobs of block dictionary
468 +                            jobsOfBlock[block].append(jobCount+1)
469 +                            # reset counter
470 +                            jobCount = jobCount + 1
471 +                            totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
472 +                            eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
473 +                            jobSkipEventCount = 0
474 +                            # reset file
475 +                            pString = ""
476 +                            parString = ""
477 +                            filesEventCount = 0
478 +                            newFile = 1
479 +                            fileCount += 1
480 +                        else :
481 +                            # go to next file
482 +                            newFile = 1
483 +                            fileCount += 1
484 +                    # if events in file equal to eventsPerJobRequested
485 +                    elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
486 +                        # close job and touch new file
487 +                        fullString = parString[:-2]
488 +                        if self.useParent:
489 +                            fullParentString = pString[:-2]
490 +                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
491 +                        else:
492 +                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
493 +                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
494 +                        self.jobDestination.append(blockSites[block])
495 +                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
496 +                        jobsOfBlock[block].append(jobCount+1)
497 +                        # reset counter
498 +                        jobCount = jobCount + 1
499 +                        totalEventCount = totalEventCount + eventsPerJobRequested
500 +                        eventsRemaining = eventsRemaining - eventsPerJobRequested
501 +                        jobSkipEventCount = 0
502 +                        # reset file
503 +                        pString = ""
504 +                        parString = ""
505 +                        filesEventCount = 0
506 +                        newFile = 1
507 +                        fileCount += 1
508 +
509 +                    # if more events in file remain than eventsPerJobRequested
510 +                    else :
511 +                        # close job but don't touch new file
512 +                        fullString = parString[:-2]
513 +                        if self.useParent:
514 +                            fullParentString = pString[:-2]
515 +                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
516 +                        else:
517 +                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
518 +                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
519 +                        self.jobDestination.append(blockSites[block])
520 +                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
521 +                        jobsOfBlock[block].append(jobCount+1)
522 +                        # increase counter
523 +                        jobCount = jobCount + 1
524 +                        totalEventCount = totalEventCount + eventsPerJobRequested
525 +                        eventsRemaining = eventsRemaining - eventsPerJobRequested
526 +                        # calculate skip events for last file
527 +                        # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
528 +                        jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
529 +                        # remove all but the last file
530 +                        filesEventCount = self.eventsbyfile[file]
531 +                        if self.useParent:
532 +                            for f in parent : pString += '\\\"' + f + '\\\"\,'
533 +                        parString = '\\\"' + file + '\\\"\,'
534 +                    pass # END if
535 +                pass # END while (iterate over files in the block)
536 +        pass # END while (iterate over blocks in the dataset)
537 +        self.ncjobs = self.total_number_of_jobs = jobCount
538 +        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
539 +            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
540 +        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
541 +
542 +        # screen output
543 +        screenOutput = "List of jobs and available destination sites:\n\n"
544 +
545 +        # keep trace of block with no sites to print a warning at the end
546 +        noSiteBlock = []
547 +        bloskNoSite = []
548 +
549 +        blockCounter = 0
550 +        for block in blocks:
551 +            if block in jobsOfBlock.keys() :
552 +                blockCounter += 1
553 +                screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
554 +                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
555 +                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
556 +                    noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
557 +                    bloskNoSite.append( blockCounter )
558 +
559 +        common.logger.message(screenOutput)
560 +        if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
561 +            msg = 'WARNING: No sites are hosting any part of data for block:\n                '
562 +            virgola = ""
563 +            if len(bloskNoSite) > 1:
564 +                virgola = ","
565 +            for block in bloskNoSite:
566 +                msg += ' ' + str(block) + virgola
567 +            msg += '\n               Related jobs:\n                 '
568 +            virgola = ""
569 +            if len(noSiteBlock) > 1:
570 +                virgola = ","
571 +            for range_jobs in noSiteBlock:
572 +                msg += str(range_jobs) + virgola
573 +            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
574 +            if self.cfg_params.has_key('EDG.se_white_list'):
575 +                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
576 +                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
577 +                msg += 'Please check if the dataset is available at this site!)\n'
578 +            if self.cfg_params.has_key('EDG.ce_white_list'):
579 +                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
580 +                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
581 +                msg += 'Please check if the dataset is available at this site!)\n'
582 +
583 +            common.logger.message(msg)
584 +
585 +        self.list_of_args = list_of_lists
586 +        return
587 +
588 +    def jobSplittingNoBlockBoundary(self,blockSites):
589 +        """
590 +        """
591 +        # ---- Handle the possible job splitting configurations ---- #
592 +        if (self.selectTotalNumberEvents):
593 +            totalEventsRequested = self.total_number_of_events
594 +        if (self.selectEventsPerJob):
595 +            eventsPerJobRequested = self.eventsPerJob
596 +            if (self.selectNumberOfJobs):
597 +                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
598 +                                                                                                          
599 +        # If user requested all the events in the dataset
600 +        if (totalEventsRequested == -1):
601 +            eventsRemaining=self.maxEvents
602 +        # If user requested more events than are in the dataset
603 +        elif (totalEventsRequested > self.maxEvents):
604 +            eventsRemaining = self.maxEvents
605 +            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
606 +        # If user requested less events than are in the dataset
607 +        else:
608 +            eventsRemaining = totalEventsRequested
609 +                                                                                                          
610 +        # If user requested more events per job than are in the dataset
611 +        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
612 +            eventsPerJobRequested = self.maxEvents
613 +                                                                                                          
614 +        # For user info at end
615 +        totalEventCount = 0
616 +
617 +        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
618 +            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
619 +                                                                                                          
620 +        if (self.selectNumberOfJobs):
621 +            common.logger.message("May not create the exact number_of_jobs requested.")
622 +                                                                                                          
623 +        if ( self.ncjobs == 'all' ) :
624 +            totalNumberOfJobs = 999999999
625 +        else :
626 +            totalNumberOfJobs = self.ncjobs
627 +                                                                                                          
628 +        blocks = blockSites.keys()
629 +        blockCount = 0
630 +        # Backup variable in case self.maxEvents counted events in a non-included block
631 +        numBlocksInDataset = len(blocks)
632 +                                                                                                          
633 +        jobCount = 0
634          list_of_lists = []
339        for i in xrange(0, int(n_tot_files), self.filesPerJob):
340            list_of_lists.append(self.files[0][i: i+self.filesPerJob])
635  
636 <        self.list_of_files = list_of_lists
637 <      
636 >        #AF
637 >        #AF do not reset input files and event count on block boundary
638 >        #AF
639 >        parString=""
640 >        filesEventCount = 0
641 >        #AF
642 >
643 >        # list tracking which jobs are in which jobs belong to which block
644 >        jobsOfBlock = {}
645 >        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
646 >            block = blocks[blockCount]
647 >            blockCount += 1
648 >            if block not in jobsOfBlock.keys() :
649 >                jobsOfBlock[block] = []
650 >
651 >            if self.eventsbyblock.has_key(block) :
652 >                numEventsInBlock = self.eventsbyblock[block]
653 >                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
654 >                files = self.filesbyblock[block]
655 >                numFilesInBlock = len(files)
656 >                if (numFilesInBlock <= 0):
657 >                    continue
658 >                fileCount = 0
659 >                #AF
660 >                #AF do not reset input files and event count of block boundary
661 >                #AF
662 >                ## ---- New block => New job ---- #
663 >                #parString = ""
664 >                # counter for number of events in files currently worked on
665 >                #filesEventCount = 0
666 >                #AF
667 >                # flag if next while loop should touch new file
668 >                newFile = 1
669 >                # job event counter
670 >                jobSkipEventCount = 0
671 >
672 >                # ---- Iterate over the files in the block until we've met the requested ---- #
673 >                # ---- total # of events or we've gone over all the files in this block  ---- #
674 >                pString=''
675 >                while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
676 >                    file = files[fileCount]
677 >                    if self.useParent:
678 >                        parent = self.parentFiles[file]
679 >                        for f in parent :
680 >                            pString += '\\\"' + f + '\\\"\,'
681 >                        common.logger.debug(6, "File "+str(file)+" has the following parents: "+str(parent))
682 >                        common.logger.write("File "+str(file)+" has the following parents: "+str(parent))
683 >                    if newFile :
684 >                        try:
685 >                            numEventsInFile = self.eventsbyfile[file]
686 >                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
687 >                            # increase filesEventCount
688 >                            filesEventCount += numEventsInFile
689 >                            # Add file to current job
690 >                            parString += '\\\"' + file + '\\\"\,'
691 >                            newFile = 0
692 >                        except KeyError:
693 >                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
694 >                    eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
695 >                    #common.logger.message("AF filesEventCount %s - jobSkipEventCount %s "%(filesEventCount,jobSkipEventCount))  
696 >                    # if less events in file remain than eventsPerJobRequested
697 >                    if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
698 >                      #AF
699 >                      #AF skip fileboundary part
700 >                      #AF
701 >                            # go to next file
702 >                            newFile = 1
703 >                            fileCount += 1
704 >                    # if events in file equal to eventsPerJobRequested
705 >                    elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
706 >                        # close job and touch new file
707 >                        fullString = parString[:-2]
708 >                        if self.useParent:
709 >                            fullParentString = pString[:-2]
710 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
711 >                        else:
712 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
713 >                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
714 >                        self.jobDestination.append(blockSites[block])
715 >                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
716 >                        jobsOfBlock[block].append(jobCount+1)
717 >                        # reset counter
718 >                        jobCount = jobCount + 1
719 >                        totalEventCount = totalEventCount + eventsPerJobRequested
720 >                        eventsRemaining = eventsRemaining - eventsPerJobRequested
721 >                        jobSkipEventCount = 0
722 >                        # reset file
723 >                        pString = ""
724 >                        parString = ""
725 >                        filesEventCount = 0
726 >                        newFile = 1
727 >                        fileCount += 1
728 >
729 >                    # if more events in file remain than eventsPerJobRequested
730 >                    else :
731 >                        # close job but don't touch new file
732 >                        fullString = parString[:-2]
733 >                        if self.useParent:
734 >                            fullParentString = pString[:-2]
735 >                            list_of_lists.append([fullString,fullParentString,str(eventsPerJobRequested),str(jobSkipEventCount)])
736 >                        else:
737 >                            list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
738 >                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
739 >                        self.jobDestination.append(blockSites[block])
740 >                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
741 >                        jobsOfBlock[block].append(jobCount+1)
742 >                        # increase counter
743 >                        jobCount = jobCount + 1
744 >                        totalEventCount = totalEventCount + eventsPerJobRequested
745 >                        eventsRemaining = eventsRemaining - eventsPerJobRequested
746 >                        # calculate skip events for last file
747 >                        # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
748 >                        jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
749 >                        # remove all but the last file
750 >                        filesEventCount = self.eventsbyfile[file]
751 >                        if self.useParent:
752 >                            for f in parent : pString += '\\\"' + f + '\\\"\,'
753 >                        parString = '\\\"' + file + '\\\"\,'
754 >                    pass # END if
755 >                pass # END while (iterate over files in the block)
756 >        pass # END while (iterate over blocks in the dataset)
757 >        self.ncjobs = self.total_number_of_jobs = jobCount
758 >        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
759 >            common.logger.message("eventsRemaining "+str(eventsRemaining))
760 >            common.logger.message("jobCount "+str(jobCount))
761 >            common.logger.message(" totalNumberOfJobs "+str(totalNumberOfJobs))
762 >            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
763 >        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
764 >
765 >        # screen output
766 >        screenOutput = "List of jobs and available destination sites:\n\n"
767 >
768 >        #AF
769 >        #AF   skip check on  block with no sites
770 >        #AF
771 >        self.list_of_args = list_of_lists
772 >
773 >        return
774 >
775 >
776 >
777 >    def jobSplittingNoInput(self):
778 >        """
779 >        Perform job splitting based on number of event per job
780 >        """
781 >        common.logger.debug(5,'Splitting per events')
782 >
783 >        if (self.selectEventsPerJob):
784 >            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
785 >        if (self.selectNumberOfJobs):
786 >            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
787 >        if (self.selectTotalNumberEvents):
788 >            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
789 >
790 >        if (self.total_number_of_events < 0):
791 >            msg='Cannot split jobs per Events with "-1" as total number of events'
792 >            raise CrabException(msg)
793 >
794 >        if (self.selectEventsPerJob):
795 >            if (self.selectTotalNumberEvents):
796 >                self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
797 >            elif(self.selectNumberOfJobs) :
798 >                self.total_number_of_jobs =self.theNumberOfJobs
799 >                self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
800 >
801 >        elif (self.selectNumberOfJobs) :
802 >            self.total_number_of_jobs = self.theNumberOfJobs
803 >            self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
804 >
805 >        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
806 >
807 >        # is there any remainder?
808 >        check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
809 >
810 >        common.logger.debug(5,'Check  '+str(check))
811 >
812 >        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
813 >        if check > 0:
814 >            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
815 >
816 >        # argument is seed number.$i
817 >        self.list_of_args = []
818 >        for i in range(self.total_number_of_jobs):
819 >            ## Since there is no input, any site is good
820 >            self.jobDestination.append([""]) #must be empty to write correctly the xml
821 >            args=[]
822 >            if (self.firstRun):
823 >                ## pythia first run
824 >                args.append(str(self.firstRun)+str(i))
825 >            self.list_of_args.append(args)
826 >
827 >        return
828 >
829 >
830 >    def jobSplittingForScript(self):
831 >        """
832 >        Perform job splitting based on number of job
833 >        """
834 >        common.logger.debug(5,'Splitting per job')
835 >        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
836 >
837 >        self.total_number_of_jobs = self.theNumberOfJobs
838 >
839 >        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
840 >
841 >        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
842 >
843 >        # argument is seed number.$i
844 >        self.list_of_args = []
845 >        for i in range(self.total_number_of_jobs):
846 >            self.jobDestination.append([""])
847 >            self.list_of_args.append([str(i)])
848          return
849  
850 <    def split(self, jobParams):
851 <
348 <        common.jobDB.load()
349 <        #### Fabio
850 >    def split(self, jobParams,firstJobID):
851 >
852          njobs = self.total_number_of_jobs
853 <        filelist = self.list_of_files
853 >        arglist = self.list_of_args
854          # create the empty structure
855          for i in range(njobs):
856              jobParams.append("")
355        
356        for job in range(njobs):
357            jobParams[job] = filelist[job]
358            common.jobDB.setArguments(job, jobParams[job])
857  
858 <        common.jobDB.save()
858 >        listID=[]
859 >        listField=[]
860 >        for id in range(njobs):
861 >            job = id + int(firstJobID)
862 >            jobParams[id] = arglist[id]
863 >            listID.append(job+1)
864 >            job_ToSave ={}
865 >            concString = ' '
866 >            argu=''
867 >            if len(jobParams[id]):
868 >                argu +=   concString.join(jobParams[id] )
869 >            job_ToSave['arguments']= str(job+1)+' '+argu
870 >            job_ToSave['dlsDestination']= self.jobDestination[id]
871 >            listField.append(job_ToSave)
872 >            msg="Job "+str(job)+" Arguments:   "+str(job+1)+" "+argu+"\n"  \
873 >            +"                     Destination: "+str(self.jobDestination[id])
874 >            common.logger.debug(5,msg)
875 >        common._db.updateJob_(listID,listField)
876 >        self.argsList = (len(jobParams[0])+1)
877 >
878          return
362    
363    def getJobTypeArguments(self, nj, sched):
364        params = common.jobDB.arguments(nj)
365        #print params
366        parString = "\\{"
367        
368        for i in range(len(params) - 1):
369            parString += '\\\"' + params[i] + '\\\"\,'
370        
371        parString += '\\\"' + params[len(params) - 1] + '\\\"\\}'
372        return parString
373  
374    def numberOfJobs(self):
375        # Fabio
879  
880 +    def numberOfJobs(self):
881          return self.total_number_of_jobs
378
379
380
381    def checkBlackList(self, allSites):
382        if len(self.reCEBlackList)==0: return allSites
383        sites = []
384        for site in allSites:
385            common.logger.debug(10,'Site '+site)
386            good=1
387            for re in self.reCEBlackList:
388                if re.search(site):
389                    common.logger.message('CE in black list, skipping site '+site)
390                    good=0
391                pass
392            if good: sites.append(site)
393        if len(sites) == 0:
394            common.logger.debug(3,"No sites found after BlackList")
395        return sites
396
397    def checkWhiteList(self, allSites):
398
399        if len(self.reCEWhiteList)==0: return allSites
400        sites = []
401        for site in allSites:
402            good=0
403            for re in self.reCEWhiteList:
404                if re.search(site):
405                    common.logger.debug(5,'CE in white list, adding site '+site)
406                    good=1
407                if not good: continue
408                sites.append(site)
409        if len(sites) == 0:
410            common.logger.message("No sites found after WhiteList\n")
411        else:
412            common.logger.debug(5,"Selected sites via WhiteList are "+str(sites)+"\n")
413        return sites
882  
883      def getTarBall(self, exe):
884          """
885          Return the TarBall with lib and exe
886          """
887 <        
420 <        # if it exist, just return it
421 <        self.tgzNameWithPath = common.work_space.shareDir()+self.tgz_name
887 >        self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name
888          if os.path.exists(self.tgzNameWithPath):
889              return self.tgzNameWithPath
890  
# Line 431 | Line 897 | class Cmssw(JobType):
897  
898          # First of all declare the user Scram area
899          swArea = self.scram.getSWArea_()
434        #print "swArea = ", swArea
435        swVersion = self.scram.getSWVersion()
436        #print "swVersion = ", swVersion
900          swReleaseTop = self.scram.getReleaseTop_()
901 <        #print "swReleaseTop = ", swReleaseTop
439 <        
901 >
902          ## check if working area is release top
903          if swReleaseTop == '' or swArea == swReleaseTop:
904 +            common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
905              return
906  
907 <        filesToBeTarred = []
908 <        ## First find the executable
909 <        if (self.executable != ''):
910 <            exeWithPath = self.scram.findFile_(executable)
911 < #           print exeWithPath
912 <            if ( not exeWithPath ):
913 <                raise CrabException('User executable '+executable+' not found')
914 <
915 <            ## then check if it's private or not
916 <            if exeWithPath.find(swReleaseTop) == -1:
917 <                # the exe is private, so we must ship
918 <                common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
919 <                path = swArea+'/'
920 <                exe = string.replace(exeWithPath, path,'')
921 <                filesToBeTarred.append(exe)
922 <                pass
923 <            else:
924 <                # the exe is from release, we'll find it on WN
907 >        import tarfile
908 >        try: # create tar ball
909 >            tar = tarfile.open(self.tgzNameWithPath, "w:gz")
910 >            ## First find the executable
911 >            if (self.executable != ''):
912 >                exeWithPath = self.scram.findFile_(executable)
913 >                if ( not exeWithPath ):
914 >                    raise CrabException('User executable '+executable+' not found')
915 >
916 >                ## then check if it's private or not
917 >                if exeWithPath.find(swReleaseTop) == -1:
918 >                    # the exe is private, so we must ship
919 >                    common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
920 >                    path = swArea+'/'
921 >                    # distinguish case when script is in user project area or given by full path somewhere else
922 >                    if exeWithPath.find(path) >= 0 :
923 >                        exe = string.replace(exeWithPath, path,'')
924 >                        tar.add(path+exe,exe)
925 >                    else :
926 >                        tar.add(exeWithPath,os.path.basename(executable))
927 >                    pass
928 >                else:
929 >                    # the exe is from release, we'll find it on WN
930 >                    pass
931 >
932 >            ## Now get the libraries: only those in local working area
933 >            libDir = 'lib'
934 >            lib = swArea+'/' +libDir
935 >            common.logger.debug(5,"lib "+lib+" to be tarred")
936 >            if os.path.exists(lib):
937 >                tar.add(lib,libDir)
938 >
939 >            ## Now check if module dir is present
940 >            moduleDir = 'module'
941 >            module = swArea + '/' + moduleDir
942 >            if os.path.isdir(module):
943 >                tar.add(module,moduleDir)
944 >
945 >            ## Now check if any data dir(s) is present
946 >            self.dataExist = False
947 >            todo_list = [(i, i) for i in  os.listdir(swArea+"/src")]
948 >            while len(todo_list):
949 >                entry, name = todo_list.pop()
950 >                if name.startswith('crab_0_') or  name.startswith('.') or name == 'CVS':
951 >                    continue
952 >                if os.path.isdir(swArea+"/src/"+entry):
953 >                    entryPath = entry + '/'
954 >                    todo_list += [(entryPath + i, i) for i in  os.listdir(swArea+"/src/"+entry)]
955 >                    if name == 'data':
956 >                        self.dataExist=True
957 >                        common.logger.debug(5,"data "+entry+" to be tarred")
958 >                        tar.add(swArea+"/src/"+entry,"src/"+entry)
959 >                    pass
960                  pass
961 <
962 <        ## Now get the libraries: only those in local working area
963 <        libDir = 'lib'
964 <        lib = swArea+'/' +libDir
965 <        common.logger.debug(5,"lib "+lib+" to be tarred")
966 <        if os.path.exists(lib):
967 <            filesToBeTarred.append(libDir)
968 <
969 <        ## Now check if module dir is present
970 <        moduleDir = 'module'
971 <        if os.path.isdir(swArea+'/'+moduleDir):
972 <            filesToBeTarred.append(moduleDir)
973 <
974 <        ## Now check if the Data dir is present
975 <        dataDir = 'src/Data/'
976 <        if os.path.isdir(swArea+'/'+dataDir):
977 <            filesToBeTarred.append(dataDir)
978 <
979 <        ## Create the tar-ball
980 <        if len(filesToBeTarred)>0:
981 <            cwd = os.getcwd()
982 <            os.chdir(swArea)
983 <            tarcmd = 'tar zcvf ' + self.tgzNameWithPath + ' '
984 <            for line in filesToBeTarred:
985 <                tarcmd = tarcmd + line + ' '
986 <            cout = runCommand(tarcmd)
987 <            if not cout:
988 <                raise CrabException('Could not create tar-ball')
989 <            os.chdir(cwd)
990 <        else:
991 <            common.logger.debug(5,"No files to be to be tarred")
992 <        
993 <        return
994 <        
995 <    def wsSetupEnvironment(self, nj):
961 >
962 >            ### CMSSW ParameterSet
963 >            if not self.pset is None:
964 >                cfg_file = common.work_space.jobDir()+self.configFilename()
965 >                tar.add(cfg_file,self.configFilename())
966 >                common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
967 >
968 >
969 >            ## Add ProdCommon dir to tar
970 >            prodcommonDir = './'
971 >            prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/'
972 >            neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools', \
973 >                           'ProdCommon/Core', 'ProdCommon/MCPayloads', 'IMProv', 'ProdCommon/Storage']
974 >            for file in neededStuff:
975 >                tar.add(prodcommonPath+file,prodcommonDir+file)
976 >            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
977 >
978 >            ##### ML stuff
979 >            ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
980 >            path=os.environ['CRABDIR'] + '/python/'
981 >            for file in ML_file_list:
982 >                tar.add(path+file,file)
983 >            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
984 >
985 >            ##### Utils
986 >            Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py']
987 >            for file in Utils_file_list:
988 >                tar.add(path+file,file)
989 >            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
990 >
991 >            ##### AdditionalFiles
992 >            for file in self.additional_inbox_files:
993 >                tar.add(file,string.split(file,'/')[-1])
994 >            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
995 >
996 >            tar.close()
997 >        except IOError, exc:
998 >            common.logger.write(str(exc))
999 >            raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
1000 >        except tarfile.TarError, exc:
1001 >            common.logger.write(str(exc))
1002 >            raise CrabException('Could not create tar-ball '+self.tgzNameWithPath)
1003 >
1004 >        ## check for tarball size
1005 >        tarballinfo = os.stat(self.tgzNameWithPath)
1006 >        if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
1007 >            msg  = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) \
1008 >               +'MB input sandbox limit \n'
1009 >            msg += '      and not supported by the direct GRID submission system.\n'
1010 >            msg += '      Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n'
1011 >            msg += '      For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#CRABSERVER_for_Users'
1012 >            raise CrabException(msg)
1013 >
1014 >        ## create tar-ball with ML stuff
1015 >
1016 >    def wsSetupEnvironment(self, nj=0):
1017          """
1018          Returns part of a job script which prepares
1019          the execution environment for the job 'nj'.
1020          """
1021 +        if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1022 +            psetName = 'pset.py'
1023 +        else:
1024 +            psetName = 'pset.cfg'
1025          # Prepare JobType-independent part
1026 <        txt = ''
1027 <  
1028 <        ## OLI_Daniele at this level  middleware already known
506 <
507 <        txt += 'if [ $middleware == LCG ]; then \n'
1026 >        txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
1027 >        txt += 'echo ">>> setup environment"\n'
1028 >        txt += 'if [ $middleware == LCG ]; then \n'
1029          txt += self.wsSetupCMSLCGEnvironment_()
1030          txt += 'elif [ $middleware == OSG ]; then\n'
1031 <        txt += '    time=`date -u +"%s"`\n'
1032 <        txt += '    WORKING_DIR=$OSG_WN_TMP/cms_$time\n'
1033 <        txt += '    echo "Creating working directory: $WORKING_DIR"\n'
1034 <        txt += '    /bin/mkdir -p $WORKING_DIR\n'
1035 <        txt += '    if [ ! -d $WORKING_DIR ] ;then\n'
515 <        txt += '        echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
516 <        txt += '        echo "JOB_EXIT_STATUS = 10016"\n'
517 <        txt += '        echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
518 <        txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
519 <        txt += '        exit 1\n'
1031 >        txt += '    WORKING_DIR=`/bin/mktemp  -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
1032 >        txt += '    if [ ! $? == 0 ] ;then\n'
1033 >        txt += '        echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
1034 >        txt += '        job_exit_code=10016\n'
1035 >        txt += '        func_exit\n'
1036          txt += '    fi\n'
1037 +        txt += '    echo ">>> Created working directory: $WORKING_DIR"\n'
1038          txt += '\n'
1039          txt += '    echo "Change to working directory: $WORKING_DIR"\n'
1040          txt += '    cd $WORKING_DIR\n'
1041 <        txt += self.wsSetupCMSOSGEnvironment_()
1041 >        txt += '    echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
1042 >        txt += self.wsSetupCMSOSGEnvironment_()
1043          txt += 'fi\n'
1044  
1045          # Prepare JobType-specific part
1046          scram = self.scram.commandName()
1047          txt += '\n\n'
1048 <        txt += 'echo "### SPECIFIC JOB SETUP ENVIRONMENT ###"\n'
1048 >        txt += 'echo ">>> specific cmssw setup environment:"\n'
1049 >        txt += 'echo "CMSSW_VERSION =  '+self.version+'"\n'
1050          txt += scram+' project CMSSW '+self.version+'\n'
1051          txt += 'status=$?\n'
1052          txt += 'if [ $status != 0 ] ; then\n'
1053 <        txt += '   echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
1054 <        txt += '   echo "JOB_EXIT_STATUS = 10034"\n'
1055 <        txt += '   echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
537 <        txt += '   dumpStatus $RUNTIME_AREA/$repo\n'
538 <        ## OLI_Daniele
539 <        txt += '    if [ $middleware == OSG ]; then \n'
540 <        txt += '        echo "Remove working directory: $WORKING_DIR"\n'
541 <        txt += '        cd $RUNTIME_AREA\n'
542 <        txt += '        /bin/rm -rf $WORKING_DIR\n'
543 <        txt += '        if [ -d $WORKING_DIR ] ;then\n'
544 <        txt += '            echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
545 <        txt += '            echo "JOB_EXIT_STATUS = 10018"\n'
546 <        txt += '            echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
547 <        txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
548 <        txt += '        fi\n'
549 <        txt += '    fi \n'
550 <        txt += '   exit 1 \n'
1053 >        txt += '    echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
1054 >        txt += '    job_exit_code=10034\n'
1055 >        txt += '    func_exit\n'
1056          txt += 'fi \n'
552        txt += 'echo "CMSSW_VERSION =  '+self.version+'"\n'
1057          txt += 'cd '+self.version+'\n'
1058 <        ### needed grep for bug in scramv1 ###
1058 >        txt += 'SOFTWARE_DIR=`pwd`\n'
1059 >        txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1060          txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
1061 <
1061 >        txt += 'if [ $? != 0 ] ; then\n'
1062 >        txt += '    echo "ERROR ==> Problem with the command: "\n'
1063 >        txt += '    echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
1064 >        txt += '    job_exit_code=10034\n'
1065 >        txt += '    func_exit\n'
1066 >        txt += 'fi \n'
1067          # Handle the arguments:
1068          txt += "\n"
1069          txt += "## number of arguments (first argument always jobnumber)\n"
1070          txt += "\n"
1071 <        txt += "narg=$#\n"
562 <        txt += "if [ $narg -lt 2 ]\n"
1071 >        txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
1072          txt += "then\n"
1073 <        txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$narg+ \n"
1074 <        txt += '    echo "JOB_EXIT_STATUS = 50113"\n'
1075 <        txt += '    echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
567 <        txt += '    dumpStatus $RUNTIME_AREA/$repo\n'
568 <        ## OLI_Daniele
569 <        txt += '    if [ $middleware == OSG ]; then \n'
570 <        txt += '        echo "Remove working directory: $WORKING_DIR"\n'
571 <        txt += '        cd $RUNTIME_AREA\n'
572 <        txt += '        /bin/rm -rf $WORKING_DIR\n'
573 <        txt += '        if [ -d $WORKING_DIR ] ;then\n'
574 <        txt += '            echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
575 <        txt += '            echo "JOB_EXIT_STATUS = 50114"\n'
576 <        txt += '            echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
577 <        txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
578 <        txt += '        fi\n'
579 <        txt += '    fi \n'
580 <        txt += "    exit 1\n"
1073 >        txt += "    echo 'ERROR ==> Too few arguments' +$nargs+ \n"
1074 >        txt += '    job_exit_code=50113\n'
1075 >        txt += "    func_exit\n"
1076          txt += "fi\n"
1077          txt += "\n"
1078  
1079          # Prepare job-specific part
1080          job = common.job_list[nj]
1081 <        pset = os.path.basename(job.configFilename())
1082 <        txt += '\n'
1083 <        txt += 'InputFiles=$2\n'
1084 <        txt += 'echo "<$InputFiles>"\n'
1085 <        #txt += 'echo sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' \n'
591 <        txt += 'sed "s#{\'INPUT\'}#$InputFiles#" $RUNTIME_AREA/'+pset+' > pset.cfg\n'
592 <        #txt += 'sed "s#{\'INPUT\'}#${InputFiles}#" $RUNTIME_AREA/'+pset+' > pset1.cfg\n'
1081 >        if (self.datasetPath):
1082 >            self.primaryDataset = self.datasetPath.split("/")[1]
1083 >            DataTier = self.datasetPath.split("/")[2]
1084 >            txt += '\n'
1085 >            txt += 'DatasetPath='+self.datasetPath+'\n'
1086  
1087 <        if len(self.additional_inbox_files) > 0:
1088 <            for file in self.additional_inbox_files:
1089 <                txt += 'if [ -e $RUNTIME_AREA/'+file+' ] ; then\n'
597 <                txt += '   cp $RUNTIME_AREA/'+file+' .\n'
598 <                txt += '   chmod +x '+file+'\n'
599 <                txt += 'fi\n'
600 <            pass
1087 >            txt += 'PrimaryDataset='+self.primaryDataset +'\n'
1088 >            txt += 'DataTier='+DataTier+'\n'
1089 >            txt += 'ApplicationFamily=cmsRun\n'
1090  
1091 <        txt += 'echo "### END JOB SETUP ENVIRONMENT ###"\n\n'
1091 >        else:
1092 >            self.primaryDataset = 'null'
1093 >            txt += 'DatasetPath=MCDataTier\n'
1094 >            txt += 'PrimaryDataset=null\n'
1095 >            txt += 'DataTier=null\n'
1096 >            txt += 'ApplicationFamily=MCDataTier\n'
1097 >        if self.pset != None:
1098 >            pset = os.path.basename(job.configFilename())
1099 >            txt += '\n'
1100 >            txt += 'cp  $RUNTIME_AREA/'+pset+' .\n'
1101 >            if (self.datasetPath): # standard job
1102 >                txt += 'InputFiles=${args[1]}; export InputFiles\n'
1103 >                if (self.useParent):
1104 >                    txt += 'ParentFiles=${args[2]}; export ParentFiles\n'
1105 >                    txt += 'MaxEvents=${args[3]}; export MaxEvents\n'
1106 >                    txt += 'SkipEvents=${args[4]}; export SkipEvents\n'
1107 >                else:
1108 >                    txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
1109 >                    txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
1110 >                txt += 'echo "Inputfiles:<$InputFiles>"\n'
1111 >                if (self.useParent): txt += 'echo "ParentFiles:<$ParentFiles>"\n'
1112 >                txt += 'echo "MaxEvents:<$MaxEvents>"\n'
1113 >                txt += 'echo "SkipEvents:<$SkipEvents>"\n'
1114 >            else:  # pythia like job
1115 >                txt += 'PreserveSeeds='  + ','.join(self.preserveSeeds)  + '; export PreserveSeeds\n'
1116 >                txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
1117 >                txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
1118 >                txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
1119 >                if (self.firstRun):
1120 >                    txt += 'FirstRun=${args[1]}; export FirstRun\n'
1121 >                    txt += 'echo "FirstRun: <$FirstRun>"\n'
1122  
1123 <        txt += '\n'
1124 <        txt += 'echo "***** cat pset.cfg *********"\n'
1125 <        txt += 'cat pset.cfg\n'
1126 <        txt += 'echo "****** end pset.cfg ********"\n'
1127 <        txt += '\n'
1128 <        # txt += 'echo "***** cat pset1.cfg *********"\n'
1129 <        # txt += 'cat pset1.cfg\n'
1130 <        # txt += 'echo "****** end pset1.cfg ********"\n'
1123 >            txt += 'mv -f ' + pset + ' ' + psetName + '\n'
1124 >
1125 >
1126 >        if self.pset != None:
1127 >            # FUTURE: Can simply for 2_1_x and higher
1128 >            txt += '\n'
1129 >            if self.debug_wrapper==True:
1130 >                txt += 'echo "***** cat ' + psetName + ' *********"\n'
1131 >                txt += 'cat ' + psetName + '\n'
1132 >                txt += 'echo "****** end ' + psetName + ' ********"\n'
1133 >                txt += '\n'
1134 >            if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1135 >                txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n'
1136 >            else:
1137 >                txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
1138 >            txt += 'echo "PSETHASH = $PSETHASH" \n'
1139 >            txt += '\n'
1140          return txt
1141  
1142 <    def wsBuildExe(self, nj):
1142 >    def wsUntarSoftware(self, nj=0):
1143          """
1144          Put in the script the commands to build an executable
1145          or a library.
1146          """
1147  
1148 <        txt = ""
1148 >        txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
1149  
1150          if os.path.isfile(self.tgzNameWithPath):
1151 <            txt += 'echo "tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'"\n'
1151 >            txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
1152              txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
1153 +            if  self.debug_wrapper:
1154 +                txt += 'ls -Al \n'
1155              txt += 'untar_status=$? \n'
1156              txt += 'if [ $untar_status -ne 0 ]; then \n'
1157 <            txt += '   echo "SET_EXE 1 ==> ERROR Untarring .tgz file failed"\n'
1158 <            txt += '   echo "JOB_EXIT_STATUS = $untar_status" \n'
1159 <            txt += '   echo "JobExitCode=$untar_status" | tee -a $RUNTIME_AREA/$repo\n'
630 <            txt += '   if [ $middleware == OSG ]; then \n'
631 <            txt += '       echo "Remove working directory: $WORKING_DIR"\n'
632 <            txt += '       cd $RUNTIME_AREA\n'
633 <            txt += '       /bin/rm -rf $WORKING_DIR\n'
634 <            txt += '       if [ -d $WORKING_DIR ] ;then\n'
635 <            txt += '        echo "SET_EXE 50999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Untarring .tgz file failed"\n'
636 <            txt += '        echo "JOB_EXIT_STATUS = 50999"\n'
637 <            txt += '        echo "JobExitCode=50999" | tee -a $RUNTIME_AREA/$repo\n'
638 <            txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
639 <            txt += '       fi\n'
640 <            txt += '   fi \n'
641 <            txt += '   \n'
642 <            txt += '   exit 1 \n'
1157 >            txt += '   echo "ERROR ==> Untarring .tgz file failed"\n'
1158 >            txt += '   job_exit_code=$untar_status\n'
1159 >            txt += '   func_exit\n'
1160              txt += 'else \n'
1161              txt += '   echo "Successful untar" \n'
1162              txt += 'fi \n'
1163 +            txt += '\n'
1164 +            txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
1165 +            txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1166 +            txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
1167 +            txt += 'else\n'
1168 +            txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
1169 +            txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
1170 +            txt += 'fi\n'
1171 +            txt += '\n'
1172 +
1173              pass
1174 <        
1174 >
1175          return txt
1176  
1177 <    def modifySteeringCards(self, nj):
1177 >    def wsBuildExe(self, nj=0):
1178          """
1179 <        modify the card provided by the user,
1180 <        writing a new card into share dir
1179 >        Put in the script the commands to build an executable
1180 >        or a library.
1181          """
1182 <        
1182 >
1183 >        txt = '\n#Written by cms_cmssw::wsBuildExe\n'
1184 >        txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
1185 >
1186 >        txt += 'rm -r lib/ module/ \n'
1187 >        txt += 'mv $RUNTIME_AREA/lib/ . \n'
1188 >        txt += 'mv $RUNTIME_AREA/module/ . \n'
1189 >        if self.dataExist == True:
1190 >            txt += 'rm -r src/ \n'
1191 >            txt += 'mv $RUNTIME_AREA/src/ . \n'
1192 >        if len(self.additional_inbox_files)>0:
1193 >            for file in self.additional_inbox_files:
1194 >                txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
1195 >        # txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
1196 >        # txt += 'mv $RUNTIME_AREA/IMProv/ . \n'
1197 >
1198 >        txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
1199 >        txt += 'if [ -z "$PYTHONPATH" ]; then\n'
1200 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
1201 >        txt += 'else\n'
1202 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
1203 >        txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
1204 >        txt += 'fi\n'
1205 >        txt += '\n'
1206 >
1207 >        return txt
1208 >
1209 >
1210      def executableName(self):
1211 <        return self.executable
1211 >        if self.scriptExe:
1212 >            return "sh "
1213 >        else:
1214 >            return self.executable
1215  
1216      def executableArgs(self):
1217 <        return " -p pset.cfg"
1217 >        # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
1218 >        if self.scriptExe:#CarlosDaniele
1219 >            return   self.scriptExe + " $NJob"
1220 >        else:
1221 >            ex_args = ""
1222 >            # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
1223 >            # Framework job report
1224 >            if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
1225 >                ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
1226 >            # Type of config file
1227 >            if self.CMSSW_major >= 2 :
1228 >                ex_args += " -p pset.py"
1229 >            else:
1230 >                ex_args += " -p pset.cfg"
1231 >            return ex_args
1232  
1233      def inputSandbox(self, nj):
1234          """
1235          Returns a list of filenames to be put in JDL input sandbox.
1236          """
1237          inp_box = []
667        # dict added to delete duplicate from input sandbox file list
668        seen = {}
669        ## code
1238          if os.path.isfile(self.tgzNameWithPath):
1239              inp_box.append(self.tgzNameWithPath)
1240 <        ## config
673 <        inp_box.append(common.job_list[nj].configFilename())
674 <        ## additional input files
675 <        #for file in self.additional_inbox_files:
676 <        #    inp_box.append(common.work_space.cwdDir()+file)
1240 >        inp_box.append(common.work_space.jobDir() + self.scriptName)
1241          return inp_box
1242  
1243      def outputSandbox(self, nj):
# Line 682 | Line 1246 | class Cmssw(JobType):
1246          """
1247          out_box = []
1248  
685        stdout=common.job_list[nj].stdout()
686        stderr=common.job_list[nj].stderr()
687
1249          ## User Declared output files
1250 <        for out in self.output_file:
1251 <            n_out = nj + 1
1252 <            out_box.append(self.numberFile_(out,str(n_out)))
1250 >        for out in (self.output_file+self.output_file_sandbox):
1251 >            n_out = nj + 1
1252 >            out_box.append(numberFile(out,str(n_out)))
1253          return out_box
693        return []
1254  
695    def prepareSteeringCards(self):
696        """
697        Make initial modifications of the user's steering card file.
698        """
699        return
1255  
1256      def wsRenameOutput(self, nj):
1257          """
1258          Returns part of a job script which renames the produced files.
1259          """
1260  
1261 <        txt = '\n'
1262 <        txt += '# directory content\n'
1263 <        txt += 'ls \n'
1264 <        file_list = ''
1265 <        for fileWithSuffix in self.output_file:
1266 <            output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1267 <            file_list=file_list+output_file_num+' '
1261 >        txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
1262 >        txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1263 >        txt += 'echo ">>> current directory content:"\n'
1264 >        if self.debug_wrapper:
1265 >            txt += 'ls -Al\n'
1266 >        txt += '\n'
1267 >
1268 >        for fileWithSuffix in (self.output_file):
1269 >            output_file_num = numberFile(fileWithSuffix, '$NJob')
1270              txt += '\n'
1271              txt += '# check output file\n'
1272 <            txt += 'ls '+fileWithSuffix+'\n'
1273 <            txt += 'exe_result=$?\n'
1274 <            txt += 'if [ $exe_result -ne 0 ] ; then\n'
1275 <            txt += '   echo "ERROR: No output file to manage"\n'
1276 <            txt += '   echo "JOB_EXIT_STATUS = $exe_result"\n'
1277 <            txt += '   echo "JobExitCode=60302" | tee -a $RUNTIME_AREA/$repo\n'
1278 <            txt += '   dumpStatus $RUNTIME_AREA/$repo\n'
1279 <            ### OLI_DANIELE
1280 <            if common.scheduler.boss_scheduler_name == 'condor_g':
1272 >            txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1273 >            if (self.copy_data == 1):  # For OSG nodes, file is in $WORKING_DIR, should not be moved to $RUNTIME_AREA
1274 >                txt += '    mv '+fileWithSuffix+' '+output_file_num+'\n'
1275 >                txt += '    ln -s `pwd`/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1276 >            else:
1277 >                txt += '    mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1278 >                txt += '    ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1279 >            txt += 'else\n'
1280 >            txt += '    job_exit_code=60302\n'
1281 >            txt += '    echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1282 >            if common.scheduler.name().upper() == 'CONDOR_G':
1283                  txt += '    if [ $middleware == OSG ]; then \n'
1284                  txt += '        echo "prepare dummy output file"\n'
1285                  txt += '        echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1286                  txt += '    fi \n'
728            txt += 'else\n'
729            txt += '   cp '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1287              txt += 'fi\n'
1288 <      
1289 <        txt += 'cd $RUNTIME_AREA\n'
1290 <        file_list=file_list[:-1]
1291 <        txt += 'file_list="'+file_list+'"\n'
1292 <        ### OLI_DANIELE
736 <        txt += 'if [ $middleware == OSG ]; then\n'  
737 <        txt += '    cd $RUNTIME_AREA\n'
738 <        txt += '    echo "Remove working directory: $WORKING_DIR"\n'
739 <        txt += '    /bin/rm -rf $WORKING_DIR\n'
740 <        txt += '    if [ -d $WORKING_DIR ] ;then\n'
741 <        txt += '        echo "SET_EXE 60999 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
742 <        txt += '        echo "JOB_EXIT_STATUS = 60999"\n'
743 <        txt += '        echo "JobExitCode=60999" | tee -a $RUNTIME_AREA/$repo\n'
744 <        txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
745 <        txt += '    fi\n'
746 <        txt += 'fi\n'
1288 >        file_list = []
1289 >        for fileWithSuffix in (self.output_file):
1290 >             file_list.append(numberFile(fileWithSuffix, '$NJob'))
1291 >
1292 >        txt += 'file_list="'+string.join(file_list,' ')+'"\n'
1293          txt += '\n'
1294 +        txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
1295 +        txt += 'echo ">>> current directory content:"\n'
1296 +        if self.debug_wrapper:
1297 +            txt += 'ls -Al\n'
1298 +        txt += '\n'
1299 +        txt += 'cd $RUNTIME_AREA\n'
1300 +        txt += 'echo ">>> current directory (RUNTIME_AREA):  $RUNTIME_AREA"\n'
1301          return txt
1302  
1303 <    def numberFile_(self, file, txt):
751 <        """
752 <        append _'txt' before last extension of a file
753 <        """
754 <        p = string.split(file,".")
755 <        # take away last extension
756 <        name = p[0]
757 <        for x in p[1:-1]:
758 <           name=name+"."+x
759 <        # add "_txt"
760 <        if len(p)>1:
761 <          ext = p[len(p)-1]
762 <          #result = name + '_' + str(txt) + "." + ext
763 <          result = name + '_' + txt + "." + ext
764 <        else:
765 <          #result = name + '_' + str(txt)
766 <          result = name + '_' + txt
767 <        
768 <        return result
769 <
770 <    def getRequirements(self):
1303 >    def getRequirements(self, nj=[]):
1304          """
1305 <        return job requirements to add to jdl files
1305 >        return job requirements to add to jdl files
1306          """
1307          req = ''
1308 <        if common.analisys_common_info['sites']:
1309 <            if common.analisys_common_info['sw_version']:
1310 <                req='Member("VO-cms-' + \
1311 <                     common.analisys_common_info['sw_version'] + \
1312 <                     '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1313 <            if len(common.analisys_common_info['sites'])>0:
1314 <                req = req + ' && ('
1315 <                for i in range(len(common.analisys_common_info['sites'])):
1316 <                    req = req + 'other.GlueCEInfoHostName == "' \
1317 <                         + common.analisys_common_info['sites'][i] + '"'
1318 <                    if ( i < (int(len(common.analisys_common_info['sites']) - 1)) ):
1319 <                        req = req + ' || '
1320 <            req = req + ')'
788 <        #print "req = ", req
1308 >        if self.version:
1309 >            req='Member("VO-cms-' + \
1310 >                 self.version + \
1311 >                 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1312 >        if self.executable_arch:
1313 >            req+=' && Member("VO-cms-' + \
1314 >                 self.executable_arch + \
1315 >                 '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
1316 >
1317 >        req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
1318 >        if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"):
1319 >            req += ' && other.GlueCEStateStatus == "Production" '
1320 >
1321          return req
1322  
1323      def configFilename(self):
1324          """ return the config filename """
1325 <        return self.name()+'.cfg'
1325 >        # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
1326 >        if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
1327 >          return self.name()+'.py'
1328 >        else:
1329 >          return self.name()+'.cfg'
1330  
795    ### OLI_DANIELE
1331      def wsSetupCMSOSGEnvironment_(self):
1332          """
1333          Returns part of a job script which is prepares
1334          the execution environment and which is common for all CMS jobs.
1335          """
1336 <        txt = '\n'
1337 <        txt += '   echo "### SETUP CMS OSG  ENVIRONMENT ###"\n'
1338 <        txt += '   if [ -f $GRID3_APP_DIR/cmssoft/cmsset_default.sh ] ;then\n'
1339 <        txt += '      # Use $GRID3_APP_DIR/cmssoft/cmsset_default.sh to setup cms software\n'
1340 <        txt += '       source $GRID3_APP_DIR/cmssoft/cmsset_default.sh '+self.version+'\n'
1341 <        txt += '   elif [ -f $OSG_APP/cmssoft/cmsset_default.sh ] ;then\n'
1342 <        txt += '      # Use $OSG_APP/cmssoft/cmsset_default.sh to setup cms software\n'
1343 <        txt += '       source $OSG_APP/cmssoft/cmsset_default.sh '+self.version+'\n'
1344 <        txt += '   else\n'
1345 <        txt += '       echo "SET_CMS_ENV 10020 ==> ERROR $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
1346 <        txt += '       echo "JOB_EXIT_STATUS = 10020"\n'
1347 <        txt += '       echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1348 <        txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
814 <        txt += '       exit 1\n'
815 <        txt += '\n'
816 <        txt += '       echo "Remove working directory: $WORKING_DIR"\n'
817 <        txt += '       cd $RUNTIME_AREA\n'
818 <        txt += '       /bin/rm -rf $WORKING_DIR\n'
819 <        txt += '       if [ -d $WORKING_DIR ] ;then\n'
820 <        txt += '            echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $GRID3_APP_DIR/cmssoft/cmsset_default.sh and $OSG_APP/cmssoft/cmsset_default.sh file not found"\n'
821 <        txt += '            echo "JOB_EXIT_STATUS = 10017"\n'
822 <        txt += '            echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
823 <        txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
824 <        txt += '       fi\n'
825 <        txt += '\n'
826 <        txt += '       exit 1\n'
827 <        txt += '   fi\n'
1336 >        txt = '\n#Written by cms_cmssw::wsSetupCMSOSGEnvironment_\n'
1337 >        txt += '    echo ">>> setup CMS OSG environment:"\n'
1338 >        txt += '    echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
1339 >        txt += '    export SCRAM_ARCH='+self.executable_arch+'\n'
1340 >        txt += '    echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1341 >        txt += '    if [ -f $OSG_APP/cmssoft/cms/cmsset_default.sh ] ;then\n'
1342 >        txt += '      # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
1343 >        txt += '        source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
1344 >        txt += '    else\n'
1345 >        txt += '        echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1346 >        txt += '        job_exit_code=10020\n'
1347 >        txt += '        func_exit\n'
1348 >        txt += '    fi\n'
1349          txt += '\n'
1350 <        txt += '   echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1351 <        txt += '   echo " END SETUP CMS OSG  ENVIRONMENT "\n'
1350 >        txt += '    echo "==> setup cms environment ok"\n'
1351 >        txt += '    echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
1352  
1353          return txt
1354 <
834 <    ### OLI_DANIELE
1354 >
1355      def wsSetupCMSLCGEnvironment_(self):
1356          """
1357          Returns part of a job script which is prepares
1358          the execution environment and which is common for all CMS jobs.
1359          """
1360 <        txt  = '   \n'
1361 <        txt += '   echo " ### SETUP CMS LCG  ENVIRONMENT ### "\n'
1362 <        txt += '   if [ ! $VO_CMS_SW_DIR ] ;then\n'
1363 <        txt += '       echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1364 <        txt += '       echo "JOB_EXIT_STATUS = 10031" \n'
1365 <        txt += '       echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1366 <        txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
1367 <        txt += '       exit 1\n'
1368 <        txt += '   else\n'
1369 <        txt += '       echo "Sourcing environment... "\n'
1370 <        txt += '       if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1371 <        txt += '           echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1372 <        txt += '           echo "JOB_EXIT_STATUS = 10020"\n'
1373 <        txt += '           echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1374 <        txt += '           dumpStatus $RUNTIME_AREA/$repo\n'
1375 <        txt += '           exit 1\n'
1376 <        txt += '       fi\n'
1377 <        txt += '       echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1378 <        txt += '       source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1379 <        txt += '       result=$?\n'
1380 <        txt += '       if [ $result -ne 0 ]; then\n'
1381 <        txt += '           echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1382 <        txt += '           echo "JOB_EXIT_STATUS = 10032"\n'
1383 <        txt += '           echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1384 <        txt += '           dumpStatus $RUNTIME_AREA/$repo\n'
1385 <        txt += '           exit 1\n'
1386 <        txt += '       fi\n'
1387 <        txt += '   fi\n'
1388 <        txt += '   \n'
1389 <        txt += '   string=`cat /etc/redhat-release`\n'
1390 <        txt += '   echo $string\n'
1391 <        txt += '   if [[ $string = *alhalla* ]]; then\n'
1392 <        txt += '       echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1393 <        txt += '   elif [[ $string = *Enterprise* ]] || [[ $string = *cientific* ]]; then\n'
1394 <        txt += '       export SCRAM_ARCH=slc3_ia32_gcc323\n'
1395 <        txt += '       echo "SCRAM_ARCH= $SCRAM_ARCH"\n'
1396 <        txt += '   else\n'
1397 <        txt += '       echo "SET_CMS_ENV 10033 ==> ERROR OS unknown, LCG environment not initialized"\n'
1398 <        txt += '       echo "JOB_EXIT_STATUS = 10033"\n'
1399 <        txt += '       echo "JobExitCode=10033" | tee -a $RUNTIME_AREA/$repo\n'
1400 <        txt += '       dumpStatus $RUNTIME_AREA/$repo\n'
1401 <        txt += '       exit 1\n'
1402 <        txt += '   fi\n'
1403 <        txt += '   echo "SET_CMS_ENV 0 ==> setup cms environment ok"\n'
1404 <        txt += '   echo "### END SETUP CMS LCG ENVIRONMENT ###"\n'
1360 >        txt = '\n#Written by cms_cmssw::wsSetupCMSLCGEnvironment_\n'
1361 >        txt += '    echo ">>> setup CMS LCG environment:"\n'
1362 >        txt += '    echo "set SCRAM ARCH and BUILD_ARCH to ' + self.executable_arch + ' ###"\n'
1363 >        txt += '    export SCRAM_ARCH='+self.executable_arch+'\n'
1364 >        txt += '    export BUILD_ARCH='+self.executable_arch+'\n'
1365 >        txt += '    if [ ! $VO_CMS_SW_DIR ] ;then\n'
1366 >        txt += '        echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
1367 >        txt += '        job_exit_code=10031\n'
1368 >        txt += '        func_exit\n'
1369 >        txt += '    else\n'
1370 >        txt += '        echo "Sourcing environment... "\n'
1371 >        txt += '        if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1372 >        txt += '            echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1373 >        txt += '            job_exit_code=10020\n'
1374 >        txt += '            func_exit\n'
1375 >        txt += '        fi\n'
1376 >        txt += '        echo "sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1377 >        txt += '        source $VO_CMS_SW_DIR/cmsset_default.sh\n'
1378 >        txt += '        result=$?\n'
1379 >        txt += '        if [ $result -ne 0 ]; then\n'
1380 >        txt += '            echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1381 >        txt += '            job_exit_code=10032\n'
1382 >        txt += '            func_exit\n'
1383 >        txt += '        fi\n'
1384 >        txt += '    fi\n'
1385 >        txt += '    \n'
1386 >        txt += '    echo "==> setup cms environment ok"\n'
1387 >        return txt
1388 >
1389 >    def wsModifyReport(self, nj):
1390 >        """
1391 >        insert the part of the script that modifies the FrameworkJob Report
1392 >        """
1393 >        txt = '\n#Written by cms_cmssw::wsModifyReport\n'
1394 >        publish_data = int(self.cfg_params.get('USER.publish_data',0))
1395 >        if (publish_data == 1):
1396 >
1397 >            txt += 'if [ $StageOutExitStatus -eq 0 ]; then\n'
1398 >            txt += '    FOR_LFN=$LFNBaseName/${PSETHASH}/\n'
1399 >            txt += 'else\n'
1400 >            txt += '    FOR_LFN=/copy_problems/ \n'
1401 >            txt += '    SE=""\n'
1402 >            txt += '    SE_PATH=""\n'
1403 >            txt += 'fi\n'
1404 >
1405 >            txt += 'echo ">>> Modify Job Report:" \n'
1406 >            txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1407 >            txt += 'ProcessedDataset= $procDataset \n'
1408 >            txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1409 >            txt += 'echo "SE = $SE"\n'
1410 >            txt += 'echo "SE_PATH = $SE_PATH"\n'
1411 >            txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1412 >            txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1413 >            args = '$RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier ' \
1414 >                   '$User -$ProcessedDataset-$PSETHASH $ApplicationFamily '+ \
1415 >                    '  $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1416 >            txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)
1417 >            txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)
1418 >            txt += 'modifyReport_result=$?\n'
1419 >            txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1420 >            txt += '    modifyReport_result=70500\n'
1421 >            txt += '    job_exit_code=$modifyReport_result\n'
1422 >            txt += '    echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1423 >            txt += '    echo "WARNING: Problem with ModifyJobReport"\n'
1424 >            txt += 'else\n'
1425 >            txt += '    mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1426 >            txt += 'fi\n'
1427 >        return txt
1428 >
1429 >    def wsParseFJR(self):
1430 >        """
1431 >        Parse the FrameworkJobReport to obtain useful infos
1432 >        """
1433 >        txt = '\n#Written by cms_cmssw::wsParseFJR\n'
1434 >        txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
1435 >        txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1436 >        txt += '    if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1437 >        txt += '        cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n'
1438 >        if self.debug_wrapper :
1439 >            txt += '        echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1440 >        txt += '        executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n'
1441 >        txt += '        if [ $executable_exit_status -eq 50115 ];then\n'
1442 >        txt += '            echo ">>> crab_fjr.xml contents: "\n'
1443 >        txt += '            cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1444 >        txt += '            echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1445 >        txt += '        elif [ $executable_exit_status -eq -999 ];then\n'
1446 >        txt += '            echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n'
1447 >        txt += '        else\n'
1448 >        txt += '            echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1449 >        txt += '        fi\n'
1450 >        txt += '    else\n'
1451 >        txt += '        echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1452 >        txt += '    fi\n'
1453 >          #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1454 >        txt += '    if [ $executable_exit_status -eq 0 ];then\n'
1455 >        txt += '      echo ">>> Executable succeded  $executable_exit_status"\n'
1456 >        if (self.datasetPath and not (self.dataset_pu or self.useParent)) :
1457 >          # VERIFY PROCESSED DATA
1458 >            txt += '      echo ">>> Verify list of processed files:"\n'
1459 >            txt += '      echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n'
1460 >            txt += '      python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n'
1461 >            txt += '      cat input-files.txt  | sort | uniq > tmp.txt\n'
1462 >            txt += '      mv tmp.txt input-files.txt\n'
1463 >            txt += '      echo "cat input-files.txt"\n'
1464 >            txt += '      echo "----------------------"\n'
1465 >            txt += '      cat input-files.txt\n'
1466 >            txt += '      cat processed-files.txt | sort | uniq > tmp.txt\n'
1467 >            txt += '      mv tmp.txt processed-files.txt\n'
1468 >            txt += '      echo "----------------------"\n'
1469 >            txt += '      echo "cat processed-files.txt"\n'
1470 >            txt += '      echo "----------------------"\n'
1471 >            txt += '      cat processed-files.txt\n'
1472 >            txt += '      echo "----------------------"\n'
1473 >            txt += '      diff -q input-files.txt processed-files.txt\n'
1474 >            txt += '      fileverify_status=$?\n'
1475 >            txt += '      if [ $fileverify_status -ne 0 ]; then\n'
1476 >            txt += '         executable_exit_status=30001\n'
1477 >            txt += '         echo "ERROR ==> not all input files processed"\n'
1478 >            txt += '         echo "      ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1479 >            txt += '         echo "      ==> diff input-files.txt processed-files.txt"\n'
1480 >            txt += '      fi\n'
1481 >        txt += '    elif [ $executable_exit_status -ne 0 ] || [ $executable_exit_status -ne 50015 ] || [ $executable_exit_status -ne 50017 ];then\n'
1482 >        txt += '      echo ">>> Executable failed  $executable_exit_status"\n'
1483 >        txt += '      func_exit\n'
1484 >        txt += '    fi\n'
1485 >        txt += '\n'
1486 >        txt += 'else\n'
1487 >        txt += '    echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1488 >        txt += 'fi\n'
1489 >        txt += '\n'
1490 >        txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1491 >        txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1492 >        txt += 'job_exit_code=$executable_exit_status\n'
1493 >
1494          return txt
1495  
1496      def setParam_(self, param, value):
# Line 889 | Line 1498 | class Cmssw(JobType):
1498  
1499      def getParams(self):
1500          return self._params
1501 +
1502 +    def uniquelist(self, old):
1503 +        """
1504 +        remove duplicates from a list
1505 +        """
1506 +        nd={}
1507 +        for e in old:
1508 +            nd[e]=0
1509 +        return nd.keys()
1510 +
1511 +    def outList(self):
1512 +        """
1513 +        check the dimension of the output files
1514 +        """
1515 +        txt = ''
1516 +        txt += 'echo ">>> list of expected files on output sandbox"\n'
1517 +        listOutFiles = []
1518 +        stdout = 'CMSSW_$NJob.stdout'
1519 +        stderr = 'CMSSW_$NJob.stderr'
1520 +        if (self.return_data == 1):
1521 +            for file in (self.output_file+self.output_file_sandbox):
1522 +                listOutFiles.append(numberFile(file, '$NJob'))
1523 +            listOutFiles.append(stdout)
1524 +            listOutFiles.append(stderr)
1525 +        else:
1526 +            for file in (self.output_file_sandbox):
1527 +                listOutFiles.append(numberFile(file, '$NJob'))
1528 +            listOutFiles.append(stdout)
1529 +            listOutFiles.append(stderr)
1530 +        txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1531 +        txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1532 +        txt += 'export filesToCheck\n'
1533 +        return txt

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines