Skip to content

Commit 50966b2

Browse files
committed
introduce runtime struct and move context to merger
1 parent d476537 commit 50966b2

File tree

3 files changed

+151
-108
lines changed

3 files changed

+151
-108
lines changed

merger.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,29 @@
11
package pipe
22

33
import (
4+
"context"
45
"sync"
56
)
67

78
type (
89
// errorMerger allows to listen to multiple error channels.
910
errorMerger struct {
11+
ctx context.Context
1012
wg sync.WaitGroup
1113
errorChan chan error
1214
}
1315
)
1416

17+
func newErrorMerger(ctx context.Context) *errorMerger {
18+
return &errorMerger{
19+
ctx: ctx,
20+
errorChan: make(chan error, 1),
21+
}
22+
}
23+
1524
// add error channels from all components into one.
16-
func (m *errorMerger) add(errc <-chan error) {
25+
func (m *errorMerger) add(e executor) {
26+
errc := start(m.ctx, e)
1727
// function to wait for error channel
1828
m.wg.Add(1)
1929
go m.listen(errc)

pipe.go

+115-107
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,19 @@ import (
1414
type (
1515
// Pipe is a graph formed with multiple lines of bound DSP components.
1616
Pipe struct {
17-
ctx context.Context
1817
mctx mutable.Context
1918
bufferSize int
2019
// async lines have runner per component
2120
// sync lines always wrapped in multiLineExecutor
2221
mutationsChan chan []mutable.Mutation
23-
pusher mutable.Pusher
24-
executors map[mutable.Context]executor
25-
errorMerger
26-
routes []*route
22+
runtime
23+
}
24+
25+
runtime struct {
26+
merger *errorMerger
27+
routes []*route
28+
executors map[mutable.Context]executor
29+
pusher mutable.Pusher
2730
}
2831

2932
// Source is a source of signal data. Optinaly, mutability can be
@@ -83,6 +86,22 @@ type (
8386
FlushFunc func(ctx context.Context) error
8487
)
8588

89+
// Run executes the pipe in a single goroutine, sequentially.
90+
func Run(ctx context.Context, bufferSize int, lines ...Line) error {
91+
e := multiLineExecutor{}
92+
mctx := mutable.Mutable()
93+
for i, l := range lines {
94+
l.Context = mctx
95+
r, err := l.route(bufferSize)
96+
if err != nil {
97+
return err
98+
}
99+
r.connect(bufferSize)
100+
e.executors = append(e.executors, r.executor(nil, i))
101+
}
102+
return run(ctx, &e)
103+
}
104+
86105
// New returns a new Pipe that binds multiple lines using the provided
87106
// buffer size.
88107
func New(bufferSize int, lines ...Line) (*Pipe, error) {
@@ -102,45 +121,93 @@ func New(bufferSize int, lines ...Line) (*Pipe, error) {
102121
mctx: mutable.Mutable(),
103122
mutationsChan: make(chan []mutable.Mutation, 1),
104123
bufferSize: bufferSize,
105-
routes: routes,
106-
pusher: mutable.NewPusher(),
124+
runtime: newRuntime(routes),
107125
}, nil
108126
}
109127

110-
// Run executes the pipe in a single goroutine, sequentially.
111-
func Run(ctx context.Context, bufferSize int, lines ...Line) error {
112-
e := multiLineExecutor{}
113-
mctx := mutable.Mutable()
114-
for i, l := range lines {
115-
l.Context = mctx
116-
r, err := l.route(bufferSize)
117-
if err != nil {
118-
return err
128+
func newRuntime(routes []*route) runtime {
129+
rt := runtime{
130+
routes: routes,
131+
pusher: mutable.NewPusher(),
132+
executors: make(map[mutable.Context]executor),
133+
}
134+
for idx := range rt.routes {
135+
if routes[idx].context.IsMutable() {
136+
rt.registerSyncRoute(idx)
137+
continue
119138
}
120-
r.connect(bufferSize)
121-
e.executors = append(e.executors, r.executor(nil, i))
139+
rt.registerAsyncRoute(idx)
122140
}
123-
return run(ctx, &e)
141+
return rt
142+
}
143+
144+
// adds executors from the route
145+
func (rt *runtime) addRoute(r *route) int {
146+
idx := len(rt.routes)
147+
rt.routes = append(rt.routes, r)
148+
return idx
149+
}
150+
151+
// add route to multiline executor
152+
func (rt *runtime) registerSyncRoute(idx int) {
153+
// add to existing multiline executor
154+
r := rt.routes[idx]
155+
if e, ok := rt.executors[r.context]; ok {
156+
mle := e.(*multiLineExecutor)
157+
mle.executors = append(mle.executors, r.executor(mle.Destination, idx))
158+
return
159+
}
160+
161+
// new multiline executor
162+
d := mutable.NewDestination()
163+
rt.pusher.AddDestination(r.context, d)
164+
e := multiLineExecutor{
165+
Context: r.context,
166+
Destination: d,
167+
executors: []*lineExecutor{r.executor(d, idx)},
168+
}
169+
rt.executors[r.context] = &e
170+
}
171+
172+
func (rt *runtime) registerAsyncRoute(idx int) {
173+
d := mutable.NewDestination()
174+
r := rt.routes[idx]
175+
r.source.dest = d
176+
rt.pusher.AddDestination(r.source.Context, d)
177+
rt.executors[r.source.Context] = r.source
178+
for i := range r.processors {
179+
rt.pusher.AddDestination(r.processors[i].Context, d)
180+
rt.executors[r.processors[i].Context] = r.processors[i]
181+
}
182+
rt.pusher.AddDestination(r.sink.Context, d)
183+
rt.executors[r.sink.Context] = r.sink
184+
}
185+
186+
func (rt *runtime) startAsyncRoute(idx int) {
187+
r := rt.routes[idx]
188+
// start all executors
189+
rt.merger.add(r.source)
190+
for _, proc := range r.processors {
191+
rt.merger.add(proc)
192+
}
193+
rt.merger.add(r.sink)
124194
}
125195

126196
// Start starts the pipe execution.
127197
func (p *Pipe) Start(ctx context.Context, initializers ...mutable.Mutation) <-chan error {
128198
// cancel is required to stop the pipe in case of error
129199
ctx, cancelFn := context.WithCancel(ctx)
130-
p.ctx = ctx
131-
p.errorMerger.errorChan = make(chan error, 1)
132-
p.executors = make(map[mutable.Context]executor)
133-
for i, r := range p.routes {
134-
p.addExecutors(r, i)
200+
p.runtime.merger = newErrorMerger(ctx)
201+
for _, r := range p.routes {
135202
r.connect(p.bufferSize)
136203
}
137204
// push initializers before start
138205
p.pusher.Put(initializers...)
139206
p.pusher.Push(ctx)
140207
for _, e := range p.executors {
141-
p.errorMerger.add(start(ctx, e))
208+
p.merger.add(e)
142209
}
143-
go p.errorMerger.wait()
210+
go p.merger.wait()
144211
errc := make(chan error, 1)
145212
go p.start(ctx, errc, cancelFn)
146213
return errc
@@ -160,12 +227,12 @@ func (p *Pipe) start(ctx context.Context, errc chan error, cancelFn context.Canc
160227
}
161228
}
162229
p.pusher.Push(ctx)
163-
case err, ok := <-p.errorMerger.errorChan:
230+
case err, ok := <-p.merger.errorChan:
164231
// merger has buffer of one error, if more errors happen, they
165232
// will be ignored.
166233
if ok {
167234
cancelFn()
168-
p.errorMerger.drain()
235+
p.merger.drain()
169236
errc <- err
170237
}
171238
return
@@ -191,87 +258,47 @@ func Wait(errc <-chan error) error {
191258

192259
// AddLine creates the line for provied route and adds it to the pipe.
193260
func (p *Pipe) AddLine(l Line) <-chan struct{} {
194-
if p.ctx == nil {
261+
if p.merger == nil {
195262
panic("pipe isn't running")
196263
}
197-
ctx, cancelFn := context.WithCancel(p.ctx)
264+
ctx, cancelFn := context.WithCancel(p.merger.ctx)
198265
p.Push(p.mctx.Mutate(func() error {
199266
r, err := l.route(p.bufferSize)
200267
if err != nil {
201268
return fmt.Errorf("error adding line: %w", err)
202269
}
203-
routeIdx := len(p.routes)
204-
p.routes = append(p.routes, r)
270+
271+
idx := p.runtime.addRoute(r)
205272
// connect all fittings
206273
r.connect(p.bufferSize)
207274

208275
// async line
209276
if !l.Context.IsMutable() {
210-
p.addExecutors(r, routeIdx)
211-
// start all executors
212-
p.errorMerger.add(start(p.ctx, r.source))
213-
for _, proc := range r.processors {
214-
p.errorMerger.add(start(p.ctx, proc))
215-
}
216-
p.errorMerger.add(start(p.ctx, r.sink))
277+
p.runtime.registerAsyncRoute(idx)
278+
p.runtime.startAsyncRoute(idx)
217279
cancelFn()
218280
return nil
219281
}
282+
220283
// sync add to existing goroutine
221284
if e, ok := p.executors[l.Context]; ok {
222-
p.pusher.Put(e.(*multiLineExecutor).addRoute(p.ctx, r, routeIdx, cancelFn))
285+
p.pusher.Put(e.(*multiLineExecutor).addRoute(p.merger.ctx, r, idx, cancelFn))
223286
return nil
224287
}
225288
// sync new goroutine
226-
e := p.newMultilineExecutor(r, routeIdx)
227-
p.executors[r.context] = e
228-
p.errorMerger.add(start(p.ctx, e))
289+
p.runtime.registerSyncRoute(idx)
290+
p.runtime.merger.add(p.runtime.executors[r.context])
229291
cancelFn()
230292
return nil
231293
}))
232294
return ctx.Done()
233295
}
234296

235-
func (p *Pipe) addExecutors(r *route, routeIdx int) {
236-
if r.context.IsMutable() {
237-
if e, ok := p.executors[r.context]; ok {
238-
mle := e.(*multiLineExecutor)
239-
mle.executors = append(mle.executors, r.executor(mle.Destination, routeIdx))
240-
return
241-
}
242-
p.newMultilineExecutor(r, routeIdx)
243-
return
244-
}
245-
246-
d := mutable.NewDestination()
247-
r.source.dest = d
248-
p.pusher.AddDestination(r.source.Context, d)
249-
p.executors[r.source.Context] = r.source
250-
for i := range r.processors {
251-
p.pusher.AddDestination(r.processors[i].Context, d)
252-
p.executors[r.processors[i].Context] = r.processors[i]
253-
}
254-
p.pusher.AddDestination(r.sink.Context, d)
255-
p.executors[r.sink.Context] = r.sink
256-
}
257-
258-
func (p *Pipe) newMultilineExecutor(r *route, routeIdx int) *multiLineExecutor {
259-
d := mutable.NewDestination()
260-
p.pusher.AddDestination(r.context, d)
261-
e := multiLineExecutor{
262-
Context: r.context,
263-
Destination: d,
264-
executors: []*lineExecutor{r.executor(d, routeIdx)},
265-
}
266-
p.executors[r.context] = &e
267-
return &e
268-
}
269-
270297
func (p *Pipe) InsertProcessor(line, pos int, procAlloc ProcessorAllocatorFunc) <-chan struct{} {
271-
if p.ctx == nil {
298+
if p.merger == nil {
272299
panic("pipe isn't running")
273300
}
274-
ctx, cancelFn := context.WithCancel(p.ctx)
301+
ctx, cancelFn := context.WithCancel(p.merger.ctx)
275302
p.Push(p.mctx.Mutate(func() error {
276303
r := p.routes[line]
277304
// allocate and connect
@@ -283,32 +310,14 @@ func (p *Pipe) InsertProcessor(line, pos int, procAlloc ProcessorAllocatorFunc)
283310
return err
284311
}
285312

313+
// insert proc to a sync line
286314
if r.context.IsMutable() {
287315
proc.connect(p.bufferSize, fitting.Sync, prevOut)
288-
mle := p.executors[r.context].(*multiLineExecutor)
289-
p.pusher.Put(mle.Context.Mutate(func() error {
290-
defer cancelFn()
291-
pos++ // slice of executors includes source and sink
292-
for _, e := range mle.executors {
293-
if e.route != line {
294-
continue
295-
}
296-
297-
inserter := e.executors[pos].(interface{ insert(out) })
298-
inserter.insert(proc.out)
299-
300-
err := proc.startHook(p.ctx)
301-
if err != nil {
302-
return fmt.Errorf("error starting processor: %w", err)
303-
}
304-
e.executors = append(e.executors, nil)
305-
copy(e.executors[pos+1:], e.executors[pos:])
306-
e.executors[pos] = &proc
307-
e.started++
308-
break
309-
}
310-
return nil
311-
}))
316+
mle, ok := p.executors[r.context].(*multiLineExecutor)
317+
if !ok {
318+
panic("add processor to not running line")
319+
}
320+
p.pusher.Put(mle.insertProcessor(p.merger.ctx, line, pos+1, proc, cancelFn))
312321
return nil
313322
}
314323

@@ -317,7 +326,6 @@ func (p *Pipe) InsertProcessor(line, pos int, procAlloc ProcessorAllocatorFunc)
317326
// get ready for start
318327
p.executors[mctx] = &proc
319328
p.pusher.AddDestination(mctx, r.source.dest)
320-
321329
// add to route after this function returns
322330
defer func() {
323331
r.processors = append(r.processors, nil)
@@ -327,7 +335,7 @@ func (p *Pipe) InsertProcessor(line, pos int, procAlloc ProcessorAllocatorFunc)
327335
if pos == len(r.processors) {
328336
p.pusher.Put(r.sink.Mutate((func() error {
329337
r.sink.in.insert(proc.out)
330-
p.errorMerger.add(start(p.ctx, &proc))
338+
p.merger.add(&proc)
331339
cancelFn()
332340
return nil
333341
})))
@@ -336,7 +344,7 @@ func (p *Pipe) InsertProcessor(line, pos int, procAlloc ProcessorAllocatorFunc)
336344
nextProc := r.processors[pos]
337345
p.pusher.Put(nextProc.Mutate((func() error {
338346
nextProc.in.insert(proc.out)
339-
p.errorMerger.add(start(p.ctx, &proc))
347+
p.merger.add(&proc)
340348
cancelFn()
341349
return nil
342350
})))

0 commit comments

Comments
 (0)