Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 177 additions & 36 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/CyCoreSystems/kubetemplate"
"github.com/CyCoreSystems/netdiscover/discover"
"github.com/rotisserie/eris"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
Expand All @@ -35,12 +37,25 @@ var defaultMinReloadInterval = 5 * time.Second
// Service maintains an Asterisk configuration set
type Service struct {

// Kubernetes client interface
kc *kubernetes.Clientset

// Discoverer is the engine which should be used for network discovery
Discoverer discover.Discoverer

// Secret is the password which should be used for internal administrative authentication
Secret string

// SourceFile is the source zip file in the filesystem
// This can be a mounted secret, or a just a zip file embeded in the container
SourceFile string

// SourceSecret indicates the source zip file is stored in a secret
SourceSecret string

// The default namespace
DefaultNamespace string

// CustomRoot is the directory which contains the tree of custom configuration templates
CustomRoot string

Expand All @@ -67,9 +82,20 @@ func main() {
}
disc := getDiscoverer(cloud)

source := "/source/asterisk-config.zip"
sourceFile := "/source/asterisk-config.zip"
if os.Getenv("SOURCE") != "" {
source = os.Getenv("SOURCE")
sourceFile = os.Getenv("SOURCE")
}

secretSourceName := ""
if os.Getenv("SECRET_SOURCE_NAME") != "" {
secretSourceName = os.Getenv("SECRET_SOURCE_NAME")
sourceFile = ""
}

defaultNamespace := ""
if os.Getenv("POD_NAMESPACE") != "" {
defaultNamespace = os.Getenv("POD_NAMESPACE")
}

defaultsRoot := "/defaults"
Expand Down Expand Up @@ -107,25 +133,24 @@ func main() {
log.Println("failed to get secret:", err)
os.Exit(1)
}
// this is set so the default template can use it
os.Setenv("ARI_AUTOSECRET", secret)
}

// Try to extract the source
if err := extractSource(source, customRoot); err != nil {
log.Printf("failed to load source from %s: %s\n", source, err.Error())
}

var shortDeaths int
var t time.Time
for shortDeaths < maxShortDeaths {

svc := &Service{
Discoverer: disc,
Secret: secret,
CustomRoot: customRoot,
DefaultsRoot: defaultsRoot,
ExportRoot: exportRoot,
Modules: modules,
Discoverer: disc,
Secret: secret,
SourceFile: sourceFile,
SourceSecret: secretSourceName,
DefaultNamespace: defaultNamespace,
CustomRoot: customRoot,
DefaultsRoot: defaultsRoot,
ExportRoot: exportRoot,
Modules: modules,
}

t = time.Now()
Expand All @@ -134,6 +159,8 @@ func main() {
log.Println("service exited:", err)
if time.Since(t) < minRuntime {
shortDeaths++
log.Println("short death", shortDeaths)
time.Sleep(60 * time.Second)
} else {
shortDeaths = 0
}
Expand All @@ -143,6 +170,46 @@ func main() {
os.Exit(1)
}

func (s *Service) secretWatcher(ctx context.Context, reloader *reloader) error {
for {
// this only whatches for changes to the source secret itself
// and we won't receive any events for other secrets in the namespace
watcher, _ := s.kc.CoreV1().Secrets(s.DefaultNamespace).
Watch(context.Background(), metav1.ListOptions{
FieldSelector: "metadata.name=" + s.SourceSecret,
})

c := watcher.ResultChan()
for c != nil {
select {
case <-ctx.Done():
return nil
case event, ok := <-c:
if !ok {
fmt.Printf("watcher unexpected event, reloading\n")
c = nil
break
}
if event.Object == nil {
fmt.Printf("watcher event object is nil, ignoring\n")
c = nil
break
}

secret := event.Object.(*corev1.Secret)
fmt.Printf("%v secret with name %s\n", event.Type, secret.Name)

// Run the full render cycle to react to config change
if err := s.renderFull(); err != nil {
return eris.Wrap(err, "failed to re-render configuration")
}

reloader.Reload()
}
}
}
}

// Run executes the Service
func (s *Service) Run() error {
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -155,31 +222,26 @@ func (s *Service) Run() error {
return fmt.Errorf("failed to create in-cluster Kubernetes config: %w", err)
}

kc, err := kubernetes.NewForConfig(kconfig)
s.kc, err = kubernetes.NewForConfig(kconfig)
if err != nil {
return eris.Wrap(err, "failed to create Kubernetes client")
}

s.engine, err = kubetemplate.NewEngine(kc, s.Discoverer, 10*time.Minute)
s.engine, err = kubetemplate.NewEngine(s.kc, s.Discoverer, 10*time.Minute)
if err != nil {
return eris.Wrap(err, "failed to create templating engine")
}

defer s.engine.Close()

// Learn the templates first
if err := s.learnTemplates(); err != nil {
return eris.Wrap(err, "failed to learn defaults")
}

// Execute first template render
if err := s.renderTemplates(); err != nil {
return eris.Wrap(err, "failed to render configuration")
// Run the initial full render cycle
if err := s.renderFull(); err != nil {
return eris.Wrap(err, "failed to run the initial render")
}

// Write out render flag file to signal completion
if err := ioutil.WriteFile(path.Join(s.ExportRoot, renderFlagFilename), []byte("complete"), 0666); err != nil {
return eris.Wrap(err, "failed to write render flag file")
// When the SourceSecret is defined, we will watch for changes to re-render
if s.SourceSecret != "" {
go s.secretWatcher(ctx, r)
}

r.Reload()
Expand All @@ -189,8 +251,9 @@ func (s *Service) Run() error {

log.Println("change detected")

if err := s.renderTemplates(); err != nil {
return eris.Wrap(err, "failed to render configuration")
// Run the full render cycle to react to config change
if err := s.renderFull(); err != nil {
return eris.Wrap(err, "failed to re-render configuration")
}

r.Reload()
Expand All @@ -200,21 +263,21 @@ func (s *Service) Run() error {
}

func (s *Service) learnTemplates() error {
if err := render(s.engine, true, s.DefaultsRoot, s.ExportRoot); err != nil {
if err := render(s.engine, true, s.DefaultsRoot, s.ExportRoot, s.DefaultNamespace); err != nil {
return eris.Wrap(err, "failed to learn defaults")
}
if err := render(s.engine, true, s.DefaultsRoot, s.ExportRoot); err != nil {
if err := render(s.engine, true, s.CustomRoot, s.ExportRoot, s.DefaultNamespace); err != nil {
return eris.Wrap(err, "failed to learn templates")
}

return nil
}

func (s *Service) renderTemplates() error {
if err := render(s.engine, false, s.DefaultsRoot, s.ExportRoot); err != nil {
if err := render(s.engine, false, s.DefaultsRoot, s.ExportRoot, s.DefaultNamespace); err != nil {
return eris.Wrap(err, "failed to render defaults")
}
if err := render(s.engine, false, s.DefaultsRoot, s.ExportRoot); err != nil {
if err := render(s.engine, false, s.CustomRoot, s.ExportRoot, s.DefaultNamespace); err != nil {
return eris.Wrap(err, "failed to render templates")
}

Expand Down Expand Up @@ -258,7 +321,7 @@ func getOrCreateSecret(exportRoot string) (string, error) {
return secret, nil
}

func render(e kubetemplate.Engine, learn bool, customRoot string, exportRoot string) error {
func render(e kubetemplate.Engine, learn bool, customRoot string, exportRoot string, defaultNamespace string) error {
var fileCount int

err := filepath.Walk(customRoot, func(fn string, info os.FileInfo, err error) error {
Expand Down Expand Up @@ -295,9 +358,9 @@ func render(e kubetemplate.Engine, learn bool, customRoot string, exportRoot str

if isTemplate {
if learn {
return e.Learn(in, os.Getenv("POD_NAMESPACE"))
return e.Learn(in, defaultNamespace)
}
return e.Render(out, in, os.Getenv("POD_NAMESPACE"))
return e.Render(out, in, defaultNamespace)
}

_, err = io.Copy(out, in)
Expand All @@ -314,6 +377,82 @@ func render(e kubetemplate.Engine, learn bool, customRoot string, exportRoot str
return nil
}

// Reads secret data and writes the asterisk zip file
func (s *Service) writeSecretFile() error {
secret, err := s.kc.CoreV1().Secrets(s.DefaultNamespace).Get(context.Background(), s.SourceSecret, metav1.GetOptions{})
if err != nil {
return eris.Wrap(err, "failure during source secret fetch")
}

key := "asterisk-config.zip"
data, ok := secret.Data[key]
if !ok {
return eris.Wrap(err, "failure during source secret fetch")
}

if err := ioutil.WriteFile("/asterisk-config.zip", data, 0666); err != nil {
return eris.Wrap(err, "failed to write secret data")
}

return nil
}

func (s *Service) renderFull() error {
if s.SourceSecret != "" {
// Read the zip from the Secret
if err := s.writeSecretFile(); err != nil {
return eris.Wrap(err, "failed to extract secret")
}

s.SourceFile = "/asterisk-config.zip"
}

// Need to make sure that the ExportRoot is clean before rendering the
// template because if we extract something to it, and later the secret
// changes, the new zip might not have some files that it had before, and
// we end up with inconsistent state. Removing everything does not affect
// Asterisk as the files will only be considered when reloading modules.
if err := clearDir(s.ExportRoot); err != nil {
return eris.Wrap(err, "failed to cleanup export directory")
}

// Extract the source file
if err := extractSource(s.SourceFile, s.CustomRoot); err != nil {
return eris.Wrap(err, "failed extract source from SourceFile")
}

// Learn the templates first
if err := s.learnTemplates(); err != nil {
return eris.Wrap(err, "failed to learn templates")
}

// Render the templates
if err := s.renderTemplates(); err != nil {
return eris.Wrap(err, "failed to render configuration")
}

// Write out render flag file to signal completion
if err := ioutil.WriteFile(path.Join(s.ExportRoot, renderFlagFilename), []byte("complete"), 0666); err != nil {
return eris.Wrap(err, "failed to write render flag file")
}

return nil
}

func clearDir(dir string) error {
files, err := filepath.Glob(filepath.Join(dir, "*"))
if err != nil {
return err
}
for _, file := range files {
err = os.RemoveAll(file)
if err != nil {
return err
}
}
return nil
}

func waitAsterisk(username, secret string) error {
r, err := http.NewRequest("GET", "http://127.0.0.1:8088/ari/asterisk/variable?variable=ASTERISK_CONFIG_SYSTEM_READY", nil)
if err != nil {
Expand All @@ -328,10 +467,11 @@ func waitAsterisk(username, secret string) error {
resp := new(response)

for {
time.Sleep(time.Second / 2)
time.Sleep(time.Second)

ret, err := http.DefaultClient.Do(r)
if err != nil {
log.Println("error calling asterisk:", err)
continue
}

Expand All @@ -342,6 +482,7 @@ func waitAsterisk(username, secret string) error {
}
if resp.Value != "1" {
// not yet ready
log.Println("asterisk not ready:", resp.Value, ret.StatusCode)
continue
}

Expand Down