-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi.go
More file actions
62 lines (56 loc) · 1.16 KB
/
api.go
File metadata and controls
62 lines (56 loc) · 1.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main
import (
"context"
"github.com/moby/moby/api/types/events"
"github.com/moby/moby/client"
"github.com/samber/ro"
)
type APIClient struct {
client.APIClient
EventsOba ro.Observable[events.Message]
Close func() error
}
func NewAPIClient(ctx context.Context, opt ...client.Opt) (*APIClient, error) {
cli, err := client.New(opt...)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
eventsResult := cli.Events(ctx, client.EventsListOptions{})
oba := ro.NewObservable(func(obs ro.Observer[events.Message]) ro.Teardown {
go func() {
for {
select {
case err := <-eventsResult.Err:
obs.Error(err)
return
default:
msg, ok := <-eventsResult.Messages
if ok {
obs.Next(msg)
} else {
obs.Complete()
}
}
}
}()
return nil
})
clz := func() error {
cancel()
return cli.Close()
}
return &APIClient{cli, oba, clz}, nil
}
// TODO: 优化
func (a *App) connectApiClient(opt ...client.Opt) (err error) {
if a.cli != nil {
_ = a.cli.Close()
}
a.cli, err = NewAPIClient(a.ctx, opt...)
if err != nil {
return err
}
a.cli.EventsOba.Subscribe(a.bus)
return nil
}