ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/cms_cmssw.py
(Generate patch)

Comparing COMP/CRAB/python/cms_cmssw.py (file contents):
Revision 1.178 by spiga, Sun Apr 20 09:34:40 2008 UTC vs.
Revision 1.365 by spiga, Tue Nov 9 21:10:07 2010 UTC

# Line 1 | Line 1
1 +
2 + __revision__ = "$Id$"
3 + __version__ = "$Revision$"
4 +
5   from JobType import JobType
2 from crab_logger import Logger
6   from crab_exceptions import *
7   from crab_util import *
5 from BlackWhiteListParser import BlackWhiteListParser
8   import common
9 + import re
10   import Scram
11 < from LFNBaseName import *
11 > from Splitter import JobSplitter
12 > from Downloader import Downloader
13 > try:
14 >    import json
15 > except:
16 >    import simplejson as json
17  
18 + from IMProv.IMProvNode import IMProvNode
19 + from IMProv.IMProvLoader import loadIMProvFile
20   import os, string, glob
21 + from xml.dom import pulldom
22  
23   class Cmssw(JobType):
24 <    def __init__(self, cfg_params, ncjobs):
24 >    def __init__(self, cfg_params, ncjobs,skip_blocks, isNew):
25          JobType.__init__(self, 'CMSSW')
26 <        common.logger.debug(3,'CMSSW::__init__')
27 <
28 <        self.argsList = []
29 <
26 >        common.logger.debug('CMSSW::__init__')
27 >        self.skip_blocks = skip_blocks
28 >        self.argsList = 2
29 >        self.NumEvents=0
30          self._params = {}
31          self.cfg_params = cfg_params
32 <        # init BlackWhiteListParser
33 <        self.blackWhiteListParser = BlackWhiteListParser(cfg_params)
32 >        ### FEDE FOR MULTI ###
33 >        self.var_filter=''
34  
35 <        self.MaxTarBallSize = float(self.cfg_params.get('EDG.maxtarballsize',9.5))
35 >        ### Temporary patch to automatically skip the ISB size check:
36 >        self.server = self.cfg_params.get('CRAB.server_name',None) or \
37 >                      self.cfg_params.get('CRAB.use_server',0)
38 >        self.local  = common.scheduler.name().upper() in ['LSF','CAF','CONDOR','SGE','PBS']
39 >        size = 9.5
40 >        if self.server :
41 >            size = 1000
42 >        elif self.local:
43 >            size = 9999999
44 >        self.MaxTarBallSize = float(self.cfg_params.get('GRID.maxtarballsize',size))
45  
46          # number of jobs requested to be created, limit obj splitting
47          self.ncjobs = ncjobs
48  
29        log = common.logger
30
49          self.scram = Scram.Scram(cfg_params)
50          self.additional_inbox_files = []
51          self.scriptExe = ''
52          self.executable = ''
53          self.executable_arch = self.scram.getArch()
54          self.tgz_name = 'default.tgz'
37        self.additional_tgz_name = 'additional.tgz'
55          self.scriptName = 'CMSSW.sh'
56 <        self.pset = ''      #scrip use case Da
57 <        self.datasetPath = '' #scrip use case Da
56 >        self.pset = ''
57 >        self.datasetPath = ''
58  
59 +        self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name
60          # set FJR file name
61          self.fjrFileName = 'crab_fjr.xml'
62  
63          self.version = self.scram.getSWVersion()
64 +        common.logger.log(10-1,"CMSSW version is: "+str(self.version))
65 +        version_array = self.version.split('_')
66 +        self.CMSSW_major = 0
67 +        self.CMSSW_minor = 0
68 +        self.CMSSW_patch = 0
69 +        try:
70 +            self.CMSSW_major = int(version_array[1])
71 +            self.CMSSW_minor = int(version_array[2])
72 +            self.CMSSW_patch = int(version_array[3])
73 +        except:
74 +            msg = "Cannot parse CMSSW version string: " + self.version + " for major and minor release number!"
75 +            raise CrabException(msg)
76  
77 <        #
78 <        # Try to block creation in case of arch/version mismatch
79 <        #
80 <
81 < #        a = string.split(self.version, "_")
82 < #
83 < #        if int(a[1]) == 1 and (int(a[2]) < 5 and self.executable_arch.find('slc4') == 0):
84 < #            msg = "Warning: You are using %s version of CMSSW  with %s architecture. \n--> Did you compile your libraries with SLC3? Otherwise you can find some problems running on SLC4 Grid nodes.\n"%(self.version, self.executable_arch)
85 < #            common.logger.message(msg)
56 < #        if int(a[1]) == 1 and (int(a[2]) >= 5 and self.executable_arch.find('slc3') == 0):
57 < #            msg = "Error: CMS does not support %s with %s architecture"%(self.version, self.executable_arch)
58 < #            raise CrabException(msg)
59 < #
77 >        if self.CMSSW_major < 2 or (self.CMSSW_major == 2 and self.CMSSW_minor < 1):
78 >            msg = "CRAB supports CMSSW >= 2_1_x only. Use an older CRAB version."
79 >            raise CrabException(msg)
80 >            """
81 >            As CMSSW versions are dropped we can drop more code:
82 >            2.x dropped: drop check for lumi range setting
83 >            """
84 >        self.checkCMSSWVersion()
85 >        ### collect Data cards
86  
87 +        ### Temporary: added to remove input file control in the case of PU
88 +        self.dataset_pu = cfg_params.get('CMSSW.dataset_pu', None)
89  
90 <        ### collect Data cards
90 >        tmp =  cfg_params['CMSSW.datasetpath']
91 >        common.logger.log(10-1, "CMSSW::CMSSW(): datasetPath = "+tmp)
92  
93 <        if not cfg_params.has_key('CMSSW.datasetpath'):
93 >        if tmp =='':
94              msg = "Error: datasetpath not defined "
95              raise CrabException(msg)
96 <        tmp =  cfg_params['CMSSW.datasetpath']
68 <        log.debug(6, "CMSSW::CMSSW(): datasetPath = "+tmp)
69 <        if string.lower(tmp)=='none':
96 >        elif string.lower(tmp)=='none':
97              self.datasetPath = None
98              self.selectNoInput = 1
99 +            self.primaryDataset = 'null'
100          else:
101              self.datasetPath = tmp
102              self.selectNoInput = 0
103 +            ll = len(self.datasetPath.split("/"))
104 +            if (ll < 4):
105 +                msg = 'Your datasetpath has a invalid format ' + self.datasetPath + '\n'
106 +                msg += 'Expected a path in format /PRIMARY/PROCESSED/TIER1-TIER2 or /PRIMARY/PROCESSED/TIER/METHOD for ADS'
107 +                raise CrabException(msg)
108 +            self.primaryDataset = self.datasetPath.split("/")[1]
109 +            self.dataTier = self.datasetPath.split("/")[2]
110  
111 <        self.dataTiers = []
111 >        # Analysis dataset is primary/processed/tier/definition
112 >        self.ads = False
113 >        if self.datasetPath:
114 >            self.ads = len(self.datasetPath.split("/")) > 4
115 >        self.lumiMask = self.cfg_params.get('CMSSW.lumi_mask',None)
116 >        self.lumiParams = self.cfg_params.get('CMSSW.total_number_of_lumis',None) or \
117 >                          self.cfg_params.get('CMSSW.lumis_per_job',None)
118 >
119 >        # FUTURE: Can remove this check
120 >        if self.ads and self.CMSSW_major < 3:
121 >            common.logger.info('Warning: Analysis dataset support is incomplete in CMSSW 2_x.')
122 >            common.logger.info('  Only file level, not lumi level, granularity is supported.')
123 >
124 >        self.debugWrap=''
125 >        self.debug_wrapper = int(cfg_params.get('USER.debug_wrapper',0))
126 >        if self.debug_wrapper == 1: self.debugWrap='--debug'
127  
128          ## now the application
129 +        self.managedGenerators = ['madgraph', 'comphep', 'lhe']
130 +        self.generator = cfg_params.get('CMSSW.generator','pythia').lower()
131          self.executable = cfg_params.get('CMSSW.executable','cmsRun')
132 <        log.debug(6, "CMSSW::CMSSW(): executable = "+self.executable)
132 >        common.logger.log(10-1, "CMSSW::CMSSW(): executable = "+self.executable)
133  
134          if not cfg_params.has_key('CMSSW.pset'):
135              raise CrabException("PSet file missing. Cannot run cmsRun ")
136          self.pset = cfg_params['CMSSW.pset']
137 <        log.debug(6, "Cmssw::Cmssw(): PSet file = "+self.pset)
137 >        common.logger.log(10-1, "Cmssw::Cmssw(): PSet file = "+self.pset)
138          if self.pset.lower() != 'none' :
139              if (not os.path.exists(self.pset)):
140                  raise CrabException("User defined PSet file "+self.pset+" does not exist")
# Line 97 | Line 149 | class Cmssw(JobType):
149          self.output_file_sandbox.append(self.fjrFileName)
150  
151          # other output files to be returned via sandbox or copied to SE
152 +        outfileflag = False
153          self.output_file = []
154          tmp = cfg_params.get('CMSSW.output_file',None)
155          if tmp :
156 <            tmpOutFiles = string.split(tmp,',')
157 <            log.debug(7, 'cmssw::cmssw(): output files '+str(tmpOutFiles))
105 <            for tmp in tmpOutFiles:
106 <                tmp=string.strip(tmp)
107 <                self.output_file.append(tmp)
108 <                pass
109 <        else:
110 <            log.message("No output file defined: only stdout/err and the CRAB Framework Job Report will be available\n")
111 <        pass
156 >            self.output_file = [x.strip() for x in tmp.split(',')]
157 >            outfileflag = True #output found
158  
113        # script_exe file as additional file in inputSandbox
159          self.scriptExe = cfg_params.get('USER.script_exe',None)
160          if self.scriptExe :
161              if not os.path.isfile(self.scriptExe):
# Line 118 | Line 163 | class Cmssw(JobType):
163                  raise CrabException(msg)
164              self.additional_inbox_files.append(string.strip(self.scriptExe))
165  
166 <        #CarlosDaniele
166 >        self.AdditionalArgs = cfg_params.get('USER.script_arguments',None)
167 >        if self.AdditionalArgs : self.AdditionalArgs = string.replace(self.AdditionalArgs,',',' ')
168 >
169          if self.datasetPath == None and self.pset == None and self.scriptExe == '' :
170              msg ="Error. script_exe  not defined"
171              raise CrabException(msg)
172  
173 +        # use parent files...
174 +        self.useParent = int(self.cfg_params.get('CMSSW.use_parent',0))
175 +
176          ## additional input files
177          if cfg_params.has_key('USER.additional_input_files'):
178              tmpAddFiles = string.split(cfg_params['USER.additional_input_files'],',')
# Line 141 | Line 191 | class Cmssw(JobType):
191                      if not os.path.exists(file):
192                          raise CrabException("Additional input file not found: "+file)
193                      pass
144                    # fname = string.split(file, '/')[-1]
145                    # storedFile = common.work_space.pathForTgz()+'share/'+fname
146                    # shutil.copyfile(file, storedFile)
194                      self.additional_inbox_files.append(string.strip(file))
195                  pass
196              pass
197 <            common.logger.debug(5,"Additional input files: "+str(self.additional_inbox_files))
197 >            common.logger.debug("Additional input files: "+str(self.additional_inbox_files))
198          pass
199  
153        ## Events per job
154        if cfg_params.has_key('CMSSW.events_per_job'):
155            self.eventsPerJob =int( cfg_params['CMSSW.events_per_job'])
156            self.selectEventsPerJob = 1
157        else:
158            self.eventsPerJob = -1
159            self.selectEventsPerJob = 0
160
161        ## number of jobs
162        if cfg_params.has_key('CMSSW.number_of_jobs'):
163            self.theNumberOfJobs =int( cfg_params['CMSSW.number_of_jobs'])
164            self.selectNumberOfJobs = 1
165        else:
166            self.theNumberOfJobs = 0
167            self.selectNumberOfJobs = 0
168
169        if cfg_params.has_key('CMSSW.total_number_of_events'):
170            self.total_number_of_events = int(cfg_params['CMSSW.total_number_of_events'])
171            self.selectTotalNumberEvents = 1
172        else:
173            self.total_number_of_events = 0
174            self.selectTotalNumberEvents = 0
175
176        if self.pset != None: #CarlosDaniele
177             if ( (self.selectTotalNumberEvents + self.selectEventsPerJob + self.selectNumberOfJobs) != 2 ):
178                 msg = 'Must define exactly two of total_number_of_events, events_per_job, or number_of_jobs.'
179                 raise CrabException(msg)
180        else:
181             if (self.selectNumberOfJobs == 0):
182                 msg = 'Must specify  number_of_jobs.'
183                 raise CrabException(msg)
200  
201          ## New method of dealing with seeds
202          self.incrementSeeds = []
# Line 196 | Line 212 | class Cmssw(JobType):
212                  tmp.strip()
213                  self.incrementSeeds.append(tmp)
214  
215 <        ## Old method of dealing with seeds
200 <        ## FUTURE: This is for old CMSSW and old CRAB. Can throw exceptions after a couple of CRAB releases and then
201 <        ## remove
202 <        self.sourceSeed = cfg_params.get('CMSSW.pythia_seed',None)
203 <        if self.sourceSeed:
204 <            print "pythia_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
205 <            self.incrementSeeds.append('sourceSeed')
206 <
207 <        self.sourceSeedVtx = cfg_params.get('CMSSW.vtx_seed',None)
208 <        if self.sourceSeedVtx:
209 <            print "vtx_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
210 <            self.incrementSeeds.append('VtxSmeared')
211 <
212 <        self.sourceSeedG4 = cfg_params.get('CMSSW.g4_seed',None)
213 <        if self.sourceSeedG4:
214 <            print "g4_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
215 <            self.incrementSeeds.append('g4SimHits')
216 <
217 <        self.sourceSeedMix = cfg_params.get('CMSSW.mix_seed',None)
218 <        if self.sourceSeedMix:
219 <            print "mix_seed is a deprecated parameter. Use preserve_seeds or increment_seeds in the future.\n","Added to increment_seeds."
220 <            self.incrementSeeds.append('mix')
221 <
222 <        self.firstRun = cfg_params.get('CMSSW.first_run',None)
223 <
224 <        if self.pset != None: #CarlosDaniele
225 <            import PsetManipulator as pp
226 <            PsetEdit = pp.PsetManipulator(self.pset) #Daniele Pset
227 <
228 <        # Copy/return
229 <
215 >        # Copy/return/publish
216          self.copy_data = int(cfg_params.get('USER.copy_data',0))
217          self.return_data = int(cfg_params.get('USER.return_data',0))
218 +        self.publish_data = int(cfg_params.get('USER.publish_data',0))
219 +        if (self.publish_data == 1):
220 +            if not cfg_params.has_key('USER.publish_data_name'):
221 +                raise CrabException('Cannot publish output data, because you did not specify USER.publish_data_name parameter in the crab.cfg file')
222 +            else:
223 +                self.processedDataset = cfg_params['USER.publish_data_name']
224  
225 +        self.conf = {}
226 +        self.conf['pubdata'] = None
227 +        # number of jobs requested to be created, limit obj splitting DD
228          #DBSDLS-start
229          ## Initialize the variables that are extracted from DBS/DLS and needed in other places of the code
230          self.maxEvents=0  # max events available   ( --> check the requested nb. of evts in Creator.py)
# Line 238 | Line 233 | class Cmssw(JobType):
233          ## Perform the data location and discovery (based on DBS/DLS)
234          ## SL: Don't if NONE is specified as input (pythia use case)
235          blockSites = {}
236 <        if self.datasetPath:
237 <            blockSites = self.DataDiscoveryAndLocation(cfg_params)
238 <        #DBSDLS-end
236 > #wmbs
237 >        self.automation = int(self.cfg_params.get('WMBS.automation',0))
238 >        if self.automation == 0:
239 >            if self.datasetPath:
240 >                blockSites = self.DataDiscoveryAndLocation(cfg_params)
241 >            #DBSDLS-end
242 >            self.conf['blockSites']=blockSites
243 >
244 >            ## Select Splitting
245 >            splitByRun = int(cfg_params.get('CMSSW.split_by_run',0))
246 >
247 >            if self.selectNoInput:
248 >                if self.pset == None:
249 >                    self.algo = 'ForScript'
250 >                else:
251 >                    self.algo = 'NoInput'
252 >                    self.conf['managedGenerators']=self.managedGenerators
253 >                    self.conf['generator']=self.generator
254 >            elif self.ads or self.lumiMask or self.lumiParams:
255 >                self.algo = 'LumiBased'
256 >                if splitByRun:
257 >                    msg = "Cannot combine split by run with lumi_mask, ADS, " \
258 >                          "or lumis_per_job. Use split by lumi mode instead."
259 >                    raise CrabException(msg)
260  
261 <        self.tgzNameWithPath = self.getTarBall(self.executable)
261 >            elif splitByRun ==1:
262 >                self.algo = 'RunBased'
263 >            else:
264 >                self.algo = 'EventBased'
265 >            common.logger.debug("Job splitting method: %s" % self.algo)
266 >
267 >            splitter = JobSplitter(self.cfg_params,self.conf)
268 >            self.dict = splitter.Algos()[self.algo]()
269 >
270 >        self.argsFile= '%s/arguments.xml'%common.work_space.shareDir()
271 >        self.rootArgsFilename= 'arguments'
272 >        # modify Pset only the first time
273 >        if isNew:
274 >            if self.pset != None: self.ModifyPset()
275 >
276 >            ## Prepare inputSandbox TarBall (only the first time)
277 >            self.tarNameWithPath = self.getTarBall(self.executable)
278 >
279 >
280 >    def ModifyPset(self):
281 >        import PsetManipulator as pp
282 >
283 >        # If pycfg_params set, fake out the config script
284 >        # to make it think it was called with those args
285 >        pycfg_params = self.cfg_params.get('CMSSW.pycfg_params',None)
286 >        if pycfg_params:
287 >            trueArgv = sys.argv
288 >            sys.argv = [self.pset]
289 >            sys.argv.extend(pycfg_params.split(' '))
290 >        PsetEdit = pp.PsetManipulator(self.pset)
291 >        if pycfg_params: # Restore original sys.argv
292 >            sys.argv = trueArgv
293 >
294 >        try:
295 >            # Add FrameworkJobReport to parameter-set, set max events.
296 >            # Reset later for data jobs by writeCFG which does all modifications
297 >            PsetEdit.maxEvent(1)
298 >            PsetEdit.skipEvent(0)
299 >            PsetEdit.psetWriter(self.configFilename())
300 >            ## If present, add TFileService to output files
301 >            if not int(self.cfg_params.get('CMSSW.skip_tfileservice_output',0)):
302 >                tfsOutput = PsetEdit.getTFileService()
303 >                if tfsOutput:
304 >                    if tfsOutput in self.output_file:
305 >                        common.logger.debug("Output from TFileService "+tfsOutput+" already in output files")
306 >                    else:
307 >                        outfileflag = True #output found
308 >                        self.output_file.append(tfsOutput)
309 >                        common.logger.info("Adding "+tfsOutput+" (from TFileService) to list of output files")
310 >                    pass
311 >                pass
312  
313 <        ## Select Splitting
314 <        if self.selectNoInput:
315 <            if self.pset == None: #CarlosDaniele
316 <                self.jobSplittingForScript()
313 >            # If requested, add PoolOutputModule to output files
314 >            ### FEDE FOR MULTI ###
315 >            #edmOutput = PsetEdit.getPoolOutputModule()
316 >            edmOutputDict = PsetEdit.getPoolOutputModule()
317 >            common.logger.debug("(test) edmOutputDict = "+str(edmOutputDict))
318 >            filter_dict = {}
319 >            for key in edmOutputDict.keys():
320 >                filter_dict[key]=edmOutputDict[key]['dataset']
321 >            common.logger.debug("(test) filter_dict for multi =  "+str(filter_dict))
322 >
323 >            #### in CMSSW.sh: export var_filter
324 >
325 >            self.var_filter = json.dumps(filter_dict)
326 >            common.logger.debug("(test) var_filter for multi =  "+self.var_filter)
327 >
328 >            edmOutput = edmOutputDict.keys()
329 >            if int(self.cfg_params.get('CMSSW.get_edm_output',0)):
330 >                if edmOutput:
331 >                    for outputFile in edmOutput:
332 >                        if outputFile in self.output_file:
333 >                            common.logger.debug("Output from PoolOutputModule "+outputFile+" already in output files")
334 >                        else:
335 >                            self.output_file.append(outputFile)
336 >                            common.logger.info("Adding "+outputFile+" (from PoolOutputModule) to list of output files")
337 >            # not requested, check anyhow to avoid accidental T2 overload
338              else:
339 <                self.jobSplittingNoInput()
340 <        else:
341 <            self.jobSplittingByBlocks(blockSites)
339 >                if edmOutput:
340 >                    missedFiles = []
341 >                    for outputFile in edmOutput:
342 >                        if outputFile not in self.output_file:
343 >                            missedFiles.append(outputFile)
344 >                    if missedFiles:
345 >                        msg  = "ERROR: PoolOutputModule(s) are present in your ParameteSet %s \n"%self.pset
346 >                        msg += "    but the file(s) produced ( %s ) are not in the list of output files\n" % ', '.join(missedFiles)
347 >                        msg += "WARNING: please remove them. If you want to keep them, add the file(s) to output_files or use CMSSW.get_edm_output = 1\n"
348 >                        if int(self.cfg_params.get('CMSSW.ignore_edm_output',0)):
349 >                            msg += "    CMSSW.ignore_edm_output==1 : Hope you know what you are doing...\n"
350 >                            common.logger.info(msg)
351 >                        else :
352 >                            raise CrabException(msg)
353  
354 <        # modify Pset
355 <        if self.pset != None: #CarlosDaniele
356 <            try:
357 <                # Add FrameworkJobReport to parameter-set, set max events.
358 <                # Reset later for data jobs by writeCFG which does all modifications
359 <                PsetEdit.addCrabFJR(self.fjrFileName)
360 <                PsetEdit.maxEvent(self.eventsPerJob)
361 <                PsetEdit.psetWriter(self.configFilename())
362 <            except:
363 <                msg='Error while manipuliating ParameterSet: exiting...'
354 >            if (PsetEdit.getBadFilesSetting()):
355 >                msg = "WARNING: You have set skipBadFiles to True. This will continue processing on some errors and you may not be notified."
356 >                common.logger.info(msg)
357 >
358 >        except CrabException, msg:
359 >            common.logger.info(str(msg))
360 >            msg='Error while manipulating ParameterSet (see previous message, if any): exiting...'
361 >            raise CrabException(msg)
362 >
363 >        valid = re.compile('^[\w\.\-]+$')
364 >        for fileName in self.output_file:
365 >            if not valid.match(fileName):
366 >                msg = "The file %s may only contain alphanumeric characters and -, _, ." % fileName
367                  raise CrabException(msg)
368  
369 +
370      def DataDiscoveryAndLocation(self, cfg_params):
371  
372          import DataDiscovery
373          import DataLocation
374 <        common.logger.debug(10,"CMSSW::DataDiscoveryAndLocation()")
374 >        common.logger.log(10-1,"CMSSW::DataDiscoveryAndLocation()")
375  
376          datasetPath=self.datasetPath
377  
378          ## Contact the DBS
379 <        common.logger.message("Contacting Data Discovery Services ...")
379 >        common.logger.info("Contacting Data Discovery Services ...")
380          try:
381 <            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params)
381 >            self.pubdata=DataDiscovery.DataDiscovery(datasetPath, cfg_params,self.skip_blocks)
382              self.pubdata.fetchDBSInfo()
383  
384          except DataDiscovery.NotExistingDatasetError, ex :
# Line 290 | Line 392 | class Cmssw(JobType):
392              raise CrabException(msg)
393  
394          self.filesbyblock=self.pubdata.getFiles()
395 <        self.eventsbyblock=self.pubdata.getEventsPerBlock()
294 <        self.eventsbyfile=self.pubdata.getEventsPerFile()
395 >        self.conf['pubdata']=self.pubdata
396  
397          ## get max number of events
398 <        self.maxEvents=self.pubdata.getMaxEvents() ##  self.maxEvents used in Creator.py
398 >        self.maxEvents=self.pubdata.getMaxEvents()
399  
400          ## Contact the DLS and build a list of sites hosting the fileblocks
401          try:
402              dataloc=DataLocation.DataLocation(self.filesbyblock.keys(),cfg_params)
403              dataloc.fetchDLSInfo()
404 +
405          except DataLocation.DataLocationError , ex:
406              msg = 'ERROR ***: failed Data Location in DLS \n %s '%ex.getErrorMessage()
407              raise CrabException(msg)
408  
409  
410 <        sites = dataloc.getSites()
410 >        unsorted_sites = dataloc.getSites()
411 >        sites = self.filesbyblock.fromkeys(self.filesbyblock,'')
412 >        for lfn in self.filesbyblock.keys():
413 >            if unsorted_sites.has_key(lfn):
414 >                sites[lfn]=unsorted_sites[lfn]
415 >            else:
416 >                sites[lfn]=[]
417 >
418 >        if len(sites)==0:
419 >            msg = 'ERROR ***: no location for any of the blocks of this dataset: \n\t %s \n'%datasetPath
420 >            msg += "\tMaybe the dataset is located only at T1's (or at T0), where analysis jobs are not allowed\n"
421 >            msg += "\tPlease check DataDiscovery page https://cmsweb.cern.ch/dbs_discovery/\n"
422 >            raise CrabException(msg)
423 >
424          allSites = []
425          listSites = sites.values()
426          for listSite in listSites:
427              for oneSite in listSite:
428                  allSites.append(oneSite)
429 <        allSites = self.uniquelist(allSites)
429 >        [allSites.append(it) for it in allSites if not allSites.count(it)]
430  
316        # screen output
317        common.logger.message("Requested dataset: " + datasetPath + " has " + str(self.maxEvents) + " events in " + str(len(self.filesbyblock.keys())) + " blocks.\n")
318
319        return sites
320
321  # to Be Removed  DS -- BL
322  #  def setArgsList(self, argsList):
323  #      self.argsList = argsList
324
325    def jobSplittingByBlocks(self, blockSites):
326        """
327        Perform job splitting. Jobs run over an integer number of files
328        and no more than one block.
329        ARGUMENT: blockSites: dictionary with blocks as keys and list of host sites as values
330        REQUIRES: self.selectTotalNumberEvents, self.selectEventsPerJob, self.selectNumberofJobs,
331                  self.total_number_of_events, self.eventsPerJob, self.theNumberOfJobs,
332                  self.maxEvents, self.filesbyblock
333        SETS: self.jobDestination - Site destination(s) for each job (a list of lists)
334              self.total_number_of_jobs - Total # of jobs
335              self.list_of_args - File(s) job will run on (a list of lists)
336        """
337
338        # ---- Handle the possible job splitting configurations ---- #
339        if (self.selectTotalNumberEvents):
340            totalEventsRequested = self.total_number_of_events
341        if (self.selectEventsPerJob):
342            eventsPerJobRequested = self.eventsPerJob
343            if (self.selectNumberOfJobs):
344                totalEventsRequested = self.theNumberOfJobs * self.eventsPerJob
345
346        # If user requested all the events in the dataset
347        if (totalEventsRequested == -1):
348            eventsRemaining=self.maxEvents
349        # If user requested more events than are in the dataset
350        elif (totalEventsRequested > self.maxEvents):
351            eventsRemaining = self.maxEvents
352            common.logger.message("Requested "+str(self.total_number_of_events)+ " events, but only "+str(self.maxEvents)+" events are available.")
353        # If user requested less events than are in the dataset
354        else:
355            eventsRemaining = totalEventsRequested
356
357        # If user requested more events per job than are in the dataset
358        if (self.selectEventsPerJob and eventsPerJobRequested > self.maxEvents):
359            eventsPerJobRequested = self.maxEvents
360
361        # For user info at end
362        totalEventCount = 0
363
364        if (self.selectTotalNumberEvents and self.selectNumberOfJobs):
365            eventsPerJobRequested = int(eventsRemaining/self.theNumberOfJobs)
366
367        if (self.selectNumberOfJobs):
368            common.logger.message("May not create the exact number_of_jobs requested.")
369
370        if ( self.ncjobs == 'all' ) :
371            totalNumberOfJobs = 999999999
372        else :
373            totalNumberOfJobs = self.ncjobs
374
375        blocks = blockSites.keys()
376        blockCount = 0
377        # Backup variable in case self.maxEvents counted events in a non-included block
378        numBlocksInDataset = len(blocks)
379
380        jobCount = 0
381        list_of_lists = []
382
383        # list tracking which jobs are in which jobs belong to which block
384        jobsOfBlock = {}
385
386        # ---- Iterate over the blocks in the dataset until ---- #
387        # ---- we've met the requested total # of events    ---- #
388        while ( (eventsRemaining > 0) and (blockCount < numBlocksInDataset) and (jobCount < totalNumberOfJobs)):
389            block = blocks[blockCount]
390            blockCount += 1
391            if block not in jobsOfBlock.keys() :
392                jobsOfBlock[block] = []
393
394            if self.eventsbyblock.has_key(block) :
395                numEventsInBlock = self.eventsbyblock[block]
396                common.logger.debug(5,'Events in Block File '+str(numEventsInBlock))
397
398                files = self.filesbyblock[block]
399                numFilesInBlock = len(files)
400                if (numFilesInBlock <= 0):
401                    continue
402                fileCount = 0
403
404                # ---- New block => New job ---- #
405                parString = ""
406                # counter for number of events in files currently worked on
407                filesEventCount = 0
408                # flag if next while loop should touch new file
409                newFile = 1
410                # job event counter
411                jobSkipEventCount = 0
412
413                # ---- Iterate over the files in the block until we've met the requested ---- #
414                # ---- total # of events or we've gone over all the files in this block  ---- #
415                while ( (eventsRemaining > 0) and (fileCount < numFilesInBlock) and (jobCount < totalNumberOfJobs) ):
416                    file = files[fileCount]
417                    if newFile :
418                        try:
419                            numEventsInFile = self.eventsbyfile[file]
420                            common.logger.debug(6, "File "+str(file)+" has "+str(numEventsInFile)+" events")
421                            # increase filesEventCount
422                            filesEventCount += numEventsInFile
423                            # Add file to current job
424                            parString += '\\\"' + file + '\\\"\,'
425                            newFile = 0
426                        except KeyError:
427                            common.logger.message("File "+str(file)+" has unknown number of events: skipping")
428
429                    eventsPerJobRequested = min(eventsPerJobRequested, eventsRemaining)
430                    # if less events in file remain than eventsPerJobRequested
431                    if ( filesEventCount - jobSkipEventCount < eventsPerJobRequested):
432                        # if last file in block
433                        if ( fileCount == numFilesInBlock-1 ) :
434                            # end job using last file, use remaining events in block
435                            # close job and touch new file
436                            fullString = parString[:-2]
437                            list_of_lists.append([fullString,str(-1),str(jobSkipEventCount)])
438                            common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(filesEventCount - jobSkipEventCount)+" events (last file in block).")
439                            self.jobDestination.append(blockSites[block])
440                            common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
441                            # fill jobs of block dictionary
442                            jobsOfBlock[block].append(jobCount+1)
443                            # reset counter
444                            jobCount = jobCount + 1
445                            totalEventCount = totalEventCount + filesEventCount - jobSkipEventCount
446                            eventsRemaining = eventsRemaining - filesEventCount + jobSkipEventCount
447                            jobSkipEventCount = 0
448                            # reset file
449                            parString = ""
450                            filesEventCount = 0
451                            newFile = 1
452                            fileCount += 1
453                        else :
454                            # go to next file
455                            newFile = 1
456                            fileCount += 1
457                    # if events in file equal to eventsPerJobRequested
458                    elif ( filesEventCount - jobSkipEventCount == eventsPerJobRequested ) :
459                        # close job and touch new file
460                        fullString = parString[:-2]
461                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
462                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
463                        self.jobDestination.append(blockSites[block])
464                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
465                        jobsOfBlock[block].append(jobCount+1)
466                        # reset counter
467                        jobCount = jobCount + 1
468                        totalEventCount = totalEventCount + eventsPerJobRequested
469                        eventsRemaining = eventsRemaining - eventsPerJobRequested
470                        jobSkipEventCount = 0
471                        # reset file
472                        parString = ""
473                        filesEventCount = 0
474                        newFile = 1
475                        fileCount += 1
476
477                    # if more events in file remain than eventsPerJobRequested
478                    else :
479                        # close job but don't touch new file
480                        fullString = parString[:-2]
481                        list_of_lists.append([fullString,str(eventsPerJobRequested),str(jobSkipEventCount)])
482                        common.logger.debug(3,"Job "+str(jobCount+1)+" can run over "+str(eventsPerJobRequested)+" events.")
483                        self.jobDestination.append(blockSites[block])
484                        common.logger.debug(5,"Job "+str(jobCount+1)+" Destination: "+str(self.jobDestination[jobCount]))
485                        jobsOfBlock[block].append(jobCount+1)
486                        # increase counter
487                        jobCount = jobCount + 1
488                        totalEventCount = totalEventCount + eventsPerJobRequested
489                        eventsRemaining = eventsRemaining - eventsPerJobRequested
490                        # calculate skip events for last file
491                        # use filesEventCount (contains several files), jobSkipEventCount and eventsPerJobRequest
492                        jobSkipEventCount = eventsPerJobRequested - (filesEventCount - jobSkipEventCount - self.eventsbyfile[file])
493                        # remove all but the last file
494                        filesEventCount = self.eventsbyfile[file]
495                        parString = '\\\"' + file + '\\\"\,'
496                    pass # END if
497                pass # END while (iterate over files in the block)
498        pass # END while (iterate over blocks in the dataset)
499        self.ncjobs = self.total_number_of_jobs = jobCount
500        if (eventsRemaining > 0 and jobCount < totalNumberOfJobs ):
501            common.logger.message("Could not run on all requested events because some blocks not hosted at allowed sites.")
502        common.logger.message(str(jobCount)+" job(s) can run on "+str(totalEventCount)+" events.\n")
431  
432          # screen output
433 <        screenOutput = "List of jobs and available destination sites:\n\n"
434 <
435 <        # keep trace of block with no sites to print a warning at the end
436 <        noSiteBlock = []
437 <        bloskNoSite = []
438 <
439 <        blockCounter = 0
512 <        for block in blocks:
513 <            if block in jobsOfBlock.keys() :
514 <                blockCounter += 1
515 <                screenOutput += "Block %5i: jobs %20s: sites: %s\n" % (blockCounter,spanRanges(jobsOfBlock[block]),
516 <                    ','.join(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)))
517 <                if len(self.blackWhiteListParser.checkWhiteList(self.blackWhiteListParser.checkBlackList(blockSites[block],block),block)) == 0:
518 <                    noSiteBlock.append( spanRanges(jobsOfBlock[block]) )
519 <                    bloskNoSite.append( blockCounter )
520 <
521 <        common.logger.message(screenOutput)
522 <        if len(noSiteBlock) > 0 and len(bloskNoSite) > 0:
523 <            msg = 'WARNING: No sites are hosting any part of data for block:\n                '
524 <            virgola = ""
525 <            if len(bloskNoSite) > 1:
526 <                virgola = ","
527 <            for block in bloskNoSite:
528 <                msg += ' ' + str(block) + virgola
529 <            msg += '\n               Related jobs:\n                 '
530 <            virgola = ""
531 <            if len(noSiteBlock) > 1:
532 <                virgola = ","
533 <            for range_jobs in noSiteBlock:
534 <                msg += str(range_jobs) + virgola
535 <            msg += '\n               will not be submitted and this block of data can not be analyzed!\n'
536 <            if self.cfg_params.has_key('EDG.se_white_list'):
537 <                msg += 'WARNING: SE White List: '+self.cfg_params['EDG.se_white_list']+'\n'
538 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
539 <                msg += 'Please check if the dataset is available at this site!)\n'
540 <            if self.cfg_params.has_key('EDG.ce_white_list'):
541 <                msg += 'WARNING: CE White List: '+self.cfg_params['EDG.ce_white_list']+'\n'
542 <                msg += '(Hint: By whitelisting you force the job to run at this particular site(s).\n'
543 <                msg += 'Please check if the dataset is available at this site!)\n'
544 <
545 <            common.logger.message(msg)
546 <
547 <        self.list_of_args = list_of_lists
548 <        return
549 <
550 <    def jobSplittingNoInput(self):
551 <        """
552 <        Perform job splitting based on number of event per job
553 <        """
554 <        common.logger.debug(5,'Splitting per events')
555 <
556 <        if (self.selectEventsPerJob):
557 <            common.logger.message('Required '+str(self.eventsPerJob)+' events per job ')
558 <        if (self.selectNumberOfJobs):
559 <            common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
560 <        if (self.selectTotalNumberEvents):
561 <            common.logger.message('Required '+str(self.total_number_of_events)+' events in total ')
562 <
563 <        if (self.total_number_of_events < 0):
564 <            msg='Cannot split jobs per Events with "-1" as total number of events'
565 <            raise CrabException(msg)
566 <
567 <        if (self.selectEventsPerJob):
568 <            if (self.selectTotalNumberEvents):
569 <                self.total_number_of_jobs = int(self.total_number_of_events/self.eventsPerJob)
570 <            elif(self.selectNumberOfJobs) :
571 <                self.total_number_of_jobs =self.theNumberOfJobs
572 <                self.total_number_of_events =int(self.theNumberOfJobs*self.eventsPerJob)
573 <
574 <        elif (self.selectNumberOfJobs) :
575 <            self.total_number_of_jobs = self.theNumberOfJobs
576 <            self.eventsPerJob = int(self.total_number_of_events/self.total_number_of_jobs)
577 <
578 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
579 <
580 <        # is there any remainder?
581 <        check = int(self.total_number_of_events) - (int(self.total_number_of_jobs)*self.eventsPerJob)
582 <
583 <        common.logger.debug(5,'Check  '+str(check))
584 <
585 <        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created, each for '+str(self.eventsPerJob)+' for a total of '+str(self.total_number_of_jobs*self.eventsPerJob)+' events')
586 <        if check > 0:
587 <            common.logger.message('Warning: asked '+str(self.total_number_of_events)+' but can do only '+str(int(self.total_number_of_jobs)*self.eventsPerJob))
588 <
589 <        # argument is seed number.$i
590 <        self.list_of_args = []
591 <        for i in range(self.total_number_of_jobs):
592 <            ## Since there is no input, any site is good
593 <            self.jobDestination.append([""]) #must be empty to write correctly the xml
594 <            args=[]
595 <            if (self.firstRun):
596 <                ## pythia first run
597 <                args.append(str(self.firstRun)+str(i))
598 <            self.list_of_args.append(args)
599 <
600 <        return
601 <
602 <
603 <    def jobSplittingForScript(self):#CarlosDaniele
604 <        """
605 <        Perform job splitting based on number of job
606 <        """
607 <        common.logger.debug(5,'Splitting per job')
608 <        common.logger.message('Required '+str(self.theNumberOfJobs)+' jobs in total ')
609 <
610 <        self.total_number_of_jobs = self.theNumberOfJobs
433 >        if self.ads or self.lumiMask:
434 >            common.logger.info("Requested (A)DS %s has %s block(s)." %
435 >                               (datasetPath, len(self.filesbyblock.keys())))
436 >        else:
437 >            common.logger.info("Requested dataset: " + datasetPath + \
438 >                " has " + str(self.maxEvents) + " events in " + \
439 >                str(len(self.filesbyblock.keys())) + " blocks.\n")
440  
441 <        common.logger.debug(5,'N jobs  '+str(self.total_number_of_jobs))
441 >        return sites
442  
614        common.logger.message(str(self.total_number_of_jobs)+' jobs can be created')
443  
444 <        # argument is seed number.$i
617 <        self.list_of_args = []
618 <        for i in range(self.total_number_of_jobs):
619 <            ## Since there is no input, any site is good
620 <           # self.jobDestination.append(["Any"])
621 <            self.jobDestination.append([""])
622 <            ## no random seed
623 <            self.list_of_args.append([str(i)])
624 <        return
444 >    def split(self, jobParams,firstJobID):
445  
446 <    def split(self, jobParams):
446 >        jobParams = self.dict['args']
447 >        njobs = self.dict['njobs']
448 >        self.jobDestination = self.dict['jobDestination']
449 >
450 >        if njobs == 0:
451 >            raise CrabException("Asked to split zero jobs: aborting")
452 >        if not self.server and not self.local and njobs > 500:
453 >            raise CrabException("The CRAB client will not submit more than 500 jobs. You must use the server mode.")
454  
628        #### Fabio
629        njobs = self.total_number_of_jobs
630        arglist = self.list_of_args
455          # create the empty structure
456          for i in range(njobs):
457              jobParams.append("")
458  
459          listID=[]
460          listField=[]
461 <        for job in range(njobs):
462 <            jobParams[job] = arglist[job]
461 >        listDictions=[]
462 >        exist= os.path.exists(self.argsFile)
463 >        for id in range(njobs):
464 >            job = id + int(firstJobID)
465              listID.append(job+1)
466              job_ToSave ={}
467              concString = ' '
468              argu=''
469 <            if len(jobParams[job]):
470 <                argu +=   concString.join(jobParams[job] )
471 <            job_ToSave['arguments']= str(job+1)+' '+argu## new BL--DS
472 <            job_ToSave['dlsDestination']= self.jobDestination[job]## new BL--DS
473 <            #common._db.updateJob_(job,job_ToSave)## new BL--DS
469 >            str_argu = str(job+1)
470 >            if len(jobParams[id]):
471 >                argu = {'JobID': job+1}
472 >                for i in range(len(jobParams[id])):
473 >                    argu[self.dict['params'][i]]=jobParams[id][i]
474 >                    if len(jobParams[id])==1: self.NumEvents = jobParams[id][i]
475 >                # just for debug
476 >                str_argu += concString.join(jobParams[id])
477 >            if argu != '': listDictions.append(argu)
478 >            job_ToSave['arguments']= '%d %d'%( (job+1), 0)
479 >            job_ToSave['dlsDestination']= self.jobDestination[id]
480              listField.append(job_ToSave)
481 <            msg="Job "+str(job)+" Arguments:   "+str(job+1)+" "+argu+"\n"  \
482 <            +"                     Destination: "+str(self.jobDestination[job])
483 <            common.logger.debug(5,msg)
484 <            #common.logger.debug(5,"Job "+str(job)+" Destination: "+str(self.jobDestination[job]))
485 <        common._db.updateJob_(listID,listField)## new BL--DS
486 <        ## Pay Attention Here....DS--BL
487 <        self.argsList = (len(jobParams[1])+1)
481 >            from ProdCommon.SiteDB.CmsSiteMapper import CmsSEMap
482 >            cms_se = CmsSEMap()
483 >            msg="Job  %s  Arguments:  %s\n"%(str(job+1),str_argu)
484 >            msg+="\t  Destination: %s "%(str(self.jobDestination[id]))
485 >            SEDestination = [cms_se[dest] for dest in self.jobDestination[id]]
486 >            msg+="\t  CMSDestination: %s "%(str(SEDestination))
487 >            common.logger.log(10-1,msg)
488 >        # write xml
489 >        if len(listDictions):
490 >            if exist==False: self.CreateXML()
491 >            self.addEntry(listDictions)
492 >        common._db.updateJob_(listID,listField)
493 >        return
494 >
495 >    def CreateXML(self):
496 >        """
497 >        """
498 >        result = IMProvNode( self.rootArgsFilename )
499 >        outfile = file( self.argsFile, 'w').write(str(result))
500 >        return
501  
502 +    def addEntry(self, listDictions):
503 +        """
504 +        _addEntry_
505 +
506 +        add an entry to the xml file
507 +        """
508 +        ## load xml
509 +        improvDoc = loadIMProvFile(self.argsFile)
510 +        entrname= 'Job'
511 +        for dictions in listDictions:
512 +           report = IMProvNode(entrname , None, **dictions)
513 +           improvDoc.addNode(report)
514 +        outfile = file( self.argsFile, 'w').write(str(improvDoc))
515          return
516  
517      def numberOfJobs(self):
518 <        # Fabio
519 <        return self.total_number_of_jobs
518 > #wmbs
519 >        if self.automation==0:
520 >           return self.dict['njobs']
521 >        else:
522 >           return None
523  
524      def getTarBall(self, exe):
525          """
526          Return the TarBall with lib and exe
527          """
528 <
668 <        # if it exist, just return it
669 <        #
670 <        # Marco. Let's start to use relative path for Boss XML files
671 <        #
672 <        self.tgzNameWithPath = common.work_space.pathForTgz()+'share/'+self.tgz_name
528 >        self.tgzNameWithPath = common.work_space.pathForTgz()+self.tgz_name
529          if os.path.exists(self.tgzNameWithPath):
530              return self.tgzNameWithPath
531  
# Line 682 | Line 538 | class Cmssw(JobType):
538  
539          # First of all declare the user Scram area
540          swArea = self.scram.getSWArea_()
685        #print "swArea = ", swArea
686        # swVersion = self.scram.getSWVersion()
687        # print "swVersion = ", swVersion
541          swReleaseTop = self.scram.getReleaseTop_()
689        #print "swReleaseTop = ", swReleaseTop
542  
543          ## check if working area is release top
544          if swReleaseTop == '' or swArea == swReleaseTop:
545 <            common.logger.debug(3,"swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
545 >            common.logger.debug("swArea = "+swArea+" swReleaseTop ="+swReleaseTop)
546              return
547  
548          import tarfile
# Line 705 | Line 557 | class Cmssw(JobType):
557                  ## then check if it's private or not
558                  if exeWithPath.find(swReleaseTop) == -1:
559                      # the exe is private, so we must ship
560 <                    common.logger.debug(5,"Exe "+exeWithPath+" to be tarred")
560 >                    common.logger.debug("Exe "+exeWithPath+" to be tarred")
561                      path = swArea+'/'
562                      # distinguish case when script is in user project area or given by full path somewhere else
563                      if exeWithPath.find(path) >= 0 :
# Line 719 | Line 571 | class Cmssw(JobType):
571                      pass
572  
573              ## Now get the libraries: only those in local working area
574 +            tar.dereference=True
575              libDir = 'lib'
576              lib = swArea+'/' +libDir
577 <            common.logger.debug(5,"lib "+lib+" to be tarred")
577 >            common.logger.debug("lib "+lib+" to be tarred")
578              if os.path.exists(lib):
579                  tar.add(lib,libDir)
580  
# Line 730 | Line 583 | class Cmssw(JobType):
583              module = swArea + '/' + moduleDir
584              if os.path.isdir(module):
585                  tar.add(module,moduleDir)
586 +            tar.dereference=False
587  
588              ## Now check if any data dir(s) is present
589 <            swAreaLen=len(swArea)
590 <            for root, dirs, files in os.walk(swArea):
591 <                if "data" in dirs:
592 <                    common.logger.debug(5,"data "+root+"/data"+" to be tarred")
593 <                    tar.add(root+"/data",root[swAreaLen:]+"/data")
589 >            self.dataExist = False
590 >            todo_list = [(i, i) for i in  os.listdir(swArea+"/src")]
591 >            while len(todo_list):
592 >                entry, name = todo_list.pop()
593 >                if name.startswith('crab_0_') or  name.startswith('.') or name == 'CVS':
594 >                    continue
595 >                if os.path.isdir(swArea+"/src/"+entry):
596 >                    entryPath = entry + '/'
597 >                    todo_list += [(entryPath + i, i) for i in  os.listdir(swArea+"/src/"+entry)]
598 >                    if name == 'data':
599 >                        self.dataExist=True
600 >                        common.logger.debug("data "+entry+" to be tarred")
601 >                        tar.add(swArea+"/src/"+entry,"src/"+entry)
602 >                    pass
603 >                pass
604 >
605 >            ### CMSSW ParameterSet
606 >            if not self.pset is None:
607 >                cfg_file = common.work_space.jobDir()+self.configFilename()
608 >                pickleFile = common.work_space.jobDir()+self.configFilename() + '.pkl'
609 >                tar.add(cfg_file,self.configFilename())
610 >                tar.add(pickleFile,self.configFilename() + '.pkl')
611  
612 +            try:
613 +                crab_cfg_file = common.work_space.shareDir()+'/crab.cfg'
614 +                tar.add(crab_cfg_file,'crab.cfg')
615 +            except:
616 +                pass
617  
618              ## Add ProdCommon dir to tar
619 <            prodcommonDir = 'ProdCommon'
620 <            prodcommonPath = os.environ['CRABDIR'] + '/' + 'ProdCommon'
621 <            if os.path.isdir(prodcommonPath):
622 <                tar.add(prodcommonPath,prodcommonDir)
619 >            prodcommonDir = './'
620 >            prodcommonPath = os.environ['CRABDIR'] + '/' + 'external/'
621 >            neededStuff = ['ProdCommon/__init__.py','ProdCommon/FwkJobRep', 'ProdCommon/CMSConfigTools', \
622 >                           'ProdCommon/Core', 'ProdCommon/MCPayloads', 'IMProv', 'ProdCommon/Storage', \
623 >                           'WMCore/__init__.py','WMCore/Algorithms']
624 >            for file in neededStuff:
625 >                tar.add(prodcommonPath+file,prodcommonDir+file)
626 >
627 >            ##### ML stuff
628 >            ML_file_list=['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py']
629 >            path=os.environ['CRABDIR'] + '/python/'
630 >            for file in ML_file_list:
631 >                tar.add(path+file,file)
632 >
633 >            ##### Utils
634 >            Utils_file_list=['parseCrabFjr.py','writeCfg.py', 'fillCrabFjr.py','cmscp.py']
635 >            for file in Utils_file_list:
636 >                tar.add(path+file,file)
637 >
638 >            ##### AdditionalFiles
639 >            tar.dereference=True
640 >            for file in self.additional_inbox_files:
641 >                tar.add(file,string.split(file,'/')[-1])
642 >            tar.dereference=False
643 >            common.logger.log(10-1,"Files in "+self.tgzNameWithPath+" : "+str(tar.getnames()))
644  
748            common.logger.debug(5,"Files added to "+self.tgzNameWithPath+" : "+str(tar.getnames()))
645              tar.close()
646 <        except :
647 <            raise CrabException('Could not create tar-ball')
646 >        except IOError, exc:
647 >            msg = 'Could not create tar-ball %s \n'%self.tgzNameWithPath
648 >            msg += str(exc)
649 >            raise CrabException(msg)
650 >        except tarfile.TarError, exc:
651 >            msg = 'Could not create tar-ball %s \n'%self.tgzNameWithPath
652 >            msg += str(exc)
653 >            raise CrabException(msg)
654  
753        ## check for tarball size
655          tarballinfo = os.stat(self.tgzNameWithPath)
656          if ( tarballinfo.st_size > self.MaxTarBallSize*1024*1024 ) :
657 <            raise CrabException('Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + str(self.MaxTarBallSize) + ' MB input sandbox limit and not supported by the used GRID submission system. Please make sure that no unnecessary files are in all data directories in your local CMSSW project area as they are automatically packed into the input sandbox.')
657 >            if not self.server:
658 >                msg  = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' + \
659 >                         str(self.MaxTarBallSize) +'MB input sandbox limit \n'
660 >                msg += '      and not supported by the direct GRID submission system.\n'
661 >                msg += '      Please use the CRAB server mode by setting server_name=<NAME> in section [CRAB] of your crab.cfg.\n'
662 >                msg += '      For further infos please see https://twiki.cern.ch/twiki/bin/view/CMS/SWGuideCrabServerForUsers#Server_available_for_users'
663 >            else:
664 >                msg  = 'Input sandbox size of ' + str(float(tarballinfo.st_size)/1024.0/1024.0) + ' MB is larger than the allowed ' +  \
665 >                        str(self.MaxTarBallSize) +'MB input sandbox limit in the server.'
666 >            raise CrabException(msg)
667  
668          ## create tar-ball with ML stuff
759        self.MLtgzfile =  common.work_space.pathForTgz()+'share/MLfiles.tgz'
760        try:
761            tar = tarfile.open(self.MLtgzfile, "w:gz")
762            path=os.environ['CRABDIR'] + '/python/'
763            for file in ['report.py', 'DashboardAPI.py', 'Logger.py', 'ProcInfo.py', 'apmon.py', 'parseCrabFjr.py','writeCfg.py', 'JobReportErrorCode.py']:
764                tar.add(path+file,file)
765            common.logger.debug(5,"Files added to "+self.MLtgzfile+" : "+str(tar.getnames()))
766            tar.close()
767        except :
768            raise CrabException('Could not create ML files tar-ball')
769
770        return
771
772    def additionalInputFileTgz(self):
773        """
774        Put all additional files into a tar ball and return its name
775        """
776        import tarfile
777        tarName=  common.work_space.pathForTgz()+'share/'+self.additional_tgz_name
778        tar = tarfile.open(tarName, "w:gz")
779        for file in self.additional_inbox_files:
780            tar.add(file,string.split(file,'/')[-1])
781        common.logger.debug(5,"Files added to "+self.additional_tgz_name+" : "+str(tar.getnames()))
782        tar.close()
783        return tarName
669  
670      def wsSetupEnvironment(self, nj=0):
671          """
672          Returns part of a job script which prepares
673          the execution environment for the job 'nj'.
674          """
675 +        psetName = 'pset.py'
676 +
677          # Prepare JobType-independent part
678          txt = '\n#Written by cms_cmssw::wsSetupEnvironment\n'
679          txt += 'echo ">>> setup environment"\n'
680 <        txt += 'if [ $middleware == LCG ]; then \n'
680 >        txt += 'echo "set SCRAM ARCH to ' + self.executable_arch + '"\n'
681 >        txt += 'export SCRAM_ARCH=' + self.executable_arch + '\n'
682 >        txt += 'echo "SCRAM_ARCH = $SCRAM_ARCH"\n'
683 >        txt += 'if [ $middleware == LCG ] || [ $middleware == CAF ] || [ $middleware == LSF ]; then \n'
684          txt += self.wsSetupCMSLCGEnvironment_()
685          txt += 'elif [ $middleware == OSG ]; then\n'
686          txt += '    WORKING_DIR=`/bin/mktemp  -d $OSG_WN_TMP/cms_XXXXXXXXXXXX`\n'
# Line 805 | Line 695 | class Cmssw(JobType):
695          txt += '    cd $WORKING_DIR\n'
696          txt += '    echo ">>> current directory (WORKING_DIR): $WORKING_DIR"\n'
697          txt += self.wsSetupCMSOSGEnvironment_()
698 <        #txt += '    echo "### Set SCRAM ARCH to ' + self.executable_arch + ' ###"\n'
699 <        #txt += '    export SCRAM_ARCH='+self.executable_arch+'\n'
698 >        #Setup SGE Environment
699 >        txt += 'elif [ $middleware == SGE ]; then\n'
700 >        txt += self.wsSetupCMSLCGEnvironment_()
701 >
702 >        txt += 'elif [ $middleware == ARC ]; then\n'
703 >        txt += self.wsSetupCMSLCGEnvironment_()
704 >
705 >        #Setup PBS Environment
706 >        txt += 'elif [ $middleware == PBS ]; then\n'
707 >        txt += self.wsSetupCMSLCGEnvironment_()
708 >
709          txt += 'fi\n'
710  
711          # Prepare JobType-specific part
# Line 822 | Line 721 | class Cmssw(JobType):
721          txt += '    func_exit\n'
722          txt += 'fi \n'
723          txt += 'cd '+self.version+'\n'
724 <        ########## FEDE FOR DBS2 ######################
826 <        txt += 'SOFTWARE_DIR=`pwd`\n'
724 >        txt += 'SOFTWARE_DIR=`pwd`; export SOFTWARE_DIR\n'
725          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
828        ###############################################
829        ### needed grep for bug in scramv1 ###
726          txt += 'eval `'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME`\n'
727 +        txt += 'if [ $? != 0 ] ; then\n'
728 +        txt += '    echo "ERROR ==> Problem with the command: "\n'
729 +        txt += '    echo "eval \`'+scram+' runtime -sh | grep -v SCRAMRT_LSB_JOBNAME \` at `hostname`"\n'
730 +        txt += '    job_exit_code=10034\n'
731 +        txt += '    func_exit\n'
732 +        txt += 'fi \n'
733          # Handle the arguments:
734          txt += "\n"
735 <        txt += "## number of arguments (first argument always jobnumber)\n"
735 >        txt += "## number of arguments (first argument always jobnumber, the second is the resubmission number)\n"
736          txt += "\n"
737          txt += "if [ $nargs -lt "+str(self.argsList)+" ]\n"
738          txt += "then\n"
# Line 842 | Line 744 | class Cmssw(JobType):
744  
745          # Prepare job-specific part
746          job = common.job_list[nj]
845        ### FEDE FOR DBS OUTPUT PUBLICATION
747          if (self.datasetPath):
748              txt += '\n'
749              txt += 'DatasetPath='+self.datasetPath+'\n'
750  
751 <            datasetpath_split = self.datasetPath.split("/")
752 <
852 <            txt += 'PrimaryDataset='+datasetpath_split[1]+'\n'
853 <            txt += 'DataTier='+datasetpath_split[2]+'\n'
751 >            txt += 'PrimaryDataset='+self.primaryDataset +'\n'
752 >            txt += 'DataTier='+self.dataTier+'\n'
753              txt += 'ApplicationFamily=cmsRun\n'
754  
755          else:
# Line 860 | Line 759 | class Cmssw(JobType):
759              txt += 'ApplicationFamily=MCDataTier\n'
760          if self.pset != None:
761              pset = os.path.basename(job.configFilename())
762 +            pkl  = os.path.basename(job.configFilename()) + '.pkl'
763              txt += '\n'
764              txt += 'cp  $RUNTIME_AREA/'+pset+' .\n'
765 <            if (self.datasetPath): # standard job
866 <                txt += 'InputFiles=${args[1]}; export InputFiles\n'
867 <                txt += 'MaxEvents=${args[2]}; export MaxEvents\n'
868 <                txt += 'SkipEvents=${args[3]}; export SkipEvents\n'
869 <                txt += 'echo "Inputfiles:<$InputFiles>"\n'
870 <                txt += 'echo "MaxEvents:<$MaxEvents>"\n'
871 <                txt += 'echo "SkipEvents:<$SkipEvents>"\n'
872 <            else:  # pythia like job
873 <                txt += 'PreserveSeeds='  + ','.join(self.preserveSeeds)  + '; export PreserveSeeds\n'
874 <                txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
875 <                txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
876 <                txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
877 <                if (self.firstRun):
878 <                    txt += 'FirstRun=${args[1]}; export FirstRun\n'
879 <                    txt += 'echo "FirstRun: <$FirstRun>"\n'
880 <
881 <            txt += 'mv -f '+pset+' pset.cfg\n'
882 <
883 <        if len(self.additional_inbox_files) > 0:
884 <            txt += 'if [ -e $RUNTIME_AREA/'+self.additional_tgz_name+' ] ; then\n'
885 <            txt += '  tar xzvf $RUNTIME_AREA/'+self.additional_tgz_name+'\n'
886 <            txt += 'fi\n'
887 <            pass
765 >            txt += 'cp  $RUNTIME_AREA/'+pkl+' .\n'
766  
767 <        if self.pset != None:
768 <            txt += '\n'
769 <            txt += 'echo "***** cat pset.cfg *********"\n'
770 <            txt += 'cat pset.cfg\n'
771 <            txt += 'echo "****** end pset.cfg ********"\n'
772 <            txt += '\n'
773 <            txt += 'PSETHASH=`EdmConfigHash < pset.cfg` \n'
774 <            txt += 'echo "PSETHASH = $PSETHASH" \n'
767 >            txt += 'PreserveSeeds='  + ','.join(self.preserveSeeds)  + '; export PreserveSeeds\n'
768 >            txt += 'IncrementSeeds=' + ','.join(self.incrementSeeds) + '; export IncrementSeeds\n'
769 >            txt += 'echo "PreserveSeeds: <$PreserveSeeds>"\n'
770 >            txt += 'echo "IncrementSeeds:<$IncrementSeeds>"\n'
771 >
772 >            txt += 'mv -f ' + pset + ' ' + psetName + '\n'
773 >            if self.var_filter:
774 >                #print "self.var_filter = ",self.var_filter
775 >                txt += "export var_filter="+"'"+self.var_filter+"'\n"
776 >                txt += 'echo $var_filter'
777 >        else:
778              txt += '\n'
779 +            if self.AdditionalArgs: txt += 'export AdditionalArgs=\"%s\"\n'%(self.AdditionalArgs)
780 +            if int(self.NumEvents) != 0: txt += 'export MaxEvents=%s\n'%str(self.NumEvents)
781          return txt
782  
783      def wsUntarSoftware(self, nj=0):
# Line 906 | Line 789 | class Cmssw(JobType):
789          txt = '\n#Written by cms_cmssw::wsUntarSoftware\n'
790  
791          if os.path.isfile(self.tgzNameWithPath):
792 <            txt += 'echo ">>> tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
793 <            txt += 'tar xzvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
792 >            txt += 'echo ">>> tar xzf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+' :" \n'
793 >            if  self.debug_wrapper==1 :
794 >                txt += 'tar zxvf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
795 >                txt += 'ls -Al \n'
796 >            else:
797 >                txt += 'tar zxf $RUNTIME_AREA/'+os.path.basename(self.tgzNameWithPath)+'\n'
798              txt += 'untar_status=$? \n'
799              txt += 'if [ $untar_status -ne 0 ]; then \n'
800              txt += '   echo "ERROR ==> Untarring .tgz file failed"\n'
# Line 917 | Line 804 | class Cmssw(JobType):
804              txt += '   echo "Successful untar" \n'
805              txt += 'fi \n'
806              txt += '\n'
807 <            txt += 'echo ">>> Include ProdCommon in PYTHONPATH:"\n'
807 >            txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
808              txt += 'if [ -z "$PYTHONPATH" ]; then\n'
809 <            txt += '   export PYTHONPATH=$RUNTIME_AREA/ProdCommon\n'
809 >            txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
810              txt += 'else\n'
811 <            txt += '   export PYTHONPATH=$RUNTIME_AREA/ProdCommon:${PYTHONPATH}\n'
811 >            txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
812              txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
813              txt += 'fi\n'
814              txt += '\n'
# Line 942 | Line 829 | class Cmssw(JobType):
829          txt += 'rm -r lib/ module/ \n'
830          txt += 'mv $RUNTIME_AREA/lib/ . \n'
831          txt += 'mv $RUNTIME_AREA/module/ . \n'
832 <        txt += 'mv $RUNTIME_AREA/ProdCommon/ . \n'
832 >        if self.dataExist == True:
833 >            txt += 'rm -r src/ \n'
834 >            txt += 'mv $RUNTIME_AREA/src/ . \n'
835 >        if len(self.additional_inbox_files)>0:
836 >            for file in self.additional_inbox_files:
837 >                txt += 'mv $RUNTIME_AREA/'+os.path.basename(file)+' . \n'
838  
839 +        txt += 'echo ">>> Include $RUNTIME_AREA in PYTHONPATH:"\n'
840          txt += 'if [ -z "$PYTHONPATH" ]; then\n'
841 <        txt += '   export PYTHONPATH=$SOFTWARE_DIR/ProdCommon\n'
841 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/\n'
842          txt += 'else\n'
843 <        txt += '   export PYTHONPATH=$SOFTWARE_DIR/ProdCommon:${PYTHONPATH}\n'
843 >        txt += '   export PYTHONPATH=$RUNTIME_AREA/:${PYTHONPATH}\n'
844          txt += 'echo "PYTHONPATH=$PYTHONPATH"\n'
845          txt += 'fi\n'
846          txt += '\n'
847  
848 +        if self.pset != None:
849 +            psetName = 'pset.py'
850 +
851 +            txt += '\n'
852 +            if self.debug_wrapper == 1:
853 +                txt += 'echo "***** cat ' + psetName + ' *********"\n'
854 +                txt += 'cat ' + psetName + '\n'
855 +                txt += 'echo "****** end ' + psetName + ' ********"\n'
856 +                txt += '\n'
857 +                txt += 'echo "***********************" \n'
858 +                txt += 'which edmConfigHash \n'
859 +                txt += 'echo "***********************" \n'
860 +            txt += 'edmConfigHash ' + psetName + ' \n'
861 +            txt += 'PSETHASH=`edmConfigHash ' + psetName + '` \n'
862 +            txt += 'echo "PSETHASH = $PSETHASH" \n'
863 +            #### FEDE temporary fix for noEdm files #####
864 +            txt += 'if [ -z "$PSETHASH" ]; then \n'
865 +            txt += '   export PSETHASH=null\n'
866 +            txt += 'fi \n'
867 +            #############################################
868 +            txt += '\n'
869          return txt
870  
957    def modifySteeringCards(self, nj):
958        """
959        modify the card provided by the user,
960        writing a new card into share dir
961        """
871  
872      def executableName(self):
873 <        if self.scriptExe: #CarlosDaniele
873 >        if self.scriptExe:
874              return "sh "
875          else:
876              return self.executable
877  
878      def executableArgs(self):
879 <        # FUTURE: This function tests the CMSSW version. Can be simplified as we drop support for old versions
880 <        if self.scriptExe:#CarlosDaniele
972 <            return   self.scriptExe + " $NJob"
879 >        if self.scriptExe:
880 >            return self.scriptExe + " $NJob $AdditionalArgs"
881          else:
882 <            version_array = self.scram.getSWVersion().split('_')
975 <            major = 0
976 <            minor = 0
977 <            try:
978 <                major = int(version_array[1])
979 <                minor = int(version_array[2])
980 <            except:
981 <                msg = "Cannot parse CMSSW version string: " + "_".join(version_array) + " for major and minor release number!"
982 <                raise CrabException(msg)
983 <
984 <            ex_args = ""
985 <            # FUTURE: This tests the CMSSW version. Can remove code as versions deprecated
986 <            # Framework job report
987 <            if major >= 1 and minor >= 5 :
988 <                ex_args += " -j $RUNTIME_AREA/crab_fjr_$NJob.xml"
989 <            # Type of cfg file
990 <            if major >= 2 :
991 <                ex_args += " -p pset.py"
992 <            else:
993 <                ex_args += " -p pset.cfg"
994 <            return ex_args
882 >            return " -j $RUNTIME_AREA/crab_fjr_$NJob.xml -p pset.py"
883  
884      def inputSandbox(self, nj):
885          """
886          Returns a list of filenames to be put in JDL input sandbox.
887          """
888          inp_box = []
1001        # # dict added to delete duplicate from input sandbox file list
1002        # seen = {}
1003        ## code
889          if os.path.isfile(self.tgzNameWithPath):
890              inp_box.append(self.tgzNameWithPath)
891 <        if os.path.isfile(self.MLtgzfile):
892 <            inp_box.append(self.MLtgzfile)
893 <        ## config
1009 <        if not self.pset is None:
1010 <            inp_box.append(common.work_space.pathForTgz() + 'job/' + self.configFilename())
1011 <        ## additional input files
1012 <        tgz = self.additionalInputFileTgz()
1013 <        inp_box.append(tgz)
1014 <        ## executable
1015 <        wrapper = os.path.basename(str(common._db.queryTask('scriptName')))
1016 <        inp_box.append(common.work_space.pathForTgz() +'job/'+ wrapper)
891 >        if os.path.isfile(self.argsFile):
892 >            inp_box.append(self.argsFile)
893 >        inp_box.append(common.work_space.jobDir() + self.scriptName)
894          return inp_box
895  
896      def outputSandbox(self, nj):
# Line 1025 | Line 902 | class Cmssw(JobType):
902          ## User Declared output files
903          for out in (self.output_file+self.output_file_sandbox):
904              n_out = nj + 1
905 <            out_box.append(self.numberFile_(out,str(n_out)))
905 >            out_box.append(numberFile(out,str(n_out)))
906          return out_box
907  
1031    def prepareSteeringCards(self):
1032        """
1033        Make initial modifications of the user's steering card file.
1034        """
1035        return
908  
909      def wsRenameOutput(self, nj):
910          """
# Line 1042 | Line 914 | class Cmssw(JobType):
914          txt = '\n#Written by cms_cmssw::wsRenameOutput\n'
915          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
916          txt += 'echo ">>> current directory content:"\n'
917 <        txt += 'ls \n'
917 >        if self.debug_wrapper==1:
918 >            txt += 'ls -Al\n'
919          txt += '\n'
920  
921          for fileWithSuffix in (self.output_file):
922 <            output_file_num = self.numberFile_(fileWithSuffix, '$NJob')
922 >            output_file_num = numberFile(fileWithSuffix, '$OutUniqueID')
923              txt += '\n'
924              txt += '# check output file\n'
925              txt += 'if [ -e ./'+fileWithSuffix+' ] ; then\n'
# Line 1067 | Line 940 | class Cmssw(JobType):
940              txt += 'fi\n'
941          file_list = []
942          for fileWithSuffix in (self.output_file):
943 <             file_list.append(self.numberFile_(fileWithSuffix, '$NJob'))
943 >             file_list.append(numberFile('$SOFTWARE_DIR/'+fileWithSuffix, '$OutUniqueID'))
944  
945 <        txt += 'file_list="'+string.join(file_list,' ')+'"\n'
945 >        txt += 'file_list="'+string.join(file_list,',')+'"\n'
946          txt += '\n'
947          txt += 'echo ">>> current directory (SOFTWARE_DIR): $SOFTWARE_DIR" \n'
948          txt += 'echo ">>> current directory content:"\n'
949 <        txt += 'ls \n'
949 >        if self.debug_wrapper==1:
950 >            txt += 'ls -Al\n'
951          txt += '\n'
952          txt += 'cd $RUNTIME_AREA\n'
953          txt += 'echo ">>> current directory (RUNTIME_AREA):  $RUNTIME_AREA"\n'
954          return txt
955  
1082    def numberFile_(self, file, txt):
1083        """
1084        append _'txt' before last extension of a file
1085        """
1086        p = string.split(file,".")
1087        # take away last extension
1088        name = p[0]
1089        for x in p[1:-1]:
1090            name=name+"."+x
1091        # add "_txt"
1092        if len(p)>1:
1093            ext = p[len(p)-1]
1094            result = name + '_' + txt + "." + ext
1095        else:
1096            result = name + '_' + txt
1097
1098        return result
1099
956      def getRequirements(self, nj=[]):
957          """
958          return job requirements to add to jdl files
# Line 1106 | Line 962 | class Cmssw(JobType):
962              req='Member("VO-cms-' + \
963                   self.version + \
964                   '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
965 <        ## SL add requirement for OS version only if SL4
1110 <        #reSL4 = re.compile( r'slc4' )
1111 <        if self.executable_arch: # and reSL4.search(self.executable_arch):
965 >        if self.executable_arch:
966              req+=' && Member("VO-cms-' + \
967                   self.executable_arch + \
968                   '", other.GlueHostApplicationSoftwareRunTimeEnvironment)'
969  
970          req = req + ' && (other.GlueHostNetworkAdapterOutboundIP)'
971 <        if common.scheduler.name() == "glitecoll":
972 <            req += ' && other.GlueCEStateStatus == "Production" '
971 >        if ( common.scheduler.name() in ["glite"] ):
972 >            ## 25-Jun-2009 SL: patch to use Cream enabled WMS
973 >            if ( self.cfg_params.get('GRID.use_cream',None) ):
974 >                req += ' && (other.GlueCEStateStatus == "Production" || other.GlueCEStateStatus == "Special")'
975 >            else:
976 >                req += ' && other.GlueCEStateStatus == "Production" '
977  
978          return req
979  
980      def configFilename(self):
981          """ return the config filename """
982 <        return self.name()+'.cfg'
982 >        return self.name()+'.py'
983  
984      def wsSetupCMSOSGEnvironment_(self):
985          """
# Line 1147 | Line 1005 | class Cmssw(JobType):
1005  
1006          return txt
1007  
1150    ### OLI_DANIELE
1008      def wsSetupCMSLCGEnvironment_(self):
1009          """
1010          Returns part of a job script which is prepares
# Line 1182 | Line 1039 | class Cmssw(JobType):
1039          txt += '    echo "==> setup cms environment ok"\n'
1040          return txt
1041  
1042 <    ### FEDE FOR DBS OUTPUT PUBLICATION
1186 <    def modifyReport(self, nj):
1042 >    def wsModifyReport(self, nj):
1043          """
1044          insert the part of the script that modifies the FrameworkJob Report
1045          """
1046  
1047 <        txt = '\n#Written by cms_cmssw::modifyReport\n'
1048 <        publish_data = int(self.cfg_params.get('USER.publish_data',0))
1049 <        if (publish_data == 1):
1194 <            processedDataset = self.cfg_params['USER.publish_data_name']
1195 <            LFNBaseName = LFNBase(processedDataset)
1047 >        txt = ''
1048 >        if (self.copy_data == 1):
1049 >            txt = '\n#Written by cms_cmssw::wsModifyReport\n'
1050  
1197            txt += 'if [ $copy_exit_status -eq 0 ]; then\n'
1198            txt += '    FOR_LFN=%s_${PSETHASH}/\n'%(LFNBaseName)
1199            txt += 'else\n'
1200            txt += '    FOR_LFN=/copy_problems/ \n'
1201            txt += '    SE=""\n'
1202            txt += '    SE_PATH=""\n'
1203            txt += 'fi\n'
1204            
1051              txt += 'echo ">>> Modify Job Report:" \n'
1052 <            txt += 'chmod a+x $SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1207 <            txt += 'ProcessedDataset='+processedDataset+'\n'
1208 <            txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1209 <            txt += 'echo "SE = $SE"\n'
1210 <            txt += 'echo "SE_PATH = $SE_PATH"\n'
1211 <            txt += 'echo "FOR_LFN = $FOR_LFN" \n'
1052 >            txt += 'chmod a+x $RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py\n'
1053              txt += 'echo "CMSSW_VERSION = $CMSSW_VERSION"\n\n'
1054 <            txt += 'echo "$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH"\n'
1055 <            txt += '$SOFTWARE_DIR/ProdCommon/ProdCommon/FwkJobRep/ModifyJobReport.py $RUNTIME_AREA/crab_fjr_$NJob.xml $NJob $FOR_LFN $PrimaryDataset $DataTier $ProcessedDataset $ApplicationFamily $executable $CMSSW_VERSION $PSETHASH $SE $SE_PATH\n'
1054 >
1055 >            args = 'fjr $RUNTIME_AREA/crab_fjr_$NJob.xml json $RUNTIME_AREA/resultCopyFile n_job $OutUniqueID PrimaryDataset $PrimaryDataset  ApplicationFamily $ApplicationFamily ApplicationName $executable cmssw_version $CMSSW_VERSION psethash $PSETHASH'
1056 >
1057 >            if (self.publish_data == 1):
1058 >                txt += 'ProcessedDataset='+self.processedDataset+'\n'
1059 >                txt += 'echo "ProcessedDataset = $ProcessedDataset"\n'
1060 >                args += ' UserProcessedDataset $USER-$ProcessedDataset-$PSETHASH'
1061 >
1062 >            txt += 'echo "$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'"\n'
1063 >            txt += '$RUNTIME_AREA/ProdCommon/FwkJobRep/ModifyJobReport.py '+str(args)+'\n'
1064              txt += 'modifyReport_result=$?\n'
1065              txt += 'if [ $modifyReport_result -ne 0 ]; then\n'
1066              txt += '    modifyReport_result=70500\n'
# Line 1223 | Line 1072 | class Cmssw(JobType):
1072              txt += 'fi\n'
1073          return txt
1074  
1075 +    def wsParseFJR(self):
1076 +        """
1077 +        Parse the FrameworkJobReport to obtain useful infos
1078 +        """
1079 +        txt = '\n#Written by cms_cmssw::wsParseFJR\n'
1080 +        txt += 'echo ">>> Parse FrameworkJobReport crab_fjr.xml"\n'
1081 +        txt += 'if [ -s $RUNTIME_AREA/crab_fjr_$NJob.xml ]; then\n'
1082 +        txt += '    if [ -s $RUNTIME_AREA/parseCrabFjr.py ]; then\n'
1083 +        txt += '        cmd_out=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --dashboard $MonitorID,$MonitorJobID '+self.debugWrap+'`\n'
1084 +        if self.debug_wrapper==1 :
1085 +            txt += '        echo "Result of parsing the FrameworkJobReport crab_fjr.xml: $cmd_out"\n'
1086 +        txt += '        executable_exit_status=`python $RUNTIME_AREA/parseCrabFjr.py --input $RUNTIME_AREA/crab_fjr_$NJob.xml --exitcode`\n'
1087 +        txt += '        if [ $executable_exit_status -eq 50115 ];then\n'
1088 +        txt += '            echo ">>> crab_fjr.xml contents: "\n'
1089 +        txt += '            cat $RUNTIME_AREA/crab_fjr_$NJob.xml\n'
1090 +        txt += '            echo "Wrong FrameworkJobReport --> does not contain useful info. ExitStatus: $executable_exit_status"\n'
1091 +        txt += '        elif [ $executable_exit_status -eq -999 ];then\n'
1092 +        txt += '            echo "ExitStatus from FrameworkJobReport not available. not available. Using exit code of executable from command line."\n'
1093 +        txt += '        else\n'
1094 +        txt += '            echo "Extracted ExitStatus from FrameworkJobReport parsing output: $executable_exit_status"\n'
1095 +        txt += '        fi\n'
1096 +        txt += '    else\n'
1097 +        txt += '        echo "CRAB python script to parse CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1098 +        txt += '    fi\n'
1099 +          #### Patch to check input data reading for CMSSW16x Hopefully we-ll remove it asap
1100 +        txt += '    if [ $executable_exit_status -eq 0 ];then\n'
1101 +        txt += '        echo ">>> Executable succeded  $executable_exit_status"\n'
1102 +        txt += '    fi\n'
1103 +        txt += 'else\n'
1104 +        txt += '    echo "CRAB FrameworkJobReport crab_fjr.xml is not available, using exit code of executable from command line."\n'
1105 +        txt += 'fi\n'
1106 +        txt += '\n'
1107 +        txt += 'if [ $executable_exit_status -ne 0 ];then\n'
1108 +        txt += '    echo ">>> Executable failed  $executable_exit_status"\n'
1109 +        txt += '    echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1110 +        txt += '    echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1111 +        txt += '    job_exit_code=$executable_exit_status\n'
1112 +        txt += '    func_exit\n'
1113 +        txt += 'fi\n\n'
1114 +        txt += 'echo "ExeExitCode=$executable_exit_status" | tee -a $RUNTIME_AREA/$repo\n'
1115 +        txt += 'echo "EXECUTABLE_EXIT_STATUS = $executable_exit_status"\n'
1116 +        txt += 'job_exit_code=$executable_exit_status\n'
1117 +
1118 +        return txt
1119 +
1120      def setParam_(self, param, value):
1121          self._params[param] = value
1122  
1123      def getParams(self):
1124          return self._params
1125  
1126 <    def uniquelist(self, old):
1233 <        """
1234 <        remove duplicates from a list
1235 <        """
1236 <        nd={}
1237 <        for e in old:
1238 <            nd[e]=0
1239 <        return nd.keys()
1240 <
1241 <    def outList(self):
1126 >    def outList(self,list=False):
1127          """
1128          check the dimension of the output files
1129          """
# Line 1247 | Line 1132 | class Cmssw(JobType):
1132          listOutFiles = []
1133          stdout = 'CMSSW_$NJob.stdout'
1134          stderr = 'CMSSW_$NJob.stderr'
1135 +        if len(self.output_file) <= 0:
1136 +            msg ="WARNING: no output files name have been defined!!\n"
1137 +            msg+="\tno output files will be reported back/staged\n"
1138 +            common.logger.info(msg)
1139 +
1140          if (self.return_data == 1):
1141 <            for file in (self.output_file+self.output_file_sandbox):
1142 <                listOutFiles.append(self.numberFile_(file, '$NJob'))
1143 <            listOutFiles.append(stdout)
1144 <            listOutFiles.append(stderr)
1145 <        else:
1146 <            for file in (self.output_file_sandbox):
1147 <                listOutFiles.append(self.numberFile_(file, '$NJob'))
1258 <            listOutFiles.append(stdout)
1259 <            listOutFiles.append(stderr)
1141 >            for file in (self.output_file):
1142 >                listOutFiles.append(numberFile(file, '$OutUniqueID'))
1143 >        for file in (self.output_file_sandbox):
1144 >            listOutFiles.append(numberFile(file, '$NJob'))
1145 >        listOutFiles.append(stdout)
1146 >        listOutFiles.append(stderr)
1147 >
1148          txt += 'echo "output files: '+string.join(listOutFiles,' ')+'"\n'
1149          txt += 'filesToCheck="'+string.join(listOutFiles,' ')+'"\n'
1150          txt += 'export filesToCheck\n'
1151 +        taskinfo={}
1152 +        taskinfo['outfileBasename'] = self.output_file
1153 +        common._db.updateTask_(taskinfo)
1154 +
1155 +        if list : return self.output_file
1156          return txt
1157 +
1158 +    def checkCMSSWVersion(self, url = "https://cmstags.cern.ch/cgi-bin/CmsTC/", fileName = "ReleasesXML"):
1159 +        """
1160 +        compare current CMSSW release and arch with allowed releases
1161 +        """
1162 +
1163 +        downloader = Downloader(url)
1164 +        goodRelease = False
1165 +
1166 +        try:
1167 +            result = downloader.config(fileName)
1168 +        except:
1169 +            common.logger.info("ERROR: Problem reading file of allowed CMSSW releases.")
1170 +
1171 +        try:
1172 +            events = pulldom.parseString(result)
1173 +
1174 +            arch     = None
1175 +            release  = None
1176 +            relType  = None
1177 +            relState = None
1178 +            for (event, node) in events:
1179 +                if event == pulldom.START_ELEMENT:
1180 +                    if node.tagName == 'architecture':
1181 +                        arch = node.attributes.getNamedItem('name').nodeValue
1182 +                    if node.tagName == 'project':
1183 +                        relType = node.attributes.getNamedItem('type').nodeValue
1184 +                        relState = node.attributes.getNamedItem('state').nodeValue
1185 +                        if relType == 'Production' and relState == 'Announced':
1186 +                            release = node.attributes.getNamedItem('label').nodeValue
1187 +                if self.executable_arch == arch and self.version == release:
1188 +                    goodRelease = True
1189 +                    return goodRelease
1190 +
1191 +            if not goodRelease:
1192 +                msg = "WARNING: %s on %s is not a supported release. " % \
1193 +                        (self.version, self.executable_arch)
1194 +                msg += "Submission may fail."
1195 +                common.logger.info(msg)
1196 +        except:
1197 +            common.logger.info("Problems parsing file of allowed CMSSW releases.")
1198 +
1199 +        return goodRelease
1200 +

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines