Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master #2851

Merged
merged 28 commits into from
Mar 5, 2024
Merged

master #2851

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eb28faf
add emqx migration func
abhishek9686 Feb 28, 2024
4630925
send pull syn over old mq for emqx migration
abhishek9686 Feb 29, 2024
01592b2
add additional logging
abhishek9686 Feb 29, 2024
089df3d
Merge pull request #2834 from gravitl/release-v0.23.0
abhishek9686 Feb 29, 2024
be0a588
use kick out clients
abhishek9686 Mar 1, 2024
0259c87
migrate emqx
abhishek9686 Mar 1, 2024
8e240ff
migrate emqx
abhishek9686 Mar 1, 2024
83cdc1a
migrate emqx
abhishek9686 Mar 1, 2024
81af1d9
migrate emqx
abhishek9686 Mar 1, 2024
2e3b640
migrate emqx
abhishek9686 Mar 1, 2024
d69d0ed
validate relay req for inet gws
abhishek9686 Mar 2, 2024
b4f788f
add debug logs to removing host from network
abhishek9686 Mar 4, 2024
e3b6b06
add debug logs to removing host from network
abhishek9686 Mar 4, 2024
1b4eb1b
add debug logs to removing host from network
abhishek9686 Mar 4, 2024
0dae814
add debug logs to removing host from network
abhishek9686 Mar 4, 2024
f165f5f
add debug logs to removing host from network
abhishek9686 Mar 4, 2024
c25a732
fix acl mutex lock
abhishek9686 Mar 4, 2024
8e05807
remove debug logs
abhishek9686 Mar 4, 2024
8ac864f
add update check for validate relay
abhishek9686 Mar 4, 2024
778d025
Merge branch 'develop' of https://github.com/gravitl/netmaker into AC…
abhishek9686 Mar 4, 2024
077ec49
Merge pull request #2838 from gravitl/NET-940-fix
abhishek9686 Mar 4, 2024
c45f7bf
Merge pull request #2839 from gravitl/NET-1047
abhishek9686 Mar 4, 2024
e846881
Merge pull request #2843 from gravitl/ACC-468
abhishek9686 Mar 4, 2024
37e877c
Merge pull request #2844 from gravitl/release-v0.23.0
abhishek9686 Mar 4, 2024
a5dff67
delete server user on start up to re-initialize
abhishek9686 Mar 5, 2024
96f6493
increase verbosity on mq log
abhishek9686 Mar 5, 2024
e7eb40a
Merge pull request #2849 from gravitl/ACC-468
abhishek9686 Mar 5, 2024
d390f44
Merge pull request #2850 from gravitl/release-v0.23.0
abhishek9686 Mar 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/gravitl/netmaker/logger"
m "github.com/gravitl/netmaker/migrate"
"github.com/gravitl/netmaker/servercfg"
)

Expand Down Expand Up @@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) {
logger.Log(0, err.Error())
}
}()
if os.Getenv("MIGRATE_EMQX") == "true" {
logger.Log(0, "migrating emqx...")
time.Sleep(time.Second * 2)
m.MigrateEmqx()
}
logger.Log(0, "REST Server successfully started on port ", port, " (REST)")

// Block main routine until a signal is received
Expand Down
11 changes: 11 additions & 0 deletions controllers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,17 @@ func updateNode(w http.ResponseWriter, r *http.Request) {

}
relayUpdate := logic.RelayUpdates(&currentNode, newNode)
if relayUpdate && newNode.IsRelay {
err = logic.ValidateRelay(models.RelayRequest{
NodeID: newNode.ID.String(),
NetID: newNode.Network,
RelayedNodes: newNode.RelayedNodes,
}, true)
if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
}
_, err = logic.GetHost(newNode.HostID.String())
if err != nil {
logger.Log(0, r.Header.Get("user"),
Expand Down
2 changes: 0 additions & 2 deletions logic/acls/nodeacls/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,12 @@ func RemoveNodeACL(networkID NetworkID, nodeID NodeID) (acls.ACLContainer, error
if err != nil {
return nil, err
}
acls.AclMutex.Lock()
for currentNodeID := range currentNetworkACL {
if NodeID(currentNodeID) != nodeID {
currentNetworkACL[currentNodeID].Remove(acls.AclID(nodeID))
}
}
delete(currentNetworkACL, acls.AclID(nodeID))
acls.AclMutex.Unlock()
return currentNetworkACL.Save(acls.ContainerID(networkID))
}

Expand Down
4 changes: 3 additions & 1 deletion logic/acls/nodeacls/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
}
var allowed bool
acls.AclMutex.RLock()
allowed = currentNetworkACL[acls.AclID(node1)].IsAllowed(acls.AclID(node2)) && currentNetworkACL[acls.AclID(node2)].IsAllowed(acls.AclID(node1))
currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
acls.AclMutex.RUnlock()
allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
return allowed
}

Expand Down
1 change: 0 additions & 1 deletion logic/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func DissasociateNodeFromHost(n *models.Node, h *models.Host) error {
if err := DeleteNodeByID(n); err != nil {
return err
}

return UpsertHost(h)
}

Expand Down
3 changes: 0 additions & 3 deletions logic/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
func DeleteNode(node *models.Node, purge bool) error {
alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE
node.Action = models.NODE_DELETE

//delete ext clients if node is ingress gw
if node.IsIngressGateway {
if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
Expand Down Expand Up @@ -235,7 +234,6 @@ func DeleteNode(node *models.Node, purge bool) error {
if node.IsInternetGateway {
UnsetInternetGw(node)
}

if !purge && !alreadyDeleted {
newnode := *node
newnode.PendingDelete = true
Expand Down Expand Up @@ -281,7 +279,6 @@ func GetNodeByHostRef(hostid, network string) (node models.Node, err error) {
func DeleteNodeByID(node *models.Node) error {
var err error
var key = node.ID.String()

if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil {
if !database.IsEmptyRecord(err) {
return err
Expand Down
4 changes: 4 additions & 0 deletions logic/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ var SetRelayedNodes = func(setRelayed bool, relay string, relayed []string) []mo
var RelayUpdates = func(currentNode, newNode *models.Node) bool {
return false
}

var ValidateRelay = func(relay models.RelayRequest, update bool) error {
return nil
}
19 changes: 19 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"log"
"time"

"golang.org/x/exp/slog"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/logic/acls"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/servercfg"
)

Expand All @@ -22,6 +24,7 @@ func Run() {
updateHosts()
updateNodes()
updateAcls()

}

func assignSuperAdmin() {
Expand Down Expand Up @@ -292,3 +295,19 @@ func updateAcls() {
slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
}
}

func MigrateEmqx() {

err := mq.SendPullSYN()
if err != nil {
logger.Log(0, "failed to send pull syn to clients", "error", err.Error())

}
time.Sleep(time.Second * 3)
slog.Info("proceeding to kicking out clients from emqx")
err = mq.KickOutClients()
if err != nil {
logger.Log(2, "failed to migrate emqx: ", "kickout-error", err.Error())
}

}
131 changes: 131 additions & 0 deletions mq/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package mq

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)

func setupmqtt_old() (mqtt.Client, error) {

opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
id := logic.RandomString(23)
opts.ClientID = id
opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(time.Second << 2)
opts.SetKeepAlive(time.Minute)
opts.SetWriteTimeout(time.Minute)
mqclient := mqtt.NewClient(opts)

var connecterr error
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
if token.Error() == nil {
connecterr = errors.New("connect timeout")
} else {
connecterr = token.Error()
}
slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
}
return mqclient, nil
}

func getEmqxAuthTokenOld() (string, error) {
payload, err := json.Marshal(&emqxLogin{
Username: os.Getenv("OLD_MQ_USERNAME"),
Password: os.Getenv("OLD_MQ_PASSWORD"),
})
if err != nil {
return "", err
}
resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
if err != nil {
return "", err
}
msg, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error during EMQX login %v", string(msg))
}
var loginResp emqxLoginResponse
if err := json.Unmarshal(msg, &loginResp); err != nil {
return "", err
}
return loginResp.Token, nil
}

func SendPullSYN() error {
mqclient, err := setupmqtt_old()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
return err
}
for _, host := range hosts {
host := host
hostUpdate := models.HostUpdate{
Action: models.RequestPull,
Host: host,
}
msg, _ := json.Marshal(hostUpdate)
encrypted, encryptErr := encryptMsg(&host, msg)
if encryptErr != nil {
continue
}
logger.Log(0, "sending pull syn to", host.Name)
mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
}
return nil
}

func KickOutClients() error {
authToken, err := getEmqxAuthTokenOld()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
slog.Error("failed to migrate emqx: ", "error", err)
return err
}

for _, host := range hosts {
url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
client := &http.Client{}
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
continue
}
req.Header.Add("Authorization", "Bearer "+authToken)
res, err := client.Do(req)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
continue
}
if res.StatusCode != http.StatusNoContent {
slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
}
res.Body.Close()
}
return nil
}
1 change: 1 addition & 0 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func SetupMQTT() {
log.Fatal(err)
}
} else {
emqx.DeleteEmqxUser(servercfg.GetMqUserName())
if err := emqx.CreateEmqxUserforServer(); err != nil {
log.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pro/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func InitPro() {
logic.UpdateRelayed = proLogic.UpdateRelayed
logic.SetRelayedNodes = proLogic.SetRelayedNodes
logic.RelayUpdates = proLogic.RelayUpdates
logic.ValidateRelay = proLogic.ValidateRelay
logic.GetTrialEndDate = getTrialEndDate
logic.SetDefaultGw = proLogic.SetDefaultGw
logic.SetDefaultGwForRelayedUpdate = proLogic.SetDefaultGwForRelayedUpdate
Expand Down
9 changes: 6 additions & 3 deletions pro/logic/relays.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error)
if host.OS != "linux" {
return returnnodes, models.Node{}, fmt.Errorf("only linux machines can be relay nodes")
}
err = ValidateRelay(relay)
err = ValidateRelay(relay, false)
if err != nil {
return returnnodes, models.Node{}, err
}
Expand Down Expand Up @@ -101,14 +101,14 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
// }

// ValidateRelay - checks if relay is valid
func ValidateRelay(relay models.RelayRequest) error {
func ValidateRelay(relay models.RelayRequest, update bool) error {
var err error

node, err := logic.GetNodeByID(relay.NodeID)
if err != nil {
return err
}
if node.IsRelay {
if !update && node.IsRelay {
return errors.New("node is already acting as a relay")
}
for _, relayedNodeID := range relay.RelayedNodes {
Expand All @@ -119,6 +119,9 @@ func ValidateRelay(relay models.RelayRequest) error {
if relayedNode.IsIngressGateway {
return errors.New("cannot relay an ingress gateway (" + relayedNodeID + ")")
}
if relayedNode.IsInternetGateway {
return errors.New("cannot relay an internet gateway (" + relayedNodeID + ")")
}
}
return err
}
Expand Down
Loading