ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SchedulerEdg.py
(Generate patch)

Comparing COMP/CRAB/python/SchedulerEdg.py (file contents):
Revision 1.111 by spiga, Tue Dec 19 16:06:10 2006 UTC vs.
Revision 1.120 by fanzago, Mon Jun 25 13:54:32 2007 UTC

# Line 5 | Line 5 | from crab_util import *
5   from EdgConfig import *
6   import common
7  
8 < import os, sys, time, gzip
8 > import os, sys, time
9  
10   class SchedulerEdg(Scheduler):
11      def __init__(self):
# Line 81 | Line 81 | class SchedulerEdg(Scheduler):
81  
82          if ( int(self.return_data) == 0 and int(self.copy_data) == 0 ):
83             msg = 'Warning: return_data = 0 and copy_data = 0 ==> your exe output will be lost\n'
84 <           msg = msg + 'Please modify return_data and copy_data value in your crab.cfg file\n'
84 >           msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
85             raise CrabException(msg)
86  
87 +        if ( int(self.return_data) == 1 and int(self.copy_data) == 1 ):
88 +           msg = 'Warning: return_data = 1 and copy_data = 1\n'
89 +           msg = msg + 'Please modify return_data or copy_data value in your crab.cfg file\n'
90 +           raise CrabException(msg)
91 +
92 +        ########### FEDE FOR DBS2 ##############################
93 +        try:
94 +            self.publish_data = cfg_params["USER.publish_data"]
95 +            if int(self.publish_data) == 1:
96 +                try:
97 +                    self.publish_data_name = cfg_params['USER.publish_data_name']
98 +                except KeyError:
99 +                    msg = "Error. The [USER] section does not have 'publish_data_name'"
100 +                    common.logger.message(msg)
101 +                    raise CrabException(msg)
102 +        except KeyError: self.publish_data = 0
103 +
104 +        if ( int(self.copy_data) == 0 and int(self.publish_data) == 1 ):
105 +           msg = 'Warning: publish_data = 1 must be used with copy_data = 1\n'
106 +           msg = msg + 'Please modify copy_data value in your crab.cfg file\n'
107 +           common.logger.message(msg)
108 +           raise CrabException(msg)
109 +        #################################################
110          try:
111              self.lfc_host = cfg_params['EDG.lfc_host']
112          except KeyError:
# Line 324 | Line 347 | class SchedulerEdg(Scheduler):
347          
348          txt += '\n\n'
349  
350 <        if int(self.copy_data) == 1:
351 <           if self.SE:
352 <              txt += 'export SE='+self.SE+'\n'
353 <              txt += 'echo "SE = $SE"\n'
354 <           if self.SE_PATH:
355 <              if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
356 <              txt += 'export SE_PATH='+self.SE_PATH+'\n'
357 <              txt += 'echo "SE_PATH = $SE_PATH"\n'
350 > #        if int(self.copy_data) == 1:
351 > #           if self.SE:
352 > #              txt += 'export SE='+self.SE+'\n'
353 > #              txt += 'echo "SE = $SE"\n'
354 > #           if self.SE_PATH:
355 > #              if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
356 > #              txt += 'export SE_PATH='+self.SE_PATH+'\n'
357 > #              txt += 'echo "SE_PATH = $SE_PATH"\n'
358  
359          txt += 'export VO='+self.VO+'\n'
360          ### add some line for LFC catalog setting
# Line 450 | Line 473 | class SchedulerEdg(Scheduler):
473          to copy produced output into a storage element.
474          """
475          txt = ''
476 +
477 +        ##### FEDE MOVED FROM SET_ENVIRONMENT ##############
478 +        
479 +        SE_PATH=''
480          if int(self.copy_data) == 1:
481 +           if self.SE:
482 +              txt += 'export SE='+self.SE+'\n'
483 +              txt += 'echo "SE = $SE"\n'
484 +           if self.SE_PATH:
485 +              if ( self.SE_PATH[-1] != '/' ) : self.SE_PATH = self.SE_PATH + '/'
486 +              SE_PATH=self.SE_PATH
487 +              ####### FEDE FOR DBS2
488 +              if int(self.publish_data) == 1:
489 +                  txt += '### publish_data = 1 so the SE path where to copy the output is: \n'
490 +                  txt += 'subject=`voms-proxy-info -subject | awk -F\'CN\' \'{print $2$3$4}\' | tr -d \'=/ \'` \n'
491 +                  txt += 'echo "subject = $subject" \n'
492 +                  
493 +                  path_add = '${subject}/'+ self.publish_data_name +'_${PSETHASH}/'
494 +                  SE_PATH = SE_PATH + path_add
495 +
496 +              txt += 'export SE_PATH='+SE_PATH+'\n'
497 +              txt += 'echo "SE_PATH = $SE_PATH"\n'
498 +
499 +        ##########################################################  
500 +
501 +        #if int(self.copy_data) == 1:
502             txt += '#\n'
503             txt += '#   Copy output to SE = $SE\n'
504             txt += '#\n'
# Line 483 | Line 531 | class SchedulerEdg(Scheduler):
531             txt += '            echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
532             txt += '            echo "srmcp failed, attempting lcg-cp."\n'
533             if common.logger.debugLevel() >= 5:
534 <               txt += '            echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
535 <               txt += '            exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
534 >               ########### FEDE CHANGES TO WRITE IN SRM LNL.INFN.IT #################
535 >               #txt += '            echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
536 >               #txt += '            exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
537 >               txt += '            echo "lcg-cp --vo $VO -t 2400 --verbose file://`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
538 >               txt += '            exitstring=`lcg-cp --vo $VO -t 2400 --verbose file://\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
539             else:              
540 <               txt += '            echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
541 <               txt += '            exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
540 >               #txt += '            echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file gsiftp://${SE}${SE_PATH}$out_file"\n'
541 >               #txt += '            exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file gsiftp://${SE}${SE_PATH}$out_file 2>&1`\n'
542 >               txt += '            echo "lcg-cp --vo $VO -t 2400 file://`pwd`/$out_file srm://${SE}:8443${SE_PATH}$out_file"\n'
543 >               txt += '            exitstring=`lcg-cp --vo $VO -t 2400 file://\`pwd\`/$out_file srm://${SE}:8443${SE_PATH}$out_file 2>&1`\n'
544             txt += '            copy_exit_status=$?\n'
545             txt += '            echo "COPY_EXIT_STATUS for lcg-cp = $copy_exit_status"\n'
546             txt += '            echo "STAGE_OUT = $copy_exit_status"\n'
# Line 497 | Line 550 | class SchedulerEdg(Scheduler):
550             txt += '               echo "StageOutExitStatus = 198" | tee -a $RUNTIME_AREA/$repo\n'
551             txt += '               echo "StageOutExitStatusReason = $exitstring" | tee -a $RUNTIME_AREA/$repo\n'
552             txt += '               echo "srmcp and lcg-cp and failed!"\n'
553 +           txt += '               SE=""\n'
554 +           txt += '               echo "SE = $SE"\n'
555 +           txt += '               SE_PATH=""\n'
556 +           txt += '               echo "SE_PATH = $SE_PATH"\n'
557             txt += '            else\n'
558             txt += '               echo "StageOutSE = $SE" | tee -a $RUNTIME_AREA/$repo\n'
559             txt += '               echo "StageOutCatalog = " | tee -a $RUNTIME_AREA/$repo\n'
# Line 512 | Line 569 | class SchedulerEdg(Scheduler):
569             txt += '            echo "srmcp succeeded"\n'
570             txt += '         fi\n'
571             txt += '     done\n'
572 +           txt += '     exit_status=$copy_exit_status\n'
573          return txt
574  
575      def wsRegisterOutput(self):
# Line 593 | Line 651 | class SchedulerEdg(Scheduler):
651          cmd_out = runCommand(cmd)
652          return cmd_out
653  
596    ##### FEDE ######        
654      def findSites_(self, n):
655          itr4 =[]
656          sites = common.jobDB.destination(n)
# Line 624 | Line 681 | class SchedulerEdg(Scheduler):
681          jbt = job.type()
682          
683          inp_sandbox = jbt.inputSandbox(index)
684 <        out_sandbox = jbt.outputSandbox(index)
684 >        #out_sandbox = jbt.outputSandbox(index)
685          """
686          [end] FIX-ME
687          """
# Line 640 | Line 697 | class SchedulerEdg(Scheduler):
697          dir = string.split(common.work_space.topDir(), '/')
698          taskName = dir[len(dir)-2]
699    
643        to_writeReq = ''
700          to_write = ''
701  
702          req=' '
# Line 700 | Line 756 | class SchedulerEdg(Scheduler):
756          taskName = dir[len(dir)-2]
757  
758          xml.write(str(title))
759 <        xml.write('<task name="' +str(taskName)+'" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache">\n')
759 >        #xml.write('<task name="' +str(taskName)+'" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache">\n')
760 >
761 >        #xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + '" task_info="' + os.path.expandvars('X509_USER_PROXY') + '">\n')
762 >        xml.write('<task name="' +str(taskName)+ '" sub_path="' +common.work_space.pathForTgz() + 'share/.boss_cache"' + ' task_info="' + os.environ["X509_USER_PROXY"] + '">\n')
763          xml.write(jt_string)
764          
765          if (to_write != ''):
# Line 727 | Line 786 | class SchedulerEdg(Scheduler):
786          indy: here itr4
787          '''
788          
789 <
790 <        xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
789 >        xml.write('<chain name="' +str(taskName)+'__ITR1_" scheduler="'+str(self.schedulerName)+'">\n')
790 >       # xml.write('<chain scheduler="'+str(self.schedulerName)+'">\n')
791          xml.write(jt_string)
792  
793          #executable
# Line 752 | Line 811 | class SchedulerEdg(Scheduler):
811                  pass
812              pass
813  
814 <        if (not jbt.additional_inbox_files == []):
815 <            inp_box = inp_box + ','
816 <            for addFile in jbt.additional_inbox_files:
817 <                addFile = os.path.abspath(addFile)
818 <                inp_box = inp_box+''+addFile+','
819 <                pass
814 > #        if (not jbt.additional_inbox_files == []):
815 > #            inp_box = inp_box + ','
816 > #            for addFile in jbt.additional_inbox_files:
817 > #                #addFile = os.path.abspath(addFile)
818 > #                inp_box = inp_box+''+addFile+','
819 > #                pass
820  
821          if inp_box[-1] == ',' : inp_box = inp_box[:-1]
822          inp_box = '<infiles> <![CDATA[\n' + inp_box + '\n]]> </infiles>\n'
# Line 825 | Line 884 | class SchedulerEdg(Scheduler):
884          if (self.proxyValid): return
885  
886          ### Just return if asked to do so
887 <        if (self.dontCheckProxy):
887 >        if (self.dontCheckProxy==1):
888              self.proxyValid=1
889              return
890  
832        timeleft = -999
891          minTimeLeft=10*3600 # in seconds
892  
893          minTimeLeftServer = 100 # in hours
# Line 878 | Line 936 | class SchedulerEdg(Scheduler):
936              reTime = re.compile( r'timeleft: (\d+)' )
937              #print "<"+str(reTime.search( cmd_out ).group(1))+">"
938              if reTime.match( cmd_out ):
939 <                time = reTime.search( line ).group(1)
939 >                time = reTime.search( cmd_out ).group(1)
940                  if time < minTimeLeftServer:
941                      renewProxy = 1
942                      common.logger.message('No credential delegation will expire in '+time+' hours: renew it')

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines