ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.169 by spiga, Tue Apr 1 14:53:36 2008 UTC vs.
Revision 1.300 by spiga, Wed May 20 13:51:45 2009 UTC

# Line 2 | Line 2 | from JobType import JobType
2   from crab_logger import Logger
3   from crab_exceptions import *
4   from crab_util import *
5 from BlackWhiteListParser import BlackWhiteListParser
5   import common
6   import Scram
7 + from Splitter import JobSplitter
8  
9 + from IMProv.IMProvNode import IMProvNode
10   import os, string, glob
11  
12   class Cmssw(JobType):
13 <    def __init__(self, cfg_params, ncjobs):
13 >    def __init__(self, cfg_params, ncjobs,skip_blocks, isNew):
14          JobType.__init__(self, 'CMSSW')
15          common.logger.debug(3,'CMSSW::__init__')
16 <
17 <        self.argsList = []
16 >        self.skip_blocks = skip_blocks
17 >        self.argsList = 1
18  
19          self._params = {}
20          self.cfg_params = cfg_params
20        # init BlackWhiteListParser
21        self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
21  
22 <        self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
22 >        ### Temporary patch to automatically skip the ISB size check:
23 >        server=self.cfg_params.get('CRAB.server_name',None)
24 >        size = 9.5
25 >        if server or common.scheduler.name().upper() in ['LSF','CAF']: size = 99999
26 >        ### D.S.
27 >        self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',size))
28  
29          # number of jobs requested to be created, limit obj splitting
30          self.ncjobs = ncjobs
# Line 32 | Line 36 | class Cmssw(JobType):
36          self.scriptExe = ''
37          self.executable = ''
38          self.executable_arch = self.scram.getArch()
39 <        self.tgz_name = 'default.tgz'
40 <        self.additional_tgz_name = 'additional.tgz'
39 >        self.tgz_name = 'default.tar.gz'
40 >        self.tar_name = 'default.tar'
41          self.scriptName = 'CMSSW.sh'
42 <        self.pset = ''      #scrip use case Da
43 <        self.datasetPath = '' #scrip use case Da
42 >        self.pset = ''
43 >        self.datasetPath = ''
44  
45 +        self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name
46          # set FJR file name
47          self.fjrFileName = 'crab_fjr.xml'
48  
49          self.version = self.scram.getSWVersion()
50 +        common.logger.write("CMSSW version is: "+str(self.version))
51 +        try:
52 +            type, self.CMSSW_major, self.CMSSW_minor, self.CMSSW_patch = tuple(self.version.split('_'))
53 +        except:
54 +            msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
55 +            raise CrabException(msg)
56  
57 <        #
58 <        # Try to block creation in case of arch/version mismatch
59 <        #
60 <
61 < #        a = string.split(self.version, "_")
62 < #
63 < #        if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
64 < #            msg = "Warning: You are using %s version of CMSSW  with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
65 < #            common.logger.message(msg)
55 < #        if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
56 < #            msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
57 < #            raise CrabException(msg)
58 < #
59 <
60 <        self.setParam_('application', self.version)
57 >        if self.CMSSW_major < 1 or (self.CMSSW_major == 1 and self.CMSSW_minor < 5):
58 >            msg = "CRAB supports CMSSW >= 1_5_x only. Use an older CRAB version."
59 >            raise CrabException(msg)
60 >            """
61 >            As CMSSW versions are dropped we can drop more code:
62 >            1.X dropped: drop support for running .cfg on WN
63 >            2.0 dropped: drop all support for cfg here and in writeCfg
64 >            2.0 dropped: Recheck the random number seed support
65 >            """
66  
67          ### collect Data cards
68  
69 <        if not cfg_params.has_key('CMSSW.datasetpath'):
70 <            msg = "Error: datasetpath not defined "
71 <            raise CrabException(msg)
69 >
70 >        ### Temporary: added to remove input file control in the case of PU
71 >        self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None)
72 >
73          tmp =  cfg_params['CMSSW.datasetpath']
74          log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
75 <        if string.lower(tmp)=='none':
75 >
76 >        if tmp =='':
77 >            msg = "Error: datasetpath not defined "
78 >            raise CrabException(msg)
79 >        elif string.lower(tmp)=='none':
80              self.datasetPath = None
81              self.selectNoInput = 1
82          else:
83              self.datasetPath = tmp
84              self.selectNoInput = 0
85  
76        # ML monitoring
77        # split dataset path style: /PreProdR3Minbias/SIM/GEN-SIM
78        if not self.datasetPath:
79            self.setParam_('dataset', 'None')
80            self.setParam_('owner', 'None')
81        else:
82            ## SL what is supposed to fail here?
83            try:
84                datasetpath_split = self.datasetPath.split("/")
85                # standard style
86                self.setParam_('datasetFull', self.datasetPath)
87                self.setParam_('dataset', datasetpath_split[1])
88                self.setParam_('owner', datasetpath_split[2])
89            except:
90                self.setParam_('dataset', self.datasetPath)
91                self.setParam_('owner', self.datasetPath)
92
93        self.setParam_('taskId', common._db.queryTask('name')) ## new BL--DS
94
86          self.dataTiers = []
87  
88 +        self.debugWrap=''
89 +        self.debug_wrapper = int(cfg_params.get('USER.debug_wrapper',0))
90 +        if self.debug_wrapper == 1: self.debugWrap='--debug'
91 +
92          ## now the application
93 +        self.managedGenerators = ['madgraph','comphep']
94 +        self.generator = cfg_params.get('CMSSW.generator','pythia').lower()
95          self.executable = cfg_params.get('CMSSW.executable','cmsRun')
99        self.setParam_('exe', self.executable)
96          log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
97  
98          if not cfg_params.has_key('CMSSW.pset'):
# Line 117 | Line 113 | class Cmssw(JobType):
113          self.output_file_sandbox.append(self.fjrFileName)
114  
115          # other output files to be returned via sandbox or copied to SE
116 +        outfileflag = False
117          self.output_file = []
118          tmp = cfg_params.get('CMSSW.output_file',None)
119          if tmp :
120 <            tmpOutFiles = string.split(tmp,',')
121 <            log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
122 <            for tmp in tmpOutFiles:
123 <                tmp=string.strip(tmp)
127 <                self.output_file.append(tmp)
128 <                pass
129 <        else:
130 <            log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
131 <        pass
120 >            self.output_file = [x.strip() for x in tmp.split(',')]
121 >            outfileflag = True #output found
122 >        #else:
123 >        #    log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
124  
125          # script_exe file as additional file in inputSandbox
126          self.scriptExe = cfg_params.get('USER.script_exe',None)
127          if self.scriptExe :
128 <           if not os.path.isfile(self.scriptExe):
129 <              msg ="ERROR. file "+self.scriptExe+" not found"
130 <              raise CrabException(msg)
131 <           self.additional_inbox_files.append(string.strip(self.scriptExe))
128 >            if not os.path.isfile(self.scriptExe):
129 >                msg ="ERROR. file "+self.scriptExe+" not found"
130 >                raise CrabException(msg)
131 >            self.additional_inbox_files.append(string.strip(self.scriptExe))
132  
141        #CarlosDaniele
133          if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
134 <           msg ="Error. script_exe  not defined"
135 <           raise CrabException(msg)
134 >            msg ="Error. script_exe  not defined"
135 >            raise CrabException(msg)
136 >
137 >        # use parent files...
138 >        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
139  
140          ## additional input files
141          if cfg_params.has_key('USER.additional_input_files'):
# Line 161 | Line 155 | class Cmssw(JobType):
155                      if not os.path.exists(file):
156                          raise CrabException("Additional input file not found: "+file)
157                      pass
164                    # fname = string.split(file, '/')[-1]
165                    # storedFile = common.work_space.pathForTgz()+'share/'+fname
166                    # shutil.copyfile(file, storedFile)
158                      self.additional_inbox_files.append(string.strip(file))
159                  pass
160              pass
161              common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
162          pass
163  
173        ## Events per job
174        if cfg_params.has_key('CMSSW.events_per_job'):
175            self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
176            self.selectEventsPerJob = 1
177        else:
178            self.eventsPerJob = -1
179            self.selectEventsPerJob = 0
180
181        ## number of jobs
182        if cfg_params.has_key('CMSSW.number_of_jobs'):
183            self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
184            self.selectNumberOfJobs = 1
185        else:
186            self.theNumberOfJobs = 0
187            self.selectNumberOfJobs = 0
188
189        if cfg_params.has_key('CMSSW.total_number_of_events'):
190            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
191            self.selectTotalNumberEvents = 1
192        else:
193            self.total_number_of_events = 0
194            self.selectTotalNumberEvents = 0
195
196        if self.pset != None: #CarlosDaniele
197             if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
198                 msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
199                 raise CrabException(msg)
200        else:
201             if (self.selectNumberOfJobs == 0):
202                 msg = 'Must specify  number_of_jobs.'
203                 raise CrabException(msg)
164  
165          ## New method of dealing with seeds
166          self.incrementSeeds = []
# Line 216 | Line 176 | class Cmssw(JobType):
176                  tmp.strip()
177                  self.incrementSeeds.append(tmp)
178  
219        ## Old method of dealing with seeds
220        ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
221        ## remove
222        self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
223        if self.sourceSeed:
224          print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
225          self.incrementSeeds.append('sourceSeed')
226
227        self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
228        if self.sourceSeedVtx:
229          print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
230          self.incrementSeeds.append('VtxSmeared')
231
232        self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
233        if self.sourceSeedG4:
234          print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
235          self.incrementSeeds.append('g4SimHits')
236
237        self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
238        if self.sourceSeedMix:
239          print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
240          self.incrementSeeds.append('mix')
241
179          self.firstRun = cfg_params.get('CMSSW.first_run',None)
180  
244        if self.pset != None: #CarlosDaniele
245            import PsetManipulator as pp
246            PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
247
181          # Copy/return
249
182          self.copy_data = int(cfg_params.get('USER.copy_data',0))
183          self.return_data = int(cfg_params.get('USER.return_data',0))
184  
185 +        self.conf = {}
186 +        self.conf['pubdata'] = None
187 +        # number of jobs requested to be created, limit obj splitting DD
188          #DBSDLS-start
189          ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
190          self.maxEvents=0  # max events available   ( --> check the requested nb. of evts in Creator.py)
# Line 261 | Line 196 | class Cmssw(JobType):
196          if self.datasetPath:
197              blockSites = self.DataDiscoveryAndLocation(cfg_params)
198          #DBSDLS-end
199 <
265 <        self.tgzNameWithPath = self.getTarBall(self.executable)
199 >        self.conf['blockSites']=blockSites
200  
201          ## Select Splitting
202 +        splitByRun = int(cfg_params.get('CMSSW.split_by_run',0))
203 +
204          if self.selectNoInput:
205 <            if self.pset == None: #CarlosDaniele
206 <                self.jobSplittingForScript()
205 >            if self.pset == None:
206 >                self.algo = 'ForScript'
207              else:
208 <                self.jobSplittingNoInput()
208 >                self.algo = 'NoInput'
209 >                self.conf['managedGenerators']=self.managedGenerators
210 >                self.conf['generator']=self.generator
211 >        elif splitByRun ==1:
212 >            self.algo = 'RunBased'
213          else:
214 <            self.jobSplittingByBlocks(blockSites)
214 >            self.algo = 'EventBased'
215 >
216 > #        self.algo = 'LumiBased'
217 >        splitter = JobSplitter(self.cfg_params,self.conf)
218 >        self.dict = splitter.Algos()[self.algo]()
219 >
220 >        self.argsFile= '%s/arguments.xml'%common.work_space.shareDir()
221 >        self.rootArgsFilename= 'arguments'
222 >        # modify Pset only the first time
223 >        if (isNew and self.pset != None): self.ModifyPset()
224 >
225 >        ## Prepare inputSandbox TarBall (only the first time)
226 >        self.tarNameWithPath = self.getTarBall(self.executable)
227 >
228 >
229 >    def ModifyPset(self):
230 >        import PsetManipulator as pp
231 >        PsetEdit = pp.PsetManipulator(self.pset)
232 >        try:
233 >            # Add FrameworkJobReport to parameter-set, set max events.
234 >            # Reset later for data jobs by writeCFG which does all modifications
235 >            PsetEdit.maxEvent(1)
236 >            PsetEdit.skipEvent(0)
237 >            PsetEdit.psetWriter(self.configFilename())
238 >            ## If present, add TFileService to output files
239 >            if not int(self.cfg_params.get('CMSSW.skip_TFileService_output',0)):
240 >                tfsOutput = PsetEdit.getTFileService()
241 >                if tfsOutput:
242 >                    if tfsOutput in self.output_file:
243 >                        common.logger.debug(5,"Output from TFileService "+tfsOutput+" already in output files")
244 >                    else:
245 >                        outfileflag = True #output found
246 >                        self.output_file.append(tfsOutput)
247 >                        common.logger.message("Adding "+tfsOutput+" (from TFileService) to list of output files")
248 >                    pass
249 >                pass
250 >            ## If present and requested, add PoolOutputModule to output files
251 >            if int(self.cfg_params.get('CMSSW.get_edm_output',0)):
252 >                edmOutput = PsetEdit.getPoolOutputModule()
253 >                if edmOutput:
254 >                    if edmOutput in self.output_file:
255 >                        common.logger.debug(5,"Output from PoolOutputModule "+edmOutput+" already in output files")
256 >                    else:
257 >                        self.output_file.append(edmOutput)
258 >                        common.logger.message("Adding "+edmOutput+" (from PoolOutputModule) to list of output files")
259 >                    pass
260 >                pass
261 >            # not required: check anyhow if present, to avoid accidental T2 overload
262 >            else:
263 >                edmOutput = PsetEdit.getPoolOutputModule()
264 >                if edmOutput and (edmOutput not in self.output_file):
265 >                    msg = "ERROR: a PoolOutputModule is present in your ParameteSet %s \n"%self.pset
266 >                    msg +="         but the file produced ( %s ) is not in the list of output files\n"%edmOutput
267 >                    msg += "WARNING: please remove it. If you want to keep it, add the file to output_files or use CMSSW.get_edm_output\n"
268 >                    raise CrabException(msg)
269 >                pass
270 >            pass
271 >        except CrabException, msg:
272 >            common.logger.message(str(msg))
273 >            msg='Error while manipulating ParameterSet (see previous message, if any): exiting...'
274 >            raise CrabException(msg)
275  
276        # modify Pset
277        if self.pset != None: #CarlosDaniele
278            try:
279                # Add FrameworkJobReport to parameter-set, set max events.
280                # Reset later for data jobs by writeCFG which does all modifications
281                PsetEdit.addCrabFJR(self.fjrFileName)
282                PsetEdit.maxEvent(self.eventsPerJob)
283                PsetEdit.psetWriter(self.configFilename())
284            except:
285                msg='Error while manipuliating ParameterSet: exiting...'
286                raise CrabException(msg)
276  
277      def DataDiscoveryAndLocation(self, cfg_params):
278  
# Line 296 | Line 285 | class Cmssw(JobType):
285          ## Contact the DBS
286          common.logger.message("Contacting Data Discovery Services ...")
287          try:
288 <            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
288 >            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks)
289              self.pubdata.fetchDBSInfo()
290  
291          except DataDiscovery.NotExistingDatasetError, ex :
# Line 310 | Line 299 | class Cmssw(JobType):
299              raise CrabException(msg)
300  
301          self.filesbyblock=self.pubdata.getFiles()
302 <        self.eventsbyblock=self.pubdata.getEventsPerBlock()
303 <        self.eventsbyfile=self.pubdata.getEventsPerFile()
302 >        #print self.filesbyblock
303 >        self.conf['pubdata']=self.pubdata
304  
305          ## get max number of events
306 <        self.maxEvents=self.pubdata.getMaxEvents() ##  self.maxEvents used in Creator.py
306 >        self.maxEvents=self.pubdata.getMaxEvents()
307  
308          ## Contact the DLS and build a list of sites hosting the fileblocks
309          try:
310              dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
311              dataloc.fetchDLSInfo()
312 +
313          except DataLocation.DataLocationError , ex:
314              msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
315              raise CrabException(msg)
316  
317  
318 <        sites = dataloc.getSites()
318 >        unsorted_sites = dataloc.getSites()
319 >        #print "Unsorted :",unsorted_sites
320 >        sites = self.filesbyblock.fromkeys(self.filesbyblock,'')
321 >        for lfn in self.filesbyblock.keys():
322 >            #print lfn
323 >            if unsorted_sites.has_key(lfn):
324 >                #print "Found ",lfn
325 >                sites[lfn]=unsorted_sites[lfn]
326 >            else:
327 >                #print "Not Found ",lfn
328 >                sites[lfn]=[]
329 >        #print sites
330 >
331 >        #print "Sorted :",sites
332 >        if len(sites)==0:
333 >            msg = 'ERROR ***: no location for any of the blocks of this dataset: \n\t %s \n'%datasetPath
334 >            msg += "\tMaybe the dataset is located only at T1's (or at T0), where analysis jobs are not allowed\n"
335 >            msg += "\tPlease check DataDiscovery page https://cmsweb.cern.ch/dbs_discovery/\n"
336 >            raise CrabException(msg)
337 >
338          allSites = []
339          listSites = sites.values()
340          for listSite in listSites:
341              for oneSite in listSite:
342                  allSites.append(oneSite)
343 <        allSites = self.uniquelist(allSites)
343 >        [allSites.append(it) for it in allSites if not allSites.count(it)]
344 >
345  
346          # screen output
347          common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
348  
349          return sites
350  
341  # to Be Removed  DS -- BL
342  #  def setArgsList(self, argsList):
343  #      self.argsList = argsList
344
345    def jobSplittingByBlocks(self, blockSites):
346        """
347        Perform job splitting. Jobs run over an integer number of files
348        and no more than one block.
349        ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
350        REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
351                  self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
352                  self.maxEvents, self.filesbyblock
353        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
354              self.total_number_of_jobs - Total # of jobs
355              self.list_of_args - File(s) job will run on (a list of lists)
356        """
357
358        # ---- Handle the possible job splitting configurations ---- #
359        if (self.selectTotalNumberEvents):
360            totalEventsRequested = self.total_number_of_events
361        if (self.selectEventsPerJob):
362            eventsPerJobRequested = self.eventsPerJob
363            if (self.selectNumberOfJobs):
364                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
365
366        # If user requested all the events in the dataset
367        if (totalEventsRequested == -1):
368            eventsRemaining=self.maxEvents
369        # If user requested more events than are in the dataset
370        elif (totalEventsRequested > self.maxEvents):
371            eventsRemaining = self.maxEvents
372            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
373        # If user requested less events than are in the dataset
374        else:
375            eventsRemaining = totalEventsRequested
376
377        # If user requested more events per job than are in the dataset
378        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
379            eventsPerJobRequested = self.maxEvents
380
381        # For user info at end
382        totalEventCount = 0
383
384        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
385            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
386
387        if (self.selectNumberOfJobs):
388            common.logger.message("May not create the exact number_of_jobs requested.")
389
390        if ( self.ncjobs == 'all' ) :
391            totalNumberOfJobs = 999999999
392        else :
393            totalNumberOfJobs = self.ncjobs
394
395
396        blocks = blockSites.keys()
397        blockCount = 0
398        # Backup variable in case self.maxEvents counted events in a non-included block
399        numBlocksInDataset = len(blocks)
400
401        jobCount = 0
402        list_of_lists = []
403
404        # list tracking which jobs are in which jobs belong to which block
405        jobsOfBlock = {}
406
407        # ---- Iterate over the blocks in the dataset until ---- #
408        # ---- we've met the requested total # of events    ---- #
409        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
410            block = blocks[blockCount]
411            blockCount += 1
412            if block not in jobsOfBlock.keys() :
413                jobsOfBlock[block] = []
414
415            if self.eventsbyblock.has_key(block) :
416                numEventsInBlock = self.eventsbyblock[block]
417                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
418
419                files = self.filesbyblock[block]
420                numFilesInBlock = len(files)
421                if (numFilesInBlock <= 0):
422                    continue
423                fileCount = 0
424
425                # ---- New block => New job ---- #
426                parString = ""
427                # counter for number of events in files currently worked on
428                filesEventCount = 0
429                # flag if next while loop should touch new file
430                newFile = 1
431                # job event counter
432                jobSkipEventCount = 0
433
434                # ---- Iterate over the files in the block until we've met the requested ---- #
435                # ---- total # of events or we've gone over all the files in this block  ---- #
436                while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
437                    file = files[fileCount]
438                    if newFile :
439                        try:
440                            numEventsInFile = self.eventsbyfile[file]
441                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
442                            # increase filesEventCount
443                            filesEventCount += numEventsInFile
444                            # Add file to current job
445                            parString += '\\\"' + file + '\\\"\,'
446                            newFile = 0
447                        except KeyError:
448                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
449
450
451                    # if less events in file remain than eventsPerJobRequested
452                    if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested ) :
453                        # if last file in block
454                        if ( fileCount == numFilesInBlock-1 ) :
455                            # end job using last file, use remaining events in block
456                            # close job and touch new file
457                            fullString = parString[:-2]
458                            list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
459                            common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
460                            self.jobDestination.append(blockSites[block])
461                            common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
462                            # fill jobs of block dictionary
463                            jobsOfBlock[block].append(jobCount+1)
464                            # reset counter
465                            jobCount = jobCount + 1
466                            totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
467                            eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
468                            jobSkipEventCount = 0
469                            # reset file
470                            parString = ""
471                            filesEventCount = 0
472                            newFile = 1
473                            fileCount += 1
474                        else :
475                            # go to next file
476                            newFile = 1
477                            fileCount += 1
478                    # if events in file equal to eventsPerJobRequested
479                    elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
480                        # close job and touch new file
481                        fullString = parString[:-2]
482                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
483                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
484                        self.jobDestination.append(blockSites[block])
485                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
486                        jobsOfBlock[block].append(jobCount+1)
487                        # reset counter
488                        jobCount = jobCount + 1
489                        totalEventCount = totalEventCount + eventsPerJobRequested
490                        eventsRemaining = eventsRemaining - eventsPerJobRequested
491                        jobSkipEventCount = 0
492                        # reset file
493                        parString = ""
494                        filesEventCount = 0
495                        newFile = 1
496                        fileCount += 1
497
498                    # if more events in file remain than eventsPerJobRequested
499                    else :
500                        # close job but don't touch new file
501                        fullString = parString[:-2]
502                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
503                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
504                        self.jobDestination.append(blockSites[block])
505                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
506                        jobsOfBlock[block].append(jobCount+1)
507                        # increase counter
508                        jobCount = jobCount + 1
509                        totalEventCount = totalEventCount + eventsPerJobRequested
510                        eventsRemaining = eventsRemaining - eventsPerJobRequested
511                        # calculate skip events for last file
512                        # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
513                        jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
514                        # remove all but the last file
515                        filesEventCount = self.eventsbyfile[file]
516                        parString = '\\\"' + file + '\\\"\,'
517                    pass # END if
518                pass # END while (iterate over files in the block)
519        pass # END while (iterate over blocks in the dataset)
520        self.ncjobs = self.total_number_of_jobs = jobCount
521        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
522            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
523        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
524
525        # screen output
526        screenOutput = "List of jobs and available destination sites:\n\n"
527
528        # keep trace of block with no sites to print a warning at the end
529        noSiteBlock = []
530        bloskNoSite = []
531
532        blockCounter = 0
533        for block in blocks:
534            if block in jobsOfBlock.keys() :
535                blockCounter += 1
536                screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
537                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
538                    noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
539                    bloskNoSite.append( blockCounter )
540
541        common.logger.message(screenOutput)
542        if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
543            msg = 'WARNING: No sites are hosting any part of data for block:\n                '
544            virgola = ""
545            if len(bloskNoSite) > 1:
546                virgola = ","
547            for block in bloskNoSite:
548                msg += ' ' + str(block) + virgola
549            msg += '\n               Related jobs:\n                 '
550            virgola = ""
551            if len(noSiteBlock) > 1:
552                virgola = ","
553            for range_jobs in noSiteBlock:
554                msg += str(range_jobs) + virgola
555            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
556            if self.cfg_params.has_key('EDG.se_white_list'):
557                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
558                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
559                msg += 'Please check if the dataset is available at this site!)\n'
560            if self.cfg_params.has_key('EDG.ce_white_list'):
561                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
562                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
563                msg += 'Please check if the dataset is available at this site!)\n'
564
565            common.logger.message(msg)
566
567        self.list_of_args = list_of_lists
568        return
569
570    def jobSplittingNoInput(self):
571        """
572        Perform job splitting based on number of event per job
573        """
574        common.logger.debug(5,'Splitting per events')
575
576        if (self.selectEventsPerJob):
577            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
578        if (self.selectNumberOfJobs):
579            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
580        if (self.selectTotalNumberEvents):
581            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
582
583        if (self.total_number_of_events < 0):
584            msg='Cannot split jobs per Events with "-1" as total number of events'
585            raise CrabException(msg)
586
587        if (self.selectEventsPerJob):
588            if (self.selectTotalNumberEvents):
589                self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
590            elif(self.selectNumberOfJobs) :
591                self.total_number_of_jobs =self.theNumberOfJobs
592                self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
593
594        elif (self.selectNumberOfJobs) :
595            self.total_number_of_jobs = self.theNumberOfJobs
596            self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
597
598        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
599
600        # is there any remainder?
601        check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
602
603        common.logger.debug(5,'Check  '+str(check))
604
605        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
606        if check > 0:
607            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
608
609        # argument is seed number.$i
610        self.list_of_args = []
611        for i in range(self.total_number_of_jobs):
612            ## Since there is no input, any site is good
613            self.jobDestination.append([""]) #must be empty to write correctly the xml
614            args=[]
615            if (self.firstRun):
616                ## pythia first run
617                args.append(str(self.firstRun)+str(i))
618            self.list_of_args.append(args)
351  
352 <        return
621 <
622 <
623 <    def jobSplittingForScript(self):#CarlosDaniele
624 <        """
625 <        Perform job splitting based on number of job
626 <        """
627 <        common.logger.debug(5,'Splitting per job')
628 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
629 <
630 <        self.total_number_of_jobs = self.theNumberOfJobs
352 >    def split(self, jobParams,firstJobID):
353  
354 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
354 >        jobParams = self.dict['args']
355 >        njobs = self.dict['njobs']
356 >        self.jobDestination = self.dict['jobDestination']
357  
358 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
358 >        if njobs==0:
359 >            raise CrabException("Ask to split "+str(njobs)+" jobs: aborting")
360  
636        # argument is seed number.$i
637        self.list_of_args = []
638        for i in range(self.total_number_of_jobs):
639            ## Since there is no input, any site is good
640           # self.jobDestination.append(["Any"])
641            self.jobDestination.append([""])
642            ## no random seed
643            self.list_of_args.append([str(i)])
644        return
645
646    def split(self, jobParams):
647
648        #### Fabio
649        njobs = self.total_number_of_jobs
650        arglist = self.list_of_args
361          # create the empty structure
362          for i in range(njobs):
363              jobParams.append("")
364  
365          listID=[]
366          listField=[]
367 <        for job in range(njobs):
368 <            jobParams[job] = arglist[job]
367 >        listDictions=[]
368 >        exist= os.path.exists(self.argsFile)
369 >        for id in range(njobs):
370 >            job = id + int(firstJobID)
371              listID.append(job+1)
372              job_ToSave ={}
373              concString = ' '
374              argu=''
375 <            if len(jobParams[job]):
376 <                argu +=   concString.join(jobParams[job] )
377 <            job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
378 <            job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
379 <            #common._db.updateJob_(job,job_ToSave)## new BL--DS
375 >            str_argu = str(job+1)
376 >            if len(jobParams[id]):
377 >                argu = {'JobID': job+1}
378 >                for i in range(len(jobParams[id])):
379 >                    argu[self.dict['params'][i]]=jobParams[id][i]
380 >                # just for debug
381 >                str_argu += concString.join(jobParams[id])
382 >            listDictions.append(argu)
383 >            job_ToSave['arguments']= str(job+1)
384 >            job_ToSave['dlsDestination']= self.jobDestination[id]
385              listField.append(job_ToSave)
386 <            msg="Job "+str(job)+" Arguments:   "+str(job+1)+" "+argu+"\n"  \
387 <            +"                     Destination: "+str(self.jobDestination[job])
386 >            msg="Job  %s  Arguments:  %s\n"%(str(job+1),str_argu)
387 >            msg+="\t  Destination: %s "%(str(self.jobDestination[id]))
388              common.logger.debug(5,msg)
389 <            #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
390 <        common._db.updateJob_(listID,listField)## new BL--DS
391 <        ## Pay Attention Here....DS--BL
392 <        self.argsList = (len(jobParams[1])+1)
389 >        # write xml
390 >        if len(listDictions):
391 >            if exist==False: self.CreateXML()
392 >            self.addEntry(listDictions)
393 >            self.addXMLfile()
394 >        common._db.updateJob_(listID,listField)
395 >        self.zipTarFile()
396 >        return
397 >      
398 >    def addXMLfile(self):
399  
400 +        import tarfile
401 +       # try:
402 +        print self.argsFile
403 +        tar = tarfile.open(self.tarNameWithPath, "a")
404 +        tar.add(self.argsFile, os.path.basename(self.argsFile))
405 +        tar.close()
406 +       ## except:
407 +       #     pass
408 +
409 +  
410 +    def CreateXML(self):
411 +        """
412 +        """
413 +        result = IMProvNode( self.rootArgsFilename )
414 +        outfile = file( self.argsFile, 'w').write(str(result))
415 +        return
416 +
417 +    def addEntry(self, listDictions):
418 +        """
419 +        _addEntry_
420 +
421 +        add an entry to the xml file
422 +        """
423 +        from IMProv.IMProvLoader import loadIMProvFile
424 +        ## load xml
425 +        improvDoc = loadIMProvFile(self.argsFile)
426 +        entrname= 'Job'
427 +        for dictions in listDictions:
428 +           report = IMProvNode(entrname , None, **dictions)
429 +           improvDoc.addNode(report)
430 +        outfile = file( self.argsFile, 'w').write(str(improvDoc))
431          return
678 #
679 #    def getJobTypeArguments(self, nj, sched):
680 #        result = ''
681 #        jobs=[]
682 #        jobs.append(nj)
683 #        for i in common._db.queryJob('arguments',jobs):##  BL--DS
684 #            result=result+str(i)+" "
685 #        return result
432  
433      def numberOfJobs(self):
434 <        # Fabio
689 <        return self.total_number_of_jobs
434 >        return self.dict['njobs']
435  
436      def getTarBall(self, exe):
437          """
438          Return the TarBall with lib and exe
439          """
440 <
441 <        # if it exist, just return it
442 <        #
698 <        # Marco. Let's start to use relative path for Boss XML files
699 <        #
700 <        self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
701 <        if os.path.exists(self.tgzNameWithPath):
702 <            return self.tgzNameWithPath
440 >        self.tarNameWithPath = common.work_space.pathForTgz()+self.tar_name
441 >        if os.path.exists(self.tarNameWithPath):
442 >            return self.tarNameWithPath
443  
444          # Prepare a tar gzipped file with user binaries.
445          self.buildTar_(exe)
446  
447 <        return string.strip(self.tgzNameWithPath)
447 >        return string.strip(self.tarNameWithPath)
448  
449      def buildTar_(self, executable):
450  
451          # First of all declare the user Scram area
452          swArea = self.scram.getSWArea_()
713        #print "swArea = ", swArea
714        # swVersion = self.scram.getSWVersion()
715        # print "swVersion = ", swVersion
453          swReleaseTop = self.scram.getReleaseTop_()
717        #print "swReleaseTop = ", swReleaseTop
454  
455          ## check if working area is release top
456          if swReleaseTop == '' or swArea == swReleaseTop:
457 +            common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
458              return
459  
460          import tarfile
461          try: # create tar ball
462 <            tar = tarfile.open(self.tgzNameWithPath, "w:gz")
462 >            #tar = tarfile.open(self.tgzNameWithPath, "w:gz")
463 >            tar = tarfile.open(self.tarNameWithPath, "w")
464              ## First find the executable
465              if (self.executable != ''):
466                  exeWithPath = self.scram.findFile_(executable)
# Line 746 | Line 484 | class Cmssw(JobType):
484                      pass
485  
486              ## Now get the libraries: only those in local working area
487 +            tar.dereference=True
488              libDir = 'lib'
489              lib = swArea+'/' +libDir
490              common.logger.debug(5,"lib "+lib+" to be tarred")
# Line 757 | Line 496 | class Cmssw(JobType):
496              module = swArea + '/' + moduleDir
497              if os.path.isdir(module):
498                  tar.add(module,moduleDir)
499 +            tar.dereference=False
500  
501              ## Now check if any data dir(s) is present
502 <            swAreaLen=len(swArea)
503 <            for root, dirs, files in os.walk(swArea):
504 <                if "data" in dirs:
505 <                    common.logger.debug(5,"data "+root+"/data"+" to be tarred")
506 <                    tar.add(root+"/data",root[swAreaLen:]+"/data")
502 >            self.dataExist = False
503 >            todo_list = [(i, i) for i in  os.listdir(swArea+"/src")]
504 >            while len(todo_list):
505 >                entry, name = todo_list.pop()
506 >                if name.startswith('crab_0_') or  name.startswith('.') or name == 'CVS':
507 >                    continue
508 >                if os.path.isdir(swArea+"/src/"+entry):
509 >                    entryPath = entry + '/'
510 >                    todo_list += [(entryPath + i, i) for i in  os.listdir(swArea+"/src/"+entry)]
511 >                    if name == 'data':
512 >                        self.dataExist=True
513 >                        common.logger.debug(5,"data "+entry+" to be tarred")
514 >                        tar.add(swArea+"/src/"+entry,"src/"+entry)
515 >                    pass
516 >                pass
517 >
518 >            ### CMSSW ParameterSet
519 >            if not self.pset is None:
520 >                cfg_file = common.work_space.jobDir()+self.configFilename()
521 >                tar.add(cfg_file,self.configFilename())
522  
523  
524              ## Add ProdCommon dir to tar
525 <            prodcommonDir = 'ProdCommon'
526 <            prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
527 <            if os.path.isdir(prodcommonPath):
528 <                tar.add(prodcommonPath,prodcommonDir)
525 >            prodcommonDir = './'
526 >            prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/'
527 >            neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools', \
528 >                           'ProdCommon/Core', 'ProdCommon/MCPayloads', 'IMProv', 'ProdCommon/Storage', \
529 >                           'WMCore/__init__.py','WMCore/Algorithms']
530 >            for file in neededStuff:
531 >                tar.add(prodcommonPath+file,prodcommonDir+file)
532 >
533 >            ##### ML stuff
534 >            ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
535 >            path=os.environ['CRABDIR'] + '/python/'
536 >            for file in ML_file_list:
537 >                tar.add(path+file,file)
538 >
539 >            ##### Utils
540 >            Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py']
541 >            for file in Utils_file_list:
542 >                tar.add(path+file,file)
543 >
544 >            ##### AdditionalFiles
545 >            tar.dereference=True
546 >            for file in self.additional_inbox_files:
547 >                tar.add(file,string.split(file,'/')[-1])
548 >            tar.dereference=False
549 >            common.logger.debug(5,"Files in "+self.tarNameWithPath+" : "+str(tar.getnames()))
550  
775            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
551              tar.close()
552 <        except :
553 <            raise CrabException('Could not create tar-ball')
552 >        except IOError, exc:
553 >            common.logger.write(str(exc))
554 >            raise CrabException('Could not create tar-ball '+self.tarNameWithPath)
555 >        except tarfile.TarError, exc:
556 >            common.logger.write(str(exc))
557 >            raise CrabException('Could not create tar-ball '+self.tarNameWithPath)
558 >  
559 >    def zipTarFile(self):  
560 >
561 >        cmd = "gzip -c %s > %s "%(self.tarNameWithPath,self.tgzNameWithPath)
562 >        res=runCommand(cmd)
563  
780        ## check for tarball size
564          tarballinfo = os.stat(self.tgzNameWithPath)
565          if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
566 <            raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
566 >            msg  = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) \
567 >               +'MB input sandbox limit \n'
568 >            msg += '      and not supported by the direct GRID submission system.\n'
569 >            msg += '      Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n'
570 >            msg += '      For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/CrabServer#CRABSERVER_for_Users'
571 >            raise CrabException(msg)
572  
573          ## create tar-ball with ML stuff
786        self.MLtgzfile =  common.work_space.pathForTgz()+'share/MLfiles.tgz'
787        try:
788            tar = tarfile.open(self.MLtgzfile, "w:gz")
789            path=os.environ['CRABDIR'] + '/python/'
790            #for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py','writeCfg.py']:
791            ### FEDE ####
792            for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']:
793            ###############
794                tar.add(path+file,file)
795            common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
796            tar.close()
797        except :
798            raise CrabException('Could not create ML files tar-ball')
799
800        return
801
802    def additionalInputFileTgz(self):
803        """
804        Put all additional files into a tar ball and return its name
805        """
806        import tarfile
807        tarName=  common.work_space.pathForTgz()+'share/'+self.additional_tgz_name
808        tar = tarfile.open(tarName, "w:gz")
809        for file in self.additional_inbox_files:
810            tar.add(file,string.split(file,'/')[-1])
811        common.logger.debug(5,"Files added to "+self.additional_tgz_name+" : "+str(tar.getnames()))
812        tar.close()
813        return tarName
574  
575      def wsSetupEnvironment(self, nj=0):
576          """
577          Returns part of a job script which prepares
578          the execution environment for the job 'nj'.
579          """
580 +        # FUTURE: Drop support for .cfg when possible
581 +        if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
582 +            psetName = 'pset.py'
583 +        else:
584 +            psetName = 'pset.cfg'
585          # Prepare JobType-independent part
586          txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
587          txt += 'echo ">>> setup environment"\n'
588 <        txt += 'if [ $middleware == LCG ]; then \n'
588 >        txt += 'if [ $middleware == LCG ] || [ $middleware == CAF ] || [ $middleware == LSF ]; then \n'
589          txt += self.wsSetupCMSLCGEnvironment_()
590          txt += 'elif [ $middleware == OSG ]; then\n'
591          txt += '    WORKING_DIR=`/bin/mktemp  -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
592          txt += '    if [ ! $? == 0 ] ;then\n'
828        #txt += '        echo "SET_CMS_ENV 10016 ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
829        #txt += '        echo "JOB_EXIT_STATUS = 10016"\n'
830        #txt += '        echo "JobExitCode=10016" | tee -a $RUNTIME_AREA/$repo\n'
831        #txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
832        #txt += '        exit 1\n'
593          txt += '        echo "ERROR ==> OSG $WORKING_DIR could not be created on WN `hostname`"\n'
594          txt += '        job_exit_code=10016\n'
595          txt += '        func_exit\n'
# Line 840 | Line 600 | class Cmssw(JobType):
600          txt += '    cd $WORKING_DIR\n'
601          txt += '    echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
602          txt += self.wsSetupCMSOSGEnvironment_()
603 <        #txt += '    echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
604 <        #txt += '    export SCRAM_ARCH='+self.executable_arch+'\n'
603 >        #Setup SGE Environment
604 >        txt += 'elif [ $middleware == SGE ]; then\n'
605 >        txt += self.wsSetupCMSLCGEnvironment_()
606 >
607 >        txt += 'elif [ $middleware == ARC ]; then\n'
608 >        txt += self.wsSetupCMSLCGEnvironment_()
609 >
610          txt += 'fi\n'
611  
612          # Prepare JobType-specific part
# Line 852 | Line 617 | class Cmssw(JobType):
617          txt += scram+' project CMSSW '+self.version+'\n'
618          txt += 'status=$?\n'
619          txt += 'if [ $status != 0 ] ; then\n'
855        #txt += '    echo "SET_EXE_ENV 10034 ==>ERROR CMSSW '+self.version+' not found on `hostname`" \n'
856        #txt += '    echo "JOB_EXIT_STATUS = 10034"\n'
857        #txt += '    echo "JobExitCode=10034" | tee -a $RUNTIME_AREA/$repo\n'
858        #txt += '    dumpStatus $RUNTIME_AREA/$repo\n'
620          txt += '    echo "ERROR ==> CMSSW '+self.version+' not found on `hostname`" \n'
621          txt += '    job_exit_code=10034\n'
861        #txt += '    if [ $middleware == OSG ]; then \n'
862        #txt += '        cd $RUNTIME_AREA\n'
863        #txt += '        echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
864        #txt += '        echo ">>> Remove working directory: $WORKING_DIR"\n'
865        #txt += '        /bin/rm -rf $WORKING_DIR\n'
866        #txt += '        if [ -d $WORKING_DIR ] ;then\n'
867        #txt += '            echo "SET_CMS_ENV 10018 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
868        #txt += '            echo "JOB_EXIT_STATUS = 10018"\n'
869        #txt += '            echo "JobExitCode=10018" | tee -a $RUNTIME_AREA/$repo\n'
870        #txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
871        #txt += '            echo "ERROR ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after CMSSW CMSSW_0_6_1 not found on `hostname`"\n'
872        #txt += '            job_exit_code=10017\n'
873        #txt += '        fi\n'
874        #txt += '    fi \n'
875        #txt += '    exit 1 \n'
622          txt += '    func_exit\n'
623          txt += 'fi \n'
624          txt += 'cd '+self.version+'\n'
625 <        ########## FEDE FOR DBS2 ######################
880 <        txt += 'SOFTWARE_DIR=`pwd`\n'
625 >        txt += 'SOFTWARE_DIR=`pwd`; export SOFTWARE_DIR\n'
626          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
882        ###############################################
883        ### needed grep for bug in scramv1 ###
627          txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
628 +        txt += 'if [ $? != 0 ] ; then\n'
629 +        txt += '    echo "ERROR ==> Problem with the command: "\n'
630 +        txt += '    echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
631 +        txt += '    job_exit_code=10034\n'
632 +        txt += '    func_exit\n'
633 +        txt += 'fi \n'
634          # Handle the arguments:
635          txt += "\n"
636          txt += "## number of arguments (first argument always jobnumber)\n"
637          txt += "\n"
889       # txt += "if [ $nargs -lt "+str(len(self.argsList[nj].split()))+" ]\n"
638          txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
639          txt += "then\n"
892        #txt += "    echo 'SET_EXE_ENV 1 ==> ERROR Too few arguments' +$nargs+ \n"
893        #txt += '    echo "JOB_EXIT_STATUS = 50113"\n'
894        #txt += '    echo "JobExitCode=50113" | tee -a $RUNTIME_AREA/$repo\n'
895        #txt += '    dumpStatus $RUNTIME_AREA/$repo\n'
640          txt += "    echo 'ERROR ==> Too few arguments' +$nargs+ \n"
641          txt += '    job_exit_code=50113\n'
898        #txt += '    if [ $middleware == OSG ]; then \n'
899        #txt += '        cd $RUNTIME_AREA\n'
900        #txt += '        echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
901        #txt += '        echo ">>> Remove working directory: $WORKING_DIR"\n'
902        #txt += '        /bin/rm -rf $WORKING_DIR\n'
903        #txt += '        if [ -d $WORKING_DIR ] ;then\n'
904        #txt += '            echo "SET_EXE_ENV 50114 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
905        #txt += '            echo "JOB_EXIT_STATUS = 50114"\n'
906        #txt += '            echo "JobExitCode=50114" | tee -a $RUNTIME_AREA/$repo\n'
907        #txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
908        #txt += '            echo "ERROR ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after Too few arguments for CRAB job wrapper"\n'
909        #txt += '            job_exit_code=10017\n'
910        #txt += '        fi\n'
911        #txt += '    fi\n'
912        #txt += "    exit 1\n"
642          txt += "    func_exit\n"
643          txt += "fi\n"
644          txt += "\n"
645  
646          # Prepare job-specific part
647          job = common.job_list[nj]
919        ### FEDE FOR DBS OUTPUT PUBLICATION
648          if (self.datasetPath):
649 +            self.primaryDataset = self.datasetPath.split("/")[1]
650 +            DataTier = self.datasetPath.split("/")[2]
651              txt += '\n'
652              txt += 'DatasetPath='+self.datasetPath+'\n'
653  
654 <            datasetpath_split = self.datasetPath.split("/")
655 <
926 <            txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
927 <            txt += 'DataTier='+datasetpath_split[2]+'\n'
654 >            txt += 'PrimaryDataset='+self.primaryDataset +'\n'
655 >            txt += 'DataTier='+DataTier+'\n'
656              txt += 'ApplicationFamily=cmsRun\n'
657  
658          else:
659 +            self.primaryDataset = 'null'
660              txt += 'DatasetPath=MCDataTier\n'
661              txt += 'PrimaryDataset=null\n'
662              txt += 'DataTier=null\n'
663              txt += 'ApplicationFamily=MCDataTier\n'
664 <        if self.pset != None:
664 >        if self.pset != None:
665              pset = os.path.basename(job.configFilename())
666              txt += '\n'
667              txt += 'cp  $RUNTIME_AREA/'+pset+' .\n'
668 <            if (self.datasetPath): # standard job
669 <                txt += 'InputFiles=${args[1]}; export InputFiles\n'
670 <                txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
671 <                txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
672 <                txt += 'echo "Inputfiles:<$InputFiles>"\n'
673 <                txt += 'echo "MaxEvents:<$MaxEvents>"\n'
674 <                txt += 'echo "SkipEvents:<$SkipEvents>"\n'
675 <            else:  # pythia like job
947 <                txt += 'PreserveSeeds='  + ','.join(self.preserveSeeds)  + '; export PreserveSeeds\n'
948 <                txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
949 <                txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
950 <                txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
951 <                if (self.firstRun):
952 <                    txt += 'FirstRun=${args[1]}; export FirstRun\n'
953 <                    txt += 'echo "FirstRun: <$FirstRun>"\n'
954 <
955 <            txt += 'mv -f '+pset+' pset.cfg\n'
956 <
957 <        if len(self.additional_inbox_files) > 0:
958 <            txt += 'if [ -e $RUNTIME_AREA/'+self.additional_tgz_name+' ] ; then\n'
959 <            txt += '  tar xzvf $RUNTIME_AREA/'+self.additional_tgz_name+'\n'
960 <            txt += 'fi\n'
961 <            pass
668 >
669 >            txt += 'PreserveSeeds='  + ','.join(self.preserveSeeds)  + '; export PreserveSeeds\n'
670 >            txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
671 >            txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
672 >            txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
673 >
674 >            txt += 'mv -f ' + pset + ' ' + psetName + '\n'
675 >
676  
677          if self.pset != None:
678 +            # FUTURE: Can simply for 2_1_x and higher
679              txt += '\n'
680 <            txt += 'echo "***** cat pset.cfg *********"\n'
681 <            txt += 'cat pset.cfg\n'
682 <            txt += 'echo "****** end pset.cfg ********"\n'
683 <            txt += '\n'
684 <            txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
680 >            if self.debug_wrapper == 1:
681 >                txt += 'echo "***** cat ' + psetName + ' *********"\n'
682 >                txt += 'cat ' + psetName + '\n'
683 >                txt += 'echo "****** end ' + psetName + ' ********"\n'
684 >                txt += '\n'
685 >                txt += 'echo "***********************" \n'
686 >                txt += 'which edmConfigHash \n'
687 >                txt += 'echo "***********************" \n'
688 >            if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
689 >                txt += 'edmConfigHash ' + psetName + ' \n'
690 >                txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n'
691 >            else:
692 >                txt += 'PSETHASH=`edmConfigHash < ' + psetName + '` \n'
693              txt += 'echo "PSETHASH = $PSETHASH" \n'
694 +            #### FEDE temporary fix for noEdm files #####
695 +            txt += 'if [ -z "$PSETHASH" ]; then \n'
696 +            txt += '   export PSETHASH=null\n'
697 +            txt += 'fi \n'
698 +            #############################################
699              txt += '\n'
700          return txt
701 <    #### FEDE #####
701 >
702      def wsUntarSoftware(self, nj=0):
703          """
704          Put in the script the commands to build an executable
# Line 981 | Line 709 | class Cmssw(JobType):
709  
710          if os.path.isfile(self.tgzNameWithPath):
711              txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
712 <            txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
712 >            txt += 'tar zxvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
713 >            if  self.debug_wrapper==1 :
714 >                txt += 'ls -Al \n'
715              txt += 'untar_status=$? \n'
716              txt += 'if [ $untar_status -ne 0 ]; then \n'
717              txt += '   echo "ERROR ==> Untarring .tgz file failed"\n'
# Line 991 | Line 721 | class Cmssw(JobType):
721              txt += '   echo "Successful untar" \n'
722              txt += 'fi \n'
723              txt += '\n'
724 <            txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
724 >            txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
725              txt += 'if [ -z "$PYTHONPATH" ]; then\n'
726 <            txt += '   export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
726 >            txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
727              txt += 'else\n'
728 <            txt += '   export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
728 >            txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
729              txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
730              txt += 'fi\n'
731              txt += '\n'
# Line 1003 | Line 733 | class Cmssw(JobType):
733              pass
734  
735          return txt
736 <        
736 >
737      def wsBuildExe(self, nj=0):
738          """
739          Put in the script the commands to build an executable
# Line 1013 | Line 743 | class Cmssw(JobType):
743          txt = '\n#Written by cms_cmssw::wsBuildExe\n'
744          txt += 'echo ">>> moving CMSSW software directories in `pwd`" \n'
745  
746 <        txt += 'mv $RUNTIME_AREA/lib . \n'
747 <        txt += 'mv $RUNTIME_AREA/module . \n'
748 <        txt += 'mv $RUNTIME_AREA/ProdCommon . \n'
749 <        
750 <
751 <        #if os.path.isfile(self.tgzNameWithPath):
752 <        #    txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
753 <        #    txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
754 <        #    txt += 'untar_status=$? \n'
755 <        #    txt += 'if [ $untar_status -ne 0 ]; then \n'
756 <        #    txt += '   echo "ERROR ==> Untarring .tgz file failed"\n'
757 <        #    txt += '   job_exit_code=$untar_status\n'
758 <        #    txt += '   func_exit\n'
1029 <        #    txt += 'else \n'
1030 <        #    txt += '   echo "Successful untar" \n'
1031 <        #    txt += 'fi \n'
1032 <        #    txt += '\n'
1033 <        #    txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
746 >        txt += 'rm -r lib/ module/ \n'
747 >        txt += 'mv $RUNTIME_AREA/lib/ . \n'
748 >        txt += 'mv $RUNTIME_AREA/module/ . \n'
749 >        if self.dataExist == True:
750 >            txt += 'rm -r src/ \n'
751 >            txt += 'mv $RUNTIME_AREA/src/ . \n'
752 >        if len(self.additional_inbox_files)>0:
753 >            for file in self.additional_inbox_files:
754 >                txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
755 >        # txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
756 >        # txt += 'mv $RUNTIME_AREA/IMProv/ . \n'
757 >
758 >        txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
759          txt += 'if [ -z "$PYTHONPATH" ]; then\n'
760 <        txt += '   export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
760 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
761          txt += 'else\n'
762 <        txt += '   export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
762 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
763          txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
764          txt += 'fi\n'
765          txt += '\n'
766  
767          return txt
1043    ############################################################################
768  
1045    def modifySteeringCards(self, nj):
1046        """
1047        modify the card provided by the user,
1048        writing a new card into share dir
1049        """
769  
770      def executableName(self):
771 <        if self.scriptExe: #CarlosDaniele
771 >        if self.scriptExe:
772              return "sh "
773          else:
774              return self.executable
775  
776      def executableArgs(self):
777          # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
778 <        if self.scriptExe:#CarlosDaniele
779 <            return   self.scriptExe + " $NJob"
778 >        if self.scriptExe:
779 >            return self.scriptExe + " $NJob"
780          else:
1062            version_array = self.scram.getSWVersion().split('_')
1063            major = 0
1064            minor = 0
1065            try:
1066                major = int(version_array[1])
1067                minor = int(version_array[2])
1068            except:
1069                msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
1070                raise CrabException(msg)
1071
781              ex_args = ""
782 <
783 <            # Framework job report
784 <            if major >= 1 and minor >= 5 :
785 <                #ex_args += " -j " + self.fjrFileName
1077 <            ### FEDE it could be improved!!! ####    
1078 <                ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
1079 <            #######################################
1080 <            # Type of cfg file
1081 <            if major >= 2 :
1082 <                ex_args += " -p pset.pycfg"
782 >            ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
783 >            # Type of config file depends on CMSSW version
784 >            if self.CMSSW_major >= 2 :
785 >                ex_args += " -p pset.py"
786              else:
787                  ex_args += " -p pset.cfg"
788              return ex_args
# Line 1089 | Line 792 | class Cmssw(JobType):
792          Returns a list of filenames to be put in JDL input sandbox.
793          """
794          inp_box = []
1092        # # dict added to delete duplicate from input sandbox file list
1093        # seen = {}
1094        ## code
795          if os.path.isfile(self.tgzNameWithPath):
796              inp_box.append(self.tgzNameWithPath)
797 <        if os.path.isfile(self.MLtgzfile):
1098 <            inp_box.append(self.MLtgzfile)
1099 <        ## config
1100 <        if not self.pset is None:
1101 <            inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1102 <        ## additional input files
1103 <        tgz = self.additionalInputFileTgz()
1104 <        inp_box.append(tgz)
1105 <        ## executable
1106 <        wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1107 <        inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
797 >        inp_box.append(common.work_space.jobDir() + self.scriptName)
798          return inp_box
799  
800      def outputSandbox(self, nj):
# Line 1116 | Line 806 | class Cmssw(JobType):
806          ## User Declared output files
807          for out in (self.output_file+self.output_file_sandbox):
808              n_out = nj + 1
809 <            out_box.append(self.numberFile_(out,str(n_out)))
809 >            out_box.append(numberFile(out,str(n_out)))
810          return out_box
811  
1122    def prepareSteeringCards(self):
1123        """
1124        Make initial modifications of the user's steering card file.
1125        """
1126        return
812  
813      def wsRenameOutput(self, nj):
814          """
# Line 1133 | Line 818 | class Cmssw(JobType):
818          txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
819          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
820          txt += 'echo ">>> current directory content:"\n'
821 <        txt += 'ls \n'
821 >        if self.debug_wrapper==1:
822 >            txt += 'ls -Al\n'
823          txt += '\n'
824  
1139        #txt += 'output_exit_status=0\n'
1140
1141        ### FEDE #######
1142        #for fileWithSuffix in (self.output_file_sandbox):
1143        #    output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
1144        #    txt += '\n'
1145        #    txt += '# check output file\n'
1146        #    txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
1147        #    txt += '    mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
1148        #    txt += '    ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
1149        #    txt += 'else\n'
1150        #    txt += '    echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
1151        #    txt += '    job_exit_code=60302\n'
1152        #    if common.scheduler.name().upper() == 'CONDOR_G':
1153        #        txt += '    if [ $middleware == OSG ]; then \n'
1154        #        txt += '        echo "prepare dummy output file"\n'
1155        #        txt += '        echo "Processing of job output failed" > $RUNTIME_AREA/'+output_file_num+'\n'
1156        #        txt += '    fi \n'
1157        #    txt += 'fi\n'
1158
825          for fileWithSuffix in (self.output_file):
826 <            output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
826 >            output_file_num = numberFile(fileWithSuffix, '$NJob')
827              txt += '\n'
828              txt += '# check output file\n'
829              txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
# Line 1168 | Line 834 | class Cmssw(JobType):
834                  txt += '    mv '+fileWithSuffix+' $RUNTIME_AREA/'+output_file_num+'\n'
835                  txt += '    ln -s $RUNTIME_AREA/'+output_file_num+' $RUNTIME_AREA/'+fileWithSuffix+'\n'
836              txt += 'else\n'
1171            #txt += '    exit_status=60302\n'
1172            #txt += '    echo "ERROR: Output file '+fileWithSuffix+' not found"\n'
1173            #txt += '    echo "JOB_EXIT_STATUS = $exit_status"\n'
1174            #txt += '    output_exit_status=$exit_status\n'
837              txt += '    job_exit_code=60302\n'
838              txt += '    echo "WARNING: Output file '+fileWithSuffix+' not found"\n'
839              if common.scheduler.name().upper() == 'CONDOR_G':
# Line 1182 | Line 844 | class Cmssw(JobType):
844              txt += 'fi\n'
845          file_list = []
846          for fileWithSuffix in (self.output_file):
847 <             file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
847 >             file_list.append(numberFile('$SOFTWARE_DIR/'+fileWithSuffix, '$NJob'))
848  
849 <        txt += 'file_list="'+string.join(file_list,' ')+'"\n'
849 >        txt += 'file_list="'+string.join(file_list,',')+'"\n'
850          txt += '\n'
851          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
852          txt += 'echo ">>> current directory content:"\n'
853 <        txt += 'ls \n'
853 >        if self.debug_wrapper==1:
854 >            txt += 'ls -Al\n'
855          txt += '\n'
856          txt += 'cd $RUNTIME_AREA\n'
857          txt += 'echo ">>> current directory (RUNTIME_AREA):  $RUNTIME_AREA"\n'
858          return txt
859  
1197    def numberFile_(self, file, txt):
1198        """
1199        append _'txt' before last extension of a file
1200        """
1201        p = string.split(file,".")
1202        # take away last extension
1203        name = p[0]
1204        for x in p[1:-1]:
1205            name=name+"."+x
1206        # add "_txt"
1207        if len(p)>1:
1208            ext = p[len(p)-1]
1209            result = name + '_' + txt + "." + ext
1210        else:
1211            result = name + '_' + txt
1212
1213        return result
1214
860      def getRequirements(self, nj=[]):
861          """
862          return job requirements to add to jdl files
# Line 1221 | Line 866 | class Cmssw(JobType):
866              req='Member("VO-cms-' + \
867                   self.version + \
868                   '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
869 <        ## SL add requirement for OS version only if SL4
1225 <        #reSL4 = re.compile( r'slc4' )
1226 <        if self.executable_arch: # and reSL4.search(self.executable_arch):
869 >        if self.executable_arch:
870              req+=' && Member("VO-cms-' + \
871                   self.executable_arch + \
872                   '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
873  
874          req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
875 <        if common.scheduler.name() == "glitecoll":
875 >        if ( common.scheduler.name() == "glitecoll" ) or ( common.scheduler.name() == "glite"):
876              req += ' && other.GlueCEStateStatus == "Production" '
877  
878          return req
879  
880      def configFilename(self):
881          """ return the config filename """
882 <        return self.name()+'.cfg'
882 >        # FUTURE: Can remove cfg mode for CMSSW >= 2_1_x
883 >        if (self.CMSSW_major >= 2 and self.CMSSW_minor >= 1) or (self.CMSSW_major >= 3):
884 >          return self.name()+'.py'
885 >        else:
886 >          return self.name()+'.cfg'
887  
888      def wsSetupCMSOSGEnvironment_(self):
889          """
# Line 1252 | Line 899 | class Cmssw(JobType):
899          txt += '      # Use $OSG_APP/cmssoft/cms/cmsset_default.sh to setup cms software\n'
900          txt += '        source $OSG_APP/cmssoft/cms/cmsset_default.sh '+self.version+'\n'
901          txt += '    else\n'
1255        #txt += '        echo "SET_CMS_ENV 10020 ==> ERROR $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1256        #txt += '        echo "JOB_EXIT_STATUS = 10020"\n'
1257        #txt += '        echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1258        #txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
902          txt += '        echo "ERROR ==> $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
903          txt += '        job_exit_code=10020\n'
1261        #txt += '        cd $RUNTIME_AREA\n'
1262        #txt += '        echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
1263        #txt += '        echo ">>> Remove working directory: $WORKING_DIR"\n'
1264        #txt += '        /bin/rm -rf $WORKING_DIR\n'
1265        #txt += '        if [ -d $WORKING_DIR ] ;then\n'
1266        #txt += '            echo "SET_CMS_ENV 10017 ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1267        #txt += '            echo "JOB_EXIT_STATUS = 10017"\n'
1268        #txt += '            echo "JobExitCode=10017" | tee -a $RUNTIME_AREA/$repo\n'
1269        #txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
1270        #txt += '            echo "ERROR ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after $OSG_APP/cmssoft/cms/cmsset_default.sh file not found"\n'
1271        #txt += '            job_exit_code=10017\n'
1272        #txt += '        fi\n'
1273        txt += '\n'
1274        #txt += '        exit 1\n'
904          txt += '        func_exit\n'
905          txt += '    fi\n'
906          txt += '\n'
# Line 1280 | Line 909 | class Cmssw(JobType):
909  
910          return txt
911  
1283    ### OLI_DANIELE
912      def wsSetupCMSLCGEnvironment_(self):
913          """
914          Returns part of a job script which is prepares
# Line 1292 | Line 920 | class Cmssw(JobType):
920          txt += '    export SCRAM_ARCH='+self.executable_arch+'\n'
921          txt += '    export BUILD_ARCH='+self.executable_arch+'\n'
922          txt += '    if [ ! $VO_CMS_SW_DIR ] ;then\n'
1295        #txt += '        echo "SET_CMS_ENV 10031 ==> ERROR CMS software dir not found on WN `hostname`"\n'
1296        #txt += '        echo "JOB_EXIT_STATUS = 10031" \n'
1297        #txt += '        echo "JobExitCode=10031" | tee -a $RUNTIME_AREA/$repo\n'
1298        #txt += '        dumpStatus $RUNTIME_AREA/$repo\n'
1299        #txt += '        exit 1\n'
923          txt += '        echo "ERROR ==> CMS software dir not found on WN `hostname`"\n'
924          txt += '        job_exit_code=10031\n'
925          txt += '        func_exit\n'
926          txt += '    else\n'
927          txt += '        echo "Sourcing environment... "\n'
928          txt += '        if [ ! -s $VO_CMS_SW_DIR/cmsset_default.sh ] ;then\n'
1306        #txt += '            echo "SET_CMS_ENV 10020 ==> ERROR cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
1307        #txt += '            echo "JOB_EXIT_STATUS = 10020"\n'
1308        #txt += '            echo "JobExitCode=10020" | tee -a $RUNTIME_AREA/$repo\n'
1309        #txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
1310        #txt += '            exit 1\n'
929          txt += '            echo "ERROR ==> cmsset_default.sh file not found into dir $VO_CMS_SW_DIR"\n'
930          txt += '            job_exit_code=10020\n'
931          txt += '            func_exit\n'
# Line 1316 | Line 934 | class Cmssw(JobType):
934          txt += '        source $VO_CMS_SW_DIR/cmsset_default.sh\n'
935          txt += '        result=$?\n'
936          txt += '        if [ $result -ne 0 ]; then\n'
1319        #txt += '            echo "SET_CMS_ENV 10032 ==> ERROR problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
1320        #txt += '            echo "JOB_EXIT_STATUS = 10032"\n'
1321        #txt += '            echo "JobExitCode=10032" | tee -a $RUNTIME_AREA/$repo\n'
1322        #txt += '            dumpStatus $RUNTIME_AREA/$repo\n'
1323        #txt += '            exit 1\n'
937          txt += '            echo "ERROR ==> problem sourcing $VO_CMS_SW_DIR/cmsset_default.sh"\n'
938          txt += '            job_exit_code=10032\n'
939          txt += '            func_exit\n'
# Line 1330 | Line 943 | class Cmssw(JobType):
943          txt += '    echo "==> setup cms environment ok"\n'
944          return txt
945  
946 <    ### FEDE FOR DBS OUTPUT PUBLICATION
1334 <    def modifyReport(self, nj):
946 >    def wsModifyReport(self, nj):
947          """
948          insert the part of the script that modifies the FrameworkJob Report
949          """
950  
951 <        txt = '\n#Written by cms_cmssw::modifyReport\n'
952 <        try:
953 <            publish_data = int(self.cfg_params['USER.publish_data'])
954 <        except KeyError:
955 <            publish_data = 0
956 <        if (publish_data == 1):
957 <            
958 <            txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
959 <            txt += '    echo ">>> Modify Job Report:" \n'
960 <            txt += '    chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
961 <            #txt += '    if [ -z "$SE" ]; then\n'
962 <            #txt += '        SE="" \n'
963 <            #txt += '    fi \n'
964 <            #txt += '    if [ -z "$SE_PATH" ]; then\n'
965 <            #txt += '        SE_PATH="" \n'
966 <            #txt += '    fi \n'
967 <            txt += '    echo "SE = $SE"\n'
968 <            txt += '    echo "SE_PATH = $SE_PATH"\n'
969 <
970 <            processedDataset = self.cfg_params['USER.publish_data_name']
971 <            txt += '    ProcessedDataset='+processedDataset+'\n'
972 <            #txt += '    if [ "$SE_PATH" == "" ]; then\n'
973 <            #txt += '        FOR_LFN=/copy_problems/ \n'
974 <            #txt += '    else \n'
975 <            #txt += '        tmp=`echo $SE_PATH | awk -F \'store\' \'{print$2}\'` \n'
976 <            #txt += '        FOR_LFN=/store$tmp \n'
977 <            #txt += '    fi \n'
978 <            txt += '    tmp=`echo $SE_PATH | awk -F \'store\' \'{print$2}\'` \n'
979 <            txt += '    FOR_LFN=/store$tmp \n'
980 <            txt += '    echo "ProcessedDataset = $ProcessedDataset"\n'
981 <            txt += '    echo "FOR_LFN = $FOR_LFN" \n'
982 <            txt += '    echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
983 <            #txt += '    echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
984 <            #txt += '    $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
985 <            ### FEDE ####
986 <            txt += '    echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
987 <            txt += '    $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
988 <            ####################################
989 <            txt += '    modifyReport_result=$?\n'
1378 <            txt += '    if [ $modifyReport_result -ne 0 ]; then\n'
1379 <            txt += '        modifyReport_result=70500\n'
1380 <            txt += '        job_exit_code=$modifyReport_result\n'
1381 <            txt += '        echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
1382 <            txt += '        echo "WARNING: Problem with ModifyJobReport"\n'
1383 <            txt += '    else\n'
1384 <            ### FEDE #####
1385 <            #txt += '        mv NewFrameworkJobReport.xml crab_fjr_$NJob.xml\n'
1386 <            #######################
1387 <            txt += '        mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1388 <            txt += '    fi\n'
951 >        txt = ''
952 >        publish_data = int(self.cfg_params.get('USER.publish_data',0))
953 >        #if (publish_data == 1):
954 >        if (self.copy_data == 1):
955 >            txt = '\n#Written by cms_cmssw::wsModifyReport\n'
956 >            publish_data = int(self.cfg_params.get('USER.publish_data',0))
957 >
958 >
959 >            txt += 'if [ $StageOutExitStatus -eq 0 ]; then\n'
960 >            txt += '    FOR_LFN=$LFNBaseName\n'
961 >            txt += 'else\n'
962 >            txt += '    FOR_LFN=/copy_problems/ \n'
963 >            txt += 'fi\n'
964 >
965 >            txt += 'echo ">>> Modify Job Report:" \n'
966 >            txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
967 >            txt += 'echo "SE = $SE"\n'
968 >            txt += 'echo "SE_PATH = $SE_PATH"\n'
969 >            txt += 'echo "FOR_LFN = $FOR_LFN" \n'
970 >            txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
971 >
972 >
973 >            args = 'fjr $RUNTIME_AREA/crab_fjr_$NJob.xml n_job $NJob for_lfn $FOR_LFN PrimaryDataset $PrimaryDataset  ApplicationFamily $ApplicationFamily ApplicationName $executable cmssw_version $CMSSW_VERSION psethash $PSETHASH se_name $SE se_path $SE_PATH'
974 >            if (publish_data == 1):
975 >                processedDataset = self.cfg_params['USER.publish_data_name']
976 >                txt += 'ProcessedDataset='+processedDataset+'\n'
977 >                txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
978 >                args += ' UserProcessedDataset $USER-$ProcessedDataset-$PSETHASH'
979 >
980 >            txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'"\n'
981 >            txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'\n'
982 >            txt += 'modifyReport_result=$?\n'
983 >            txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
984 >            txt += '    modifyReport_result=70500\n'
985 >            txt += '    job_exit_code=$modifyReport_result\n'
986 >            txt += '    echo "ModifyReportResult=$modifyReport_result" | tee -a $RUNTIME_AREA/$repo\n'
987 >            txt += '    echo "WARNING: Problem with ModifyJobReport"\n'
988 >            txt += 'else\n'
989 >            txt += '    mv NewFrameworkJobReport.xml $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
990              txt += 'fi\n'
991          return txt
992  
993 <    def cleanEnv(self):
994 <        txt = '\n#Written by cms_cmssw::cleanEnv\n'
995 <        txt += 'if [ $middleware == OSG ]; then\n'
996 <        txt += '    cd $RUNTIME_AREA\n'
997 <        txt += '    echo ">>> current directory (RUNTIME_AREA): $RUNTIME_AREA"\n'
998 <        txt += '    echo ">>> Remove working directory: $WORKING_DIR"\n'
999 <        txt += '    /bin/rm -rf $WORKING_DIR\n'
1000 <        txt += '    if [ -d $WORKING_DIR ] ;then\n'
1001 <        txt += '        echo "ERROR ==> OSG $WORKING_DIR could not be deleted on WN `hostname` after cleanup of WN"\n'
1002 <        txt += '        job_exit_code=10017\n'
1003 <        txt += '        func_exit\n'
993 >    def wsParseFJR(self):
994 >        """
995 >        Parse the FrameworkJobReport to obtain useful infos
996 >        """
997 >        txt = '\n#Written by cms_cmssw::wsParseFJR\n'
998 >        txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
999 >        txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1000 >        txt += '    if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1001 >        txt += '        cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n'
1002 >        if self.debug_wrapper==1 :
1003 >            txt += '        echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1004 >        txt += '        executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n'
1005 >        txt += '        if [ $executable_exit_status -eq 50115 ];then\n'
1006 >        txt += '            echo ">>> crab_fjr.xml contents: "\n'
1007 >        txt += '            cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1008 >        txt += '            echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1009 >        txt += '        elif [ $executable_exit_status -eq -999 ];then\n'
1010 >        txt += '            echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n'
1011 >        txt += '        else\n'
1012 >        txt += '            echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1013 >        txt += '        fi\n'
1014 >        txt += '    else\n'
1015 >        txt += '        echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1016          txt += '    fi\n'
1017 +          #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1018 +        txt += '    if [ $executable_exit_status -eq 0 ];then\n'
1019 +        txt += '        echo ">>> Executable succeded  $executable_exit_status"\n'
1020 +        ## This cannot more work given the changes on the Job argumentsJob  
1021 +        """
1022 +        if (self.datasetPath and not (self.dataset_pu or self.useParent==1)) :
1023 +          # VERIFY PROCESSED DATA
1024 +            txt += '        echo ">>> Verify list of processed files:"\n'
1025 +            txt += '        echo $InputFiles |tr -d \'\\\\\' |tr \',\' \'\\n\'|tr -d \'"\' > input-files.txt\n'
1026 +            txt += '        python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --lfn > processed-files.txt\n'
1027 +            txt += '        cat input-files.txt  | sort | uniq > tmp.txt\n'
1028 +            txt += '        mv tmp.txt input-files.txt\n'
1029 +            txt += '        echo "cat input-files.txt"\n'
1030 +            txt += '        echo "----------------------"\n'
1031 +            txt += '        cat input-files.txt\n'
1032 +            txt += '        cat processed-files.txt | sort | uniq > tmp.txt\n'
1033 +            txt += '        mv tmp.txt processed-files.txt\n'
1034 +            txt += '        echo "----------------------"\n'
1035 +            txt += '        echo "cat processed-files.txt"\n'
1036 +            txt += '        echo "----------------------"\n'
1037 +            txt += '        cat processed-files.txt\n'
1038 +            txt += '        echo "----------------------"\n'
1039 +            txt += '        diff -qbB input-files.txt processed-files.txt\n'
1040 +            txt += '        fileverify_status=$?\n'
1041 +            txt += '        if [ $fileverify_status -ne 0 ]; then\n'
1042 +            txt += '            executable_exit_status=30001\n'
1043 +            txt += '            echo "ERROR ==> not all input files processed"\n'
1044 +            txt += '            echo "      ==> list of processed files from crab_fjr.xml differs from list in pset.cfg"\n'
1045 +            txt += '            echo "      ==> diff input-files.txt processed-files.txt"\n'
1046 +            txt += '        fi\n'
1047 +        """
1048 +        txt += '    fi\n'
1049 +        txt += 'else\n'
1050 +        txt += '    echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1051          txt += 'fi\n'
1052          txt += '\n'
1053 +        txt += 'if [ $executable_exit_status -ne 0 ] && [ $executable_exit_status -ne 50115 ] && [ $executable_exit_status -ne 50117 ] && [ $executable_exit_status -ne 30001 ];then\n'
1054 +        txt += '    echo ">>> Executable failed  $executable_exit_status"\n'
1055 +        txt += '    echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1056 +        txt += '    echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1057 +        txt += '    job_exit_code=$executable_exit_status\n'
1058 +        txt += '    func_exit\n'
1059 +        txt += 'fi\n\n'
1060 +        txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1061 +        txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1062 +        txt += 'job_exit_code=$executable_exit_status\n'
1063 +
1064          return txt
1065  
1066      def setParam_(self, param, value):
# Line 1411 | Line 1069 | class Cmssw(JobType):
1069      def getParams(self):
1070          return self._params
1071  
1072 <    def uniquelist(self, old):
1415 <        """
1416 <        remove duplicates from a list
1417 <        """
1418 <        nd={}
1419 <        for e in old:
1420 <            nd[e]=0
1421 <        return nd.keys()
1422 <
1423 <    def outList(self):
1072 >    def outList(self,list=False):
1073          """
1074          check the dimension of the output files
1075          """
1076          txt = ''
1077          txt += 'echo ">>> list of expected files on output sandbox"\n'
1078          listOutFiles = []
1079 <        stdout = 'CMSSW_$NJob.stdout'
1079 >        stdout = 'CMSSW_$NJob.stdout'
1080          stderr = 'CMSSW_$NJob.stderr'
1081 +        if len(self.output_file) <= 0:
1082 +            msg ="WARNING: no output files name have been defined!!\n"
1083 +            msg+="\tno output files will be reported back/staged\n"
1084 +            common.logger.message(msg)
1085          if (self.return_data == 1):
1086              for file in (self.output_file+self.output_file_sandbox):
1087 <                listOutFiles.append(self.numberFile_(file, '$NJob'))
1087 >                listOutFiles.append(numberFile(file, '$NJob'))
1088              listOutFiles.append(stdout)
1089              listOutFiles.append(stderr)
1090          else:
1091              for file in (self.output_file_sandbox):
1092 <                listOutFiles.append(self.numberFile_(file, '$NJob'))
1092 >                listOutFiles.append(numberFile(file, '$NJob'))
1093              listOutFiles.append(stdout)
1094              listOutFiles.append(stderr)
1095          txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1096          txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1097          txt += 'export filesToCheck\n'
1098 <        return txt
1098 >
1099 >        if list : return self.output_file
1100 >        return txt

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines