From eb28faf669a91229a4af0b3b365e11fbd99d062e Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Wed, 28 Feb 2024 17:57:25 +0700 Subject: [PATCH 1/9] add emqx migration func --- migrate/migrate.go | 21 +++++++++++++++ mq/migrate.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 mq/migrate.go diff --git a/migrate/migrate.go b/migrate/migrate.go index ce4cc8407..28c0e524a 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "os" "golang.org/x/exp/slog" @@ -12,6 +13,7 @@ import ( "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/logic/acls" "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/mq" "github.com/gravitl/netmaker/servercfg" ) @@ -22,6 +24,9 @@ func Run() { updateHosts() updateNodes() updateAcls() + if os.Getenv("MIGRATE_EMQX") == "true" { + migrateEmqx() + } } func assignSuperAdmin() { @@ -292,3 +297,19 @@ func updateAcls() { slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID)) } } + +func migrateEmqx() { + hosts, err := logic.GetAllHosts() + if err != nil { + slog.Error("failed to migrate emqx: ", "error", err) + return + } + clientIDs := []string{} + for _, host := range hosts { + clientIDs = append(clientIDs, host.ID.String()) + } + err = mq.KickOutClients(clientIDs) + if err != nil { + slog.Error("failed to migrate emqx: ", "kickout-error", err) + } +} diff --git a/mq/migrate.go b/mq/migrate.go new file mode 100644 index 000000000..1a29e25f0 --- /dev/null +++ b/mq/migrate.go @@ -0,0 +1,65 @@ +package mq + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + + "golang.org/x/exp/slog" +) + +func getEmqxAuthTokenOld() (string, error) { + payload, err := json.Marshal(&emqxLogin{ + Username: os.Getenv("OLD_MQ_USERNAME"), + Password: os.Getenv("OLD_MQ_PASSWORD"), + }) + if err != nil { + return "", err + } + resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload)) + if err != nil { + return "", err + } + msg, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("error during EMQX login %v", string(msg)) + } + var loginResp emqxLoginResponse + if err := json.Unmarshal(msg, &loginResp); err != nil { + return "", err + } + return loginResp.Token, nil +} + +func KickOutClients(clientIDs []string) error { + authToken, err := getEmqxAuthTokenOld() + if err != nil { + return err + } + for _, clientID := range clientIDs { + url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), clientID) + client := &http.Client{} + req, err := http.NewRequest(http.MethodDelete, url, nil) + if err != nil { + slog.Error("failed to kick out client:", "client", clientID, "error", err) + continue + } + req.Header.Add("Authorization", "Bearer "+authToken) + res, err := client.Do(req) + if err != nil { + slog.Error("failed to kick out client:", "client", clientID, "req-error", err) + continue + } + if res.StatusCode != http.StatusNoContent { + slog.Error("failed to kick out client:", "client", clientID, "status-code", res.StatusCode) + } + res.Body.Close() + } + return nil +} From 463092518208ebcff3bb92fb6aa69cc681d0ada5 Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Thu, 29 Feb 2024 13:30:51 +0700 Subject: [PATCH 2/9] send pull syn over old mq for emqx migration --- migrate/migrate.go | 20 ++++++------ mq/migrate.go | 76 ++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 79 insertions(+), 17 deletions(-) diff --git a/migrate/migrate.go b/migrate/migrate.go index 28c0e524a..d2aa038a6 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -299,17 +299,15 @@ func updateAcls() { } func migrateEmqx() { - hosts, err := logic.GetAllHosts() - if err != nil { - slog.Error("failed to migrate emqx: ", "error", err) - return - } - clientIDs := []string{} - for _, host := range hosts { - clientIDs = append(clientIDs, host.ID.String()) - } - err = mq.KickOutClients(clientIDs) + + err := mq.SendPullSYN() if err != nil { - slog.Error("failed to migrate emqx: ", "kickout-error", err) + slog.Error("failed to send pull syn to clients", "error", err) + slog.Info("proceeding to kicking out clients from emqx") + err := mq.KickOutClients() + if err != nil { + slog.Error("failed to migrate emqx: ", "kickout-error", err) + } } + } diff --git a/mq/migrate.go b/mq/migrate.go index 1a29e25f0..f518ecdf0 100644 --- a/mq/migrate.go +++ b/mq/migrate.go @@ -3,14 +3,47 @@ package mq import ( "bytes" "encoding/json" + "errors" "fmt" "io" "net/http" "os" + "time" + mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/logic" + "github.com/gravitl/netmaker/models" + "github.com/gravitl/netmaker/servercfg" "golang.org/x/exp/slog" ) +func setupmqtt_old() (mqtt.Client, error) { + + opts := mqtt.NewClientOptions() + opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT")) + id := logic.RandomString(23) + opts.ClientID = id + opts.SetUsername(os.Getenv("OLD_MQ_USERNAME")) + opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD")) + opts.SetAutoReconnect(true) + opts.SetConnectRetry(true) + opts.SetConnectRetryInterval(time.Second << 2) + opts.SetKeepAlive(time.Minute) + opts.SetWriteTimeout(time.Minute) + mqclient := mqtt.NewClient(opts) + + var connecterr error + if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil { + if token.Error() == nil { + connecterr = errors.New("connect timeout") + } else { + connecterr = token.Error() + } + slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr) + } + return mqclient, nil +} + func getEmqxAuthTokenOld() (string, error) { payload, err := json.Marshal(&emqxLogin{ Username: os.Getenv("OLD_MQ_USERNAME"), @@ -37,27 +70,58 @@ func getEmqxAuthTokenOld() (string, error) { return loginResp.Token, nil } -func KickOutClients(clientIDs []string) error { +func SendPullSYN() error { + mqclient, err := setupmqtt_old() + if err != nil { + return err + } + hosts, err := logic.GetAllHosts() + if err != nil { + return err + } + for _, host := range hosts { + host := host + hostUpdate := models.HostUpdate{ + Action: models.RequestPull, + Host: host, + } + msg, _ := json.Marshal(hostUpdate) + encrypted, encryptErr := encryptMsg(&host, msg) + if encryptErr != nil { + continue + } + mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted) + } + return nil +} + +func KickOutClients() error { authToken, err := getEmqxAuthTokenOld() if err != nil { return err } - for _, clientID := range clientIDs { - url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), clientID) + hosts, err := logic.GetAllHosts() + if err != nil { + slog.Error("failed to migrate emqx: ", "error", err) + return err + } + + for _, host := range hosts { + url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String()) client := &http.Client{} req, err := http.NewRequest(http.MethodDelete, url, nil) if err != nil { - slog.Error("failed to kick out client:", "client", clientID, "error", err) + slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err) continue } req.Header.Add("Authorization", "Bearer "+authToken) res, err := client.Do(req) if err != nil { - slog.Error("failed to kick out client:", "client", clientID, "req-error", err) + slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err) continue } if res.StatusCode != http.StatusNoContent { - slog.Error("failed to kick out client:", "client", clientID, "status-code", res.StatusCode) + slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode) } res.Body.Close() } From 01592b2ecc2ef43f3c171d6611e31a631027f245 Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Thu, 29 Feb 2024 19:58:18 +0700 Subject: [PATCH 3/9] add additional logging --- migrate/migrate.go | 5 +++-- mq/migrate.go | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/migrate/migrate.go b/migrate/migrate.go index d2aa038a6..f2a201f86 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -25,6 +25,7 @@ func Run() { updateNodes() updateAcls() if os.Getenv("MIGRATE_EMQX") == "true" { + logger.Log(0, "migrating emqx...") migrateEmqx() } } @@ -302,11 +303,11 @@ func migrateEmqx() { err := mq.SendPullSYN() if err != nil { - slog.Error("failed to send pull syn to clients", "error", err) + logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) slog.Info("proceeding to kicking out clients from emqx") err := mq.KickOutClients() if err != nil { - slog.Error("failed to migrate emqx: ", "kickout-error", err) + logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) } } diff --git a/mq/migrate.go b/mq/migrate.go index f518ecdf0..068d785f0 100644 --- a/mq/migrate.go +++ b/mq/migrate.go @@ -11,6 +11,7 @@ import ( "time" mqtt "github.com/eclipse/paho.mqtt.golang" + "github.com/gravitl/netmaker/logger" "github.com/gravitl/netmaker/logic" "github.com/gravitl/netmaker/models" "github.com/gravitl/netmaker/servercfg" @@ -90,6 +91,7 @@ func SendPullSYN() error { if encryptErr != nil { continue } + logger.Log(0, "sending pull syn to", host.Name) mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted) } return nil From be0a5885ea4b3d5b7752c6ebfb7d5cc1cfbdcc1e Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Fri, 1 Mar 2024 12:58:00 +0700 Subject: [PATCH 4/9] use kick out clients --- migrate/migrate.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/migrate/migrate.go b/migrate/migrate.go index f2a201f86..02076638e 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -301,14 +301,14 @@ func updateAcls() { func migrateEmqx() { - err := mq.SendPullSYN() + // err := mq.SendPullSYN() + // if err != nil { + // logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) + slog.Info("proceeding to kicking out clients from emqx") + err := mq.KickOutClients() if err != nil { - logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) - slog.Info("proceeding to kicking out clients from emqx") - err := mq.KickOutClients() - if err != nil { - logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) - } + logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) } + //} } From 0259c87f874736fd201acd517e15f4cd927c99a0 Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Fri, 1 Mar 2024 13:08:25 +0700 Subject: [PATCH 5/9] migrate emqx --- main.go | 4 ++++ migrate/migrate.go | 8 ++------ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 014711abf..83880beb3 100644 --- a/main.go +++ b/main.go @@ -48,6 +48,10 @@ func main() { defer stop() var waitGroup sync.WaitGroup startControllers(&waitGroup, ctx) // start the api endpoint and mq and stun + if os.Getenv("MIGRATE_EMQX") == "true" { + logger.Log(0, "migrating emqx...") + migrate.MigrateEmqx() + } startHooks() <-ctx.Done() waitGroup.Wait() diff --git a/migrate/migrate.go b/migrate/migrate.go index 02076638e..e3b861bea 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "log" - "os" "golang.org/x/exp/slog" @@ -24,10 +23,7 @@ func Run() { updateHosts() updateNodes() updateAcls() - if os.Getenv("MIGRATE_EMQX") == "true" { - logger.Log(0, "migrating emqx...") - migrateEmqx() - } + } func assignSuperAdmin() { @@ -299,7 +295,7 @@ func updateAcls() { } } -func migrateEmqx() { +func MigrateEmqx() { // err := mq.SendPullSYN() // if err != nil { From 8e240ff701bc0ac782f4286da42956e5353064a4 Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Fri, 1 Mar 2024 13:35:59 +0700 Subject: [PATCH 6/9] migrate emqx --- controllers/controller.go | 6 ++++++ main.go | 4 ---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/controllers/controller.go b/controllers/controller.go index d80093beb..798a61a02 100644 --- a/controllers/controller.go +++ b/controllers/controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "os" "strings" "sync" "time" @@ -11,6 +12,7 @@ import ( "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/gravitl/netmaker/logger" + m "github.com/gravitl/netmaker/migrate" "github.com/gravitl/netmaker/servercfg" ) @@ -62,6 +64,10 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) { logger.Log(0, err.Error()) } }() + if os.Getenv("MIGRATE_EMQX") == "true" { + logger.Log(0, "migrating emqx...") + m.MigrateEmqx() + } logger.Log(0, "REST Server successfully started on port ", port, " (REST)") // Block main routine until a signal is received diff --git a/main.go b/main.go index 83880beb3..014711abf 100644 --- a/main.go +++ b/main.go @@ -48,10 +48,6 @@ func main() { defer stop() var waitGroup sync.WaitGroup startControllers(&waitGroup, ctx) // start the api endpoint and mq and stun - if os.Getenv("MIGRATE_EMQX") == "true" { - logger.Log(0, "migrating emqx...") - migrate.MigrateEmqx() - } startHooks() <-ctx.Done() waitGroup.Wait() From 83cdc1abf656b295ca89e2b2123816bc5b48d72d Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Fri, 1 Mar 2024 14:26:07 +0700 Subject: [PATCH 7/9] migrate emqx --- controllers/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/controllers/controller.go b/controllers/controller.go index 798a61a02..4ce41e47c 100644 --- a/controllers/controller.go +++ b/controllers/controller.go @@ -66,6 +66,7 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) { }() if os.Getenv("MIGRATE_EMQX") == "true" { logger.Log(0, "migrating emqx...") + time.Sleep(time.Second * 2) m.MigrateEmqx() } logger.Log(0, "REST Server successfully started on port ", port, " (REST)") From 81af1d92c080c904605ffcd8854966797a96285f Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Fri, 1 Mar 2024 14:46:32 +0700 Subject: [PATCH 8/9] migrate emqx --- migrate/migrate.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/migrate/migrate.go b/migrate/migrate.go index e3b861bea..0fa5601a3 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -297,14 +297,14 @@ func updateAcls() { func MigrateEmqx() { - // err := mq.SendPullSYN() - // if err != nil { - // logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) - slog.Info("proceeding to kicking out clients from emqx") - err := mq.KickOutClients() + err := mq.SendPullSYN() if err != nil { - logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) + logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) + slog.Info("proceeding to kicking out clients from emqx") + err := mq.KickOutClients() + if err != nil { + logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) + } } - //} } From 2e3b640a9a37d5aa1024b31b586391a2d726914c Mon Sep 17 00:00:00 2001 From: abhishek9686 Date: Fri, 1 Mar 2024 14:59:47 +0700 Subject: [PATCH 9/9] migrate emqx --- migrate/migrate.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/migrate/migrate.go b/migrate/migrate.go index 0fa5601a3..0dd8ba5cb 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "log" + "time" "golang.org/x/exp/slog" @@ -300,11 +301,13 @@ func MigrateEmqx() { err := mq.SendPullSYN() if err != nil { logger.Log(0, "failed to send pull syn to clients", "error", err.Error()) - slog.Info("proceeding to kicking out clients from emqx") - err := mq.KickOutClients() - if err != nil { - logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) - } + + } + time.Sleep(time.Second * 3) + slog.Info("proceeding to kicking out clients from emqx") + err = mq.KickOutClients() + if err != nil { + logger.Log(0, "failed to migrate emqx: ", "kickout-error", err.Error()) } }