ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/CopyData.py
Revision: 1.5
Committed: Wed Nov 26 15:11:08 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
Changes since 1.4: +7 -4 lines
Log Message:
added protection for multiple dot in the filename. better print out

File Contents

# User Rev Content
1 spiga 1.1 from Actor import *
2     from crab_util import *
3     from crab_exceptions import *
4     from crab_logger import Logger
5     import common
6     import string
7    
8     class CopyData(Actor):
9     def __init__(self, cfg_params, nj_list, StatusObj):
10     """
11     constructor
12     """
13     self.cfg_params = cfg_params
14     self.nj_list = nj_list
15     if (cfg_params.get('USER.copy_data',0) == '0') :
16 spiga 1.5 msg = 'Cannot copy output locally if it has not \n'
17     msg += '\tbeen stored to SE via USER.copy_data=1'
18     raise CrabException(msg)
19 spiga 1.3
20 spiga 1.1 # update local DB
21 spiga 1.3 if StatusObj:# this is to avoid a really strange segv
22     StatusObj.query(display=False)
23 spiga 1.1
24     self.destinationTier = None
25     self.destinationDir= None
26     self.destinationTURL = None
27     target =None
28     if target:
29     if target.find('://'):
30     self.destinationTier,self.destinationDir = target.split('://')
31     elif target.find(':'):
32     if (target.find('T2_') + target.find('T1_') + target.find('T3_') >= -1) :
33     self.destinationTier=target.split(":")[0]
34     else:
35     self.destinationTURL = target
36     else:
37     self.destinationTURL = target
38     else:
39     pass
40    
41    
42     def run(self):
43    
44     results = self.copyLocal()
45     self.parseResults( results )
46    
47     return
48    
49     def copyLocal(self):
50     """
51     prepare to copy the pre staged output to local
52     """
53 spiga 1.4 # FIXME DS. we'll use the proper DB info once filled..
54     # output files to be returned via sandbox or copied to SE
55     output_file = []
56     tmp = self.cfg_params.get('CMSSW.output_file',None)
57     if tmp.find(',') >= 0:
58     [output_file.append(x.strip()) for x in tmp.split(',')]
59     else: output_file.append( tmp.strip() )
60    
61     InfileList = self.checkAvailableList(output_file)
62 spiga 1.2
63 spiga 1.1 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
64    
65     stageout = PhEDExDatasvcInfo(self.cfg_params)
66     endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
67    
68     if not self.destinationTURL:
69     self.destinationTURL = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
70    
71     cmscpConfig = {
72     "source": endpoint,
73     "destinationDir": self.destinationTURL,
74     "inputFileList": InfileList,
75     "protocol": 'srm-lcg',
76     "option": '-b -D srmv2 -t 2400 --verbose'
77     }
78    
79     results = self.performCopy(cmscpConfig)
80    
81     return results
82    
83     def copyRemote(self):
84     """
85     prepare to copy from SE1 o SE2
86     """
87     results = 'work in progress'
88     '''
89     if self.destinationTier :
90     if (self.destinationDir)
91     self.outDir = Phedex(self.destinationTier, self.destinationDir)
92     else:
93     self.uno, self.lfn ... = Phedex(self.destinationTier)
94     self.outDir= self.uno + self.lfn
95     else:
96     self.outDir = self.destinationTURL
97     pass
98    
99    
100     # to be implemeted
101     cmscpConfig = {
102     "source": '',
103     "inputFileList":'',
104     "protocol":'',
105     "option":''
106     }
107    
108     results = self.performCopy(cmscpConfig)
109     '''
110     return results
111    
112 spiga 1.4 def checkAvailableList(self, output_file):
113 spiga 1.1 '''
114     check if asked list of jobs
115     already produce output to move
116     '''
117    
118     # loop over jobs
119     task=common._db.getTask(self.nj_list)
120     allMatch={}
121     InfileList = ''
122     for job in task.jobs:
123     id_job = job['jobId']
124     if ( job.runningJob['status'] in ['E','UE'] and job.runningJob[ 'wrapperReturnCode'] == 0):
125 spiga 1.4 for of in output_file:
126 spiga 1.5 b=of.split('.')[-1:]
127     b = b[0]
128     a=of.split('.%s'%b)[0]
129 spiga 1.1 InfileList += '%s_%s.%s%s'%(a,id_job,b,',')
130     elif ( job.runningJob['status'] in ['E','UE'] and job.runningJob['wrapperReturnCode'] != 0):
131     common.logger.message("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
132     %(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
133     else:
134     common.logger.message("Not possible copy outputs of Job # %s : Status is %s" \
135     %(str(job['jobId']),str(job.runningJob['statusScheduler'])))
136     pass
137    
138     if (InfileList == '') :
139 spiga 1.5 raise CrabException("No files to copy")
140 spiga 1.1
141     return InfileList[:-1]
142    
143    
144     def performCopy(self, dict):
145     """
146     call the cmscp class and do the copy
147     """
148     from cmscp import cmscp
149    
150     doCopy = cmscp(dict)
151    
152     start = time.time()
153     results = doCopy.run()
154     stop = time.time()
155    
156     common.logger.debug(1, "CopyLocal Time: "+str(stop - start))
157     common.logger.write("CopyLocal time :"+str(stop - start))
158    
159     return results
160    
161     def parseResults(self,results):
162     '''
163     take the results dictionary and
164     print the results
165     '''
166    
167     for file, dict in results.items() :
168     if file:
169     txt = 'success'
170     if dict['erCode'] != '0': txt = 'failed'
171     msg = 'Copy %s for file: %s \n'%(txt,file)
172     if txt == 'failed': msg += 'Copy failed because : %s'%dict['reason']
173     common.logger.message( msg )
174     return