Skip to content

Commit

Permalink
fix: fix-up context handling (#305)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Sep 8, 2021
1 parent c1c90d0 commit 9a0d7cf
Show file tree
Hide file tree
Showing 25 changed files with 92 additions and 89 deletions.
11 changes: 6 additions & 5 deletions examples/301-http-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
* Due to the lack of state, do not use HTTP sources and sinks to connect steps.
""")
.annotate("dataflow.argoproj.io/needs", "dataflow-103-http-main-source-default-secret.yaml")
.step(
(cron('*/3 * * * * *')
.cat('cron')
.http('https://http-main/sources/default', insecureSkipVerify=True, headers=[{'name': "Authorization", "value": "Bearer my-bearer-token"}])
))
.step(
(http(serviceName='http-main')
.cat('main')
.log()
))
.step(
(cron('*/3 * * * * *')
.cat('cron')
.http('https://http-main/sources/default', insecureSkipVerify=True,
headers=[{'name': "Authorization", "value": "Bearer my-bearer-token"}])
))
.save())
14 changes: 7 additions & 7 deletions examples/301-http-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ metadata:
name: 301-http
spec:
steps:
- cat: {}
name: main
sinks:
- log: {}
sources:
- http:
serviceName: http-main
- cat: {}
name: cron
sinks:
Expand All @@ -25,10 +32,3 @@ spec:
sources:
- cron:
schedule: '*/3 * * * * *'
- cat: {}
name: main
sinks:
- log: {}
sources:
- http:
serviceName: http-main
4 changes: 2 additions & 2 deletions runner/sidecar/in.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

func connectIn(ctx context.Context, sink func([]byte) error) (func(context.Context, []byte) error, error) {
func connectIn(ctx context.Context, sink func(context.Context, []byte) error) (func(context.Context, []byte) error, error) {
inFlight := promauto.NewGauge(prometheus.GaugeOpts{
Subsystem: "input",
Name: "inflight",
Expand Down Expand Up @@ -81,7 +81,7 @@ func connectIn(ctx context.Context, sink func([]byte) error) (func(context.Conte
return fmt.Errorf("failed to send to main: %q %q", resp.Status, body)
}
if resp.StatusCode == 201 {
return sink(body)
return sink(ctx, body)
}
}
return nil
Expand Down
14 changes: 7 additions & 7 deletions runner/sidecar/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ import (
runtimeutil "k8s.io/apimachinery/pkg/util/runtime"
)

func connectOut(toSinks func([]byte) error) {
connectOutFIFO(toSinks)
connectOutHTTP(toSinks)
func connectOut(ctx context.Context, sink func(context.Context, []byte) error) {
connectOutFIFO(ctx, sink)
connectOutHTTP(sink)
}

func connectOutHTTP(f func([]byte) error) {
func connectOutHTTP(sink func(context.Context, []byte) error) {
logger.Info("HTTP out interface configured")
v, err := ioutil.ReadFile(dfv1.PathAuthorization)
if err != nil {
Expand All @@ -37,7 +37,7 @@ func connectOutHTTP(f func([]byte) error) {
_, _ = w.Write([]byte(err.Error()))
return
}
if err := f(data); err != nil {
if err := sink(r.Context(), data); err != nil {
logger.Error(err, "failed to send message from main to sink")
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
Expand All @@ -47,7 +47,7 @@ func connectOutHTTP(f func([]byte) error) {
})
}

func connectOutFIFO(f func([]byte) error) {
func connectOutFIFO(ctx context.Context, sink func(context.Context, []byte) error) {
logger.Info("FIFO out interface configured")
go func() {
defer runtimeutil.HandleCrash()
Expand All @@ -63,7 +63,7 @@ func connectOutFIFO(f func([]byte) error) {
logger.Info("opened output FIFO")
scanner := bufio.NewScanner(fifo)
for scanner.Scan() {
if err := f(scanner.Bytes()); err != nil {
if err := sink(ctx, scanner.Bytes()); err != nil {
return fmt.Errorf("failed to send message from main to sink: %w", err)
}
}
Expand Down
8 changes: 4 additions & 4 deletions runner/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Exec(ctx context.Context) error {

addStopHook(patchStepStatusHook)

toSinks, err := connectSinks(ctx)
sink, err := connectSinks(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -146,7 +146,7 @@ func Exec(ctx context.Context) error {
w.WriteHeader(204)
})

connectOut(toSinks)
connectOut(ctx, sink)

server := &http.Server{Addr: "localhost:3569"}
addStopHook(func(ctx context.Context) error {
Expand Down Expand Up @@ -175,12 +175,12 @@ func Exec(ctx context.Context) error {
logger.Info("HTTPS server shutdown")
}()

toMain, err := connectIn(ctx, toSinks)
process, err := connectIn(ctx, sink)
if err != nil {
return err
}

if err := connectSources(ctx, toMain); err != nil {
if err := connectSources(ctx, process); err != nil {
return err
}

Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/sink/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, x dfv1.DBS
}, nil
}

func (d dbSink) Sink(msg []byte) error {
func (d dbSink) Sink(ctx context.Context, msg []byte) error {
tx, err := d.db.Begin()
if err != nil {
return fmt.Errorf("failed to start a transaction: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions runner/sidecar/sink/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, x dfv1.HTT
}, nil
}

func (h httpSink) Sink(msg []byte) error {
req, err := http.NewRequest("POST", h.url, bytes.NewBuffer(msg))
func (h httpSink) Sink(ctx context.Context, msg []byte) error {
req, err := http.NewRequestWithContext(ctx, "POST", h.url, bytes.NewBuffer(msg))
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion runner/sidecar/sink/interface.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package sink

import "context"

type Interface interface {
Sink(msg []byte) error
Sink(ctx context.Context, msg []byte) error
}
10 changes: 5 additions & 5 deletions runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ import (
var logger = sharedutil.NewLogger()

type producer interface {
SendMessage(msg *sarama.ProducerMessage) error
SendMessage(ctx context.Context, msg *sarama.ProducerMessage) error
io.Closer
}

type asyncProducer struct{ sarama.AsyncProducer }

func (s asyncProducer) SendMessage(msg *sarama.ProducerMessage) error {
func (s asyncProducer) SendMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
s.AsyncProducer.Input() <- msg
return nil
}

type syncProducer struct{ sarama.SyncProducer }

func (s syncProducer) SendMessage(msg *sarama.ProducerMessage) error {
func (s syncProducer) SendMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
_, _, err := s.SyncProducer.SendMessage(msg)
return err
}
Expand Down Expand Up @@ -103,8 +103,8 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, x dfv1.Kaf
}
}

func (h kafkaSink) Sink(msg []byte) error {
return h.producer.SendMessage(&sarama.ProducerMessage{Value: sarama.ByteEncoder(msg), Topic: h.topic})
func (h kafkaSink) Sink(ctx context.Context, msg []byte) error {
return h.producer.SendMessage(ctx, &sarama.ProducerMessage{Value: sarama.ByteEncoder(msg), Topic: h.topic})
}

func (h kafkaSink) Close() error {
Expand Down
4 changes: 3 additions & 1 deletion runner/sidecar/sink/log/log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package logsink

import (
"context"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
Expand All @@ -16,7 +18,7 @@ func New(x dfv1.Log) sink.Interface {
return logSink{truncate: x.Truncate}
}

func (s logSink) Sink(msg []byte) error {
func (s logSink) Sink(ctx context.Context, msg []byte) error {
text := string(msg)
if s.truncate != nil && len(text) > int(*s.truncate) {
text = text[0:*s.truncate]
Expand Down
4 changes: 2 additions & 2 deletions runner/sidecar/sink/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func New(ctx context.Context, secretInterface v1.SecretInterface, x dfv1.S3Sink)
return s3Sink{client: s3.New(options), bucket: x.Bucket}, nil
}

func (h s3Sink) Sink(msg []byte) error {
func (h s3Sink) Sink(ctx context.Context, msg []byte) error {
m := &message{}
if err := json.Unmarshal(msg, m); err != nil {
return err
Expand All @@ -82,7 +82,7 @@ func (h s3Sink) Sink(msg []byte) error {
if err != nil {
return fmt.Errorf("failed to open %q: %w", m.Path, err)
}
_, err = h.client.PutObject(context.Background(), &s3.PutObjectInput{
_, err = h.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: &h.bucket,
Key: &m.Key,
Body: f,
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/sink/stan/stan.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, namespace,
}, nil
}

func (s stanSink) Sink(msg []byte) error {
func (s stanSink) Sink(ctx context.Context, msg []byte) error {
return s.conn.Publish(s.subject, msg)
}

Expand Down
4 changes: 3 additions & 1 deletion runner/sidecar/sink/volume/volume.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package volume

import (
"context"

"github.com/argoproj-labs/argo-dataflow/runner/sidecar/sink"
)

Expand All @@ -10,6 +12,6 @@ func New() (sink.Interface, error) {
return volumeSink{}, nil
}

func (h volumeSink) Sink([]byte) error {
func (h volumeSink) Sink(ctx context.Context, msg []byte) error {
return nil
}
6 changes: 3 additions & 3 deletions runner/sidecar/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/paulbellamy/ratecounter"
)

func connectSinks(ctx context.Context) (func([]byte) error, error) {
func connectSinks(ctx context.Context) (func(context.Context, []byte) error, error) {
sinks := map[string]sink.Interface{}
rateCounters := map[string]*ratecounter.RateCounter{}
for _, sink := range step.Spec.Sinks {
Expand Down Expand Up @@ -79,14 +79,14 @@ func connectSinks(ctx context.Context) (func([]byte) error, error) {
}
}

return func(msg []byte) error {
return func(ctx context.Context, msg []byte) error {
for sinkName, f := range sinks {
counter := rateCounters[sinkName]
counter.Incr(1)
withLock(func() {
step.Status.SinkStatues.IncrTotal(sinkName, replica, rateToResourceQuantity(counter), uint64(len(msg)))
})
if err := f.Sink(msg); err != nil {
if err := f.Sink(ctx, msg); err != nil {
withLock(func() { step.Status.SinkStatues.IncrErrors(sinkName, replica) })
return err
}
Expand Down
4 changes: 2 additions & 2 deletions runner/sidecar/source/cron/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type cronSource struct {
crn *cron.Cron
}

func New(x dfv1.Cron, f source.Func) (source.Interface, error) {
func New(ctx context.Context, x dfv1.Cron, process source.Process) (source.Interface, error) {
crn := cron.New(
cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)),
cron.WithChain(cron.Recover(logger)),
Expand All @@ -32,7 +32,7 @@ func New(x dfv1.Cron, f source.Func) (source.Interface, error) {

_, err := crn.AddFunc(x.Schedule, func() {
msg := []byte(time.Now().Format(x.Layout))
if err := f(context.Background(), msg); err != nil {
if err := process(ctx, msg); err != nil {
logger.Error(err, "failed to process message")
}
})
Expand Down
4 changes: 2 additions & 2 deletions runner/sidecar/source/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type dbSource struct {
db *sql.DB
}

func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterName, namespace, pipelineName, stepName string, replica int, sourceName string, x dfv1.DBSource, f source.Func) (source.Interface, error) {
func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterName, namespace, pipelineName, stepName, sourceName string, x dfv1.DBSource, process source.Process) (source.Interface, error) {
dataSource, err := getDataSource(ctx, secretInterface, x)
if err != nil {
return nil, fmt.Errorf("failed to find data source: %w", err)
Expand Down Expand Up @@ -74,7 +74,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, clusterNam
if err != nil {
return fmt.Errorf("failed to marshal to json: %w", err)
}
if err := f(ctx, jsonData); err != nil {
if err := process(ctx, jsonData); err != nil {
return fmt.Errorf("failed to process data: %w", err)
}
offset = fmt.Sprintf("%v", d[x.OffsetColumn])
Expand Down
5 changes: 2 additions & 3 deletions runner/sidecar/source/http/http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package http

import (
"context"
"io/ioutil"
"net/http"

Expand All @@ -12,7 +11,7 @@ type httpSource struct {
ready bool
}

func New(sourceName, authorization string, f source.Func) source.Interface {
func New(sourceName, authorization string, process source.Process) source.Interface {
h := &httpSource{ready: true}
http.HandleFunc("/sources/"+sourceName, func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get("Authorization") != authorization {
Expand All @@ -30,7 +29,7 @@ func New(sourceName, authorization string, f source.Func) source.Interface {
_, _ = w.Write([]byte(err.Error()))
return
}
if err := f(context.Background(), msg); err != nil {
if err := process(r.Context(), msg); err != nil {
w.WriteHeader(500)
_, _ = w.Write([]byte(err.Error()))
} else {
Expand Down
12 changes: 6 additions & 6 deletions runner/sidecar/source/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
)

type handler struct {
f source.Func
process source.Process
i int
manualCommit bool
}
Expand All @@ -25,14 +25,14 @@ func (handler) Cleanup(_ sarama.ConsumerGroupSession) error {
func (h handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
logger.Info("starting consuming claim", "partition", claim.Partition())
for msg := range claim.Messages() {
if err := h.f(sess.Context(), msg.Value); err != nil {
if err := h.process(sess.Context(), msg.Value); err != nil {
logger.Error(err, "failed to process message")
} else {
sess.MarkMessage(msg, "")
}
h.i++
if h.manualCommit && h.i%dfv1.CommitN == 0 {
sess.Commit()
h.i++
if h.manualCommit && h.i%dfv1.CommitN == 0 {
sess.Commit()
}
}
}
return nil
Expand Down
Loading

0 comments on commit 9a0d7cf

Please sign in to comment.