Skip to content

Commit

Permalink
Add logstash to serverless provider (#1646)
Browse files Browse the repository at this point in the history
  • Loading branch information
bhapas authored Feb 8, 2024
1 parent 8a66d05 commit 31dedaa
Show file tree
Hide file tree
Showing 12 changed files with 439 additions and 114 deletions.
27 changes: 21 additions & 6 deletions internal/certs/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func LoadCA(certFile, keyFile string) (*Issuer, error) {
}

func newCA(parent *Issuer) (*Issuer, error) {
cert, err := New(true, parent)
cert, err := New(true, false, parent)
if err != nil {
return nil, err
}
Expand All @@ -115,13 +115,19 @@ func (i *Issuer) IssueIntermediate() (*Issuer, error) {
// Issue issues a certificate with the given options. This certificate
// can be used to configure a TLS server.
func (i *Issuer) Issue(opts ...Option) (*Certificate, error) {
return New(false, i, opts...)
return New(false, false, i, opts...)
}

// IssueClient issues a certificate with the given options. This certificate
// can be used to configure a TLS client.
func (i *Issuer) IssueClient(opts ...Option) (*Certificate, error) {
return New(false, true, i, opts...)
}

// NewSelfSignedCert issues a self-signed certificate with the given options.
// This certificate can be used to configure a TLS server.
func NewSelfSignedCert(opts ...Option) (*Certificate, error) {
return New(false, nil, opts...)
return New(false, false, nil, opts...)
}

// Option is a function that can modify a certificate template. To be used
Expand All @@ -140,7 +146,7 @@ func WithName(name string) Option {

// New is the main helper to create a certificate, it is recommended to
// use the more specific ones for specific use cases.
func New(isCA bool, issuer *Issuer, opts ...Option) (*Certificate, error) {
func New(isCA, isClient bool, issuer *Issuer, opts ...Option) (*Certificate, error) {
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, fmt.Errorf("failed to generate key: %w", err)
Expand Down Expand Up @@ -172,6 +178,15 @@ func New(isCA bool, issuer *Issuer, opts ...Option) (*Certificate, error) {
} else {
template.Subject.CommonName = "intermediate elastic-package CA"
}
// If the requester is a client we set clientAuth instead
} else if isClient {
template.ExtKeyUsage = []x509.ExtKeyUsage{
x509.ExtKeyUsageClientAuth,
}

// Include local hostname and ips as alternates in service certificates.
template.DNSNames = []string{"localhost"}
template.IPAddresses = []net.IP{net.ParseIP("127.0.0.1"), net.ParseIP("::1")}
} else {
template.ExtKeyUsage = []x509.ExtKeyUsage{
// Required for Chrome in OSX to show the "Proceed anyway" link.
Expand Down Expand Up @@ -313,8 +328,8 @@ func checkExpectedCertUsage(cert *x509.Certificate) error {
if !cert.IsCA {
// Required for Chrome in OSX to show the "Proceed anyway" link.
// https://stackoverflow.com/a/64309893/28855
if !containsExtKeyUsage(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth) {
return fmt.Errorf("missing server auth key usage in certificate")
if !(containsExtKeyUsage(cert.ExtKeyUsage, x509.ExtKeyUsageServerAuth) || containsExtKeyUsage(cert.ExtKeyUsage, x509.ExtKeyUsageClientAuth)) {
return fmt.Errorf("missing either of server/client auth key usage in certificate")
}
}

Expand Down
53 changes: 53 additions & 0 deletions internal/kibana/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ import (
"time"
)

type FleetOutput struct {
ID string `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Hosts []string `json:"hosts,omitempty"`
Type string `json:"type,omitempty"`
SSL *AgentSSL `json:"ssl,omitempty"`
}

type AgentSSL struct {
CertificateAuthorities []string `json:"certificate_authorities,omitempty"`
Certificate string `json:"certificate,omitempty"`
Key string `json:"key,omitempty"`
}

// DefaultFleetServerURL returns the default Fleet server configured in Kibana
func (c *Client) DefaultFleetServerURL() (string, error) {
path := fmt.Sprintf("%s/fleet_server_hosts", FleetAPI)
Expand Down Expand Up @@ -45,6 +59,45 @@ func (c *Client) DefaultFleetServerURL() (string, error) {
return "", errors.New("could not find the fleet server URL for this project")
}

// UpdateFleetOutput updates an existing output to fleet
// For example, to update ssl certificates etc.,
func (c *Client) UpdateFleetOutput(fo FleetOutput, outputId string) error {
reqBody, err := json.Marshal(fo)
if err != nil {
return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err)
}

statusCode, respBody, err := c.put(fmt.Sprintf("%s/outputs/%s", FleetAPI, outputId), reqBody)
if err != nil {
return fmt.Errorf("could not update fleet output: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("could not update fleet output; API status code = %d; response body = %s", statusCode, respBody)
}

return nil
}

// AddFleetOutput adds an additional output to fleet eg., logstash
func (c *Client) AddFleetOutput(fo FleetOutput) error {
reqBody, err := json.Marshal(fo)
if err != nil {
return fmt.Errorf("could not convert fleetOutput (request) to JSON: %w", err)
}

statusCode, respBody, err := c.post(fmt.Sprintf("%s/outputs", FleetAPI), reqBody)
if err != nil {
return fmt.Errorf("could not create fleet output: %w", err)
}

if statusCode != http.StatusOK {
return fmt.Errorf("could not add fleet output; API status code = %d; response body = %s", statusCode, respBody)
}

return nil
}

func (c *Client) SetAgentLogLevel(agentID, level string) error {
path := fmt.Sprintf("%s/agents/%s/actions", FleetAPI, agentID)

Expand Down
59 changes: 58 additions & 1 deletion internal/serverless/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,22 @@ import (
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"github.com/elastic/elastic-package/internal/elasticsearch"
"github.com/elastic/elastic-package/internal/kibana"
"github.com/elastic/elastic-package/internal/logger"
"github.com/elastic/elastic-package/internal/profile"
"github.com/elastic/elastic-package/internal/registry"
)

const (
FleetLogstashOutput = "fleet-logstash-output"
)

// Project represents a serverless project
type Project struct {
url string
Expand Down Expand Up @@ -131,6 +138,54 @@ func (p *Project) DefaultFleetServerURL(kibanaClient *kibana.Client) (string, er
return fleetURL, nil
}

func (p *Project) AddLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error {
logstashFleetOutput := kibana.FleetOutput{
Name: "logstash-output",
ID: FleetLogstashOutput,
Type: "logstash",
Hosts: []string{"logstash:5044"},
}

if err := kibanaClient.AddFleetOutput(logstashFleetOutput); err != nil {
return fmt.Errorf("failed to add logstash fleet output: %w", err)
}

return nil
}

func (p *Project) UpdateLogstashFleetOutput(profile *profile.Profile, kibanaClient *kibana.Client) error {
certsDir := filepath.Join(profile.ProfilePath, "certs", "elastic-agent")

caFile, err := os.ReadFile(filepath.Join(certsDir, "ca-cert.pem"))
if err != nil {
return fmt.Errorf("failed to read ca certificate: %w", err)
}

certFile, err := os.ReadFile(filepath.Join(certsDir, "cert.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate: %w", err)
}

keyFile, err := os.ReadFile(filepath.Join(certsDir, "key.pem"))
if err != nil {
return fmt.Errorf("failed to read client certificate private key: %w", err)
}

logstashFleetOutput := kibana.FleetOutput{
SSL: &kibana.AgentSSL{
CertificateAuthorities: []string{string(caFile)},
Certificate: string(certFile),
Key: string(keyFile),
},
}

if err := kibanaClient.UpdateFleetOutput(logstashFleetOutput, FleetLogstashOutput); err != nil {
return fmt.Errorf("failed to update logstash fleet output: %w", err)
}

return nil
}

func (p *Project) getESHealth(ctx context.Context, elasticsearchClient *elasticsearch.Client) error {
return elasticsearchClient.CheckHealth(ctx)
}
Expand Down Expand Up @@ -177,7 +232,7 @@ func (p *Project) getFleetHealth(ctx context.Context) error {
return nil
}

func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client) error {
func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Client, outputId string) error {
systemPackages, err := registry.Production.Revisions("system", registry.SearchOptions{
KibanaVersion: strings.TrimSuffix(stackVersion, kibana.SNAPSHOT_SUFFIX),
})
Expand All @@ -195,7 +250,9 @@ func (p *Project) CreateAgentPolicy(stackVersion string, kibanaClient *kibana.Cl
Description: "Policy created by elastic-package",
Namespace: "default",
MonitoringEnabled: []string{"logs", "metrics"},
DataOutputID: outputId,
}

newPolicy, err := kibanaClient.CreatePolicy(policy)
if err != nil {
return fmt.Errorf("error while creating agent policy: %w", err)
Expand Down
59 changes: 59 additions & 0 deletions internal/stack/_static/serverless-docker-compose.yml.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
version: '2.3'
services:
elastic-agent:
image: "{{ fact "agent_image" }}"
healthcheck:
test: "elastic-agent status"
timeout: 2s
start_period: 360s
retries: 180
interval: 5s
hostname: docker-fleet-agent
env_file: "./elastic-agent.env"
volumes:
- type: bind
source: ../../../tmp/service_logs/
target: /tmp/service_logs/
# Mount service_logs under /run too as a testing workaround for the journald input (see elastic-package#1235).
- type: bind
source: ../../../tmp/service_logs/
target: /run/service_logs/
- "../certs/ca-cert.pem:/etc/ssl/certs/elastic-package.pem"

elastic-agent_is_ready:
image: tianon/true
depends_on:
elastic-agent:
condition: service_healthy

{{ $logstash_enabled := fact "logstash_enabled" }}
{{ if eq $logstash_enabled "true" }}
logstash:
image: "{{ fact "logstash_image" }}"
healthcheck:
test: bin/logstash -t
interval: 60s
timeout: 50s
retries: 5
# logstash expects the key in pkcs8 format. Hence converting the key.pem to pkcs8 format using openssl.
# Also logstash-filter-elastic_integration plugin is installed by default to run ingest pipelines in logstash.
# elastic-package#1637 made improvements to enable logstash stats through port 9600.
command: bash -c 'openssl pkcs8 -inform PEM -in /usr/share/logstash/config/certs/key.pem -topk8 -nocrypt -outform PEM -out /usr/share/logstash/config/certs/logstash.pkcs8.key && chmod 777 /usr/share/logstash/config/certs/logstash.pkcs8.key && if [[ ! $(bin/logstash-plugin list) == *"logstash-filter-elastic_integration"* ]]; then echo "Missing plugin logstash-filter-elastic_integration, installing now" && bin/logstash-plugin install logstash-filter-elastic_integration; fi && bin/logstash -f /usr/share/logstash/pipeline/logstash.conf'
volumes:
- "../certs/logstash:/usr/share/logstash/config/certs"
- "./logstash.conf:/usr/share/logstash/pipeline/logstash.conf:ro"
ports:
- "127.0.0.1:5044:5044"
- "127.0.0.1:9600:9600"
environment:
- xpack.monitoring.enabled=false
- ELASTIC_USER={{ fact "username" }}
- ELASTIC_PASSWORD={{ fact "password" }}
- ELASTIC_HOSTS={{ fact "elasticsearch_host" }}

logstash_is_ready:
image: tianon/true
depends_on:
logstash:
condition: service_healthy
{{ end }}
26 changes: 0 additions & 26 deletions internal/stack/_static/serverless-elastic-agent.yml.tmpl

This file was deleted.

32 changes: 32 additions & 0 deletions internal/stack/_static/serverless-logstash.conf.tmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
input {
elastic_agent {
port => 5044
ssl_enabled => true
ssl_certificate_authorities => ["/usr/share/logstash/config/certs/ca-cert.pem"]
ssl_certificate => "/usr/share/logstash/config/certs/cert.pem"
ssl_key => "/usr/share/logstash/config/certs/logstash.pkcs8.key"
}
}


filter {
elastic_integration {
remove_field => ['@version']
hosts => ["{{ fact "elasticsearch_host" }}"]
username => '{{ fact "username" }}'
password => '{{ fact "password" }}'
ssl_enabled => true
ssl_verification_mode => "full"
}
}


output {
elasticsearch {
hosts => ["{{ fact "elasticsearch_host" }}"]
user => '{{ fact "username" }}'
password => '{{ fact "password" }}'
ssl_enabled => true
data_stream => "true"
}
}
Loading

0 comments on commit 31dedaa

Please sign in to comment.