ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/CopyData.py
Revision: 1.28
Committed: Thu Oct 7 12:18:02 2010 UTC (14 years, 6 months ago) by fanzago
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_8_2_pre4, CRAB_2_8_2_pre3, CRAB_2_8_2_pre2, CRAB_2_8_2_pre1, CRAB_2_8_1, CRAB_2_8_0, CRAB_2_8_0_pre1, CRAB_2_7_10_pre3, CRAB_2_7_9_patch2_pre1, CRAB_2_7_10_pre2, CRAB_2_7_10_pre1, CRAB_2_7_9_patch1, CRAB_2_7_9, CRAB_2_7_9_pre5, CRAB_2_7_9_pre4, CRAB_2_7_9_pre3, CRAB_2_7_9_pre2, CRAB_2_7_8_patch2, CRAB_2_7_9_pre1, CRAB_2_7_8_patch2_pre1, CRAB_2_7_8_patch1, CRAB_2_7_8_patch1_pre1, CRAB_2_7_8, CRAB_2_7_8_pre3, CRAB_2_7_8_pre2, CRAB_2_7_8_dash3, CRAB_2_7_8_dash2, CRAB_2_7_8_dash, CRAB_2_7_7_patch1, CRAB_2_7_7_patch1_pre1, CRAB_2_7_8_pre1, CRAB_2_7_7, CRAB_2_7_7_pre2, CRAB_2_7_7_pre1, CRAB_2_7_6_patch1, CRAB_2_7_6, CRAB_2_7_6_pre1, CRAB_2_7_5_patch1, CRAB_2_7_5
Changes since 1.27: +3 -3 lines
Log Message:
removed debug level

File Contents

# User Rev Content
1 fanzago 1.28 __revision__ = "$Id: CopyData.py,v 1.27 2010/07/02 15:34:37 fanzago Exp $"
2     __version__ = "$Revision: 1.27 $"
3 fanzago 1.26
4 spiga 1.1 from Actor import *
5     from crab_util import *
6     from crab_exceptions import *
7     import common
8     import string
9    
10     class CopyData(Actor):
11     def __init__(self, cfg_params, nj_list, StatusObj):
12     """
13     constructor
14     """
15 fanzago 1.19 common.logger.debug("CopyData __init__() : \n")
16 ewv 1.21
17 fanzago 1.19 if (cfg_params.get('USER.copy_data',0) == '0') :
18     msg = 'Cannot copy output if it has not \n'
19     msg += '\tbeen stored to SE via USER.copy_data=1'
20     raise CrabException(msg)
21 ewv 1.21
22 spiga 1.1 self.cfg_params = cfg_params
23     self.nj_list = nj_list
24 ewv 1.21
25 fanzago 1.16 ### default is the copy local
26     self.copy_local = 1
27 fanzago 1.19 self.dest_se = cfg_params.get("CRAB.dest_se", 'local')
28     self.dest_endpoint = cfg_params.get("CRAB.dest_endpoint", 'local')
29 ewv 1.21
30 fanzago 1.19 if ((self.dest_se != 'local') and (self.dest_endpoint != 'local')):
31     msg = 'You can specify only a parameter with CopyData option \n'
32     msg += '1) The dest_se in the case of official CMS SE (i.e -dest_se=T2_IT_Legnaro)\n'
33     msg += '2) The complete endpoint for not official SE (i.e -dest_endpoint=srm://<se_name>:<port>/xxx/yyy/)\n'
34 ewv 1.21 msg += '3) if you do not specify parameters, the output will be copied in your UI under crab_working_dir/res \n'
35 spiga 1.5 raise CrabException(msg)
36 ewv 1.21
37 fanzago 1.19 if ((self.dest_se != 'local') or (self.dest_endpoint != 'local')):
38 ewv 1.21 self.copy_local = 0
39 spiga 1.3
40 spiga 1.1 # update local DB
41 spiga 1.3 if StatusObj:# this is to avoid a really strange segv
42     StatusObj.query(display=False)
43 ewv 1.21
44     protocolDict = { 'CAF' : 'rfio',
45     'LSF' : 'rfio',
46     'CONDOR_G' : 'srmv2',
47     'GLITE' : 'srm-lcg',
48     'GLIDEIN' : 'srm-lcg',
49     'CONDOR' : 'srmv2',
50     'SGE' : 'srmv2',
51     'ARC' : 'srmv2',
52     }
53     self.protocol = protocolDict[common.scheduler.name().upper()]
54    
55 spiga 1.20 common.logger.debug("Protocol = %s; Destination SE = %s; Destination Endpoint = %s."%(self.protocol,self.dest_se,self.dest_endpoint))
56 spiga 1.1
57     def run(self):
58 fanzago 1.16 """
59     default is the copy of output to the local dir res in crabDir
60     """
61    
62     results = self.copy()
63 ewv 1.21 self.parseResults( results )
64     return
65 spiga 1.1
66 fanzago 1.16 def copy(self):
67 spiga 1.1 """
68 fanzago 1.19 1) local copy: it is the default. The output will be copied under crab_working_dir/res dir
69 ewv 1.21 2) copy to a remote SE specifying -dest_se (official CMS remote SE)
70     or -dest_endpoint (not official, needed the complete endpoint)
71 spiga 1.1 """
72 fanzago 1.16
73     to_copy = {}
74     results = {}
75 fanzago 1.26 tmp = ''
76 ewv 1.21
77 fanzago 1.19 lfn, to_copy = self.checkAvailableList()
78 spiga 1.2
79 fanzago 1.16 if (self.copy_local == 1):
80     outputDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
81 spiga 1.20 common.logger.info("Copy file locally.\n\tOutput dir: %s"%outputDir)
82 fanzago 1.16 dest = {"destinationDir": outputDir}
83     else:
84 fanzago 1.19 if (self.dest_se != 'local'):
85     from PhEDExDatasvcInfo import PhEDExDatasvcInfo
86     phedexCfg={'storage_element': self.dest_se}
87     stageout = PhEDExDatasvcInfo(config=phedexCfg)
88     self.endpoint = stageout.getStageoutPFN()
89 fanzago 1.26
90     if (str(lfn).find("/store/temp/") == 0 ):
91     tmp = lfn.replace("/store/temp/", "/", 1)
92     elif (str(lfn).find("/store/") == 0 ):
93     tmp = lfn.replace("/store/", "/", 1)
94     else:
95     tmp = lfn
96    
97     common.logger.debug("Source LFN = %s"%lfn)
98     common.logger.debug("tmp LFN = %s"%tmp)
99 fanzago 1.19 self.endpoint = self.endpoint + tmp
100     else:
101     self.endpoint = self.dest_endpoint
102 ewv 1.21
103 spiga 1.20 common.logger.info("Copy file to remote SE.\n\tEndpoint: %s"%self.endpoint)
104 fanzago 1.26
105 fanzago 1.19 dest = {"destination": self.endpoint}
106 ewv 1.21
107 fanzago 1.16 for key in to_copy.keys():
108     cmscpConfig = {
109     "source": key,
110     "inputFileList": to_copy[key],
111 fanzago 1.27 "protocol": self.protocol,
112 fanzago 1.28 #"debug":'1'
113 ewv 1.21 }
114 fanzago 1.16 cmscpConfig.update(dest)
115 fanzago 1.26
116     common.logger.debug("Source = %s"%key)
117     common.logger.debug("Files = %s"%to_copy[key])
118     common.logger.debug("CmscpConfig = %s"%str(cmscpConfig))
119 ewv 1.21
120 fanzago 1.16 results.update(self.performCopy(cmscpConfig))
121 spiga 1.1 return results
122 ewv 1.21
123 fanzago 1.17 def checkAvailableList(self):
124 spiga 1.1 '''
125 ewv 1.21 check if asked list of jobs
126     already produce output to move
127     returns a dictionary {with endpoint, fileList}
128 spiga 1.1 '''
129 ewv 1.21
130 fanzago 1.18 common.logger.debug("CopyData in checkAvailableList() ")
131 ewv 1.21
132 spiga 1.1 task=common._db.getTask(self.nj_list)
133     allMatch={}
134 fanzago 1.16 to_copy={}
135 spiga 1.20 lfn=''
136 fanzago 1.26
137     # loop over jobs
138 spiga 1.1 for job in task.jobs:
139 fanzago 1.16 InfileList = ''
140 fanzago 1.23 if ( job.runningJob['state'] == 'Cleared' and ( job.runningJob[ 'wrapperReturnCode'] == 0 or job.runningJob[ 'wrapperReturnCode'] == 60308)):
141 ewv 1.21 id_job = job['jobId']
142 spiga 1.20 common.logger.log(10-1,"job_id = %s"%str(id_job))
143 fanzago 1.26 endpoints = job.runningJob['storage']
144     output_files = job.runningJob['lfn']
145     for i in range(0,len(endpoints)):
146     endpoint = endpoints[i]
147     output_file_name = os.path.basename(output_files[i])
148     if to_copy.has_key(endpoint):
149     to_copy[endpoint] = to_copy[endpoint] + ',' + output_file_name
150     else:
151     to_copy[endpoint] = output_file_name
152     ### the lfn should be the same for every file, the only difference could be the temp dir
153     ### to test if this is ok as solution
154     lfn = os.path.dirname(output_files[0])
155    
156     """
157 fanzago 1.17 output_files = job.runningJob['lfn']
158 spiga 1.20 common.logger.log(10-1,"Output_files = %s"%str(job.runningJob['lfn']))
159 fanzago 1.26
160 fanzago 1.17 for file in output_files:
161 spiga 1.20 InfileList += '%s,'%os.path.basename(file)
162     lfn = os.path.dirname(file)
163     common.logger.log(10-1,"InfileList = %s"%str(InfileList))
164 ewv 1.21 if to_copy.has_key(endpoint):
165 fanzago 1.16 to_copy[endpoint] = to_copy[endpoint] + InfileList
166     else:
167     to_copy[endpoint] = InfileList
168 fanzago 1.26 """
169 ewv 1.21
170 spiga 1.20 elif ( job.runningJob['state'] == 'Cleared' and job.runningJob['wrapperReturnCode'] != 0):
171 spiga 1.13 common.logger.info("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
172 spiga 1.1 %(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
173 ewv 1.21 else:
174 spiga 1.13 common.logger.info("Not possible copy outputs of Job # %s : Status is %s" \
175 spiga 1.1 %(str(job['jobId']),str(job.runningJob['statusScheduler'])))
176     pass
177    
178 fanzago 1.16 if (len(to_copy) == 0) :
179 spiga 1.5 raise CrabException("No files to copy")
180 ewv 1.21
181     common.logger.debug(" to_copy = %s"%str(to_copy))
182     return lfn, to_copy
183 spiga 1.1
184     def performCopy(self, dict):
185     """
186     call the cmscp class and do the copy
187     """
188     from cmscp import cmscp
189 fanzago 1.27 self.doCopy = cmscp(dict)
190 spiga 1.1
191 spiga 1.13 common.logger.info("Starting copy...")
192 spiga 1.1
193     start = time.time()
194 fanzago 1.27 results = self.doCopy.run()
195 spiga 1.1 stop = time.time()
196    
197 spiga 1.13 common.logger.debug("CopyLocal Time: "+str(stop - start))
198 spiga 1.1
199     return results
200    
201     def parseResults(self,results):
202 ewv 1.21 '''
203 spiga 1.1 take the results dictionary and
204 ewv 1.21 print the results
205 spiga 1.1 '''
206 fanzago 1.27 self.doCopy.writeJsonFile(results)
207 ewv 1.21 for file, dict in results.items() :
208 spiga 1.1 if file:
209 ewv 1.21 txt = 'success'
210 spiga 1.1 if dict['erCode'] != '0': txt = 'failed'
211 spiga 1.20 msg = 'Copy %s for file: %s '%(txt,file)
212     if txt == 'failed': msg += '\n\tCopy failed because : %s'%dict['reason']
213 spiga 1.13 common.logger.info( msg )
214 ewv 1.21 return