ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/CopyData.py
Revision: 1.5
Committed: Wed Nov 26 15:11:08 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +7 -4 lines
Log Message:
added protection for multiple dot in the filename. better print out

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 from crab_exceptions import *
4 from crab_logger import Logger
5 import common
6 import string
7
8 class CopyData(Actor):
9 def __init__(self, cfg_params, nj_list, StatusObj):
10 """
11 constructor
12 """
13 self.cfg_params = cfg_params
14 self.nj_list = nj_list
15 if (cfg_params.get('USER.copy_data',0) == '0') :
16 msg = 'Cannot copy output locally if it has not \n'
17 msg += '\tbeen stored to SE via USER.copy_data=1'
18 raise CrabException(msg)
19
20 # update local DB
21 if StatusObj:# this is to avoid a really strange segv
22 StatusObj.query(display=False)
23
24 self.destinationTier = None
25 self.destinationDir= None
26 self.destinationTURL = None
27 target =None
28 if target:
29 if target.find('://'):
30 self.destinationTier,self.destinationDir = target.split('://')
31 elif target.find(':'):
32 if (target.find('T2_') + target.find('T1_') + target.find('T3_') >= -1) :
33 self.destinationTier=target.split(":")[0]
34 else:
35 self.destinationTURL = target
36 else:
37 self.destinationTURL = target
38 else:
39 pass
40
41
42 def run(self):
43
44 results = self.copyLocal()
45 self.parseResults( results )
46
47 return
48
49 def copyLocal(self):
50 """
51 prepare to copy the pre staged output to local
52 """
53 # FIXME DS. we'll use the proper DB info once filled..
54 # output files to be returned via sandbox or copied to SE
55 output_file = []
56 tmp = self.cfg_params.get('CMSSW.output_file',None)
57 if tmp.find(',') >= 0:
58 [output_file.append(x.strip()) for x in tmp.split(',')]
59 else: output_file.append( tmp.strip() )
60
61 InfileList = self.checkAvailableList(output_file)
62
63 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
64
65 stageout = PhEDExDatasvcInfo(self.cfg_params)
66 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
67
68 if not self.destinationTURL:
69 self.destinationTURL = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
70
71 cmscpConfig = {
72 "source": endpoint,
73 "destinationDir": self.destinationTURL,
74 "inputFileList": InfileList,
75 "protocol": 'srm-lcg',
76 "option": '-b -D srmv2 -t 2400 --verbose'
77 }
78
79 results = self.performCopy(cmscpConfig)
80
81 return results
82
83 def copyRemote(self):
84 """
85 prepare to copy from SE1 o SE2
86 """
87 results = 'work in progress'
88 '''
89 if self.destinationTier :
90 if (self.destinationDir)
91 self.outDir = Phedex(self.destinationTier, self.destinationDir)
92 else:
93 self.uno, self.lfn ... = Phedex(self.destinationTier)
94 self.outDir= self.uno + self.lfn
95 else:
96 self.outDir = self.destinationTURL
97 pass
98
99
100 # to be implemeted
101 cmscpConfig = {
102 "source": '',
103 "inputFileList":'',
104 "protocol":'',
105 "option":''
106 }
107
108 results = self.performCopy(cmscpConfig)
109 '''
110 return results
111
112 def checkAvailableList(self, output_file):
113 '''
114 check if asked list of jobs
115 already produce output to move
116 '''
117
118 # loop over jobs
119 task=common._db.getTask(self.nj_list)
120 allMatch={}
121 InfileList = ''
122 for job in task.jobs:
123 id_job = job['jobId']
124 if ( job.runningJob['status'] in ['E','UE'] and job.runningJob[ 'wrapperReturnCode'] == 0):
125 for of in output_file:
126 b=of.split('.')[-1:]
127 b = b[0]
128 a=of.split('.%s'%b)[0]
129 InfileList += '%s_%s.%s%s'%(a,id_job,b,',')
130 elif ( job.runningJob['status'] in ['E','UE'] and job.runningJob['wrapperReturnCode'] != 0):
131 common.logger.message("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
132 %(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
133 else:
134 common.logger.message("Not possible copy outputs of Job # %s : Status is %s" \
135 %(str(job['jobId']),str(job.runningJob['statusScheduler'])))
136 pass
137
138 if (InfileList == '') :
139 raise CrabException("No files to copy")
140
141 return InfileList[:-1]
142
143
144 def performCopy(self, dict):
145 """
146 call the cmscp class and do the copy
147 """
148 from cmscp import cmscp
149
150 doCopy = cmscp(dict)
151
152 start = time.time()
153 results = doCopy.run()
154 stop = time.time()
155
156 common.logger.debug(1, "CopyLocal Time: "+str(stop - start))
157 common.logger.write("CopyLocal time :"+str(stop - start))
158
159 return results
160
161 def parseResults(self,results):
162 '''
163 take the results dictionary and
164 print the results
165 '''
166
167 for file, dict in results.items() :
168 if file:
169 txt = 'success'
170 if dict['erCode'] != '0': txt = 'failed'
171 msg = 'Copy %s for file: %s \n'%(txt,file)
172 if txt == 'failed': msg += 'Copy failed because : %s'%dict['reason']
173 common.logger.message( msg )
174 return