ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/SubmitterServer.py
(Generate patch)

Comparing COMP/CRAB/python/SubmitterServer.py (file contents):
Revision 1.82 by spiga, Wed Jul 22 17:55:18 2009 UTC vs.
Revision 1.83 by ewv, Mon Aug 17 18:42:22 2009 UTC

# Line 3 | Line 3 | from crab_util import *
3   import common
4   from ApmonIf import ApmonIf
5  
6 < import os, errno, time, sys, re
6 > import os, errno, time, sys, re
7   import commands, traceback
8   import zlib
9  
10   from Submitter import Submitter
11 < from ServerCommunicator import ServerCommunicator
11 > from ServerCommunicator import ServerCommunicator
12  
13   from ProdCommon.Storage.SEAPI.SElement import SElement
14   from ProdCommon.Storage.SEAPI.SBinterface import SBinterface
15   from ProdCommon.Storage.SEAPI.Exceptions import *
16  
17 <
17 >
18   class SubmitterServer( Submitter ):
19      def __init__(self, cfg_params, parsed_range, val):
20          self.srvCfg = {}
21          self.cfg_params = cfg_params
22          self.submitRange = []
23 <        self.credentialType = 'Proxy'
24 <        self.copyTout= ' -t 600 '    
23 >        self.credentialType = 'Proxy'
24 >        self.copyTout= ' -t 600 '
25          if common.scheduler.name().upper() in ['LSF', 'CAF']:
26 <            self.credentialType = 'Token'
27 <            self.copyTout= ' '    
26 >            self.credentialType = 'Token'
27 >            self.copyTout= ' '
28 >
29 >        Submitter.__init__(self, cfg_params, parsed_range, val)
30  
29        Submitter.__init__(self, cfg_params, parsed_range, val)      
30    
31          # init client server params...
32 <        CliServerParams(self)      
32 >        CliServerParams(self)
33  
34          # path fix
35          if self.storage_path[0]!='/':
36              self.storage_path = '/'+self.storage_path
37  
38          self.taskuuid = str(common._db.queryTask('name'))
39 <
40 <      
39 >        self.limitJobs = False
40 >
41          return
42  
43      def run(self):
# Line 51 | Line 51 | class SubmitterServer( Submitter ):
51          self.BuildJobList()
52  
53          self.submitRange = self.nj_list
54 <    
55 <        check = self.checkIfCreate()
54 >
55 >        check = self.checkIfCreate()
56  
57          if check == 0 :
58  
59              self.remotedir = os.path.join(self.storage_path, self.taskuuid)
60              self.manageCredential()
61 <            
62 <            # check if it is the first submission  
63 <            isFirstSubmission =  common._db.checkIfNeverSubmittedBefore()
61 >
62 >            # check if it is the first submission
63 >            isFirstSubmission =  common._db.checkIfNeverSubmittedBefore()
64  
65              # standard submission to the server
66              self.performSubmission(isFirstSubmission)
67 <        
67 >
68              stop = time.time()
69              common.logger.debug("Submission Time: "+str(stop - start))
70  
71 <            msg = 'Total of %d jobs submitted'%len(self.submitRange)
71 >            msg = 'Total of %d jobs submitted'%len(self.submitRange)
72              common.logger.info(msg)
73 <
73 >
74          return
75  
76      def moveISB_SEAPI(self):
# Line 78 | Line 78 | class SubmitterServer( Submitter ):
78          common.logger.debug("Task name: " + self.taskuuid)
79          isblist = common._db.queryTask('globalSandbox').split(',')
80          common.logger.debug("List of ISB files: " +str(isblist) )
81 <        
81 >
82          # init SE interface
83          common.logger.info("Starting sending the project to the storage "+str(self.storage_name)+"...")
84 <        try:  
84 >        try:
85              seEl = SElement(self.storage_name, self.storage_proto, self.storage_port)
86          except Exception, ex:
87              common.logger.debug(str(ex))
88              msg = "ERROR : Unable to create SE destination interface \n"
89              msg +="Project "+ self.taskuuid +" not Submitted \n"
90              raise CrabException(msg)
91 <          
92 <        try:  
91 >
92 >        try:
93              loc = SElement("localhost", "local")
94          except Exception, ex:
95              common.logger.debug(str(ex))
# Line 100 | Line 100 | class SubmitterServer( Submitter ):
100  
101          ### it should not be there... To move into SE API. DS
102  
103 <        # create remote dir for gsiftp
103 >        # create remote dir for gsiftp
104          if self.storage_proto in ['gridftp','rfio']:
105              try:
106 <                action = SBinterface( seEl )  
106 >                action = SBinterface( seEl )
107                  action.createDir( self.remotedir )
108              except AlreadyExistsException, ex:
109 <                msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
109 >                msg = "Project %s already exist on the Storage Element \n"%self.taskuuid
110                  msg +='\t%s'%str(ex)
111                  common.logger.debug(msg)
112              except OperationException, ex:
# Line 124 | Line 124 | class SubmitterServer( Submitter ):
124          sbi = SBinterface( loc, seEl )
125  
126          for filetocopy in isblist:
127 <            source = os.path.abspath(filetocopy)
127 >            source = os.path.abspath(filetocopy)
128              dest = os.path.join(self.remotedir, os.path.basename(filetocopy))
129              common.logger.debug("Sending "+ os.path.basename(filetocopy) +" to "+ self.storage_name)
130              try:
# Line 146 | Line 146 | class SubmitterServer( Submitter ):
146          return
147  
148  
149 <    def manageCredential(self):
149 >    def manageCredential(self):
150          """
151 <        Prepare configuration and Call credential API
151 >        Prepare configuration and Call credential API
152          """
153          common.logger.info("Registering credential to the server : %s"%self.server_name)
154 <        # only for temporary back-comp.
155 <        if  self.credentialType == 'Proxy':
154 >        # only for temporary back-comp.
155 >        if  self.credentialType == 'Proxy':
156               # for proxy all works as before....
157               self.moveProxy()
158 <             # myProxyMoveProxy() # check within the API ( Proxy.py )
158 >             # myProxyMoveProxy() # check within the API ( Proxy.py )
159          else:
160               from ProdCommon.Credential.CredentialAPI import CredentialAPI
161               myproxyserver = self.cfg_params.get('GRID.proxy_server', 'myproxy.cern.ch')
# Line 168 | Line 168 | class SubmitterServer( Submitter ):
168                            'logger'     : common.logger() \
169                            }
170               try:
171 <                 CredAPI =  CredentialAPI( configAPI )            
172 <             except Exception, err :
171 >                 CredAPI =  CredentialAPI( configAPI )
172 >             except Exception, err :
173                   common.logger.debug("Configuring Credential API: " +str(traceback.format_exc()))
174                   raise CrabException("ERROR: Unable to configure Credential Client API  %s\n"%str(err))
175               if not CredAPI.checkCredential(Time=12) :
# Line 180 | Line 180 | class SubmitterServer( Submitter ):
180                      raise CrabException(str(ex))
181  
182               try:
183 <                 dict = CredAPI.registerCredential()
183 >                 dict = CredAPI.registerCredential()
184               except Exception, err:
185                   common.logger.debug("Registering Credentials : " +str(traceback.format_exc()))
186                   raise CrabException("ERROR: Unable to register %s delegating server: %s\n"%(self.credentialType,self.server_name ))
# Line 191 | Line 191 | class SubmitterServer( Submitter ):
191      # TO REMOVE
192      def moveProxy( self ):
193          WorkDirName = os.path.basename(os.path.split(common.work_space.topDir())[0])
194 <        ## Temporary... to remove soon  
194 >        ## Temporary... to remove soon
195          common.scheduler.checkProxy(minTime=100)
196          try:
197              common.logger.debug("Registering a valid proxy to the server:")
# Line 224 | Line 224 | class SubmitterServer( Submitter ):
224          self.cfg_params['CRAB.se_remote_dir'] = self.remotedir
225  
226          if firstSubmission==True:
227 <
228 <            TotJob = common._db.nJobs()
227 >
228 >            TotJob = common._db.nJobs()
229              # move the sandbox
230 <            self.moveISB_SEAPI()
230 >            self.moveISB_SEAPI()
231  
232              # first time submit
233              try:
# Line 241 | Line 241 | class SubmitterServer( Submitter ):
241                  msg += str(e)
242                  raise CrabException(msg)
243  
244 <            # TODO fix not needed first field
244 >            # TODO fix not needed first field
245              subOutcome = csCommunicator.submitNewTask(self.taskuuid, taskXML, self.submitRange,TotJob)
246          else:
247              # subsequent submissions and resubmit
# Line 256 | Line 256 | class SubmitterServer( Submitter ):
256          if subOutcome != 0:
257              msg = "ClientServer ERROR: %d raised during the communication.\n"%subOutcome
258              self.stateChange( self.submitRange, "Created" )
259 <            common.logger.debug(msg)
259 >            common.logger.debug(msg)
260              raise CrabException('ERROR Jobs NOT submitted.')
261  
262          del csCommunicator

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines