Skip to content

Commit 3d25ea1

Browse files
authored
Merge pull request #1987 from hunchback/exporters-store-refactor
exporters: split storer and writer functions
2 parents 24fbea8 + 14620ba commit 3d25ea1

10 files changed

+204
-266
lines changed

contrib/exporters/allinone/allinone.yml.default

+9-8
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,20 @@ pipeline:
1818
sa:
1919
# exclude_started_flows: true
2020
store:
21+
type: buffered
22+
buffered:
23+
object_prefix: logs
24+
bucket: bucket
25+
max_flows_per_object: 6000
26+
max_seconds_per_object: 60
27+
max_seconds_per_stream: 86400
28+
max_flow_array_size: 100000
29+
write:
2130
type: s3
2231
s3:
23-
# -- client parames --
2432
endpoint: http://127.0.0.1:9000
2533
region: local
26-
bucket: bucket
2734
access_key: user
2835
secret_key: password
2936
# api_key: key
3037
# iam_endpoint: https://iam.cloud.ibm.com/identity/token
31-
object_prefix: logs
32-
# -- bulk store params --
33-
# max_flows_per_object: 6000
34-
# max_seconds_per_object: 60
35-
# max_seconds_per_stream: 86400
36-
# max_flow_array_size: 100000

contrib/exporters/core/Makefile

+4-3
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@ SRCS := \
55
filter.go \
66
pipeline.go \
77
main.go \
8-
s3_client.go \
9-
store_stdout.go \
10-
store_s3.go \
8+
store_buffered.go \
9+
store_direct.go \
1110
subscriber.go \
1211
transform.go \
12+
write_s3.go \
13+
write_stdout.go \
1314

1415
.PHONY: all
1516
all:

contrib/exporters/core/pipeline.go

+19-2
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,11 @@ type Storer interface {
6565
SetPipeline(p *Pipeline)
6666
}
6767

68+
// Writer allows uploading objects to an object storage service
69+
type Writer interface {
70+
Write(bucket, objectKey, data, contentType, contentEncoding string, metadata map[string]*string) error
71+
}
72+
6873
// Handler used for creating a phase handler from configuration
6974
type Handler = func(cfg *viper.Viper) (interface{}, error)
7075

@@ -79,6 +84,7 @@ var (
7984
EncoderHandlers HandlersMap
8085
CompressorHandlers HandlersMap
8186
StorerHandlers HandlersMap
87+
WriterHandlers HandlersMap
8288
)
8389

8490
// Register associates a handler with its' label
@@ -119,8 +125,12 @@ func init() {
119125
CompressorHandlers.Register("gzip", NewCompressGzip, false)
120126

121127
StorerHandlers = make(HandlersMap)
122-
StorerHandlers.Register("stdout", NewStoreStdout, true)
123-
StorerHandlers.Register("s3", NewStoreS3, false)
128+
StorerHandlers.Register("buffered", NewStoreBuffered, true)
129+
StorerHandlers.Register("direct", NewStoreDirect, false)
130+
131+
WriterHandlers = make(HandlersMap)
132+
WriterHandlers.Register("s3", NewWriteS3, true)
133+
WriterHandlers.Register("stdout", NewWriteStdout, false)
124134
}
125135

126136
// Pipeline manager
@@ -133,6 +143,7 @@ type Pipeline struct {
133143
Encoder Encoder
134144
Compressor Compressor
135145
Storer Storer
146+
Writer Writer
136147
}
137148

138149
// NewPipeline defines the pipeline elements
@@ -167,13 +178,19 @@ func NewPipeline(cfg *viper.Viper) (*Pipeline, error) {
167178
return nil, err
168179
}
169180

181+
writer, err := WriterHandlers.Init(cfg, "write")
182+
if err != nil {
183+
return nil, err
184+
}
185+
170186
p := &Pipeline{
171187
Transformer: transformer.(Transformer),
172188
Classifier: classifier.(Classifier),
173189
Filterer: filterer.(Filterer),
174190
Encoder: encoder.(Encoder),
175191
Compressor: compressor.(Compressor),
176192
Storer: storer.(Storer),
193+
Writer: writer.(Writer),
177194
}
178195
storer.(Storer).SetPipeline(p)
179196

contrib/exporters/core/s3_client.go

-166
This file was deleted.

0 commit comments

Comments
 (0)