Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancements and testing #8

Merged
merged 23 commits into from
Feb 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name: CI

on:
push:
branches: [ master ]
pull_request:
branches: [ master ]

jobs:
ci:
name: ci
runs-on: ubuntu-latest

steps:
- name: Checkout
uses: actions/checkout@v2

- name: Install Go
uses: actions/setup-go@v1
with:
go-version: '1.15'

- name: Build
run: |
go build -v

- name: Test
run: |
go test -v ./...
131 changes: 131 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package client

import (
"fmt"

"github.com/sirupsen/logrus"

"go.ligato.io/vpp-probe/providers"
"go.ligato.io/vpp-probe/vpp"
)

// Client is a client for managing providers and instances.
type Client struct {
providers []providers.Provider
instances []*vpp.Instance
}

// NewClient returns a new client using given options.
func NewClient(opt ...Opt) (*Client, error) {
c := &Client{}
for _, o := range opt {
if err := o(c); err != nil {
return nil, err
}
}
return c, nil
}

// Close releases used resources.
func (c *Client) Close() error {
for _, instance := range c.instances {
handler := instance.Handler()
if err := handler.Close(); err != nil {
logrus.Debugf("closing handler %v failed: %v", handler.ID(), err)
}
}
return nil
}

// GetProviders returns all providers.
func (c *Client) GetProviders() []providers.Provider {
return c.providers
}

// Instances returns list of VPP instances.
func (c *Client) Instances() []*vpp.Instance {
return c.instances
}

// AddProvider adds provider to the client or returns error if the provided
// was already added.
func (c *Client) AddProvider(provider providers.Provider) error {
if provider == nil {
panic("provider is nil")
}

// check duplicate
for _, p := range c.providers {
if p.Name() == provider.Name() {
return fmt.Errorf("provider '%v' already added", p)
}
}

c.providers = append(c.providers, provider)

return nil
}

// DiscoverInstances discovers running VPP instances via probe provider and
// updates the list of instances with active instances from discovery.
func (c *Client) DiscoverInstances(queryParams ...map[string]string) error {
if len(c.providers) == 0 {
return fmt.Errorf("no providers available")
}

instanceChan := make(chan []*vpp.Instance)

for _, p := range c.providers {
go func(provider providers.Provider) {
instances, err := DiscoverInstances(provider, queryParams...)
if err != nil {
logrus.Warnf("provider %q discover error: %v", provider.Name(), err)
}
instanceChan <- instances
}(p)
}

var instanceList []*vpp.Instance

for range c.providers {
instances := <-instanceChan
if len(instances) > 0 {
instanceList = append(instanceList, instances...)
}
}

c.instances = instanceList
if len(c.instances) == 0 {
return fmt.Errorf("no instances discovered")
}

return nil
}

// DiscoverInstances discovers running VPP instances using provider and
// returns the list of instances or error if provider query fails.
func DiscoverInstances(provider providers.Provider, queryParams ...map[string]string) ([]*vpp.Instance, error) {
handlers, err := provider.Query(queryParams...)
if err != nil {
return nil, err
}

var instances []*vpp.Instance

// TODO
// - run this in parallel (with throttle) to make it faster
// - persist failed handlers to skip in the next run

for _, handler := range handlers {
inst, err := vpp.NewInstance(handler)
if err != nil {
logrus.WithField("instance", handler.ID()).
Debugf("vpp instance init failed: %v", err)
continue
}

instances = append(instances, inst)
}

return instances, nil
}
3 changes: 3 additions & 0 deletions client/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package client

type Opt func(*Client) error
231 changes: 231 additions & 0 deletions cmd/cli.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package cmd

import (
"fmt"
"io"
"strings"

"github.com/docker/cli/cli/streams"
"github.com/moby/term"
"github.com/sirupsen/logrus"

"go.ligato.io/vpp-probe/client"
"go.ligato.io/vpp-probe/providers"
"go.ligato.io/vpp-probe/providers/docker"
"go.ligato.io/vpp-probe/providers/kube"
"go.ligato.io/vpp-probe/providers/local"
)

type Cli interface {
Initialize(opts ProbeOptions) error
Client() *client.Client
Queries() []map[string]string

Out() io.Writer
Err() io.Writer
In() io.ReadCloser
Apply(...CliOption) error
}

type ProbeCli struct {
queries []map[string]string
client *client.Client

in io.ReadCloser
out io.Writer
err io.Writer
}

func NewProbeCli(opt ...CliOption) (*ProbeCli, error) {
cli := new(ProbeCli)
if err := cli.Apply(opt...); err != nil {
return nil, err
}
if cli.out == nil || cli.in == nil || cli.err == nil {
stdin, stdout, stderr := term.StdStreams()
if cli.in == nil {
cli.in = streams.NewIn(stdin)
}
if cli.out == nil {
cli.out = streams.NewOut(stdout)
}
if cli.err == nil {
cli.err = stderr
}
}
return cli, nil
}

func (cli *ProbeCli) Initialize(opts ProbeOptions) (err error) {
cli.client, err = initClient(opts)
if err != nil {
return fmt.Errorf("controller setup error: %w", err)
}

cli.queries = parseQueries(opts.Queries)

return nil
}

func (cli *ProbeCli) Apply(opt ...CliOption) error {
for _, o := range opt {
if err := o(cli); err != nil {
return err
}
}
return nil
}

func (cli *ProbeCli) Client() *client.Client {
return cli.client
}

func (cli *ProbeCli) Queries() []map[string]string {
return cli.queries
}

func (cli *ProbeCli) Out() io.Writer {
return cli.out
}

func (cli *ProbeCli) Err() io.Writer {
return cli.err
}

func (cli *ProbeCli) In() io.ReadCloser {
return cli.in
}

func parseQueries(queries []string) []map[string]string {
const (
queryParamSeparator = ";"
paramKeyValSeparator = "="
)
var queryParams []map[string]string
for _, q := range queries {
params := strings.Split(q, queryParamSeparator)
qp := map[string]string{}
for _, p := range params {
if i := strings.Index(p, paramKeyValSeparator); i > 0 {
key := p[:i]
val := p[i+1:]
qp[key] = val
} else {
qp[p] = ""
}
}
queryParams = append(queryParams, qp)
}
return queryParams
}

func initClient(opts ProbeOptions) (*client.Client, error) {
env := resolveEnv(opts)

logrus.Debugf("resolved env: %v", env)

probeClient, err := client.NewClient()
if err != nil {
return nil, err
}

pvds, err := setupProviders(env, opts)
if err != nil {
return nil, err
}

logrus.Debugf("adding %v providers", len(pvds))

for _, provider := range pvds {
if err := probeClient.AddProvider(provider); err != nil {
logrus.Warnf("add provider failed: %v", err)
continue
}
logrus.Debugf("%v provider %v connected", provider.Env(), provider.Name())
}

return probeClient, nil
}

func setupProviders(env providers.Env, opt ProbeOptions) ([]providers.Provider, error) {
switch env {
case providers.Local:
prov, err := setupLocalEnv(opt)
if err != nil {
return nil, err
}
return []providers.Provider{prov}, nil
case providers.Kube:
provs, err := setupKubeEnv(opt.Kube.Kubeconfig, opt.Kube.Context)
if err != nil {
return nil, err
}
return provs, nil
case providers.Docker:
return setupDockerEnv(opt)
default:
return nil, fmt.Errorf("unknown env: %q", env)
}
}

func resolveEnv(opts ProbeOptions) providers.Env {
if opts.Env != "" {
return providers.Env(opts.Env)
}
if opts.Docker.Host != "" {
return providers.Docker
}
if opts.Kube.Kubeconfig != "" || opts.Kube.Context != "" {
return providers.Kube
}
return providers.Local
}

func setupDockerEnv(opt ProbeOptions) ([]providers.Provider, error) {
provider, err := docker.NewProvider(opt.Docker.Host)
if err != nil {
return nil, err
}
return []providers.Provider{provider}, nil
}

func setupLocalEnv(opt ProbeOptions) (providers.Provider, error) {
cfg := local.DefaultConfig()
if opt.Local.APISocket != "" {
cfg.BinapiAddr = opt.Local.APISocket
}
if opt.Local.StatsSocket != "" {
cfg.StatsAddr = opt.Local.StatsSocket
}
if opt.Local.CLISocket != "" {
cfg.CliAddr = opt.Local.CLISocket
}
return local.NewProvider(cfg), nil
}

func setupKubeEnv(kubeconfig, context string) ([]providers.Provider, error) {
var pvds []providers.Provider

isSeparator := func(c rune) bool {
switch c {
case ',', ';', ':':
return true
}
return false
}
contexts := strings.FieldsFunc(context, isSeparator)

if len(contexts) == 0 {
contexts = []string{""}
}

for _, ctx := range contexts {
provider, err := kube.NewProvider(kubeconfig, ctx)
if err != nil {
return nil, err
}
pvds = append(pvds, provider)
}

return pvds, nil
}
Loading