Skip to content
This repository has been archived by the owner on Oct 17, 2024. It is now read-only.

Commit

Permalink
Support Elasticsearch 8 (#132)
Browse files Browse the repository at this point in the history
Some fixes for ES consumer. Mostly supporting ESv8

- Switch to supporting ESv8
- Remove code that pretends to support more than the single ES version
- Rename elasticsearch_c -> elasticsearch to tidy up the name
- Allow passing of ES cluster urls via flags (as well as existing env variable)
- Disable xpack security for ES since it's only for demo purposes
    Given this is for a POC/example/e2e tests, we don't need to support the
    security features. Later it'd be ideal to e2e test all this, but that's
    for another day.
  • Loading branch information
steakunderscore authored Jul 4, 2022
1 parent 1eb1ffe commit effd97d
Show file tree
Hide file tree
Showing 13 changed files with 95 additions and 113 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
subinclude("//third_party/defs:docker")

go_binary(
name = "elasticsearch_c",
name = "elasticsearch",
srcs = [
"main.go",
],
static = True,
deps = [
"//api/proto:v1",
"//consumers",
"//third_party/go:elastic_go-elasticsearch_v7",
"//third_party/go:elastic_go-elasticsearch_v8",
"//third_party/go:protobuf",
],
)

go_test(
name = "elasticsearch_c_test",
name = "elasticsearch_test",
srcs = [
"main.go",
"main_test.go",
],
deps = [
"//api/proto:v1",
"//consumers",
"//third_party/go:elastic_go-elasticsearch_v7",
"//third_party/go:elastic_go-elasticsearch_v8",
"//third_party/go:gogo_protobuf",
"//third_party/go:protobuf",
"//third_party/go:stretchr_testify",
Expand All @@ -33,7 +33,7 @@ go_test(
docker_image(
name = "dracon-consumer-elasticsearch",
srcs = [
":elasticsearch_c",
":elasticsearch",
],
base_image = "//build/docker:dracon-base-go",
image = "dracon-consumer-elasticsearch",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM //build/docker:dracon-base-go

COPY elasticsearch_c /consume
COPY elasticsearch /consume

ENTRYPOINT ["/consume"]
102 changes: 45 additions & 57 deletions consumers/elasticsearch_c/main.go → consumers/elasticsearch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,27 @@ import (
"flag"
"fmt"
"log"
"strings"
"time"

// TODO(hjenkins): Support multiple versions of ES
// elasticsearchv5 "github.com/elastic/go-elasticsearch/v5"
// elasticsearchv6 "github.com/elastic/go-elasticsearch/v6"
v1 "github.com/thought-machine/dracon/api/proto/v1"
"github.com/thought-machine/dracon/api/proto/v1"
"github.com/thought-machine/dracon/consumers"

elasticsearchv7 "github.com/elastic/go-elasticsearch/v7"
// TODO: Support multiple versions of ES
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/golang/protobuf/ptypes"
"github.com/thought-machine/dracon/consumers"
)

var (
esURL string
esUrls string
esAddrs []string
esIndex string
basicAuthUser string
basicAuthPass string
)

func init() {
flag.StringVar(&esUrls, "es-urls", "", "[OPTIONAL] URLs to connect to elasticsearch comma seperated. Can also use env variable ELASTICSEARCH_URL")
flag.StringVar(&esIndex, "es-index", "", "the index in elasticsearch to push results to")
flag.StringVar(&basicAuthUser, "basic-auth-user", "", "[OPTIONAL] the basic auth username")
flag.StringVar(&basicAuthPass, "basic-auth-pass", "", "[OPTIONAL] the basic auth password")
Expand All @@ -39,15 +40,21 @@ func parseFlags() error {
if len(esIndex) < 1 {
return fmt.Errorf("es-index is undefined")
}
if len(esUrls) > 0 {
for _, u := range strings.Split(esUrls, ",") {
esAddrs = append(esAddrs, strings.TrimSpace(u))
}
}
return nil
}

func main() {
if err := consumers.ParseFlags(); err != nil {
if err := parseFlags(); err != nil {
log.Fatal(err)
}

if err := getESClient(); err != nil {
es, err := getESClient()
if err != nil {
log.Fatal("could not contact remote Elasticsearch: ", err)
}

Expand All @@ -65,7 +72,11 @@ func main() {
if err != nil {
log.Fatal("Could not parse raw issue", err)
}
esPush(b)
res, err := es.Index(esIndex, bytes.NewBuffer(b))
log.Printf("%+v", res)
if err != nil {
log.Fatal("Could not push raw issue", err)
}
}
}
} else {
Expand All @@ -81,7 +92,11 @@ func main() {
if err != nil {
log.Fatal("Could not parse enriched issue", err)
}
esPush(b)
res, err := es.Index(esIndex, bytes.NewBuffer(b))
log.Printf("%+v", res)
if err != nil {
log.Fatal("Could not push enriched issue", err)
}
}
}
}
Expand Down Expand Up @@ -190,21 +205,23 @@ type esDocument struct {
CVE string `json:"cve"`
}

var esClient interface{}

func getESClient() error {
var es *elasticsearchv7.Client
func getESClient() (*elasticsearch.Client, error) {
var es *elasticsearch.Client
var err error = nil
var esConfig elasticsearch.Config = elasticsearch.Config{}

if basicAuthUser != "" && basicAuthPass != "" {
es, err = elasticsearchv7.NewClient(elasticsearchv7.Config{
Username: basicAuthUser,
Password: basicAuthPass,
})
} else {
es, err = elasticsearchv7.NewDefaultClient()
esConfig.Username = basicAuthUser
esConfig.Password = basicAuthPass
}

if len(esAddrs) >= 0 {
esConfig.Addresses = esAddrs
}

es, err = elasticsearch.NewClient(esConfig)
if err != nil {
return err
return nil, err
}
type esInfo struct {
Version struct {
Expand All @@ -214,46 +231,17 @@ func getESClient() error {

res, err := es.Info()
if err != nil {
return err
return nil, err
}
var info esInfo
if err := json.NewDecoder(res.Body).Decode(&info); err != nil {
return err
return nil, err
}
switch info.Version.Number[0] {
// case '5':
// esClient, err = elasticsearchv5.NewDefaultClient()
// case '6':
// esClient, err = elasticsearchv6.NewDefaultClient()
case '7':
if basicAuthUser != "" && basicAuthPass != "" {
esClient, err = elasticsearchv7.NewClient(elasticsearchv7.Config{
Username: basicAuthUser,
Password: basicAuthPass,
})
} else {
esClient, err = elasticsearchv7.NewDefaultClient()
}

default:
err = fmt.Errorf("unsupported version %s", info.Version.Number)
}
return err
}

func esPush(b []byte) error {
var err error
var res interface{}
switch x := esClient.(type) {
// case *elasticsearchv5.Client:
// res, err = x.Index(esIndex, bytes.NewBuffer(b), x.Index.WithDocumentType("doc"))
// case *elasticsearchv6.Client:
// res, err = x.Index(esIndex, bytes.NewBuffer(b), x.Index.WithDocumentType("doc"))
case *elasticsearchv7.Client:
res, err = x.Index(esIndex, bytes.NewBuffer(b))
case '8':
// noop - we support this version
default:
err = fmt.Errorf("unsupported client %T", esClient)
err = fmt.Errorf("unsupported ES Server version %s", info.Version.Number)
}
log.Printf("%+v", res)
return err
return es, err
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ package main
import (
"bytes"
"encoding/json"
v1 "github.com/thought-machine/dracon/api/proto/v1"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

v1 "github.com/thought-machine/dracon/api/proto/v1"

"github.com/stretchr/testify/assert"
)

var (
want = "OK"
info = `{"Version":{"Number":"7.1.23"}}`
info = `{"Version":{"Number":"8.1.0"}}`
scanUUID = "test-uuid"
scanStartTime, _ = time.Parse("2006-01-02T15:04:05.000Z", "2020-04-13 11:51:53+01:00")

Expand Down Expand Up @@ -47,9 +48,9 @@ func TestEsPushBasicAuth(t *testing.T) {
esStub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
w.WriteHeader(200)
w.Header().Set("X-Elastic-Product", "Elasticsearch")
w.WriteHeader(http.StatusOK)
if r.Method == "GET" {

uname, pass, ok := r.BasicAuth()
assert.Equal(t, uname, "foo")
assert.Equal(t, pass, "bar")
Expand All @@ -76,14 +77,16 @@ func TestEsPushBasicAuth(t *testing.T) {
// basic auth ops
basicAuthUser = "foo"
basicAuthPass = "bar"
assert.Nil(t, getESClient())
esPush(esIn)
client, err := getESClient()
assert.Nil(t, err)
client.Index(esIndex, bytes.NewBuffer(esIn))
}
func TestEsPush(t *testing.T) {
esStub := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buf := new(bytes.Buffer)
buf.ReadFrom(r.Body)
w.WriteHeader(200)
w.Header().Set("X-Elastic-Product", "Elasticsearch")
w.WriteHeader(http.StatusOK)
if r.Method == "GET" {
w.Write([]byte(info))
} else if r.Method == "POST" {
Expand All @@ -95,6 +98,7 @@ func TestEsPush(t *testing.T) {
}))
defer esStub.Close()
os.Setenv("ELASTICSEARCH_URL", esStub.URL)
assert.Nil(t, getESClient())
esPush(esIn)
client, err := getESClient()
assert.Nil(t, err)
client.Index(esIndex, bytes.NewBuffer(esIn))
}
2 changes: 1 addition & 1 deletion examples/pipelines/golang-project/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ kustomized_config(
"pipeline-run.yaml",
],
images = [
"//consumers/elasticsearch_c:dracon-consumer-elasticsearch",
"//consumers/elasticsearch:dracon-consumer-elasticsearch",
"//cmd/enricher:dracon-enricher",
"//source/git:dracon-source-git",
"//producers/golang_gosec:dracon-producer-gosec",
Expand Down
5 changes: 2 additions & 3 deletions examples/pipelines/golang-project/elasticsearch-consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ spec:
# run elasticsearch consumer
- name: run-elasticsearch-consumer
image: index.docker.io/thoughtmachine/dracon-consumer-elasticsearch:latest
env: [
{name: ELASTICSEARCH_URL, value: http://elasticsearch.dracon.svc:9200}
]
env: []
command: ["/consume"]
args: [
"-in", "{{.ConsumerSourcePath}}",
"-es-urls", "http://elasticsearch.dracon.svc:9200",
"-es-index", "dracon"
]
2 changes: 1 addition & 1 deletion examples/pipelines/mixed-lang-project/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ kustomized_config(
"spotbugs-producer.yaml",
],
images = [
"//consumers/elasticsearch_c:dracon-consumer-elasticsearch",
"//consumers/elasticsearch:dracon-consumer-elasticsearch",
"//cmd/enricher:dracon-enricher",
"//source/git:dracon-source-git",
"//producers/golang_gosec:dracon-producer-gosec",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ spec:
# run elasticsearch consumer
- name: run-elasticsearch-consumer
image: index.docker.io/thoughtmachine/dracon-consumer-elasticsearch:latest
env: [
{name: ELASTICSEARCH_URL, value: http://elasticsearch.dracon.svc:9200}
]
env: []
command: ["/consume"]
args: [
"-in", "{{.ConsumerSourcePath}}",
"-es-urls", "http://elasticsearch.dracon.svc:9200",
"-es-index", "dracon"
]
2 changes: 1 addition & 1 deletion examples/pipelines/python-project/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ kustomized_config(
"pipeline-run.yaml",
],
images = [
"//consumers/elasticsearch_c:dracon-consumer-elasticsearch",
"//consumers/elasticsearch:dracon-consumer-elasticsearch",
"//cmd/enricher:dracon-enricher",
"//source/git:dracon-source-git",
"//producers/python_bandit:dracon-producer-bandit",
Expand Down
5 changes: 2 additions & 3 deletions examples/pipelines/python-project/elasticsearch-consumer.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ spec:
- name: run-elasticsearch-consumer
image: index.docker.io/thoughtmachine/dracon-consumer-elasticsearch:latest
imagePullPolicy: Never
env: [
{name: ELASTICSEARCH_URL, value: http://elasticsearch.dracon.svc:9200}
]
env: []
command: ["/consume"]
args: [
"-in", "{{.ConsumerSourcePath}}",
"-es-urls", "http://elasticsearch.dracon.svc:9200",
"-es-index", "dracon"
]
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ spec:
spec:
containers:
- name: elasticsearch
image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0
image: docker.elastic.co/elasticsearch/elasticsearch:8.2.3
env:
- name: discovery.type
value: single-node
- name: xpack.security.enabled
value: "false"
ports:
- containerPort: 9200
name: client
Expand Down
2 changes: 1 addition & 1 deletion scripts/development/k8s/elasticsearch-kibana/kibana.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ spec:
spec:
containers:
- name: kibana
image: docker.elastic.co/kibana/kibana:7.6.0
image: docker.elastic.co/kibana/kibana:8.2.3
ports:
- containerPort: 5601
name: webinterface
Expand Down
Loading

0 comments on commit effd97d

Please sign in to comment.