ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.195 by slacapra, Wed May 28 15:42:01 2008 UTC vs.
Revision 1.300 by spiga, Wed May 20 13:51:45 2009 UTC

# Line 2 | Line 2 | from JobType import JobType
2   from crab_logger import Logger
3   from crab_exceptions import *
4   from crab_util import *
5 from BlackWhiteListParser import BlackWhiteListParser
5   import common
6   import Scram
7 < from LFNBaseName import *
7 > from Splitter import JobSplitter
8  
9 + from IMProv.IMProvNode import IMProvNode
10   import os, string, glob
11  
12   class Cmssw(JobType):
13 <    def __init__(self, cfg_params, ncjobs):
13 >    def __init__(self, cfg_params, ncjobs,skip_blocks, isNew):
14          JobType.__init__(self, 'CMSSW')
15          common.logger.debug(3,'CMSSW::__init__')
16 <
17 <        self.argsList = []
16 >        self.skip_blocks = skip_blocks
17 >        self.argsList = 1
18  
19          self._params = {}
20          self.cfg_params = cfg_params
21        # init BlackWhiteListParser
22        self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
21  
22 <        self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
22 >        ### Temporary patch to automatically skip the ISB size check:
23 >        server=self.cfg_params.get('CRAB.server_name',None)
24 >        size = 9.5
25 >        if server or common.scheduler.name().upper() in ['LSF','CAF']: size = 99999
26 >        ### D.S.
27 >        self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',size))
28  
29          # number of jobs requested to be created, limit obj splitting
30          self.ncjobs = ncjobs
# Line 33 | Line 36 | class Cmssw(JobType):
36          self.scriptExe = ''
37          self.executable = ''
38          self.executable_arch = self.scram.getArch()
39 <        self.tgz_name = 'default.tgz'
39 >        self.tgz_name = 'default.tar.gz'
40 >        self.tar_name = 'default.tar'
41          self.scriptName = 'CMSSW.sh'
42          self.pset = ''
43          self.datasetPath = ''
44  
45 +        self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name
46          # set FJR file name
47          self.fjrFileName = 'crab_fjr.xml'
48  
49          self.version = self.scram.getSWVersion()
50 <        version_array = self.version.split('_')
46 <        self.CMSSW_major = 0
47 <        self.CMSSW_minor = 0
48 <        self.CMSSW_patch = 0
50 >        common.logger.write("CMSSW version is: "+str(self.version))
51          try:
52 <            self.CMSSW_major = int(version_array[1])
51 <            self.CMSSW_minor = int(version_array[2])
52 <            self.CMSSW_patch = int(version_array[3])
52 >            type, self.CMSSW_major, self.CMSSW_minor, self.CMSSW_patch = tuple(self.version.split('_'))
53          except:
54              msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
55              raise CrabException(msg)
56  
57 +        if self.CMSSW_major < 1 or (self.CMSSW_major == 1 and self.CMSSW_minor < 5):
58 +            msg = "CRAB supports CMSSW >= 1_5_x only. Use an older CRAB version."
59 +            raise CrabException(msg)
60 +            """
61 +            As CMSSW versions are dropped we can drop more code:
62 +            1.X dropped: drop support for running .cfg on WN
63 +            2.0 dropped: drop all support for cfg here and in writeCfg
64 +            2.0 dropped: Recheck the random number seed support
65 +            """
66 +
67          ### collect Data cards
68  
69 <        if not cfg_params.has_key('CMSSW.datasetpath'):
70 <            msg = "Error: datasetpath not defined "
71 <            raise CrabException(msg)
69 >
70 >        ### Temporary: added to remove input file control in the case of PU
71 >        self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None)
72 >
73          tmp =  cfg_params['CMSSW.datasetpath']
74          log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
75 <        if string.lower(tmp)=='none':
75 >
76 >        if tmp =='':
77 >            msg = "Error: datasetpath not defined "
78 >            raise CrabException(msg)
79 >        elif string.lower(tmp)=='none':
80              self.datasetPath = None
81              self.selectNoInput = 1
82          else:
# Line 70 | Line 85 | class Cmssw(JobType):
85  
86          self.dataTiers = []
87  
88 <        self.debug_pset = cfg_params.get('USER.debug_pset',False)
88 >        self.debugWrap=''
89 >        self.debug_wrapper = int(cfg_params.get('USER.debug_wrapper',0))
90 >        if self.debug_wrapper == 1: self.debugWrap='--debug'
91  
92          ## now the application
93 +        self.managedGenerators = ['madgraph','comphep']
94 +        self.generator = cfg_params.get('CMSSW.generator','pythia').lower()
95          self.executable = cfg_params.get('CMSSW.executable','cmsRun')
96          log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
97  
# Line 94 | Line 113 | class Cmssw(JobType):
113          self.output_file_sandbox.append(self.fjrFileName)
114  
115          # other output files to be returned via sandbox or copied to SE
116 +        outfileflag = False
117          self.output_file = []
118          tmp = cfg_params.get('CMSSW.output_file',None)
119          if tmp :
120 <            tmpOutFiles = string.split(tmp,',')
121 <            log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
122 <            for tmp in tmpOutFiles:
123 <                tmp=string.strip(tmp)
104 <                self.output_file.append(tmp)
105 <                pass
106 <        else:
107 <            log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
108 <        pass
120 >            self.output_file = [x.strip() for x in tmp.split(',')]
121 >            outfileflag = True #output found
122 >        #else:
123 >        #    log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
124  
125          # script_exe file as additional file in inputSandbox
126          self.scriptExe = cfg_params.get('USER.script_exe',None)
# Line 119 | Line 134 | class Cmssw(JobType):
134              msg ="Error. script_exe  not defined"
135              raise CrabException(msg)
136  
137 +        # use parent files...
138 +        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
139 +
140          ## additional input files
141          if cfg_params.has_key('USER.additional_input_files'):
142              tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
# Line 143 | Line 161 | class Cmssw(JobType):
161              common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
162          pass
163  
146        ## Events per job
147        if cfg_params.has_key('CMSSW.events_per_job'):
148            self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
149            self.selectEventsPerJob = 1
150        else:
151            self.eventsPerJob = -1
152            self.selectEventsPerJob = 0
153
154        ## number of jobs
155        if cfg_params.has_key('CMSSW.number_of_jobs'):
156            self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
157            self.selectNumberOfJobs = 1
158        else:
159            self.theNumberOfJobs = 0
160            self.selectNumberOfJobs = 0
161
162        if cfg_params.has_key('CMSSW.total_number_of_events'):
163            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
164            self.selectTotalNumberEvents = 1
165            if self.selectNumberOfJobs  == 1:
166                if int(self.total_number_of_events) < int(self.theNumberOfJobs):
167                    msg = 'Must specify at least one event per job. total_number_of_events > number_of_jobs '
168                    raise CrabException(msg)
169        else:
170            self.total_number_of_events = 0
171            self.selectTotalNumberEvents = 0
172
173        if self.pset != None:
174             if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
175                 msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
176                 raise CrabException(msg)
177        else:
178             if (self.selectNumberOfJobs == 0):
179                 msg = 'Must specify  number_of_jobs.'
180                 raise CrabException(msg)
164  
165          ## New method of dealing with seeds
166          self.incrementSeeds = []
# Line 193 | Line 176 | class Cmssw(JobType):
176                  tmp.strip()
177                  self.incrementSeeds.append(tmp)
178  
196        ## Old method of dealing with seeds
197        ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
198        ## remove
199        self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
200        if self.sourceSeed:
201            print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
202            self.incrementSeeds.append('sourceSeed')
203            self.incrementSeeds.append('theSource')
204
205        self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
206        if self.sourceSeedVtx:
207            print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
208            self.incrementSeeds.append('VtxSmeared')
209
210        self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
211        if self.sourceSeedG4:
212            print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
213            self.incrementSeeds.append('g4SimHits')
214
215        self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
216        if self.sourceSeedMix:
217            print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
218            self.incrementSeeds.append('mix')
219
179          self.firstRun = cfg_params.get('CMSSW.first_run',None)
180  
222        if self.pset != None: #CarlosDaniele
223            import PsetManipulator as pp
224            PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
225
181          # Copy/return
227
182          self.copy_data = int(cfg_params.get('USER.copy_data',0))
183          self.return_data = int(cfg_params.get('USER.return_data',0))
184  
185 +        self.conf = {}
186 +        self.conf['pubdata'] = None
187 +        # number of jobs requested to be created, limit obj splitting DD
188          #DBSDLS-start
189          ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
190          self.maxEvents=0  # max events available   ( --> check the requested nb. of evts in Creator.py)
# Line 239 | Line 196 | class Cmssw(JobType):
196          if self.datasetPath:
197              blockSites = self.DataDiscoveryAndLocation(cfg_params)
198          #DBSDLS-end
199 <
199 >        self.conf['blockSites']=blockSites
200  
201          ## Select Splitting
202 +        splitByRun = int(cfg_params.get('CMSSW.split_by_run',0))
203 +
204          if self.selectNoInput:
205              if self.pset == None:
206 <                self.jobSplittingForScript()
206 >                self.algo = 'ForScript'
207              else:
208 <                self.jobSplittingNoInput()
208 >                self.algo = 'NoInput'
209 >                self.conf['managedGenerators']=self.managedGenerators
210 >                self.conf['generator']=self.generator
211 >        elif splitByRun ==1:
212 >            self.algo = 'RunBased'
213          else:
214 <            self.jobSplittingByBlocks(blockSites)
214 >            self.algo = 'EventBased'
215 >
216 > #        self.algo = 'LumiBased'
217 >        splitter = JobSplitter(self.cfg_params,self.conf)
218 >        self.dict = splitter.Algos()[self.algo]()
219 >
220 >        self.argsFile= '%s/arguments.xml'%common.work_space.shareDir()
221 >        self.rootArgsFilename= 'arguments'
222 >        # modify Pset only the first time
223 >        if (isNew and self.pset != None): self.ModifyPset()
224 >
225 >        ## Prepare inputSandbox TarBall (only the first time)
226 >        self.tarNameWithPath = self.getTarBall(self.executable)
227 >
228 >
229 >    def ModifyPset(self):
230 >        import PsetManipulator as pp
231 >        PsetEdit = pp.PsetManipulator(self.pset)
232 >        try:
233 >            # Add FrameworkJobReport to parameter-set, set max events.
234 >            # Reset later for data jobs by writeCFG which does all modifications
235 >            PsetEdit.maxEvent(1)
236 >            PsetEdit.skipEvent(0)
237 >            PsetEdit.psetWriter(self.configFilename())
238 >            ## If present, add TFileService to output files
239 >            if not int(self.cfg_params.get('CMSSW.skip_TFileService_output',0)):
240 >                tfsOutput = PsetEdit.getTFileService()
241 >                if tfsOutput:
242 >                    if tfsOutput in self.output_file:
243 >                        common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files")
244 >                    else:
245 >                        outfileflag = True #output found
246 >                        self.output_file.append(tfsOutput)
247 >                        common.logger.message("Adding "+tfsOutput+" (from TFileService) to list of output files")
248 >                    pass
249 >                pass
250 >            ## If present and requested, add PoolOutputModule to output files
251 >            if int(self.cfg_params.get('CMSSW.get_edm_output',0)):
252 >                edmOutput = PsetEdit.getPoolOutputModule()
253 >                if edmOutput:
254 >                    if edmOutput in self.output_file:
255 >                        common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files")
256 >                    else:
257 >                        self.output_file.append(edmOutput)
258 >                        common.logger.message("Adding "+edmOutput+" (from PoolOutputModule) to list of output files")
259 >                    pass
260 >                pass
261 >            # not required: check anyhow if present, to avoid accidental T2 overload
262 >            else:
263 >                edmOutput = PsetEdit.getPoolOutputModule()
264 >                if edmOutput and (edmOutput not in self.output_file):
265 >                    msg = "ERROR: a PoolOutputModule is present in your ParameteSet %s \n"%self.pset
266 >                    msg +="         but the file produced ( %s ) is not in the list of output files\n"%edmOutput
267 >                    msg += "WARNING: please remove it. If you want to keep it, add the file to output_files or use CMSSW.get_edm_output\n"
268 >                    raise CrabException(msg)
269 >                pass
270 >            pass
271 >        except CrabException, msg:
272 >            common.logger.message(str(msg))
273 >            msg='Error while manipulating ParameterSet (see previous message, if any): exiting...'
274 >            raise CrabException(msg)
275  
253        # modify Pset
254        if self.pset != None:
255            try:
256                # Add FrameworkJobReport to parameter-set, set max events.
257                # Reset later for data jobs by writeCFG which does all modifications
258                PsetEdit.addCrabFJR(self.fjrFileName) # FUTURE: Job report addition not needed by CMSSW>1.5
259                PsetEdit.maxEvent(self.eventsPerJob)
260                PsetEdit.psetWriter(self.configFilename())
261            except:
262                msg='Error while manipulating ParameterSet: exiting...'
263                raise CrabException(msg)
264        self.tgzNameWithPath = self.getTarBall(self.executable)
276  
277      def DataDiscoveryAndLocation(self, cfg_params):
278  
# Line 274 | Line 285 | class Cmssw(JobType):
285          ## Contact the DBS
286          common.logger.message("Contacting Data Discovery Services ...")
287          try:
288 <            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
288 >            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks)
289              self.pubdata.fetchDBSInfo()
290  
291          except DataDiscovery.NotExistingDatasetError, ex :
# Line 288 | Line 299 | class Cmssw(JobType):
299              raise CrabException(msg)
300  
301          self.filesbyblock=self.pubdata.getFiles()
302 <        self.eventsbyblock=self.pubdata.getEventsPerBlock()
303 <        self.eventsbyfile=self.pubdata.getEventsPerFile()
302 >        #print self.filesbyblock
303 >        self.conf['pubdata']=self.pubdata
304  
305          ## get max number of events
306          self.maxEvents=self.pubdata.getMaxEvents()
# Line 298 | Line 309 | class Cmssw(JobType):
309          try:
310              dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
311              dataloc.fetchDLSInfo()
312 +
313          except DataLocation.DataLocationError , ex:
314              msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
315              raise CrabException(msg)
316  
317  
318 <        sites = dataloc.getSites()
318 >        unsorted_sites = dataloc.getSites()
319 >        #print "Unsorted :",unsorted_sites
320 >        sites = self.filesbyblock.fromkeys(self.filesbyblock,'')
321 >        for lfn in self.filesbyblock.keys():
322 >            #print lfn
323 >            if unsorted_sites.has_key(lfn):
324 >                #print "Found ",lfn
325 >                sites[lfn]=unsorted_sites[lfn]
326 >            else:
327 >                #print "Not Found ",lfn
328 >                sites[lfn]=[]
329 >        #print sites
330 >
331 >        #print "Sorted :",sites
332 >        if len(sites)==0:
333 >            msg = 'ERROR ***: no location for any of the blocks of this dataset: \n\t %s \n'%datasetPath
334 >            msg += "\tMaybe the dataset is located only at T1's (or at T0), where analysis jobs are not allowed\n"
335 >            msg += "\tPlease check DataDiscovery page https://cmsweb.cern.ch/dbs_discovery/\n"
336 >            raise CrabException(msg)
337 >
338          allSites = []
339          listSites = sites.values()
340          for listSite in listSites:
341              for oneSite in listSite:
342                  allSites.append(oneSite)
343 <        allSites = self.uniquelist(allSites)
343 >        [allSites.append(it) for it in allSites if not allSites.count(it)]
344 >
345  
346          # screen output
347          common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
348  
349          return sites
350  
319    def jobSplittingByBlocks(self, blockSites):
320        """
321        Perform job splitting. Jobs run over an integer number of files
322        and no more than one block.
323        ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
324        REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
325                  self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
326                  self.maxEvents, self.filesbyblock
327        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
328              self.total_number_of_jobs - Total # of jobs
329              self.list_of_args - File(s) job will run on (a list of lists)
330        """
331
332        # ---- Handle the possible job splitting configurations ---- #
333        if (self.selectTotalNumberEvents):
334            totalEventsRequested = self.total_number_of_events
335        if (self.selectEventsPerJob):
336            eventsPerJobRequested = self.eventsPerJob
337            if (self.selectNumberOfJobs):
338                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
339
340        # If user requested all the events in the dataset
341        if (totalEventsRequested == -1):
342            eventsRemaining=self.maxEvents
343        # If user requested more events than are in the dataset
344        elif (totalEventsRequested > self.maxEvents):
345            eventsRemaining = self.maxEvents
346            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
347        # If user requested less events than are in the dataset
348        else:
349            eventsRemaining = totalEventsRequested
350
351        # If user requested more events per job than are in the dataset
352        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
353            eventsPerJobRequested = self.maxEvents
354
355        # For user info at end
356        totalEventCount = 0
357
358        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
359            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
360
361        if (self.selectNumberOfJobs):
362            common.logger.message("May not create the exact number_of_jobs requested.")
363
364        if ( self.ncjobs == 'all' ) :
365            totalNumberOfJobs = 999999999
366        else :
367            totalNumberOfJobs = self.ncjobs
368
369        blocks = blockSites.keys()
370        blockCount = 0
371        # Backup variable in case self.maxEvents counted events in a non-included block
372        numBlocksInDataset = len(blocks)
373
374        jobCount = 0
375        list_of_lists = []
376
377        # list tracking which jobs are in which jobs belong to which block
378        jobsOfBlock = {}
379
380        # ---- Iterate over the blocks in the dataset until ---- #
381        # ---- we've met the requested total # of events    ---- #
382        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
383            block = blocks[blockCount]
384            blockCount += 1
385            if block not in jobsOfBlock.keys() :
386                jobsOfBlock[block] = []
387
388            if self.eventsbyblock.has_key(block) :
389                numEventsInBlock = self.eventsbyblock[block]
390                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
391
392                files = self.filesbyblock[block]
393                numFilesInBlock = len(files)
394                if (numFilesInBlock <= 0):
395                    continue
396                fileCount = 0
397
398                # ---- New block => New job ---- #
399                parString = ""
400                # counter for number of events in files currently worked on
401                filesEventCount = 0
402                # flag if next while loop should touch new file
403                newFile = 1
404                # job event counter
405                jobSkipEventCount = 0
406
407                # ---- Iterate over the files in the block until we've met the requested ---- #
408                # ---- total # of events or we've gone over all the files in this block  ---- #
409                while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
410                    file = files[fileCount]
411                    if newFile :
412                        try:
413                            numEventsInFile = self.eventsbyfile[file]
414                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
415                            # increase filesEventCount
416                            filesEventCount += numEventsInFile
417                            # Add file to current job
418                            parString += '\\\"' + file + '\\\"\,'
419                            newFile = 0
420                        except KeyError:
421                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
422
423                    eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
424                    # if less events in file remain than eventsPerJobRequested
425                    if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
426                        # if last file in block
427                        if ( fileCount == numFilesInBlock-1 ) :
428                            # end job using last file, use remaining events in block
429                            # close job and touch new file
430                            fullString = parString[:-2]
431                            list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
432                            common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
433                            self.jobDestination.append(blockSites[block])
434                            common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
435                            # fill jobs of block dictionary
436                            jobsOfBlock[block].append(jobCount+1)
437                            # reset counter
438                            jobCount = jobCount + 1
439                            totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
440                            eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
441                            jobSkipEventCount = 0
442                            # reset file
443                            parString = ""
444                            filesEventCount = 0
445                            newFile = 1
446                            fileCount += 1
447                        else :
448                            # go to next file
449                            newFile = 1
450                            fileCount += 1
451                    # if events in file equal to eventsPerJobRequested
452                    elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
453                        # close job and touch new file
454                        fullString = parString[:-2]
455                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
456                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
457                        self.jobDestination.append(blockSites[block])
458                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
459                        jobsOfBlock[block].append(jobCount+1)
460                        # reset counter
461                        jobCount = jobCount + 1
462                        totalEventCount = totalEventCount + eventsPerJobRequested
463                        eventsRemaining = eventsRemaining - eventsPerJobRequested
464                        jobSkipEventCount = 0
465                        # reset file
466                        parString = ""
467                        filesEventCount = 0
468                        newFile = 1
469                        fileCount += 1
470
471                    # if more events in file remain than eventsPerJobRequested
472                    else :
473                        # close job but don't touch new file
474                        fullString = parString[:-2]
475                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
476                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
477                        self.jobDestination.append(blockSites[block])
478                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
479                        jobsOfBlock[block].append(jobCount+1)
480                        # increase counter
481                        jobCount = jobCount + 1
482                        totalEventCount = totalEventCount + eventsPerJobRequested
483                        eventsRemaining = eventsRemaining - eventsPerJobRequested
484                        # calculate skip events for last file
485                        # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
486                        jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
487                        # remove all but the last file
488                        filesEventCount = self.eventsbyfile[file]
489                        parString = '\\\"' + file + '\\\"\,'
490                    pass # END if
491                pass # END while (iterate over files in the block)
492        pass # END while (iterate over blocks in the dataset)
493        self.ncjobs = self.total_number_of_jobs = jobCount
494        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
495            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
496        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
497
498        # screen output
499        screenOutput = "List of jobs and available destination sites:\n\n"
351  
352 <        # keep trace of block with no sites to print a warning at the end
502 <        noSiteBlock = []
503 <        bloskNoSite = []
504 <
505 <        blockCounter = 0
506 <        for block in blocks:
507 <            if block in jobsOfBlock.keys() :
508 <                blockCounter += 1
509 <                screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
510 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
511 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
512 <                    noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
513 <                    bloskNoSite.append( blockCounter )
514 <
515 <        common.logger.message(screenOutput)
516 <        if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
517 <            msg = 'WARNING: No sites are hosting any part of data for block:\n                '
518 <            virgola = ""
519 <            if len(bloskNoSite) > 1:
520 <                virgola = ","
521 <            for block in bloskNoSite:
522 <                msg += ' ' + str(block) + virgola
523 <            msg += '\n               Related jobs:\n                 '
524 <            virgola = ""
525 <            if len(noSiteBlock) > 1:
526 <                virgola = ","
527 <            for range_jobs in noSiteBlock:
528 <                msg += str(range_jobs) + virgola
529 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
530 <            if self.cfg_params.has_key('EDG.se_white_list'):
531 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
532 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
533 <                msg += 'Please check if the dataset is available at this site!)\n'
534 <            if self.cfg_params.has_key('EDG.ce_white_list'):
535 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
536 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
537 <                msg += 'Please check if the dataset is available at this site!)\n'
352 >    def split(self, jobParams,firstJobID):
353  
354 <            common.logger.message(msg)
355 <
356 <        self.list_of_args = list_of_lists
542 <        return
543 <
544 <    def jobSplittingNoInput(self):
545 <        """
546 <        Perform job splitting based on number of event per job
547 <        """
548 <        common.logger.debug(5,'Splitting per events')
354 >        jobParams = self.dict['args']
355 >        njobs = self.dict['njobs']
356 >        self.jobDestination = self.dict['jobDestination']
357  
358 <        if (self.selectEventsPerJob):
359 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
552 <        if (self.selectNumberOfJobs):
553 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
554 <        if (self.selectTotalNumberEvents):
555 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
556 <
557 <        if (self.total_number_of_events < 0):
558 <            msg='Cannot split jobs per Events with "-1" as total number of events'
559 <            raise CrabException(msg)
358 >        if njobs==0:
359 >            raise CrabException("Ask to split "+str(njobs)+" jobs: aborting")
360  
561        if (self.selectEventsPerJob):
562            if (self.selectTotalNumberEvents):
563                self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
564            elif(self.selectNumberOfJobs) :
565                self.total_number_of_jobs =self.theNumberOfJobs
566                self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
567
568        elif (self.selectNumberOfJobs) :
569            self.total_number_of_jobs = self.theNumberOfJobs
570            self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
571
572        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
573
574        # is there any remainder?
575        check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
576
577        common.logger.debug(5,'Check  '+str(check))
578
579        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
580        if check > 0:
581            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
582
583        # argument is seed number.$i
584        self.list_of_args = []
585        for i in range(self.total_number_of_jobs):
586            ## Since there is no input, any site is good
587            self.jobDestination.append([""]) #must be empty to write correctly the xml
588            args=[]
589            if (self.firstRun):
590                ## pythia first run
591                args.append(str(self.firstRun)+str(i))
592            self.list_of_args.append(args)
593
594        return
595
596
597    def jobSplittingForScript(self):
598        """
599        Perform job splitting based on number of job
600        """
601        common.logger.debug(5,'Splitting per job')
602        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
603
604        self.total_number_of_jobs = self.theNumberOfJobs
605
606        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
607
608        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
609
610        # argument is seed number.$i
611        self.list_of_args = []
612        for i in range(self.total_number_of_jobs):
613            self.jobDestination.append([""])
614            self.list_of_args.append([str(i)])
615        return
616
617    def split(self, jobParams):
618
619        njobs = self.total_number_of_jobs
620        arglist = self.list_of_args
361          # create the empty structure
362          for i in range(njobs):
363              jobParams.append("")
364  
365          listID=[]
366          listField=[]
367 <        for job in range(njobs):
368 <            jobParams[job] = arglist[job]
367 >        listDictions=[]
368 >        exist= os.path.exists(self.argsFile)
369 >        for id in range(njobs):
370 >            job = id + int(firstJobID)
371              listID.append(job+1)
372              job_ToSave ={}
373              concString = ' '
374              argu=''
375 <            if len(jobParams[job]):
376 <                argu +=   concString.join(jobParams[job] )
377 <            job_ToSave['arguments']= str(job+1)+' '+argu
378 <            job_ToSave['dlsDestination']= self.jobDestination[job]
375 >            str_argu = str(job+1)
376 >            if len(jobParams[id]):
377 >                argu = {'JobID': job+1}
378 >                for i in range(len(jobParams[id])):
379 >                    argu[self.dict['params'][i]]=jobParams[id][i]
380 >                # just for debug
381 >                str_argu += concString.join(jobParams[id])
382 >            listDictions.append(argu)
383 >            job_ToSave['arguments']= str(job+1)
384 >            job_ToSave['dlsDestination']= self.jobDestination[id]
385              listField.append(job_ToSave)
386 <            msg="Job "+str(job)+" Arguments:   "+str(job+1)+" "+argu+"\n"  \
387 <            +"                     Destination: "+str(self.jobDestination[job])
386 >            msg="Job  %s  Arguments:  %s\n"%(str(job+1),str_argu)
387 >            msg+="\t  Destination: %s "%(str(self.jobDestination[id]))
388              common.logger.debug(5,msg)
389 +        # write xml
390 +        if len(listDictions):
391 +            if exist==False: self.CreateXML()
392 +            self.addEntry(listDictions)
393 +            self.addXMLfile()
394          common._db.updateJob_(listID,listField)
395 <        self.argsList = (len(jobParams[0])+1)
395 >        self.zipTarFile()
396 >        return
397 >      
398 >    def addXMLfile(self):
399 >
400 >        import tarfile
401 >       # try:
402 >        print self.argsFile
403 >        tar = tarfile.open(self.tarNameWithPath, "a")
404 >        tar.add(self.argsFile, os.path.basename(self.argsFile))
405 >        tar.close()
406 >       ## except:
407 >       #     pass
408  
409 +  
410 +    def CreateXML(self):
411 +        """
412 +        """
413 +        result = IMProvNode( self.rootArgsFilename )
414 +        outfile = file( self.argsFile, 'w').write(str(result))
415 +        return
416 +
417 +    def addEntry(self, listDictions):
418 +        """
419 +        _addEntry_
420 +
421 +        add an entry to the xml file
422 +        """
423 +        from IMProv.IMProvLoader import loadIMProvFile
424 +        ## load xml
425 +        improvDoc = loadIMProvFile(self.argsFile)
426 +        entrname= 'Job'
427 +        for dictions in listDictions:
428 +           report = IMProvNode(entrname , None, **dictions)
429 +           improvDoc.addNode(report)
430 +        outfile = file( self.argsFile, 'w').write(str(improvDoc))
431          return
432  
433      def numberOfJobs(self):
434 <        return self.total_number_of_jobs
434 >        return self.dict['njobs']
435  
436      def getTarBall(self, exe):
437          """
438          Return the TarBall with lib and exe
439          """
440 <        self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
441 <        if os.path.exists(self.tgzNameWithPath):
442 <            return self.tgzNameWithPath
440 >        self.tarNameWithPath = common.work_space.pathForTgz()+self.tar_name
441 >        if os.path.exists(self.tarNameWithPath):
442 >            return self.tarNameWithPath
443  
444          # Prepare a tar gzipped file with user binaries.
445          self.buildTar_(exe)
446  
447 <        return string.strip(self.tgzNameWithPath)
447 >        return string.strip(self.tarNameWithPath)
448  
449      def buildTar_(self, executable):
450  
# Line 672 | Line 459 | class Cmssw(JobType):
459  
460          import tarfile
461          try: # create tar ball
462 <            tar = tarfile.open(self.tgzNameWithPath, "w:gz")
462 >            #tar = tarfile.open(self.tgzNameWithPath, "w:gz")
463 >            tar = tarfile.open(self.tarNameWithPath, "w")
464              ## First find the executable
465              if (self.executable != ''):
466                  exeWithPath = self.scram.findFile_(executable)
# Line 696 | Line 484 | class Cmssw(JobType):
484                      pass
485  
486              ## Now get the libraries: only those in local working area
487 +            tar.dereference=True
488              libDir = 'lib'
489              lib = swArea+'/' +libDir
490              common.logger.debug(5,"lib "+lib+" to be tarred")
# Line 707 | Line 496 | class Cmssw(JobType):
496              module = swArea + '/' + moduleDir
497              if os.path.isdir(module):
498                  tar.add(module,moduleDir)
499 +            tar.dereference=False
500  
501              ## Now check if any data dir(s) is present
712            swAreaLen=len(swArea)
502              self.dataExist = False
503 <            for root, dirs, files in os.walk(swArea):
504 <                if "data" in dirs:
505 <                    self.dataExist=True
506 <                    common.logger.debug(5,"data "+root+"/data"+" to be tarred")
507 <                    tar.add(root+"/data",root[swAreaLen:]+"/data")
503 >            todo_list = [(i, i) for i in  os.listdir(swArea+"/src")]
504 >            while len(todo_list):
505 >                entry, name = todo_list.pop()
506 >                if name.startswith('crab_0_') or  name.startswith('.') or name == 'CVS':
507 >                    continue
508 >                if os.path.isdir(swArea+"/src/"+entry):
509 >                    entryPath = entry + '/'
510 >                    todo_list += [(entryPath + i, i) for i in  os.listdir(swArea+"/src/"+entry)]
511 >                    if name == 'data':
512 >                        self.dataExist=True
513 >                        common.logger.debug(5,"data "+entry+" to be tarred")
514 >                        tar.add(swArea+"/src/"+entry,"src/"+entry)
515 >                    pass
516 >                pass
517  
518              ### CMSSW ParameterSet
519              if not self.pset is None:
520                  cfg_file = common.work_space.jobDir()+self.configFilename()
521                  tar.add(cfg_file,self.configFilename())
724                common.logger.debug(5,"File added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
522  
523  
524              ## Add ProdCommon dir to tar
525 <            prodcommonDir = 'ProdCommon'
526 <            prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
527 <            if os.path.isdir(prodcommonPath):
528 <                tar.add(prodcommonPath,prodcommonDir)
529 <            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
525 >            prodcommonDir = './'
526 >            prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/'
527 >            neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools', \
528 >                           'ProdCommon/Core', 'ProdCommon/MCPayloads', 'IMProv', 'ProdCommon/Storage', \
529 >                           'WMCore/__init__.py','WMCore/Algorithms']
530 >            for file in neededStuff:
531 >                tar.add(prodcommonPath+file,prodcommonDir+file)
532  
533              ##### ML stuff
534              ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
535              path=os.environ['CRABDIR'] + '/python/'
536              for file in ML_file_list:
537                  tar.add(path+file,file)
739            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
538  
539              ##### Utils
540 <            Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']
540 >            Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py']
541              for file in Utils_file_list:
542                  tar.add(path+file,file)
745            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
543  
544              ##### AdditionalFiles
545 +            tar.dereference=True
546              for file in self.additional_inbox_files:
547                  tar.add(file,string.split(file,'/')[-1])
548 <            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
548 >            tar.dereference=False
549 >            common.logger.debug(5,"Files in "+self.tarNameWithPath+" : "+str(tar.getnames()))
550  
551              tar.close()
552 <        except :
553 <            raise CrabException('Could not create tar-ball')
552 >        except IOError, exc:
553 >            common.logger.write(str(exc))
554 >            raise CrabException('Could not create tar-ball '+self.tarNameWithPath)
555 >        except tarfile.TarError, exc:
556 >            common.logger.write(str(exc))
557 >            raise CrabException('Could not create tar-ball '+self.tarNameWithPath)
558 >  
559 >    def zipTarFile(self):  
560 >
561 >        cmd = "gzip -c %s > %s "%(self.tarNameWithPath,self.tgzNameWithPath)
562 >        res=runCommand(cmd)
563  
756        ## check for tarball size
564          tarballinfo = os.stat(self.tgzNameWithPath)
565          if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
566 <            raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
566 >            msg  = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) \
567 >               +'MB input sandbox limit \n'
568 >            msg += '      and not supported by the direct GRID submission system.\n'
569 >            msg += '      Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n'
570 >            msg += '      For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#CRABSERVER_for_Users'
571 >            raise CrabException(msg)
572  
573          ## create tar-ball with ML stuff
574  
# Line 765 | Line 577 | class Cmssw(JobType):
577          Returns part of a job script which prepares
578          the execution environment for the job 'nj'.
579          """
580 +        # FUTURE: Drop support for .cfg when possible
581          if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
582              psetName = 'pset.py'
583          else:
# Line 772 | Line 585 | class Cmssw(JobType):
585          # Prepare JobType-independent part
586          txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
587          txt += 'echo ">>> setup environment"\n'
588 <        txt += 'if [ $middleware == LCG ]; then \n'
588 >        txt += 'if [ $middleware == LCG ] || [ $middleware == CAF ] || [ $middleware == LSF ]; then \n'
589          txt += self.wsSetupCMSLCGEnvironment_()
590          txt += 'elif [ $middleware == OSG ]; then\n'
591          txt += '    WORKING_DIR=`/bin/mktemp  -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
# Line 787 | Line 600 | class Cmssw(JobType):
600          txt += '    cd $WORKING_DIR\n'
601          txt += '    echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
602          txt += self.wsSetupCMSOSGEnvironment_()
603 +        #Setup SGE Environment
604 +        txt += 'elif [ $middleware == SGE ]; then\n'
605 +        txt += self.wsSetupCMSLCGEnvironment_()
606 +
607 +        txt += 'elif [ $middleware == ARC ]; then\n'
608 +        txt += self.wsSetupCMSLCGEnvironment_()
609 +
610          txt += 'fi\n'
611  
612          # Prepare JobType-specific part
# Line 802 | Line 622 | class Cmssw(JobType):
622          txt += '    func_exit\n'
623          txt += 'fi \n'
624          txt += 'cd '+self.version+'\n'
625 <        txt += 'SOFTWARE_DIR=`pwd`\n'
625 >        txt += 'SOFTWARE_DIR=`pwd`; export SOFTWARE_DIR\n'
626          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
627          txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
628          txt += 'if [ $? != 0 ] ; then\n'
# Line 826 | Line 646 | class Cmssw(JobType):
646          # Prepare job-specific part
647          job = common.job_list[nj]
648          if (self.datasetPath):
649 +            self.primaryDataset = self.datasetPath.split("/")[1]
650 +            DataTier = self.datasetPath.split("/")[2]
651              txt += '\n'
652              txt += 'DatasetPath='+self.datasetPath+'\n'
653  
654 <            datasetpath_split = self.datasetPath.split("/")
655 <
834 <            txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
835 <            txt += 'DataTier='+datasetpath_split[2]+'\n'
654 >            txt += 'PrimaryDataset='+self.primaryDataset +'\n'
655 >            txt += 'DataTier='+DataTier+'\n'
656              txt += 'ApplicationFamily=cmsRun\n'
657  
658          else:
659 +            self.primaryDataset = 'null'
660              txt += 'DatasetPath=MCDataTier\n'
661              txt += 'PrimaryDataset=null\n'
662              txt += 'DataTier=null\n'
# Line 844 | Line 665 | class Cmssw(JobType):
665              pset = os.path.basename(job.configFilename())
666              txt += '\n'
667              txt += 'cp  $RUNTIME_AREA/'+pset+' .\n'
668 <            if (self.datasetPath): # standard job
669 <                txt += 'InputFiles=${args[1]}; export InputFiles\n'
670 <                txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
671 <                txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
672 <                txt += 'echo "Inputfiles:<$InputFiles>"\n'
852 <                txt += 'echo "MaxEvents:<$MaxEvents>"\n'
853 <                txt += 'echo "SkipEvents:<$SkipEvents>"\n'
854 <            else:  # pythia like job
855 <                txt += 'PreserveSeeds='  + ','.join(self.preserveSeeds)  + '; export PreserveSeeds\n'
856 <                txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
857 <                txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
858 <                txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
859 <                if (self.firstRun):
860 <                    txt += 'FirstRun=${args[1]}; export FirstRun\n'
861 <                    txt += 'echo "FirstRun: <$FirstRun>"\n'
668 >
669 >            txt += 'PreserveSeeds='  + ','.join(self.preserveSeeds)  + '; export PreserveSeeds\n'
670 >            txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
671 >            txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
672 >            txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
673  
674              txt += 'mv -f ' + pset + ' ' + psetName + '\n'
675  
# Line 866 | Line 677 | class Cmssw(JobType):
677          if self.pset != None:
678              # FUTURE: Can simply for 2_1_x and higher
679              txt += '\n'
680 <            if self.debug_pset==True:
680 >            if self.debug_wrapper == 1:
681                  txt += 'echo "***** cat ' + psetName + ' *********"\n'
682                  txt += 'cat ' + psetName + '\n'
683                  txt += 'echo "****** end ' + psetName + ' ********"\n'
684                  txt += '\n'
685 <            txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
685 >                txt += 'echo "***********************" \n'
686 >                txt += 'which edmConfigHash \n'
687 >                txt += 'echo "***********************" \n'
688 >            if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
689 >                txt += 'edmConfigHash ' + psetName + ' \n'
690 >                txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n'
691 >            else:
692 >                txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
693              txt += 'echo "PSETHASH = $PSETHASH" \n'
694 +            #### FEDE temporary fix for noEdm files #####
695 +            txt += 'if [ -z "$PSETHASH" ]; then \n'
696 +            txt += '   export PSETHASH=null\n'
697 +            txt += 'fi \n'
698 +            #############################################
699              txt += '\n'
700          return txt
701  
# Line 886 | Line 709 | class Cmssw(JobType):
709  
710          if os.path.isfile(self.tgzNameWithPath):
711              txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
712 <            txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
713 <            txt += 'ls -Al \n'
712 >            txt += 'tar zxvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
713 >            if  self.debug_wrapper==1 :
714 >                txt += 'ls -Al \n'
715              txt += 'untar_status=$? \n'
716              txt += 'if [ $untar_status -ne 0 ]; then \n'
717              txt += '   echo "ERROR ==> Untarring .tgz file failed"\n'
# Line 897 | Line 721 | class Cmssw(JobType):
721              txt += '   echo "Successful untar" \n'
722              txt += 'fi \n'
723              txt += '\n'
724 <            txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
724 >            txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
725              txt += 'if [ -z "$PYTHONPATH" ]; then\n'
726 <            txt += '   export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
726 >            txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
727              txt += 'else\n'
728 <            txt += '   export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
728 >            txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
729              txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
730              txt += 'fi\n'
731              txt += '\n'
# Line 928 | Line 752 | class Cmssw(JobType):
752          if len(self.additional_inbox_files)>0:
753              for file in self.additional_inbox_files:
754                  txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
755 <        txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
755 >        # txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
756 >        # txt += 'mv $RUNTIME_AREA/IMProv/ . \n'
757  
758 +        txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
759          txt += 'if [ -z "$PYTHONPATH" ]; then\n'
760 <        txt += '   export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
760 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
761          txt += 'else\n'
762 <        txt += '   export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
762 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
763          txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
764          txt += 'fi\n'
765          txt += '\n'
766  
767          return txt
768  
943    def modifySteeringCards(self, nj):
944        """
945        modify the card provided by the user,
946        writing a new card into share dir
947        """
769  
770      def executableName(self):
771          if self.scriptExe:
# Line 954 | Line 775 | class Cmssw(JobType):
775  
776      def executableArgs(self):
777          # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
778 <        if self.scriptExe:#CarlosDaniele
779 <            return   self.scriptExe + " $NJob"
778 >        if self.scriptExe:
779 >            return self.scriptExe + " $NJob"
780          else:
781              ex_args = ""
782 <            # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
783 <            # Framework job report
963 <            if (self.CMSSW_major >= 1 and self.CMSSW_minor >= 5) or (self.CMSSW_major >= 2):
964 <                ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
965 <            # Type of config file
782 >            ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
783 >            # Type of config file depends on CMSSW version
784              if self.CMSSW_major >= 2 :
785                  ex_args += " -p pset.py"
786              else:
# Line 976 | Line 794 | class Cmssw(JobType):
794          inp_box = []
795          if os.path.isfile(self.tgzNameWithPath):
796              inp_box.append(self.tgzNameWithPath)
797 <        wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
980 <        inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
797 >        inp_box.append(common.work_space.jobDir() + self.scriptName)
798          return inp_box
799  
800      def outputSandbox(self, nj):
# Line 989 | Line 806 | class Cmssw(JobType):
806          ## User Declared output files
807          for out in (self.output_file+self.output_file_sandbox):
808              n_out = nj + 1
809 <            out_box.append(self.numberFile_(out,str(n_out)))
809 >            out_box.append(numberFile(out,str(n_out)))
810          return out_box
811  
995    def prepareSteeringCards(self):
996        """
997        Make initial modifications of the user's steering card file.
998        """
999        return
812  
813      def wsRenameOutput(self, nj):
814          """
# Line 1006 | Line 818 | class Cmssw(JobType):
818          txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
819          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
820          txt += 'echo ">>> current directory content:"\n'
821 <        txt += 'ls \n'
821 >        if self.debug_wrapper==1:
822 >            txt += 'ls -Al\n'
823          txt += '\n'
824  
825          for fileWithSuffix in (self.output_file):
826 <            output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
826 >            output_file_num = numberFile(fileWithSuffix, '$NJob')
827              txt += '\n'
828              txt += '# check output file\n'
829              txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
# Line 1031 | Line 844 | class Cmssw(JobType):
844              txt += 'fi\n'
845          file_list = []
846          for fileWithSuffix in (self.output_file):
847 <             file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
847 >             file_list.append(numberFile('$SOFTWARE_DIR/'+fileWithSuffix, '$NJob'))
848  
849 <        txt += 'file_list="'+string.join(file_list,' ')+'"\n'
849 >        txt += 'file_list="'+string.join(file_list,',')+'"\n'
850          txt += '\n'
851          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
852          txt += 'echo ">>> current directory content:"\n'
853 <        txt += 'ls \n'
853 >        if self.debug_wrapper==1:
854 >            txt += 'ls -Al\n'
855          txt += '\n'
856          txt += 'cd $RUNTIME_AREA\n'
857          txt += 'echo ">>> current directory (RUNTIME_AREA):  $RUNTIME_AREA"\n'
858          return txt
859  
1046    def numberFile_(self, file, txt):
1047        """
1048        append _'txt' before last extension of a file
1049        """
1050        p = string.split(file,".")
1051        # take away last extension
1052        name = p[0]
1053        for x in p[1:-1]:
1054            name=name+"."+x
1055        # add "_txt"
1056        if len(p)>1:
1057            ext = p[len(p)-1]
1058            result = name + '_' + txt + "." + ext
1059        else:
1060            result = name + '_' + txt
1061
1062        return result
1063
860      def getRequirements(self, nj=[]):
861          """
862          return job requirements to add to jdl files
# Line 1076 | Line 872 | class Cmssw(JobType):
872                   '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
873  
874          req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
875 <        if common.scheduler.name() == "glitecoll":
875 >        if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"):
876              req += ' && other.GlueCEStateStatus == "Production" '
877  
878          return req
# Line 1147 | Line 943 | class Cmssw(JobType):
943          txt += '    echo "==> setup cms environment ok"\n'
944          return txt
945  
946 <    def modifyReport(self, nj):
946 >    def wsModifyReport(self, nj):
947          """
948          insert the part of the script that modifies the FrameworkJob Report
949          """
950 <        txt = '\n#Written by cms_cmssw::modifyReport\n'
950 >
951 >        txt = ''
952          publish_data = int(self.cfg_params.get('USER.publish_data',0))
953 <        if (publish_data == 1):
954 <            processedDataset = self.cfg_params['USER.publish_data_name']
955 <            LFNBaseName = LFNBase(processedDataset)
953 >        #if (publish_data == 1):
954 >        if (self.copy_data == 1):
955 >            txt = '\n#Written by cms_cmssw::wsModifyReport\n'
956 >            publish_data = int(self.cfg_params.get('USER.publish_data',0))
957 >
958  
959 <            txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
960 <            txt += '    FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
959 >            txt += 'if [ $StageOutExitStatus -eq 0 ]; then\n'
960 >            txt += '    FOR_LFN=$LFNBaseName\n'
961              txt += 'else\n'
962              txt += '    FOR_LFN=/copy_problems/ \n'
1164            txt += '    SE=""\n'
1165            txt += '    SE_PATH=""\n'
963              txt += 'fi\n'
964  
965              txt += 'echo ">>> Modify Job Report:" \n'
966 <            txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1170 <            txt += 'ProcessedDataset='+processedDataset+'\n'
1171 <            txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
966 >            txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
967              txt += 'echo "SE = $SE"\n'
968              txt += 'echo "SE_PATH = $SE_PATH"\n'
969              txt += 'echo "FOR_LFN = $FOR_LFN" \n'
970              txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
971 <            txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
972 <            txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
971 >
972 >
973 >            args = 'fjr $RUNTIME_AREA/crab_fjr_$NJob.xml n_job $NJob for_lfn $FOR_LFN PrimaryDataset $PrimaryDataset  ApplicationFamily $ApplicationFamily ApplicationName $executable cmssw_version $CMSSW_VERSION psethash $PSETHASH se_name $SE se_path $SE_PATH'
974 >            if (publish_data == 1):
975 >                processedDataset = self.cfg_params['USER.publish_data_name']
976 >                txt += 'ProcessedDataset='+processedDataset+'\n'
977 >                txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
978 >                args += ' UserProcessedDataset $USER-$ProcessedDataset-$PSETHASH'
979 >
980 >            txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'"\n'
981 >            txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'\n'
982              txt += 'modifyReport_result=$?\n'
983              txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
984              txt += '    modifyReport_result=70500\n'
# Line 1194 | Line 998 | class Cmssw(JobType):
998          txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
999          txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1000          txt += '    if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1001 <        txt += '        cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --MonitorID $MonitorID --MonitorJobID $MonitorJobID`\n'
1002 <        txt += '        echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1003 <        txt += '        tmp_executable_exit_status=`echo $cmd_out | awk -F\; \'{print $1}\' | awk -F \' \' \'{print $NF}\'`\n'
1004 <        txt += '        if [ -n $tmp_executable_exit_status ];then\n'
1201 <        txt += '            executable_exit_status=$tmp_executable_exit_status\n'
1202 <        txt += '        fi\n'
1001 >        txt += '        cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n'
1002 >        if self.debug_wrapper==1 :
1003 >            txt += '        echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1004 >        txt += '        executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n'
1005          txt += '        if [ $executable_exit_status -eq 50115 ];then\n'
1006          txt += '            echo ">>> crab_fjr.xml contents: "\n'
1007 <        txt += '            cat $RUNTIME_AREA/crab_fjr_NJob.xml\n'
1007 >        txt += '            cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1008          txt += '            echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1009 +        txt += '        elif [ $executable_exit_status -eq -999 ];then\n'
1010 +        txt += '            echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n'
1011          txt += '        else\n'
1012          txt += '            echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1013          txt += '        fi\n'
# Line 1211 | Line 1015 | class Cmssw(JobType):
1015          txt += '        echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1016          txt += '    fi\n'
1017            #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1018 <
1019 <        if self.datasetPath:
1018 >        txt += '    if [ $executable_exit_status -eq 0 ];then\n'
1019 >        txt += '        echo ">>> Executable succeded  $executable_exit_status"\n'
1020 >        ## This cannot more work given the changes on the Job argumentsJob  
1021 >        """
1022 >        if (self.datasetPath and not (self.dataset_pu or self.useParent==1)) :
1023            # VERIFY PROCESSED DATA
1024 <            txt += '    if [ $executable_exit_status -eq 0 ];then\n'
1025 <            txt += '      echo ">>> Verify list of processed files:"\n'
1026 <            txt += '      echo $InputFiles |tr -d "\\\\" |tr "," "\\n"|tr -d "\\"" > input-files.txt\n'
1027 <            txt += '      grep LFN $RUNTIME_AREA/crab_fjr_$NJob.xml |cut -d">" -f2|cut -d"<" -f1|grep "/" > processed-files.txt\n'
1028 <            txt += '      cat input-files.txt  | sort | uniq > tmp.txt\n'
1029 <            txt += '      mv tmp.txt input-files.txt\n'
1030 <            txt += '      echo "cat input-files.txt"\n'
1031 <            txt += '      echo "----------------------"\n'
1032 <            txt += '      cat input-files.txt\n'
1033 <            txt += '      cat processed-files.txt | sort | uniq > tmp.txt\n'
1034 <            txt += '      mv tmp.txt processed-files.txt\n'
1035 <            txt += '      echo "----------------------"\n'
1036 <            txt += '      echo "cat processed-files.txt"\n'
1037 <            txt += '      echo "----------------------"\n'
1038 <            txt += '      cat processed-files.txt\n'
1039 <            txt += '      echo "----------------------"\n'
1040 <            txt += '      diff -q input-files.txt processed-files.txt\n'
1041 <            txt += '      fileverify_status=$?\n'
1042 <            txt += '      if [ $fileverify_status -ne 0 ]; then\n'
1043 <            txt += '         executable_exit_status=30001\n'
1044 <            txt += '         echo "ERROR ==> not all input files processed"\n'
1045 <            txt += '         echo "      ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1046 <            txt += '         echo "      ==> diff input-files.txt processed-files.txt"\n'
1047 <            txt += '      fi\n'
1048 <            txt += '    fi\n'
1242 <            txt += '\n'
1024 >            txt += '        echo ">>> Verify list of processed files:"\n'
1025 >            txt += '        echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n'
1026 >            txt += '        python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n'
1027 >            txt += '        cat input-files.txt  | sort | uniq > tmp.txt\n'
1028 >            txt += '        mv tmp.txt input-files.txt\n'
1029 >            txt += '        echo "cat input-files.txt"\n'
1030 >            txt += '        echo "----------------------"\n'
1031 >            txt += '        cat input-files.txt\n'
1032 >            txt += '        cat processed-files.txt | sort | uniq > tmp.txt\n'
1033 >            txt += '        mv tmp.txt processed-files.txt\n'
1034 >            txt += '        echo "----------------------"\n'
1035 >            txt += '        echo "cat processed-files.txt"\n'
1036 >            txt += '        echo "----------------------"\n'
1037 >            txt += '        cat processed-files.txt\n'
1038 >            txt += '        echo "----------------------"\n'
1039 >            txt += '        diff -qbB input-files.txt processed-files.txt\n'
1040 >            txt += '        fileverify_status=$?\n'
1041 >            txt += '        if [ $fileverify_status -ne 0 ]; then\n'
1042 >            txt += '            executable_exit_status=30001\n'
1043 >            txt += '            echo "ERROR ==> not all input files processed"\n'
1044 >            txt += '            echo "      ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1045 >            txt += '            echo "      ==> diff input-files.txt processed-files.txt"\n'
1046 >            txt += '        fi\n'
1047 >        """
1048 >        txt += '    fi\n'
1049          txt += 'else\n'
1050          txt += '    echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1051          txt += 'fi\n'
1052          txt += '\n'
1053 +        txt += 'if [ $executable_exit_status -ne 0 ] && [ $executable_exit_status -ne 50115 ] && [ $executable_exit_status -ne 50117 ] && [ $executable_exit_status -ne 30001 ];then\n'
1054 +        txt += '    echo ">>> Executable failed  $executable_exit_status"\n'
1055 +        txt += '    echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1056 +        txt += '    echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1057 +        txt += '    job_exit_code=$executable_exit_status\n'
1058 +        txt += '    func_exit\n'
1059 +        txt += 'fi\n\n'
1060          txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1061          txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1062          txt += 'job_exit_code=$executable_exit_status\n'
# Line 1256 | Line 1069 | class Cmssw(JobType):
1069      def getParams(self):
1070          return self._params
1071  
1072 <    def uniquelist(self, old):
1260 <        """
1261 <        remove duplicates from a list
1262 <        """
1263 <        nd={}
1264 <        for e in old:
1265 <            nd[e]=0
1266 <        return nd.keys()
1267 <
1268 <    def outList(self):
1072 >    def outList(self,list=False):
1073          """
1074          check the dimension of the output files
1075          """
# Line 1274 | Line 1078 | class Cmssw(JobType):
1078          listOutFiles = []
1079          stdout = 'CMSSW_$NJob.stdout'
1080          stderr = 'CMSSW_$NJob.stderr'
1081 +        if len(self.output_file) <= 0:
1082 +            msg ="WARNING: no output files name have been defined!!\n"
1083 +            msg+="\tno output files will be reported back/staged\n"
1084 +            common.logger.message(msg)
1085          if (self.return_data == 1):
1086              for file in (self.output_file+self.output_file_sandbox):
1087 <                listOutFiles.append(self.numberFile_(file, '$NJob'))
1087 >                listOutFiles.append(numberFile(file, '$NJob'))
1088              listOutFiles.append(stdout)
1089              listOutFiles.append(stderr)
1090          else:
1091              for file in (self.output_file_sandbox):
1092 <                listOutFiles.append(self.numberFile_(file, '$NJob'))
1092 >                listOutFiles.append(numberFile(file, '$NJob'))
1093              listOutFiles.append(stdout)
1094              listOutFiles.append(stderr)
1095          txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1096          txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1097          txt += 'export filesToCheck\n'
1098 +
1099 +        if list : return self.output_file
1100          return txt

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines