ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/CopyData.py
Revision: 1.30
Committed: Thu Aug 23 13:21:41 2012 UTC (12 years, 8 months ago) by belforte
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_9_1, CRAB_2_9_1_pre2, CRAB_2_9_1_pre1, CRAB_2_9_0, CRAB_2_9_0_pre2, CRAB_2_9_0_pre1, CRAB_2_8_8, CRAB_2_8_8_pre1, CRAB_2_8_7_patch3, CRAB_2_8_7_patch2, CRAB_2_8_7_patch1, CRAB_2_8_7, CRAB_2_8_7_pre2, CRAB_2_8_7_pre1, CRAB_2_8_6, CRAB_2_8_6_pre1, CRAB_2_8_5_patch3, CRAB_2_8_5_patch2, CRAB_2_8_5_patch1, CRAB_2_8_5, CRAB_2_8_5_pre5, CRAB_2_8_5_pre4, CRAB_2_8_5_pre3, CRAB_2_8_4_patch3, CRAB_2_8_5_pre2, CRAB_2_8_4_patch2, CRAB_2_8_5_pre1, CRAB_2_8_4_patch1, CRAB_2_8_4, CRAB_2_8_4_pre5, CRAB_2_8_4_pre4, CRAB_2_8_4_pre3, CRAB_2_8_4_pre2, CRAB_2_8_4_pre1, CRAB_2_8_3, CRAB_2_8_3_pre4, CRAB_2_8_3_pre3, CRAB_2_8_3_pre2, CRAB_2_8_3_pre1, HEAD
Changes since 1.29: +3 -3 lines
Log Message:
rename scheduler rcondor to remoteglidein

File Contents

# Content
1 __revision__ = "$Id: CopyData.py,v 1.29 2012/07/11 15:53:30 belforte Exp $"
2 __version__ = "$Revision: 1.29 $"
3
4 from Actor import *
5 from crab_util import *
6 from crab_exceptions import *
7 import common
8 import string
9
10 class CopyData(Actor):
11 def __init__(self, cfg_params, nj_list, StatusObj):
12 """
13 constructor
14 """
15 common.logger.debug("CopyData __init__() : \n")
16
17 if (cfg_params.get('USER.copy_data',0) == '0') :
18 msg = 'Cannot copy output if it has not \n'
19 msg += '\tbeen stored to SE via USER.copy_data=1'
20 raise CrabException(msg)
21
22 self.cfg_params = cfg_params
23 self.nj_list = nj_list
24
25 ### default is the copy local
26 self.copy_local = 1
27 self.dest_se = cfg_params.get("CRAB.dest_se", 'local')
28 self.dest_endpoint = cfg_params.get("CRAB.dest_endpoint", 'local')
29
30 if ((self.dest_se != 'local') and (self.dest_endpoint != 'local')):
31 msg = 'You can specify only a parameter with CopyData option \n'
32 msg += '1) The dest_se in the case of official CMS SE (i.e -dest_se=T2_IT_Legnaro)\n'
33 msg += '2) The complete endpoint for not official SE (i.e -dest_endpoint=srm://<se_name>:<port>/xxx/yyy/)\n'
34 msg += '3) if you do not specify parameters, the output will be copied in your UI under crab_working_dir/res \n'
35 raise CrabException(msg)
36
37 if ((self.dest_se != 'local') or (self.dest_endpoint != 'local')):
38 self.copy_local = 0
39
40 # update local DB
41 if StatusObj:# this is to avoid a really strange segv
42 StatusObj.query(display=False)
43
44 protocolDict = { 'CAF' : 'rfio',
45 'LSF' : 'rfio',
46 'CONDOR_G' : 'srmv2',
47 'GLITE' : 'srm-lcg',
48 'GLIDEIN' : 'srm-lcg',
49 'CONDOR' : 'srmv2',
50 'REMOTEGLIDEIN' : 'srm-lcg',
51 'SGE' : 'srmv2',
52 'ARC' : 'srmv2',
53 }
54 self.protocol = protocolDict[common.scheduler.name().upper()]
55
56 common.logger.debug("Protocol = %s; Destination SE = %s; Destination Endpoint = %s."%(self.protocol,self.dest_se,self.dest_endpoint))
57
58 def run(self):
59 """
60 default is the copy of output to the local dir res in crabDir
61 """
62
63 results = self.copy()
64 self.parseResults( results )
65 return
66
67 def copy(self):
68 """
69 1) local copy: it is the default. The output will be copied under crab_working_dir/res dir
70 2) copy to a remote SE specifying -dest_se (official CMS remote SE)
71 or -dest_endpoint (not official, needed the complete endpoint)
72 """
73
74 to_copy = {}
75 results = {}
76 tmp = ''
77
78 lfn, to_copy = self.checkAvailableList()
79
80 if (self.copy_local == 1):
81 outputDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
82 common.logger.info("Copy file locally.\n\tOutput dir: %s"%outputDir)
83 dest = {"destinationDir": outputDir}
84 else:
85 if (self.dest_se != 'local'):
86 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
87 phedexCfg={'storage_element': self.dest_se}
88 stageout = PhEDExDatasvcInfo(config=phedexCfg)
89 self.endpoint = stageout.getStageoutPFN()
90
91 if (str(lfn).find("/store/temp/") == 0 ):
92 tmp = lfn.replace("/store/temp/", "/", 1)
93 elif (str(lfn).find("/store/") == 0 ):
94 tmp = lfn.replace("/store/", "/", 1)
95 else:
96 tmp = lfn
97
98 common.logger.debug("Source LFN = %s"%lfn)
99 common.logger.debug("tmp LFN = %s"%tmp)
100 self.endpoint = self.endpoint + tmp
101 else:
102 self.endpoint = self.dest_endpoint
103
104 common.logger.info("Copy file to remote SE.\n\tEndpoint: %s"%self.endpoint)
105
106 dest = {"destination": self.endpoint}
107
108 for key in to_copy.keys():
109 cmscpConfig = {
110 "source": key,
111 "inputFileList": to_copy[key],
112 "protocol": self.protocol,
113 #"debug":'1'
114 }
115 cmscpConfig.update(dest)
116
117 common.logger.debug("Source = %s"%key)
118 common.logger.debug("Files = %s"%to_copy[key])
119 common.logger.debug("CmscpConfig = %s"%str(cmscpConfig))
120
121 results.update(self.performCopy(cmscpConfig))
122 return results
123
124 def checkAvailableList(self):
125 '''
126 check if asked list of jobs
127 already produce output to move
128 returns a dictionary {with endpoint, fileList}
129 '''
130
131 common.logger.debug("CopyData in checkAvailableList() ")
132
133 task=common._db.getTask(self.nj_list)
134 allMatch={}
135 to_copy={}
136 lfn=''
137
138 # loop over jobs
139 for job in task.jobs:
140 InfileList = ''
141 if ( job.runningJob['state'] == 'Cleared' and ( job.runningJob[ 'wrapperReturnCode'] == 0 or job.runningJob[ 'wrapperReturnCode'] == 60308)):
142 id_job = job['jobId']
143 common.logger.log(10-1,"job_id = %s"%str(id_job))
144 endpoints = job.runningJob['storage']
145 output_files = job.runningJob['lfn']
146 for i in range(0,len(endpoints)):
147 endpoint = endpoints[i]
148 output_file_name = os.path.basename(output_files[i])
149 if to_copy.has_key(endpoint):
150 to_copy[endpoint] = to_copy[endpoint] + ',' + output_file_name
151 else:
152 to_copy[endpoint] = output_file_name
153 ### the lfn should be the same for every file, the only difference could be the temp dir
154 ### to test if this is ok as solution
155 lfn = os.path.dirname(output_files[0])
156
157 """
158 output_files = job.runningJob['lfn']
159 common.logger.log(10-1,"Output_files = %s"%str(job.runningJob['lfn']))
160
161 for file in output_files:
162 InfileList += '%s,'%os.path.basename(file)
163 lfn = os.path.dirname(file)
164 common.logger.log(10-1,"InfileList = %s"%str(InfileList))
165 if to_copy.has_key(endpoint):
166 to_copy[endpoint] = to_copy[endpoint] + InfileList
167 else:
168 to_copy[endpoint] = InfileList
169 """
170
171 elif ( job.runningJob['state'] == 'Cleared' and job.runningJob['wrapperReturnCode'] != 0):
172 common.logger.info("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
173 %(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
174 else:
175 common.logger.info("Not possible copy outputs of Job # %s : Status is %s" \
176 %(str(job['jobId']),str(job.runningJob['statusScheduler'])))
177 pass
178
179 if (len(to_copy) == 0) :
180 raise CrabException("No files to copy")
181
182 common.logger.debug(" to_copy = %s"%str(to_copy))
183 return lfn, to_copy
184
185 def performCopy(self, dict):
186 """
187 call the cmscp class and do the copy
188 """
189 from cmscp import cmscp
190 self.doCopy = cmscp(dict)
191
192 common.logger.info("Starting copy...")
193
194 start = time.time()
195 results = self.doCopy.run()
196 stop = time.time()
197
198 common.logger.debug("CopyLocal Time: "+str(stop - start))
199
200 return results
201
202 def parseResults(self,results):
203 '''
204 take the results dictionary and
205 print the results
206 '''
207 self.doCopy.writeJsonFile(results)
208 for file, dict in results.items() :
209 if file:
210 txt = 'success'
211 if dict['erCode'] != '0': txt = 'failed'
212 msg = 'Copy %s for file: %s '%(txt,file)
213 if txt == 'failed': msg += '\n\tCopy failed because : %s'%dict['reason']
214 common.logger.info( msg )
215 return