Skip to content

Commit

Permalink
Merge pull request #954 from threefoldtech/development_update_farmerbot
Browse files Browse the repository at this point in the history
update farmerbot
  • Loading branch information
rawdaGastan committed Jul 4, 2024
2 parents b8ba9e3 + 6a4d007 commit b67c938
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 141 deletions.
10 changes: 8 additions & 2 deletions internal/provider/resource_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/threefoldtech/terraform-provider-grid/internal/provider/scheduler"
"github.com/threefoldtech/tfgrid-sdk-go/grid-client/deployer"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
"github.com/threefoldtech/zos/pkg/gridtypes"
)

Expand Down Expand Up @@ -132,7 +133,7 @@ func parseRequests(d *schema.ResourceData, assignment map[string]uint32) []sched

reqs = append(reqs, scheduler.Request{
Name: mp["name"].(string),
FarmId: uint32(mp["farm_id"].(int)),
FarmID: uint32(mp["farm_id"].(int)),
PublicConfig: mp["public_config"].(bool),
PublicIpsCount: uint32(mp["public_ips_count"].(int)),
Certified: mp["certified"].(bool),
Expand All @@ -158,7 +159,12 @@ func schedule(ctx context.Context, d *schema.ResourceData, meta interface{}) dia
assignment := parseAssignment(d)
reqs := parseRequests(d, assignment)

scheduler := scheduler.NewScheduler(tfPluginClient.GridProxyClient, uint64(tfPluginClient.TwinID), tfPluginClient.RMB)
rpcClient, ok := tfPluginClient.RMB.(*peer.RpcClient)
if !ok {
return diag.FromErr(fmt.Errorf("failed to cast rmb client into rpc client"))
}

scheduler := scheduler.NewScheduler(tfPluginClient.GridProxyClient, uint64(tfPluginClient.TwinID), rpcClient)
if err := scheduler.ProcessRequests(ctx, reqs, assignment); err != nil {
return diag.FromErr(err)
}
Expand Down
146 changes: 42 additions & 104 deletions internal/provider/scheduler/farmer_bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,30 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
)

const (
rmbTimeout = 40
FarmerBotVersionAction = "farmerbot.farmmanager.version"
FarmerBotFindNodeAction = "farmerbot.nodemanager.findnode"
FarmerBotRMBFunction = "execute_job"
)

type FarmerBotAction struct {
Guid string `json:"guid"`
TwinID uint32 `json:"twinid"`
Action string `json:"action"`
Args FarmerBotArgs `json:"args"`
Result FarmerBotArgs `json:"result"`
State string `json:"state"`
Start uint64 `json:"start"`
End uint64 `json:"end"`
GracePeriod uint32 `json:"grace_period"`
Error string `json:"error"`
Timeout uint32 `json:"timeout"`
SourceTwinID uint32 `json:"src_twinid"`
SourceAction string `json:"src_action"`
Dependencies []string `json:"dependencies"`
}

type FarmerBotArgs struct {
Args []Args `json:"args"`
Params []Params `json:"params"`
}

type Args struct {
RequiredHRU *uint64 `json:"required_hru,omitempty"`
RequiredSRU *uint64 `json:"required_sru,omitempty"`
RequiredCRU *uint64 `json:"required_cru,omitempty"`
RequiredMRU *uint64 `json:"required_mru,omitempty"`
NodeExclude []uint32 `json:"node_exclude,omitempty"`
Dedicated *bool `json:"dedicated,omitempty"`
PublicConfig *bool `json:"public_config,omitempty"`
PublicIPs *uint32 `json:"public_ips"`
Certified *bool `json:"certified,omitempty"`
}

type Params struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}

func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool {
args := []Args{}
params := []Params{}
data := buildFarmerBotAction(farmID, uint32(s.twinID), args, params, FarmerBotVersionAction)
ctx, cancel := context.WithTimeout(ctx, rmbTimeout)
defer cancel()

info, err := s.getFarmInfo(ctx, farmID)
if err != nil {
return false
}

dst := info.farmerTwinID
var output FarmerBotAction

err = s.rmbClient.Call(ctx, dst, FarmerBotRMBFunction, data, &output)
service := fmt.Sprintf("farmerbot-%d", farmID)
var version string
err = s.rmbClient.CallWithSession(ctx, info.farmerTwinID, &service, FarmerBotVersionAction, nil, &version)
if err != nil {
log.Printf("error while pinging farmerbot on farm %d with farmer twin %d. %s", farmID, dst, err.Error())
}
Expand All @@ -79,94 +36,75 @@ func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool {
}

func (n *Scheduler) farmerBotSchedule(ctx context.Context, r *Request) (uint32, error) {
info, err := n.getFarmInfo(ctx, r.FarmId)
if err != nil {
return 0, errors.Wrapf(err, "failed to get farm %d info", r.FarmId)
}
params := buildFarmerBotParams(r)
args := buildFarmerBotArgs(r)
data := buildFarmerBotAction(info.farmerTwinID, uint32(n.twinID), args, params, FarmerBotFindNodeAction)
output := FarmerBotAction{}
ctx, cancel := context.WithTimeout(ctx, rmbTimeout)
defer cancel()

err = n.rmbClient.Call(ctx, info.farmerTwinID, FarmerBotRMBFunction, data, &output)
info, err := n.getFarmInfo(ctx, r.FarmID)
if err != nil {
return 0, err
}
if len(output.Result.Params) < 1 {
return 0, fmt.Errorf("cannot find an eligible node on farm %d", r.FarmId)
return 0, errors.Wrapf(err, "failed to get farm %d info", r.FarmID)
}
nodeId, err := strconv.ParseUint(output.Result.Params[0].Value.(string), 10, 32)
if err != nil {

data := buildNodeOptions(r)
var nodeID uint32

service := fmt.Sprintf("farmerbot-%d", r.FarmID)
if err := n.rmbClient.CallWithSession(ctx, info.farmerTwinID, &service, FarmerBotFindNodeAction, data, &nodeID); err != nil {
return 0, err
}
log.Printf("got a node with id %d", nodeId)
return uint32(nodeId), nil

log.Printf("got a node with id %d", nodeID)
return nodeID, nil
}

func buildFarmerBotArgs(r *Request) []Args {
return []Args{}
type NodeFilterOption struct {
NodesExcluded []uint32 `json:"nodes_excluded,omitempty"`
Certified bool `json:"certified,omitempty"`
Dedicated bool `json:"dedicated,omitempty"`
PublicConfig bool `json:"public_config,omitempty"`
PublicIPs uint64 `json:"public_ips,omitempty"`
HRU uint64 `json:"hru,omitempty"` // in GB
SRU uint64 `json:"sru,omitempty"` // in GB
CRU uint64 `json:"cru,omitempty"`
MRU uint64 `json:"mru,omitempty"` // in GB
}
func buildFarmerBotParams(r *Request) []Params {
params := []Params{}

func buildNodeOptions(r *Request) NodeFilterOption {
options := NodeFilterOption{}
if r.Capacity.HRU != 0 {
params = append(params, Params{Key: "required_hru", Value: r.Capacity.HRU})
options.HRU = r.Capacity.HRU / (1024 * 1024 * 1024)
}

if r.Capacity.SRU != 0 {
params = append(params, Params{Key: "required_sru", Value: r.Capacity.SRU})
options.SRU = r.Capacity.SRU / (1024 * 1024 * 1024)
}

if r.Capacity.MRU != 0 {
params = append(params, Params{Key: "required_mru", Value: r.Capacity.MRU})
options.MRU = r.Capacity.MRU / (1024 * 1024 * 1024)
}

if r.Capacity.CRU != 0 {
params = append(params, Params{Key: "required_cru", Value: r.Capacity.CRU})
options.CRU = r.Capacity.CRU
}

if len(r.NodeExclude) != 0 {
value := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(r.NodeExclude)), ","), "")
params = append(params, Params{Key: "node_exclude", Value: value})
options.NodesExcluded = append(options.NodesExcluded, r.NodeExclude...)
}

if r.Dedicated {
params = append(params, Params{Key: "dedicated", Value: r.Dedicated})
options.Dedicated = r.Dedicated
}

if r.PublicConfig {
params = append(params, Params{Key: "public_config", Value: r.PublicConfig})
options.PublicConfig = r.PublicConfig
}

if r.PublicIpsCount > 0 {
params = append(params, Params{Key: "public_ips", Value: r.PublicIpsCount})
options.PublicIPs = uint64(r.PublicIpsCount)
}

if r.Certified {
params = append(params, Params{Key: "certified", Value: r.Certified})
options.Certified = r.Certified
}
return params
}

func buildFarmerBotAction(farmerTwinID uint32, sourceTwinID uint32, args []Args, params []Params, action string) FarmerBotAction {
return FarmerBotAction{
Guid: uuid.NewString(),
TwinID: farmerTwinID,
Action: action,
Args: FarmerBotArgs{
Args: args,
Params: params,
},
Result: FarmerBotArgs{
Args: []Args{},
Params: []Params{},
},
State: "init",
Start: uint64(time.Now().Unix()),
End: 0,
GracePeriod: 0,
Error: "",
Timeout: 6000,
SourceTwinID: sourceTwinID,
Dependencies: []string{},
}
return options
}
7 changes: 4 additions & 3 deletions internal/provider/scheduler/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
type Request struct {
Capacity Capacity
Name string
FarmId uint32
FarmID uint32
PublicConfig bool
PublicIpsCount uint32
Certified bool
Expand All @@ -28,8 +28,9 @@ func (r *Request) constructFilter(twinID uint64) (f proxyTypes.NodeFilter) {
// grid proxy should support filtering a node by certification type.
f.Status = []string{statusUP}
f.AvailableFor = &twinID
if r.FarmId != 0 {
f.FarmIDs = []uint64{uint64(r.FarmId)}
f.Healthy = &trueVal
if r.FarmID != 0 {
f.FarmIDs = []uint64{uint64(r.FarmID)}
}
if r.Capacity.HRU != 0 {
f.FreeHRU = &r.Capacity.HRU
Expand Down
10 changes: 5 additions & 5 deletions internal/provider/scheduler/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestFulfilsSuccess(t *testing.T) {
SRU: 3,
HRU: 3,
},
FarmId: 1,
FarmID: 1,
PublicIpsCount: 1,
PublicConfig: false,
}, farm), true, "fullfil-success")
Expand All @@ -53,7 +53,7 @@ func TestFulfilsFail(t *testing.T) {
SRU: 3,
HRU: 3,
},
FarmId: 1,
FarmID: 1,
PublicIpsCount: 0,
PublicConfig: false,
}
Expand All @@ -66,7 +66,7 @@ func TestFulfilsFail(t *testing.T) {
"mru": func(r *Request) { r.Capacity.MRU = 4 },
"sru": func(r *Request) { r.Capacity.SRU = 9 },
"hru": func(r *Request) { r.Capacity.HRU = 4 },
"farm_id": func(r *Request) { r.FarmId = 2 },
"farm_id": func(r *Request) { r.FarmID = 2 },
"public_ips_count": func(r *Request) { r.PublicIpsCount = 3 },
"public_config": func(r *Request) { r.PublicConfig = true },
}
Expand All @@ -86,7 +86,7 @@ func TestConstructFilter(t *testing.T) {
HRU: 3,
},
Name: "a",
FarmId: 1,
FarmID: 1,
PublicIpsCount: 1,
PublicConfig: false,
Certified: true,
Expand All @@ -99,7 +99,7 @@ func TestConstructFilter(t *testing.T) {
assert.Equal(t, *con.FreeHRU, uint64(3), "construct-filter-hru")
assert.Empty(t, con.Country, "construct-filter-country")
assert.Empty(t, con.City, "construct-filter-city")
assert.Equal(t, con.FarmIDs, []uint64{uint64(r.FarmId)}, "construct-filter-farm-ids")
assert.Equal(t, con.FarmIDs, []uint64{uint64(r.FarmID)}, "construct-filter-farm-ids")
assert.Equal(t, *con.FreeIPs, uint64(1), "construct-filter-free-ips")
assert.Empty(t, con.IPv4, "construct-filter-ipv4")
assert.Empty(t, con.IPv6, "construct-filter-ipv6")
Expand Down
22 changes: 12 additions & 10 deletions internal/provider/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ import (
"github.com/pkg/errors"
proxy "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/client"
proxyTypes "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
)

// NoNodesFoundErr for empty nodes returned from scheduler
var NoNodesFoundErr = errors.New("couldn't find a node satisfying the given requirements")

type rmbClient interface {
CallWithSession(ctx context.Context, twin uint32, session *string, fn string, data interface{}, result interface{}) error
}

// Scheduler struct for scheduling
type Scheduler struct {
nodes map[uint32]nodeInfo
farms map[uint32]farmInfo
twinID uint64
gridProxyClient proxy.Client
rmbClient rmb.Client
rmbClient rmbClient
}

// nodeInfo related to scheduling
Expand All @@ -45,7 +48,7 @@ func (node *nodeInfo) fulfils(r *Request, farm farmInfo) bool {
if r.Capacity.MRU > node.FreeCapacity.MRU ||
r.Capacity.HRU > node.FreeCapacity.HRU ||
r.Capacity.SRU > node.FreeCapacity.SRU ||
(r.FarmId != 0 && node.Node.FarmID != int(r.FarmId)) ||
(r.FarmID != 0 && node.Node.FarmID != int(r.FarmID)) ||
(r.PublicConfig && node.Node.PublicConfig.Domain == "") ||
(r.PublicIpsCount > uint32(farm.freeIPs)) ||
(r.Dedicated && !node.Node.Dedicated) ||
Expand All @@ -57,7 +60,7 @@ func (node *nodeInfo) fulfils(r *Request, farm farmInfo) bool {
}

// NewScheduler generates a new scheduler
func NewScheduler(gridProxyClient proxy.Client, twinID uint64, rmbClient rmb.Client) Scheduler {
func NewScheduler(gridProxyClient proxy.Client, twinID uint64, rmbClient rmbClient) Scheduler {
return Scheduler{
nodes: map[uint32]nodeInfo{},
gridProxyClient: gridProxyClient,
Expand Down Expand Up @@ -137,12 +140,11 @@ func (n *Scheduler) addNodes(nodes []proxyTypes.Node) {

// Schedule makes sure there's at least one node that satisfies the given request
func (n *Scheduler) Schedule(ctx context.Context, r *Request) (uint32, error) {
// TODO: update farmerbot
// if r.FarmId != 0 {
// if n.hasFarmerBot(ctx, r.FarmId) {
// return n.farmerBotSchedule(ctx, r)
// }
// }
if r.FarmID != 0 {
if n.hasFarmerBot(ctx, r.FarmID) {
return n.farmerBotSchedule(ctx, r)
}
}
return n.gridProxySchedule(ctx, r)
}

Expand Down
Loading

0 comments on commit b67c938

Please sign in to comment.