From 57a7b09f299b88a1535ba33fd989bfa286fbd9ae Mon Sep 17 00:00:00 2001 From: rawdaGastan Date: Tue, 2 Jul 2024 13:52:17 +0300 Subject: [PATCH 1/5] update farmerbot with the new golang one --- internal/provider/resource_scheduler.go | 10 +- internal/provider/scheduler/farmer_bot.go | 141 +++++--------------- internal/provider/scheduler/request_test.go | 10 +- internal/provider/scheduler/scheduler.go | 22 +-- 4 files changed, 61 insertions(+), 122 deletions(-) diff --git a/internal/provider/resource_scheduler.go b/internal/provider/resource_scheduler.go index 8d6df191..449d9896 100644 --- a/internal/provider/resource_scheduler.go +++ b/internal/provider/resource_scheduler.go @@ -12,6 +12,7 @@ import ( "github.com/pkg/errors" "github.com/threefoldtech/terraform-provider-grid/internal/provider/scheduler" "github.com/threefoldtech/tfgrid-sdk-go/grid-client/deployer" + "github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer" "github.com/threefoldtech/zos/pkg/gridtypes" ) @@ -132,7 +133,7 @@ func parseRequests(d *schema.ResourceData, assignment map[string]uint32) []sched reqs = append(reqs, scheduler.Request{ Name: mp["name"].(string), - FarmId: uint32(mp["farm_id"].(int)), + FarmID: uint32(mp["farm_id"].(int)), PublicConfig: mp["public_config"].(bool), PublicIpsCount: uint32(mp["public_ips_count"].(int)), Certified: mp["certified"].(bool), @@ -158,7 +159,12 @@ func schedule(ctx context.Context, d *schema.ResourceData, meta interface{}) dia assignment := parseAssignment(d) reqs := parseRequests(d, assignment) - scheduler := scheduler.NewScheduler(tfPluginClient.GridProxyClient, uint64(tfPluginClient.TwinID), tfPluginClient.RMB) + rpcClient, ok := tfPluginClient.RMB.(*peer.RpcClient) + if !ok { + return diag.FromErr(fmt.Errorf("failed to cast rmb client into rpc client")) + } + + scheduler := scheduler.NewScheduler(tfPluginClient.GridProxyClient, uint64(tfPluginClient.TwinID), rpcClient) if err := scheduler.ProcessRequests(ctx, reqs, assignment); err != nil { return diag.FromErr(err) } diff --git a/internal/provider/scheduler/farmer_bot.go b/internal/provider/scheduler/farmer_bot.go index 09eaf400..20966152 100644 --- a/internal/provider/scheduler/farmer_bot.go +++ b/internal/provider/scheduler/farmer_bot.go @@ -4,73 +4,26 @@ import ( "context" "fmt" "log" - "strconv" - "strings" - "time" - "github.com/google/uuid" "github.com/pkg/errors" ) const ( FarmerBotVersionAction = "farmerbot.farmmanager.version" FarmerBotFindNodeAction = "farmerbot.nodemanager.findnode" - FarmerBotRMBFunction = "execute_job" ) -type FarmerBotAction struct { - Guid string `json:"guid"` - TwinID uint32 `json:"twinid"` - Action string `json:"action"` - Args FarmerBotArgs `json:"args"` - Result FarmerBotArgs `json:"result"` - State string `json:"state"` - Start uint64 `json:"start"` - End uint64 `json:"end"` - GracePeriod uint32 `json:"grace_period"` - Error string `json:"error"` - Timeout uint32 `json:"timeout"` - SourceTwinID uint32 `json:"src_twinid"` - SourceAction string `json:"src_action"` - Dependencies []string `json:"dependencies"` -} - -type FarmerBotArgs struct { - Args []Args `json:"args"` - Params []Params `json:"params"` -} - -type Args struct { - RequiredHRU *uint64 `json:"required_hru,omitempty"` - RequiredSRU *uint64 `json:"required_sru,omitempty"` - RequiredCRU *uint64 `json:"required_cru,omitempty"` - RequiredMRU *uint64 `json:"required_mru,omitempty"` - NodeExclude []uint32 `json:"node_exclude,omitempty"` - Dedicated *bool `json:"dedicated,omitempty"` - PublicConfig *bool `json:"public_config,omitempty"` - PublicIPs *uint32 `json:"public_ips"` - Certified *bool `json:"certified,omitempty"` -} - -type Params struct { - Key string `json:"key"` - Value interface{} `json:"value"` -} - func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool { - args := []Args{} - params := []Params{} - data := buildFarmerBotAction(farmID, uint32(s.twinID), args, params, FarmerBotVersionAction) - info, err := s.getFarmInfo(ctx, farmID) if err != nil { return false } dst := info.farmerTwinID - var output FarmerBotAction - err = s.rmbClient.Call(ctx, dst, FarmerBotRMBFunction, data, &output) + service := fmt.Sprintf("farmerbot-%d", farmID) + var version string + err = s.rmbClient.CallWithSession(ctx, info.farmerTwinID, &service, FarmerBotVersionAction, nil, &version) if err != nil { log.Printf("error while pinging farmerbot on farm %d with farmer twin %d. %s", farmID, dst, err.Error()) } @@ -79,94 +32,72 @@ func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool { } func (n *Scheduler) farmerBotSchedule(ctx context.Context, r *Request) (uint32, error) { - info, err := n.getFarmInfo(ctx, r.FarmId) + info, err := n.getFarmInfo(ctx, r.FarmID) if err != nil { - return 0, errors.Wrapf(err, "failed to get farm %d info", r.FarmId) + return 0, errors.Wrapf(err, "failed to get farm %d info", r.FarmID) } - params := buildFarmerBotParams(r) - args := buildFarmerBotArgs(r) - data := buildFarmerBotAction(info.farmerTwinID, uint32(n.twinID), args, params, FarmerBotFindNodeAction) - output := FarmerBotAction{} - err = n.rmbClient.Call(ctx, info.farmerTwinID, FarmerBotRMBFunction, data, &output) - if err != nil { - return 0, err - } - if len(output.Result.Params) < 1 { - return 0, fmt.Errorf("cannot find an eligible node on farm %d", r.FarmId) - } - nodeId, err := strconv.ParseUint(output.Result.Params[0].Value.(string), 10, 32) - if err != nil { + data := buildNodeOptions(r) + var nodeID uint32 + + service := fmt.Sprintf("farmerbot-%d", r.FarmID) + if err := n.rmbClient.CallWithSession(ctx, info.farmerTwinID, &service, FarmerBotFindNodeAction, data, &nodeID); err != nil { return 0, err } - log.Printf("got a node with id %d", nodeId) - return uint32(nodeId), nil + + log.Printf("got a node with id %d", nodeID) + return nodeID, nil } -func buildFarmerBotArgs(r *Request) []Args { - return []Args{} +type NodeFilterOption struct { + NodesExcluded []uint32 `json:"nodes_excluded,omitempty"` + Certified bool `json:"certified,omitempty"` + Dedicated bool `json:"dedicated,omitempty"` + PublicConfig bool `json:"public_config,omitempty"` + PublicIPs uint64 `json:"public_ips,omitempty"` + HRU uint64 `json:"hru,omitempty"` // in GB + SRU uint64 `json:"sru,omitempty"` // in GB + CRU uint64 `json:"cru,omitempty"` + MRU uint64 `json:"mru,omitempty"` // in GB } -func buildFarmerBotParams(r *Request) []Params { - params := []Params{} + +func buildNodeOptions(r *Request) NodeFilterOption { + options := NodeFilterOption{} if r.Capacity.HRU != 0 { - params = append(params, Params{Key: "required_hru", Value: r.Capacity.HRU}) + options.HRU = r.Capacity.HRU } if r.Capacity.SRU != 0 { - params = append(params, Params{Key: "required_sru", Value: r.Capacity.SRU}) + options.SRU = r.Capacity.SRU } if r.Capacity.MRU != 0 { - params = append(params, Params{Key: "required_mru", Value: r.Capacity.MRU}) + options.MRU = r.Capacity.MRU } if r.Capacity.CRU != 0 { - params = append(params, Params{Key: "required_cru", Value: r.Capacity.CRU}) + options.CRU = r.Capacity.CRU } if len(r.NodeExclude) != 0 { - value := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(r.NodeExclude)), ","), "") - params = append(params, Params{Key: "node_exclude", Value: value}) + options.NodesExcluded = append(options.NodesExcluded, r.NodeExclude...) } if r.Dedicated { - params = append(params, Params{Key: "dedicated", Value: r.Dedicated}) + options.Dedicated = r.Dedicated } if r.PublicConfig { - params = append(params, Params{Key: "public_config", Value: r.PublicConfig}) + options.PublicConfig = r.PublicConfig } if r.PublicIpsCount > 0 { - params = append(params, Params{Key: "public_ips", Value: r.PublicIpsCount}) + options.PublicIPs = uint64(r.PublicIpsCount) } if r.Certified { - params = append(params, Params{Key: "certified", Value: r.Certified}) + options.Certified = r.Certified } - return params -} -func buildFarmerBotAction(farmerTwinID uint32, sourceTwinID uint32, args []Args, params []Params, action string) FarmerBotAction { - return FarmerBotAction{ - Guid: uuid.NewString(), - TwinID: farmerTwinID, - Action: action, - Args: FarmerBotArgs{ - Args: args, - Params: params, - }, - Result: FarmerBotArgs{ - Args: []Args{}, - Params: []Params{}, - }, - State: "init", - Start: uint64(time.Now().Unix()), - End: 0, - GracePeriod: 0, - Error: "", - Timeout: 6000, - SourceTwinID: sourceTwinID, - Dependencies: []string{}, - } + return options } diff --git a/internal/provider/scheduler/request_test.go b/internal/provider/scheduler/request_test.go index dc631d7d..44c9e770 100644 --- a/internal/provider/scheduler/request_test.go +++ b/internal/provider/scheduler/request_test.go @@ -29,7 +29,7 @@ func TestFulfilsSuccess(t *testing.T) { SRU: 3, HRU: 3, }, - FarmId: 1, + FarmID: 1, PublicIpsCount: 1, PublicConfig: false, }, farm), true, "fullfil-success") @@ -53,7 +53,7 @@ func TestFulfilsFail(t *testing.T) { SRU: 3, HRU: 3, }, - FarmId: 1, + FarmID: 1, PublicIpsCount: 0, PublicConfig: false, } @@ -66,7 +66,7 @@ func TestFulfilsFail(t *testing.T) { "mru": func(r *Request) { r.Capacity.MRU = 4 }, "sru": func(r *Request) { r.Capacity.SRU = 9 }, "hru": func(r *Request) { r.Capacity.HRU = 4 }, - "farm_id": func(r *Request) { r.FarmId = 2 }, + "farm_id": func(r *Request) { r.FarmID = 2 }, "public_ips_count": func(r *Request) { r.PublicIpsCount = 3 }, "public_config": func(r *Request) { r.PublicConfig = true }, } @@ -86,7 +86,7 @@ func TestConstructFilter(t *testing.T) { HRU: 3, }, Name: "a", - FarmId: 1, + FarmID: 1, PublicIpsCount: 1, PublicConfig: false, Certified: true, @@ -99,7 +99,7 @@ func TestConstructFilter(t *testing.T) { assert.Equal(t, *con.FreeHRU, uint64(3), "construct-filter-hru") assert.Empty(t, con.Country, "construct-filter-country") assert.Empty(t, con.City, "construct-filter-city") - assert.Equal(t, con.FarmIDs, []uint64{uint64(r.FarmId)}, "construct-filter-farm-ids") + assert.Equal(t, con.FarmIDs, []uint64{uint64(r.FarmID)}, "construct-filter-farm-ids") assert.Equal(t, *con.FreeIPs, uint64(1), "construct-filter-free-ips") assert.Empty(t, con.IPv4, "construct-filter-ipv4") assert.Empty(t, con.IPv6, "construct-filter-ipv6") diff --git a/internal/provider/scheduler/scheduler.go b/internal/provider/scheduler/scheduler.go index f847dfe3..1c45e7a3 100644 --- a/internal/provider/scheduler/scheduler.go +++ b/internal/provider/scheduler/scheduler.go @@ -9,19 +9,22 @@ import ( "github.com/pkg/errors" proxy "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/client" proxyTypes "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types" - "github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go" ) // NoNodesFoundErr for empty nodes returned from scheduler var NoNodesFoundErr = errors.New("couldn't find a node satisfying the given requirements") +type rmbClient interface { + CallWithSession(ctx context.Context, twin uint32, session *string, fn string, data interface{}, result interface{}) error +} + // Scheduler struct for scheduling type Scheduler struct { nodes map[uint32]nodeInfo farms map[uint32]farmInfo twinID uint64 gridProxyClient proxy.Client - rmbClient rmb.Client + rmbClient rmbClient } // nodeInfo related to scheduling @@ -45,7 +48,7 @@ func (node *nodeInfo) fulfils(r *Request, farm farmInfo) bool { if r.Capacity.MRU > node.FreeCapacity.MRU || r.Capacity.HRU > node.FreeCapacity.HRU || r.Capacity.SRU > node.FreeCapacity.SRU || - (r.FarmId != 0 && node.Node.FarmID != int(r.FarmId)) || + (r.FarmID != 0 && node.Node.FarmID != int(r.FarmID)) || (r.PublicConfig && node.Node.PublicConfig.Domain == "") || (r.PublicIpsCount > uint32(farm.freeIPs)) || (r.Dedicated && !node.Node.Dedicated) || @@ -57,7 +60,7 @@ func (node *nodeInfo) fulfils(r *Request, farm farmInfo) bool { } // NewScheduler generates a new scheduler -func NewScheduler(gridProxyClient proxy.Client, twinID uint64, rmbClient rmb.Client) Scheduler { +func NewScheduler(gridProxyClient proxy.Client, twinID uint64, rmbClient rmbClient) Scheduler { return Scheduler{ nodes: map[uint32]nodeInfo{}, gridProxyClient: gridProxyClient, @@ -137,12 +140,11 @@ func (n *Scheduler) addNodes(nodes []proxyTypes.Node) { // Schedule makes sure there's at least one node that satisfies the given request func (n *Scheduler) Schedule(ctx context.Context, r *Request) (uint32, error) { - // TODO: update farmerbot - // if r.FarmId != 0 { - // if n.hasFarmerBot(ctx, r.FarmId) { - // return n.farmerBotSchedule(ctx, r) - // } - // } + if r.FarmID != 0 { + if n.hasFarmerBot(ctx, r.FarmID) { + return n.farmerBotSchedule(ctx, r) + } + } return n.gridProxySchedule(ctx, r) } From a4c3a501006672ca443380aadcc35371785e26ff Mon Sep 17 00:00:00 2001 From: rawdaGastan Date: Tue, 2 Jul 2024 13:52:34 +0300 Subject: [PATCH 2/5] fix tests --- internal/provider/scheduler/scheduler_test.go | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/internal/provider/scheduler/scheduler_test.go b/internal/provider/scheduler/scheduler_test.go index c4b50920..717aa76d 100644 --- a/internal/provider/scheduler/scheduler_test.go +++ b/internal/provider/scheduler/scheduler_test.go @@ -3,7 +3,6 @@ package scheduler import ( "context" "fmt" - "strconv" "testing" "github.com/pkg/errors" @@ -21,9 +20,8 @@ type RMBClientMock struct { hasFarmerBot bool } -func (r *RMBClientMock) Call(ctx context.Context, twin uint32, fn string, data interface{}, result interface{}) error { - d := data.(FarmerBotAction) - switch d.Action { +func (r *RMBClientMock) CallWithSession(ctx context.Context, twin uint32, session *string, fn string, data interface{}, result interface{}) error { + switch fn { case FarmerBotVersionAction: if r.hasFarmerBot { return nil @@ -31,15 +29,13 @@ func (r *RMBClientMock) Call(ctx context.Context, twin uint32, fn string, data i return errors.New("this farm does not have a farmer bot") case FarmerBotFindNodeAction: if r.nodeID == 0 { - d.Error = "could not find node" - return nil + return fmt.Errorf("could not find node") } - output := result.(*FarmerBotAction) - - output.Result.Params = append(output.Args.Params, Params{Key: "nodeid", Value: strconv.FormatUint(uint64(r.nodeID), 10)}) + output := result.(*uint32) + *output = r.nodeID return nil default: - return fmt.Errorf("fn: %s not supported", d.Action) + return fmt.Errorf("fn: %s not supported", fn) } } @@ -126,7 +122,7 @@ func TestSchedulerEmpty(t *testing.T) { }, PublicIpsCount: 1, Name: "req", - FarmId: 10, + FarmID: 10, PublicConfig: true, Certified: false, Dedicated: false, @@ -177,7 +173,7 @@ func TestSchedulerSuccess(t *testing.T) { MRU: 11, }, Name: "req", - FarmId: 1, + FarmID: 1, PublicConfig: true, PublicIpsCount: 1, Certified: false, @@ -232,7 +228,7 @@ func TestSchedulerSuccessOn4thPage(t *testing.T) { }, PublicConfig: true, Name: "req", - FarmId: 1, + FarmID: 1, PublicIpsCount: 1, Certified: false, }) @@ -293,6 +289,7 @@ func TestSchedulerFailure(t *testing.T) { assert.Error(t, err, fmt.Sprintf("scheduler-failure-%s", key)) } } + func TestSchedulerFailureAfterSuccess(t *testing.T) { proxy := &GridProxyClientMock{} rmbClient := &RMBClientMock{ @@ -335,7 +332,7 @@ func TestSchedulerFailureAfterSuccess(t *testing.T) { }, PublicIpsCount: 1, Name: "req", - FarmId: 1, + FarmID: 1, PublicConfig: true, Certified: false, }) @@ -350,7 +347,7 @@ func TestSchedulerFailureAfterSuccess(t *testing.T) { }, PublicIpsCount: 1, Name: "req", - FarmId: 1, + FarmID: 1, PublicConfig: true, Certified: false, }) @@ -400,7 +397,7 @@ func TestSchedulerSuccessAfterSuccess(t *testing.T) { }, PublicIpsCount: 1, Name: "req", - FarmId: 1, + FarmID: 1, PublicConfig: true, Certified: false, }) @@ -415,7 +412,7 @@ func TestSchedulerSuccessAfterSuccess(t *testing.T) { }, PublicIpsCount: 1, Name: "req", - FarmId: 1, + FarmID: 1, PublicConfig: true, Certified: false, }) From b7078f79b7719a479cb215c6aa06bee6dba0adb7 Mon Sep 17 00:00:00 2001 From: rawdaGastan Date: Tue, 2 Jul 2024 13:52:50 +0300 Subject: [PATCH 3/5] add health check to scheduler --- internal/provider/scheduler/request.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/internal/provider/scheduler/request.go b/internal/provider/scheduler/request.go index 3b7113e8..a03bdf87 100644 --- a/internal/provider/scheduler/request.go +++ b/internal/provider/scheduler/request.go @@ -14,7 +14,7 @@ var ( type Request struct { Capacity Capacity Name string - FarmId uint32 + FarmID uint32 PublicConfig bool PublicIpsCount uint32 Certified bool @@ -28,8 +28,9 @@ func (r *Request) constructFilter(twinID uint64) (f proxyTypes.NodeFilter) { // grid proxy should support filtering a node by certification type. f.Status = []string{statusUP} f.AvailableFor = &twinID - if r.FarmId != 0 { - f.FarmIDs = []uint64{uint64(r.FarmId)} + f.Healthy = &trueVal + if r.FarmID != 0 { + f.FarmIDs = []uint64{uint64(r.FarmID)} } if r.Capacity.HRU != 0 { f.FreeHRU = &r.Capacity.HRU From 5f4abccc2c242a4ab078cc63a4c412e25a0f0483 Mon Sep 17 00:00:00 2001 From: rawdaGastan Date: Tue, 2 Jul 2024 14:59:55 +0300 Subject: [PATCH 4/5] fix units in farmerbot --- internal/provider/scheduler/farmer_bot.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/provider/scheduler/farmer_bot.go b/internal/provider/scheduler/farmer_bot.go index 20966152..b1c3b5fe 100644 --- a/internal/provider/scheduler/farmer_bot.go +++ b/internal/provider/scheduler/farmer_bot.go @@ -64,15 +64,15 @@ type NodeFilterOption struct { func buildNodeOptions(r *Request) NodeFilterOption { options := NodeFilterOption{} if r.Capacity.HRU != 0 { - options.HRU = r.Capacity.HRU + options.HRU = r.Capacity.HRU / (1024 * 1024 * 1024) } if r.Capacity.SRU != 0 { - options.SRU = r.Capacity.SRU + options.SRU = r.Capacity.SRU / (1024 * 1024 * 1024) } if r.Capacity.MRU != 0 { - options.MRU = r.Capacity.MRU + options.MRU = r.Capacity.MRU / (1024 * 1024 * 1024) } if r.Capacity.CRU != 0 { From 6a4d0072cf24ef2c022d411c6de2f9b16ab8fa7d Mon Sep 17 00:00:00 2001 From: rawdaGastan Date: Tue, 2 Jul 2024 16:28:38 +0300 Subject: [PATCH 5/5] set timeout to farmerbot rmb calls --- internal/provider/scheduler/farmer_bot.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/provider/scheduler/farmer_bot.go b/internal/provider/scheduler/farmer_bot.go index b1c3b5fe..2d8ce59c 100644 --- a/internal/provider/scheduler/farmer_bot.go +++ b/internal/provider/scheduler/farmer_bot.go @@ -9,11 +9,15 @@ import ( ) const ( + rmbTimeout = 40 FarmerBotVersionAction = "farmerbot.farmmanager.version" FarmerBotFindNodeAction = "farmerbot.nodemanager.findnode" ) func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool { + ctx, cancel := context.WithTimeout(ctx, rmbTimeout) + defer cancel() + info, err := s.getFarmInfo(ctx, farmID) if err != nil { return false @@ -32,6 +36,9 @@ func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool { } func (n *Scheduler) farmerBotSchedule(ctx context.Context, r *Request) (uint32, error) { + ctx, cancel := context.WithTimeout(ctx, rmbTimeout) + defer cancel() + info, err := n.getFarmInfo(ctx, r.FarmID) if err != nil { return 0, errors.Wrapf(err, "failed to get farm %d info", r.FarmID)