ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/CopyData.py
Revision: 1.30
Committed: Thu Aug 23 13:21:41 2012 UTC (12 years, 8 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, HEAD
Changes since 1.29: +3 -3 lines
Log Message:
rename scheduler rcondor to remoteglidein

File Contents

# User Rev Content
1 belforte 1.30 __revision__ = "$Id: CopyData.py,v 1.29 2012/07/11 15:53:30 belforte Exp $"
2     __version__ = "$Revision: 1.29 $"
3 fanzago 1.26
4 spiga 1.1 from Actor import *
5     from crab_util import *
6     from crab_exceptions import *
7     import common
8     import string
9    
10     class CopyData(Actor):
11     def __init__(self, cfg_params, nj_list, StatusObj):
12     """
13     constructor
14     """
15 fanzago 1.19 common.logger.debug("CopyData __init__() : \n")
16 ewv 1.21
17 fanzago 1.19 if (cfg_params.get('USER.copy_data',0) == '0') :
18     msg = 'Cannot copy output if it has not \n'
19     msg += '\tbeen stored to SE via USER.copy_data=1'
20     raise CrabException(msg)
21 ewv 1.21
22 spiga 1.1 self.cfg_params = cfg_params
23     self.nj_list = nj_list
24 ewv 1.21
25 fanzago 1.16 ### default is the copy local
26     self.copy_local = 1
27 fanzago 1.19 self.dest_se = cfg_params.get("CRAB.dest_se", 'local')
28     self.dest_endpoint = cfg_params.get("CRAB.dest_endpoint", 'local')
29 ewv 1.21
30 fanzago 1.19 if ((self.dest_se != 'local') and (self.dest_endpoint != 'local')):
31     msg = 'You can specify only a parameter with CopyData option \n'
32     msg += '1) The dest_se in the case of official CMS SE (i.e -dest_se=T2_IT_Legnaro)\n'
33     msg += '2) The complete endpoint for not official SE (i.e -dest_endpoint=srm://<se_name>:<port>/xxx/yyy/)\n'
34 ewv 1.21 msg += '3) if you do not specify parameters, the output will be copied in your UI under crab_working_dir/res \n'
35 spiga 1.5 raise CrabException(msg)
36 ewv 1.21
37 fanzago 1.19 if ((self.dest_se != 'local') or (self.dest_endpoint != 'local')):
38 ewv 1.21 self.copy_local = 0
39 spiga 1.3
40 spiga 1.1 # update local DB
41 spiga 1.3 if StatusObj:# this is to avoid a really strange segv
42     StatusObj.query(display=False)
43 ewv 1.21
44     protocolDict = { 'CAF' : 'rfio',
45     'LSF' : 'rfio',
46     'CONDOR_G' : 'srmv2',
47     'GLITE' : 'srm-lcg',
48     'GLIDEIN' : 'srm-lcg',
49     'CONDOR' : 'srmv2',
50 belforte 1.30 'REMOTEGLIDEIN' : 'srm-lcg',
51 ewv 1.21 'SGE' : 'srmv2',
52     'ARC' : 'srmv2',
53     }
54     self.protocol = protocolDict[common.scheduler.name().upper()]
55    
56 spiga 1.20 common.logger.debug("Protocol = %s; Destination SE = %s; Destination Endpoint = %s."%(self.protocol,self.dest_se,self.dest_endpoint))
57 spiga 1.1
58     def run(self):
59 fanzago 1.16 """
60     default is the copy of output to the local dir res in crabDir
61     """
62    
63     results = self.copy()
64 ewv 1.21 self.parseResults( results )
65     return
66 spiga 1.1
67 fanzago 1.16 def copy(self):
68 spiga 1.1 """
69 fanzago 1.19 1) local copy: it is the default. The output will be copied under crab_working_dir/res dir
70 ewv 1.21 2) copy to a remote SE specifying -dest_se (official CMS remote SE)
71     or -dest_endpoint (not official, needed the complete endpoint)
72 spiga 1.1 """
73 fanzago 1.16
74     to_copy = {}
75     results = {}
76 fanzago 1.26 tmp = ''
77 ewv 1.21
78 fanzago 1.19 lfn, to_copy = self.checkAvailableList()
79 spiga 1.2
80 fanzago 1.16 if (self.copy_local == 1):
81     outputDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
82 spiga 1.20 common.logger.info("Copy file locally.\n\tOutput dir: %s"%outputDir)
83 fanzago 1.16 dest = {"destinationDir": outputDir}
84     else:
85 fanzago 1.19 if (self.dest_se != 'local'):
86     from PhEDExDatasvcInfo import PhEDExDatasvcInfo
87     phedexCfg={'storage_element': self.dest_se}
88     stageout = PhEDExDatasvcInfo(config=phedexCfg)
89     self.endpoint = stageout.getStageoutPFN()
90 fanzago 1.26
91     if (str(lfn).find("/store/temp/") == 0 ):
92     tmp = lfn.replace("/store/temp/", "/", 1)
93     elif (str(lfn).find("/store/") == 0 ):
94     tmp = lfn.replace("/store/", "/", 1)
95     else:
96     tmp = lfn
97    
98     common.logger.debug("Source LFN = %s"%lfn)
99     common.logger.debug("tmp LFN = %s"%tmp)
100 fanzago 1.19 self.endpoint = self.endpoint + tmp
101     else:
102     self.endpoint = self.dest_endpoint
103 ewv 1.21
104 spiga 1.20 common.logger.info("Copy file to remote SE.\n\tEndpoint: %s"%self.endpoint)
105 fanzago 1.26
106 fanzago 1.19 dest = {"destination": self.endpoint}
107 ewv 1.21
108 fanzago 1.16 for key in to_copy.keys():
109     cmscpConfig = {
110     "source": key,
111     "inputFileList": to_copy[key],
112 fanzago 1.27 "protocol": self.protocol,
113 fanzago 1.28 #"debug":'1'
114 ewv 1.21 }
115 fanzago 1.16 cmscpConfig.update(dest)
116 fanzago 1.26
117     common.logger.debug("Source = %s"%key)
118     common.logger.debug("Files = %s"%to_copy[key])
119     common.logger.debug("CmscpConfig = %s"%str(cmscpConfig))
120 ewv 1.21
121 fanzago 1.16 results.update(self.performCopy(cmscpConfig))
122 spiga 1.1 return results
123 ewv 1.21
124 fanzago 1.17 def checkAvailableList(self):
125 spiga 1.1 '''
126 ewv 1.21 check if asked list of jobs
127     already produce output to move
128     returns a dictionary {with endpoint, fileList}
129 spiga 1.1 '''
130 ewv 1.21
131 fanzago 1.18 common.logger.debug("CopyData in checkAvailableList() ")
132 ewv 1.21
133 spiga 1.1 task=common._db.getTask(self.nj_list)
134     allMatch={}
135 fanzago 1.16 to_copy={}
136 spiga 1.20 lfn=''
137 fanzago 1.26
138     # loop over jobs
139 spiga 1.1 for job in task.jobs:
140 fanzago 1.16 InfileList = ''
141 fanzago 1.23 if ( job.runningJob['state'] == 'Cleared' and ( job.runningJob[ 'wrapperReturnCode'] == 0 or job.runningJob[ 'wrapperReturnCode'] == 60308)):
142 ewv 1.21 id_job = job['jobId']
143 spiga 1.20 common.logger.log(10-1,"job_id = %s"%str(id_job))
144 fanzago 1.26 endpoints = job.runningJob['storage']
145     output_files = job.runningJob['lfn']
146     for i in range(0,len(endpoints)):
147     endpoint = endpoints[i]
148     output_file_name = os.path.basename(output_files[i])
149     if to_copy.has_key(endpoint):
150     to_copy[endpoint] = to_copy[endpoint] + ',' + output_file_name
151     else:
152     to_copy[endpoint] = output_file_name
153     ### the lfn should be the same for every file, the only difference could be the temp dir
154     ### to test if this is ok as solution
155     lfn = os.path.dirname(output_files[0])
156    
157     """
158 fanzago 1.17 output_files = job.runningJob['lfn']
159 spiga 1.20 common.logger.log(10-1,"Output_files = %s"%str(job.runningJob['lfn']))
160 fanzago 1.26
161 fanzago 1.17 for file in output_files:
162 spiga 1.20 InfileList += '%s,'%os.path.basename(file)
163     lfn = os.path.dirname(file)
164     common.logger.log(10-1,"InfileList = %s"%str(InfileList))
165 ewv 1.21 if to_copy.has_key(endpoint):
166 fanzago 1.16 to_copy[endpoint] = to_copy[endpoint] + InfileList
167     else:
168     to_copy[endpoint] = InfileList
169 fanzago 1.26 """
170 ewv 1.21
171 spiga 1.20 elif ( job.runningJob['state'] == 'Cleared' and job.runningJob['wrapperReturnCode'] != 0):
172 spiga 1.13 common.logger.info("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
173 spiga 1.1 %(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
174 ewv 1.21 else:
175 spiga 1.13 common.logger.info("Not possible copy outputs of Job # %s : Status is %s" \
176 spiga 1.1 %(str(job['jobId']),str(job.runningJob['statusScheduler'])))
177     pass
178    
179 fanzago 1.16 if (len(to_copy) == 0) :
180 spiga 1.5 raise CrabException("No files to copy")
181 ewv 1.21
182     common.logger.debug(" to_copy = %s"%str(to_copy))
183     return lfn, to_copy
184 spiga 1.1
185     def performCopy(self, dict):
186     """
187     call the cmscp class and do the copy
188     """
189     from cmscp import cmscp
190 fanzago 1.27 self.doCopy = cmscp(dict)
191 spiga 1.1
192 spiga 1.13 common.logger.info("Starting copy...")
193 spiga 1.1
194     start = time.time()
195 fanzago 1.27 results = self.doCopy.run()
196 spiga 1.1 stop = time.time()
197    
198 spiga 1.13 common.logger.debug("CopyLocal Time: "+str(stop - start))
199 spiga 1.1
200     return results
201    
202     def parseResults(self,results):
203 ewv 1.21 '''
204 spiga 1.1 take the results dictionary and
205 ewv 1.21 print the results
206 spiga 1.1 '''
207 fanzago 1.27 self.doCopy.writeJsonFile(results)
208 ewv 1.21 for file, dict in results.items() :
209 spiga 1.1 if file:
210 ewv 1.21 txt = 'success'
211 spiga 1.1 if dict['erCode'] != '0': txt = 'failed'
212 spiga 1.20 msg = 'Copy %s for file: %s '%(txt,file)
213     if txt == 'failed': msg += '\n\tCopy failed because : %s'%dict['reason']
214 spiga 1.13 common.logger.info( msg )
215 ewv 1.21 return