Skip to content

Commit 9392281

Browse files
committed
pipeline work
1 parent 525de68 commit 9392281

File tree

4 files changed

+149
-68
lines changed

4 files changed

+149
-68
lines changed

Diff for: external/predictor/python/seldon/fileutil.py

+91-34
Original file line numberDiff line numberDiff line change
@@ -12,73 +12,58 @@
1212

1313
class FileUtil:
1414

15+
def __init__(self, key = None, secret = None):
16+
self.key = key
17+
self.secret = secret
18+
1519
def stream_decompress(self,stream):
1620
dec = zlib.decompressobj(16+zlib.MAX_WBITS) # same as gzip module
1721
for chunk in stream:
1822
rv = dec.decompress(chunk)
1923
if rv:
2024
yield rv
2125

22-
def stream_text(self,k,cl):
26+
def stream_text(self,k,fn):
2327
unfinished = ""
2428
for data in k:
2529
data = unfinished + data
2630
lines = data.split("\n");
2731
unfinished = lines.pop()
2832
for line in lines:
29-
cl.process(line)
33+
fn(line)
3034

31-
def stream_gzip(self,k,cl):
35+
def stream_gzip(self,k,fn):
3236
unfinished = ""
3337
for data in self.stream_decompress(k):
3438
data = unfinished + data
3539
lines = data.split("\n");
3640
unfinished = lines.pop()
3741
for line in lines:
38-
cl.process(line)
42+
fn(line)
3943

40-
'''
41-
Local File Stream
42-
'''
43-
class LocalFileUtil(FileUtil):
44-
4544
def getFolders(self,baseFolder,startDay,numDays):
4645
folders = []
4746
for day in range(startDay-numDays+1,startDay+1):
4847
folders.append(baseFolder+str(day)+"/*")
4948
return folders
5049

5150

52-
def stream(self,folders,cl):
51+
def stream_local(self,folders,fn):
5352
for folder in folders:
5453
for f in glob.glob(folder):
5554
k = open(f,"r")
5655
if f.endswith(".gz"):
57-
self.stream_gzip(k,cl)
56+
self.stream_gzip(k,fn)
5857
else:
59-
self.stream_text(k,cl)
58+
self.stream_text(k,fn)
6059

61-
62-
63-
def copy(self,fromPath,toPath):
60+
def copy_local(self,fromPath,toPath):
6461
print "copy ",fromPath,"to",toPath
6562
dir = os.path.dirname(toPath)
66-
if not os.path.exists(dir):
63+
if len(dir) > 0 and not os.path.exists(dir):
6764
os.makedirs(dir)
6865
copyfile(fromPath,toPath)
6966

70-
'''
71-
AWS S3 File Stream
72-
'''
73-
class S3FileUtil(FileUtil):
74-
75-
def __init__(self, key = None, secret = None):
76-
self.key = key
77-
self.secret = secret
78-
if key:
79-
self.conn = boto.connect_s3(key,secret)
80-
else:
81-
self.conn = boto.connect_s3()
8267

8368
def getGlob(self,startDay,numDays):
8469
g = "{" + str(startDay)
@@ -87,16 +72,24 @@ def getGlob(self,startDay,numDays):
8772
g += "}"
8873
return g
8974

90-
def stream(self,bucket,prefix,cl):
75+
def stream_s3(self,bucket,prefix,fn):
76+
if self.key:
77+
self.conn = boto.connect_s3(self.key,self.secret)
78+
else:
79+
self.conn = boto.connect_s3()
9180
b = self.conn.get_bucket(bucket)
9281
for k in b.list(prefix=prefix):
9382
print k.name
9483
if k.name.endswith(".gz"):
95-
self.stream_gzip(k,cl)
84+
self.stream_gzip(k,fn)
9685
else:
97-
self.stream_text(k,cl)
86+
self.stream_text(k,fn)
9887

99-
def copy(self,fromPath,bucket,path):
88+
def copy_s3(self,fromPath,bucket,path):
89+
if self.key:
90+
self.conn = boto.connect_s3(self.key,self.secret)
91+
else:
92+
self.conn = boto.connect_s3()
10093
print fromPath, bucket, path
10194
b = self.conn.get_bucket(bucket)
10295
source_size = os.stat(fromPath).st_size
@@ -115,5 +108,69 @@ def copy(self,fromPath,bucket,path):
115108
# Finish the upload
116109
print "completing transfer to s3"
117110
mp.complete_upload()
118-
# k = b.new_key(path)
119-
# k.set_contents_from_filename(fromPath)
111+
112+
def download_s3(self,bucket,s3path,localPath):
113+
if self.key:
114+
self.conn = boto.connect_s3(self.key,self.secret)
115+
else:
116+
self.conn = boto.connect_s3()
117+
print bucket, s3path, localPath
118+
b = self.conn.get_bucket(bucket)
119+
key = b.get_key(s3path)
120+
key.get_contents_to_filename(localPath)
121+
122+
def stream(self,inputPath,fn):
123+
if inputPath.startswith("s3n://"):
124+
isS3 = True
125+
inputPath = inputPath[6:]
126+
elif inputPath.startswith("s3://"):
127+
isS3 = True
128+
inputPath = inputPath[5:]
129+
else:
130+
isS3 = False
131+
if isS3:
132+
print "AWS S3 input path ",inputPath
133+
parts = inputPath.split('/')
134+
bucket = parts[0]
135+
prefix = inputPath[len(bucket)+1:]
136+
self.stream_s3(bucket,prefix,fn)
137+
else:
138+
folders = [inputPath+"*"]
139+
print "local input folders: ",folders
140+
self.stream_local(folders,fn)
141+
142+
def upload(self,path,outputPath):
143+
if outputPath.startswith("s3n://"):
144+
noSchemePath = outputPath[6:]
145+
isS3 = True
146+
elif outputPath.startswith("s3://"):
147+
noSchemePath = outputPath[5:]
148+
isS3 = True
149+
else:
150+
isS3 = False
151+
if isS3:
152+
parts = noSchemePath.split('/')
153+
bucket = parts[0]
154+
opath = noSchemePath[len(bucket)+1:]
155+
self.copy_s3(path,bucket,opath)
156+
else:
157+
self.copy_local(path,outputPath)
158+
159+
def download(self,fromPath,toPath):
160+
if fromPath.startswith("s3n://"):
161+
isS3 = True
162+
fromPath = fromPath[6:]
163+
elif fromPath.startswith("s3://"):
164+
isS3 = True
165+
fromPath = inputPath[5:]
166+
else:
167+
isS3 = False
168+
if isS3:
169+
print "AWS S3 input path ",fromPath
170+
parts = fromPath.split('/')
171+
bucket = parts[0]
172+
prefix = fromPath[len(bucket)+1:]
173+
self.download_s3(bucket,prefix,toPath)
174+
else:
175+
self.copy_local(fromPath,toPath)
176+
+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import seldon.fileutil as fu
2+
3+
class Feature_transform(object):
4+
5+
def upload(self,fromPath,fromPath):
6+
fu.save(fromPath,toPath)
7+
8+
def download(self,fromPath,toPath):
9+
fu.download(fromPath,toPath)
10+
11+
def save(self,folder):
12+
print "no model to save"
13+
14+
def load(self,folder):
15+
print "no model to load"
16+
17+
18+
class Pipeline(object):
19+
20+
def __init__(self,models_folder="./models"):
21+
self.pipeline = []
22+
self.models_folder = models_folder
23+
self.objs = []
24+
25+
def add(self,feature_transform):
26+
self.pipeline.append(feature_transform)
27+
28+
def process(self,line):
29+
j = json.loads(line)
30+
self.objs.append(j)
31+
32+
def getFeatures(self,location):
33+
fu.stream(location,self.process)
34+
35+
def transform(self,featureLocation):
36+
self.getFeatures(featuresLocation)
37+
for ft in pipeline:
38+
ft.load(self.models_folder)
39+
objs = ft.transform(objs)
40+
41+
def fit_transform(self):
42+
self.getFeatures(featuresLocation)
43+
for ft in pipeline:
44+
ft.fit(objs)
45+
objs = ft.transform(objs)
46+
ft.save(self.models_folder)

Diff for: external/predictor/python/seldon/vw.py

+9-31
Original file line numberDiff line numberDiff line change
@@ -115,49 +115,27 @@ def train(self,client,conf):
115115
self.create_vw(conf)
116116
self.features = conf.get('features',{})
117117
self.fns = conf.get('namespaces',{})
118+
#stream data into vw
118119
inputPath = conf["inputPath"] + "/" + client + "/features/" + str(conf['day']) + "/"
119120
print "inputPath->",inputPath
120121
if inputPath.startswith("s3n://"):
121122
isS3 = True
122-
inputPath = inputPath[6:]
123123
elif inputPath.startswith("s3://"):
124124
isS3 = True
125-
inputPath = inputPath[5:]
126125
else:
127126
isS3 = False
128-
if isS3:
129-
fileUtil = S3FileUtil(self.awsKey,self.awsSecret)
130-
print "AWS S3 input path ",inputPath
131-
parts = inputPath.split('/')
132-
bucket = parts[0]
133-
prefix = inputPath[len(bucket)+1:]
134-
fileUtil.stream(bucket,prefix,self)
135-
else:
136-
fileUtil = LocalFileUtil()
137-
folders = [inputPath+"*"]
138-
print "local input folders: ",folders
139-
fileUtil.stream(folders,self)
127+
fileUtil = FileUtil(key=self.awsKey,secret=self.awsSecret)
128+
fileUtil.stream(inputPath,self.process)
129+
# save vw model
140130
self.vw2.save_model("./model")
141131
self.vw2.close()
142132
print "lines processed ",self.numLinesProcessed
143-
# push model to output path on s3 or local
133+
# copy models to final location
144134
outputPath = conf["outputPath"] + "/" + client + "/vw/" + str(conf["day"])
145135
print "outputPath->",outputPath
146-
if outputPath.startswith("s3n://"):
147-
isS3 = True
148-
else:
149-
isS3 = False
150-
if isS3:
151-
noSchemePath = outputPath[6:]
152-
parts = noSchemePath.split('/')
153-
bucket = parts[0]
154-
path = noSchemePath[len(bucket)+1:]
155-
fileUtil = S3FileUtil(self.awsKey,self.awsSecret)
156-
fileUtil.copy("./model",bucket,path+"/model")
157-
fileUtil.copy("./model.readable",bucket,path+"/model.readable")
158-
else:
159-
fileUtil = LocalFileUtil()
160-
fileUtil.copy("./model",outputPath+"/model")
161-
fileUtil.copy("./model.readable",outputPath+"/model.readable")
136+
137+
fileUtil.upload("./model",outputPath+"/model")
138+
fileUtil.upload("./model.readable",outputPath+"/model.readable")
139+
162140
if "activate" in conf and conf["activate"]:
163141
self.activateModel(client,str(outputPath))

Diff for: scripts/zookeeper/set-client-config.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,11 @@ def activateModel(args,folder,zk):
3535

3636
for line in sys.stdin:
3737
line = line.rstrip()
38-
parts = line.split('\t')
39-
if len(parts) == 3 and not line.startswith("#"):
38+
parts = line.split()
39+
if not line.startswith("#"):
4040
clients = parts[0].split(',')
4141
node = parts[1]
42-
value = parts[2]
42+
value = " ".join(parts[2:])
4343
print "--------------------------"
4444
print parts[0],node,"->",value
4545
for client in clients:

0 commit comments

Comments
 (0)