Skip to content

Commit

Permalink
Merge pull request #2851 from gravitl/master
Browse files Browse the repository at this point in the history
master
  • Loading branch information
abhishek9686 authored Mar 5, 2024
2 parents 6f61f8a + d390f44 commit 8227e78
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 10 deletions.
7 changes: 7 additions & 0 deletions controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"context"
"fmt"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/gravitl/netmaker/logger"
m "github.com/gravitl/netmaker/migrate"
"github.com/gravitl/netmaker/servercfg"
)

Expand Down Expand Up @@ -62,6 +64,11 @@ func HandleRESTRequests(wg *sync.WaitGroup, ctx context.Context) {
logger.Log(0, err.Error())
}
}()
if os.Getenv("MIGRATE_EMQX") == "true" {
logger.Log(0, "migrating emqx...")
time.Sleep(time.Second * 2)
m.MigrateEmqx()
}
logger.Log(0, "REST Server successfully started on port ", port, " (REST)")

// Block main routine until a signal is received
Expand Down
11 changes: 11 additions & 0 deletions controllers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,17 @@ func updateNode(w http.ResponseWriter, r *http.Request) {

}
relayUpdate := logic.RelayUpdates(&currentNode, newNode)
if relayUpdate && newNode.IsRelay {
err = logic.ValidateRelay(models.RelayRequest{
NodeID: newNode.ID.String(),
NetID: newNode.Network,
RelayedNodes: newNode.RelayedNodes,
}, true)
if err != nil {
logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
return
}
}
_, err = logic.GetHost(newNode.HostID.String())
if err != nil {
logger.Log(0, r.Header.Get("user"),
Expand Down
2 changes: 0 additions & 2 deletions logic/acls/nodeacls/modify.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,12 @@ func RemoveNodeACL(networkID NetworkID, nodeID NodeID) (acls.ACLContainer, error
if err != nil {
return nil, err
}
acls.AclMutex.Lock()
for currentNodeID := range currentNetworkACL {
if NodeID(currentNodeID) != nodeID {
currentNetworkACL[currentNodeID].Remove(acls.AclID(nodeID))
}
}
delete(currentNetworkACL, acls.AclID(nodeID))
acls.AclMutex.Unlock()
return currentNetworkACL.Save(acls.ContainerID(networkID))
}

Expand Down
4 changes: 3 additions & 1 deletion logic/acls/nodeacls/retrieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ func AreNodesAllowed(networkID NetworkID, node1, node2 NodeID) bool {
}
var allowed bool
acls.AclMutex.RLock()
allowed = currentNetworkACL[acls.AclID(node1)].IsAllowed(acls.AclID(node2)) && currentNetworkACL[acls.AclID(node2)].IsAllowed(acls.AclID(node1))
currNetworkACLNode1 := currentNetworkACL[acls.AclID(node1)]
currNetworkACLNode2 := currentNetworkACL[acls.AclID(node2)]
acls.AclMutex.RUnlock()
allowed = currNetworkACLNode1.IsAllowed(acls.AclID(node2)) && currNetworkACLNode2.IsAllowed(acls.AclID(node1))
return allowed
}

Expand Down
1 change: 0 additions & 1 deletion logic/hosts.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,6 @@ func DissasociateNodeFromHost(n *models.Node, h *models.Host) error {
if err := DeleteNodeByID(n); err != nil {
return err
}

return UpsertHost(h)
}

Expand Down
3 changes: 0 additions & 3 deletions logic/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
func DeleteNode(node *models.Node, purge bool) error {
alreadyDeleted := node.PendingDelete || node.Action == models.NODE_DELETE
node.Action = models.NODE_DELETE

//delete ext clients if node is ingress gw
if node.IsIngressGateway {
if err := DeleteGatewayExtClients(node.ID.String(), node.Network); err != nil {
Expand Down Expand Up @@ -235,7 +234,6 @@ func DeleteNode(node *models.Node, purge bool) error {
if node.IsInternetGateway {
UnsetInternetGw(node)
}

if !purge && !alreadyDeleted {
newnode := *node
newnode.PendingDelete = true
Expand Down Expand Up @@ -281,7 +279,6 @@ func GetNodeByHostRef(hostid, network string) (node models.Node, err error) {
func DeleteNodeByID(node *models.Node) error {
var err error
var key = node.ID.String()

if err = database.DeleteRecord(database.NODES_TABLE_NAME, key); err != nil {
if !database.IsEmptyRecord(err) {
return err
Expand Down
4 changes: 4 additions & 0 deletions logic/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ var SetRelayedNodes = func(setRelayed bool, relay string, relayed []string) []mo
var RelayUpdates = func(currentNode, newNode *models.Node) bool {
return false
}

var ValidateRelay = func(relay models.RelayRequest, update bool) error {
return nil
}
19 changes: 19 additions & 0 deletions migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"log"
"time"

"golang.org/x/exp/slog"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/logic/acls"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/mq"
"github.com/gravitl/netmaker/servercfg"
)

Expand All @@ -22,6 +24,7 @@ func Run() {
updateHosts()
updateNodes()
updateAcls()

}

func assignSuperAdmin() {
Expand Down Expand Up @@ -292,3 +295,19 @@ func updateAcls() {
slog.Info(fmt.Sprintf("(migration) successfully saved new acls for network: %s", network.NetID))
}
}

func MigrateEmqx() {

err := mq.SendPullSYN()
if err != nil {
logger.Log(0, "failed to send pull syn to clients", "error", err.Error())

}
time.Sleep(time.Second * 3)
slog.Info("proceeding to kicking out clients from emqx")
err = mq.KickOutClients()
if err != nil {
logger.Log(2, "failed to migrate emqx: ", "kickout-error", err.Error())
}

}
131 changes: 131 additions & 0 deletions mq/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package mq

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"time"

mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gravitl/netmaker/logger"
"github.com/gravitl/netmaker/logic"
"github.com/gravitl/netmaker/models"
"github.com/gravitl/netmaker/servercfg"
"golang.org/x/exp/slog"
)

func setupmqtt_old() (mqtt.Client, error) {

opts := mqtt.NewClientOptions()
opts.AddBroker(os.Getenv("OLD_BROKER_ENDPOINT"))
id := logic.RandomString(23)
opts.ClientID = id
opts.SetUsername(os.Getenv("OLD_MQ_USERNAME"))
opts.SetPassword(os.Getenv("OLD_MQ_PASSWORD"))
opts.SetAutoReconnect(true)
opts.SetConnectRetry(true)
opts.SetConnectRetryInterval(time.Second << 2)
opts.SetKeepAlive(time.Minute)
opts.SetWriteTimeout(time.Minute)
mqclient := mqtt.NewClient(opts)

var connecterr error
if token := mqclient.Connect(); !token.WaitTimeout(30*time.Second) || token.Error() != nil {
if token.Error() == nil {
connecterr = errors.New("connect timeout")
} else {
connecterr = token.Error()
}
slog.Error("unable to connect to broker", "server", os.Getenv("OLD_BROKER_ENDPOINT"), "error", connecterr)
}
return mqclient, nil
}

func getEmqxAuthTokenOld() (string, error) {
payload, err := json.Marshal(&emqxLogin{
Username: os.Getenv("OLD_MQ_USERNAME"),
Password: os.Getenv("OLD_MQ_PASSWORD"),
})
if err != nil {
return "", err
}
resp, err := http.Post(os.Getenv("OLD_EMQX_REST_ENDPOINT")+"/api/v5/login", "application/json", bytes.NewReader(payload))
if err != nil {
return "", err
}
msg, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("error during EMQX login %v", string(msg))
}
var loginResp emqxLoginResponse
if err := json.Unmarshal(msg, &loginResp); err != nil {
return "", err
}
return loginResp.Token, nil
}

func SendPullSYN() error {
mqclient, err := setupmqtt_old()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
return err
}
for _, host := range hosts {
host := host
hostUpdate := models.HostUpdate{
Action: models.RequestPull,
Host: host,
}
msg, _ := json.Marshal(hostUpdate)
encrypted, encryptErr := encryptMsg(&host, msg)
if encryptErr != nil {
continue
}
logger.Log(0, "sending pull syn to", host.Name)
mqclient.Publish(fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), 0, true, encrypted)
}
return nil
}

func KickOutClients() error {
authToken, err := getEmqxAuthTokenOld()
if err != nil {
return err
}
hosts, err := logic.GetAllHosts()
if err != nil {
slog.Error("failed to migrate emqx: ", "error", err)
return err
}

for _, host := range hosts {
url := fmt.Sprintf("%s/api/v5/clients/%s", os.Getenv("OLD_EMQX_REST_ENDPOINT"), host.ID.String())
client := &http.Client{}
req, err := http.NewRequest(http.MethodDelete, url, nil)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "error", err)
continue
}
req.Header.Add("Authorization", "Bearer "+authToken)
res, err := client.Do(req)
if err != nil {
slog.Error("failed to kick out client:", "client", host.ID.String(), "req-error", err)
continue
}
if res.StatusCode != http.StatusNoContent {
slog.Error("failed to kick out client:", "client", host.ID.String(), "status-code", res.StatusCode)
}
res.Body.Close()
}
return nil
}
1 change: 1 addition & 0 deletions mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func SetupMQTT() {
log.Fatal(err)
}
} else {
emqx.DeleteEmqxUser(servercfg.GetMqUserName())
if err := emqx.CreateEmqxUserforServer(); err != nil {
log.Fatal(err)
}
Expand Down
1 change: 1 addition & 0 deletions pro/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func InitPro() {
logic.UpdateRelayed = proLogic.UpdateRelayed
logic.SetRelayedNodes = proLogic.SetRelayedNodes
logic.RelayUpdates = proLogic.RelayUpdates
logic.ValidateRelay = proLogic.ValidateRelay
logic.GetTrialEndDate = getTrialEndDate
logic.SetDefaultGw = proLogic.SetDefaultGw
logic.SetDefaultGwForRelayedUpdate = proLogic.SetDefaultGwForRelayedUpdate
Expand Down
9 changes: 6 additions & 3 deletions pro/logic/relays.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func CreateRelay(relay models.RelayRequest) ([]models.Node, models.Node, error)
if host.OS != "linux" {
return returnnodes, models.Node{}, fmt.Errorf("only linux machines can be relay nodes")
}
err = ValidateRelay(relay)
err = ValidateRelay(relay, false)
if err != nil {
return returnnodes, models.Node{}, err
}
Expand Down Expand Up @@ -101,14 +101,14 @@ func SetRelayedNodes(setRelayed bool, relay string, relayed []string) []models.N
// }

// ValidateRelay - checks if relay is valid
func ValidateRelay(relay models.RelayRequest) error {
func ValidateRelay(relay models.RelayRequest, update bool) error {
var err error

node, err := logic.GetNodeByID(relay.NodeID)
if err != nil {
return err
}
if node.IsRelay {
if !update && node.IsRelay {
return errors.New("node is already acting as a relay")
}
for _, relayedNodeID := range relay.RelayedNodes {
Expand All @@ -119,6 +119,9 @@ func ValidateRelay(relay models.RelayRequest) error {
if relayedNode.IsIngressGateway {
return errors.New("cannot relay an ingress gateway (" + relayedNodeID + ")")
}
if relayedNode.IsInternetGateway {
return errors.New("cannot relay an internet gateway (" + relayedNodeID + ")")
}
}
return err
}
Expand Down

0 comments on commit 8227e78

Please sign in to comment.