ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/CopyData.py
Revision: 1.4
Committed: Tue Nov 11 18:30:15 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: CRAB_2_4_3_pre3, CRAB_2_4_3_pre2, CRAB_2_4_3_pre1, CRAB_2_4_2
Changes since 1.3: +11 -12 lines
Log Message:
minor fix and improvements

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 from crab_exceptions import *
4 from crab_logger import Logger
5 import common
6 import string
7
8 class CopyData(Actor):
9 def __init__(self, cfg_params, nj_list, StatusObj):
10 """
11 constructor
12 """
13 self.cfg_params = cfg_params
14 self.nj_list = nj_list
15 if (cfg_params.get('USER.copy_data',0) == '0') :
16 raise CrabException("Cannot copy output locally if it has not \
17 been stored to SE via USER.copy_data=1")
18
19 # update local DB
20 if StatusObj:# this is to avoid a really strange segv
21 StatusObj.query(display=False)
22
23 self.destinationTier = None
24 self.destinationDir= None
25 self.destinationTURL = None
26 target =None
27 if target:
28 if target.find('://'):
29 self.destinationTier,self.destinationDir = target.split('://')
30 elif target.find(':'):
31 if (target.find('T2_') + target.find('T1_') + target.find('T3_') >= -1) :
32 self.destinationTier=target.split(":")[0]
33 else:
34 self.destinationTURL = target
35 else:
36 self.destinationTURL = target
37 else:
38 pass
39
40
41 def run(self):
42
43 results = self.copyLocal()
44 self.parseResults( results )
45
46 return
47
48 def copyLocal(self):
49 """
50 prepare to copy the pre staged output to local
51 """
52 # FIXME DS. we'll use the proper DB info once filled..
53 # output files to be returned via sandbox or copied to SE
54 output_file = []
55 tmp = self.cfg_params.get('CMSSW.output_file',None)
56 if tmp.find(',') >= 0:
57 [output_file.append(x.strip()) for x in tmp.split(',')]
58 else: output_file.append( tmp.strip() )
59
60 InfileList = self.checkAvailableList(output_file)
61
62 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
63
64 stageout = PhEDExDatasvcInfo(self.cfg_params)
65 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
66
67 if not self.destinationTURL:
68 self.destinationTURL = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
69
70 cmscpConfig = {
71 "source": endpoint,
72 "destinationDir": self.destinationTURL,
73 "inputFileList": InfileList,
74 "protocol": 'srm-lcg',
75 "option": '-b -D srmv2 -t 2400 --verbose'
76 }
77
78 results = self.performCopy(cmscpConfig)
79
80 return results
81
82 def copyRemote(self):
83 """
84 prepare to copy from SE1 o SE2
85 """
86 results = 'work in progress'
87 '''
88 if self.destinationTier :
89 if (self.destinationDir)
90 self.outDir = Phedex(self.destinationTier, self.destinationDir)
91 else:
92 self.uno, self.lfn ... = Phedex(self.destinationTier)
93 self.outDir= self.uno + self.lfn
94 else:
95 self.outDir = self.destinationTURL
96 pass
97
98
99 # to be implemeted
100 cmscpConfig = {
101 "source": '',
102 "inputFileList":'',
103 "protocol":'',
104 "option":''
105 }
106
107 results = self.performCopy(cmscpConfig)
108 '''
109 return results
110
111 def checkAvailableList(self, output_file):
112 '''
113 check if asked list of jobs
114 already produce output to move
115 '''
116
117 # loop over jobs
118 task=common._db.getTask(self.nj_list)
119 allMatch={}
120 InfileList = ''
121 for job in task.jobs:
122 id_job = job['jobId']
123 if ( job.runningJob['status'] in ['E','UE'] and job.runningJob[ 'wrapperReturnCode'] == 0):
124 for of in output_file:
125 a,b=of.split('.')
126 InfileList += '%s_%s.%s%s'%(a,id_job,b,',')
127 elif ( job.runningJob['status'] in ['E','UE'] and job.runningJob['wrapperReturnCode'] != 0):
128 common.logger.message("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
129 %(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
130 else:
131 common.logger.message("Not possible copy outputs of Job # %s : Status is %s" \
132 %(str(job['jobId']),str(job.runningJob['statusScheduler'])))
133 pass
134
135 if (InfileList == '') :
136 raise CrabException("No files to be copyed")
137
138 return InfileList[:-1]
139
140
141 def performCopy(self, dict):
142 """
143 call the cmscp class and do the copy
144 """
145 from cmscp import cmscp
146
147 doCopy = cmscp(dict)
148
149 start = time.time()
150 results = doCopy.run()
151 stop = time.time()
152
153 common.logger.debug(1, "CopyLocal Time: "+str(stop - start))
154 common.logger.write("CopyLocal time :"+str(stop - start))
155
156 return results
157
158 def parseResults(self,results):
159 '''
160 take the results dictionary and
161 print the results
162 '''
163
164 for file, dict in results.items() :
165 if file:
166 txt = 'success'
167 if dict['erCode'] != '0': txt = 'failed'
168 msg = 'Copy %s for file: %s \n'%(txt,file)
169 if txt == 'failed': msg += 'Copy failed because : %s'%dict['reason']
170 common.logger.message( msg )
171 return