ViewVC Help
View File | Revision Log | Show Annotations | Root Listing
root/cvsroot/COMP/CRAB/python/CopyLocal.py
Revision: 1.4
Committed: Tue Nov 11 13:42:15 2008 UTC (16 years, 5 months ago) by spiga
Content type: text/x-python
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +0 -0 lines
State: FILE REMOVED
Log Message:
renamed as copyData

File Contents

# Content
1 from Actor import *
2 from crab_util import *
3 from crab_exceptions import *
4 from crab_logger import Logger
5 import common
6 import string
7
8 class CopyLocal(Actor):
9 def __init__(self, cfg_params, nj_list):
10 """
11 constructor
12 """
13 self.cfg_params = cfg_params
14 self.nj_list = nj_list
15 if (cfg_params.get('USER.copy_data',0) == '0') :
16 raise CrabException("Cannot copy output locally if it has not \
17 been stored to SE via USER.copy_data=1")
18
19 def run(self):
20
21 results = self.copyLocal()
22
23 self.parseResults( results )
24
25 return
26
27 def copyLocal(self):
28 """
29 prepare to copy the pre staged output to local
30 """
31 from PhEDExDatasvcInfo import PhEDExDatasvcInfo
32
33 stageout = PhEDExDatasvcInfo(self.cfg_params)
34 endpoint, lfn, SE, SE_PATH, user = stageout.getEndpoint()
35
36 # FIXME DS. we'll use the proper DB info once filled..
37 # output files to be returned via sandbox or copied to SE
38 self.output_file = []
39 tmp = self.cfg_params.get('CMSSW.output_file',None)
40 if tmp.find(',') >= 0:
41 [self.output_file.append(x.strip()) for x in tmp.split(',')]
42 else: self.output_file.append( tmp.strip() )
43
44 # loop over jobs
45 task=common._db.getTask(self.nj_list)
46 allMatch={}
47 InfileList = ''
48 for job in task.jobs:
49 id_job = job['jobId']
50 if ( job.runningJob['status'] in ['E','UE'] and job.runningJob[ 'wrapperReturnCode'] == 0):
51 for of in self.output_file:
52 a,b=of.split('.')
53 InfileList = '%s_%s.%s%s'%(a,id_job,b,',')
54 elif ( job.runningJob['status'] in ['E','UE'] and job.runningJob['wrapperReturnCode'] != 0):
55 common.logger.message("Not possible copy outputs of Job # %s : Wrapper Exit Code is %s" \
56 %(str(job['jobId']),str(job.runningJob['wrapperReturnCode'])))
57 else:
58 common.logger.message("Not possible copy outputs of Job # %s : Status is %s" \
59 %(str(job['jobId']),str(job.runningJob['statusScheduler'])))
60 pass
61
62 if (InfileList == '') :
63 raise CrabException("No files to be copyed")
64
65 self.outDir = self.cfg_params.get('USER.outputdir' ,common.work_space.resDir())
66 print InfileList
67 cmscpConfig = {
68 "source": endpoint,
69 "destinationDir": self.outDir,
70 "inputFileList": InfileList[:-1],
71 "protocol": 'srm-lcg',
72 "option": '-b -D srmv2 -t 2400 --verbose'
73 }
74
75 results = self.performCopy(cmscpConfig)
76
77 return results
78
79 def copyRemote(self):
80 """
81 prepare to copy from local to SE
82 """
83 results = 'still to be implemented'
84
85 # to be implemeted
86 cmscpConfig = {
87 "source": '',
88 "inputFileList":'',
89 "protocol":'',
90 "option":''
91 }
92
93 # results = self.performCopy(cmscpConfig)
94
95 return results
96
97 def performCopy(self, dict):
98 """
99 call the cmscp class and do the copy
100 """
101 from cmscp import cmscp
102
103 doCopy = cmscp(dict)
104
105 start = time.time()
106 results = doCopy.run()
107 stop = time.time()
108
109 common.logger.debug(1, "CopyLocal Time: "+str(stop - start))
110 common.logger.write("CopyLocal time :"+str(stop - start))
111
112 return results
113
114 def parseResults(self,results):
115 '''
116 take the results dictionary and
117 print the results
118 '''
119 for file, dict in results :
120 if file:
121 txt = 'success'
122 if dict['erCode'] != 'failed':
123 msg = 'Copy %s for file: %s \n'%(txt,file)
124 if txt == 'failed': msg += 'Copy failed because : %s'%dict['reason']
125 common.logger.message( msg )
126 return