Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update farmerbot #954

Merged
merged 5 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions internal/provider/resource_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pkg/errors"
"github.com/threefoldtech/terraform-provider-grid/internal/provider/scheduler"
"github.com/threefoldtech/tfgrid-sdk-go/grid-client/deployer"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
"github.com/threefoldtech/zos/pkg/gridtypes"
)

Expand Down Expand Up @@ -132,7 +133,7 @@ func parseRequests(d *schema.ResourceData, assignment map[string]uint32) []sched

reqs = append(reqs, scheduler.Request{
Name: mp["name"].(string),
FarmId: uint32(mp["farm_id"].(int)),
FarmID: uint32(mp["farm_id"].(int)),
PublicConfig: mp["public_config"].(bool),
PublicIpsCount: uint32(mp["public_ips_count"].(int)),
Certified: mp["certified"].(bool),
Expand All @@ -158,7 +159,12 @@ func schedule(ctx context.Context, d *schema.ResourceData, meta interface{}) dia
assignment := parseAssignment(d)
reqs := parseRequests(d, assignment)

scheduler := scheduler.NewScheduler(tfPluginClient.GridProxyClient, uint64(tfPluginClient.TwinID), tfPluginClient.RMB)
rpcClient, ok := tfPluginClient.RMB.(*peer.RpcClient)
if !ok {
return diag.FromErr(fmt.Errorf("failed to cast rmb client into rpc client"))
}

scheduler := scheduler.NewScheduler(tfPluginClient.GridProxyClient, uint64(tfPluginClient.TwinID), rpcClient)
if err := scheduler.ProcessRequests(ctx, reqs, assignment); err != nil {
return diag.FromErr(err)
}
Expand Down
146 changes: 42 additions & 104 deletions internal/provider/scheduler/farmer_bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,73 +4,30 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"

"github.com/google/uuid"
"github.com/pkg/errors"
)

const (
rmbTimeout = 40
FarmerBotVersionAction = "farmerbot.farmmanager.version"
FarmerBotFindNodeAction = "farmerbot.nodemanager.findnode"
FarmerBotRMBFunction = "execute_job"
)

type FarmerBotAction struct {
Guid string `json:"guid"`
TwinID uint32 `json:"twinid"`
Action string `json:"action"`
Args FarmerBotArgs `json:"args"`
Result FarmerBotArgs `json:"result"`
State string `json:"state"`
Start uint64 `json:"start"`
End uint64 `json:"end"`
GracePeriod uint32 `json:"grace_period"`
Error string `json:"error"`
Timeout uint32 `json:"timeout"`
SourceTwinID uint32 `json:"src_twinid"`
SourceAction string `json:"src_action"`
Dependencies []string `json:"dependencies"`
}

type FarmerBotArgs struct {
Args []Args `json:"args"`
Params []Params `json:"params"`
}

type Args struct {
RequiredHRU *uint64 `json:"required_hru,omitempty"`
RequiredSRU *uint64 `json:"required_sru,omitempty"`
RequiredCRU *uint64 `json:"required_cru,omitempty"`
RequiredMRU *uint64 `json:"required_mru,omitempty"`
NodeExclude []uint32 `json:"node_exclude,omitempty"`
Dedicated *bool `json:"dedicated,omitempty"`
PublicConfig *bool `json:"public_config,omitempty"`
PublicIPs *uint32 `json:"public_ips"`
Certified *bool `json:"certified,omitempty"`
}

type Params struct {
Key string `json:"key"`
Value interface{} `json:"value"`
}

func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool {
args := []Args{}
params := []Params{}
data := buildFarmerBotAction(farmID, uint32(s.twinID), args, params, FarmerBotVersionAction)
ctx, cancel := context.WithTimeout(ctx, rmbTimeout)
defer cancel()

info, err := s.getFarmInfo(ctx, farmID)
if err != nil {
return false
}

dst := info.farmerTwinID
var output FarmerBotAction

err = s.rmbClient.Call(ctx, dst, FarmerBotRMBFunction, data, &output)
service := fmt.Sprintf("farmerbot-%d", farmID)
var version string
err = s.rmbClient.CallWithSession(ctx, info.farmerTwinID, &service, FarmerBotVersionAction, nil, &version)
if err != nil {
log.Printf("error while pinging farmerbot on farm %d with farmer twin %d. %s", farmID, dst, err.Error())
}
Expand All @@ -79,94 +36,75 @@ func (s *Scheduler) hasFarmerBot(ctx context.Context, farmID uint32) bool {
}

func (n *Scheduler) farmerBotSchedule(ctx context.Context, r *Request) (uint32, error) {
info, err := n.getFarmInfo(ctx, r.FarmId)
if err != nil {
return 0, errors.Wrapf(err, "failed to get farm %d info", r.FarmId)
}
params := buildFarmerBotParams(r)
args := buildFarmerBotArgs(r)
data := buildFarmerBotAction(info.farmerTwinID, uint32(n.twinID), args, params, FarmerBotFindNodeAction)
output := FarmerBotAction{}
ctx, cancel := context.WithTimeout(ctx, rmbTimeout)
defer cancel()

err = n.rmbClient.Call(ctx, info.farmerTwinID, FarmerBotRMBFunction, data, &output)
info, err := n.getFarmInfo(ctx, r.FarmID)
if err != nil {
return 0, err
}
if len(output.Result.Params) < 1 {
return 0, fmt.Errorf("cannot find an eligible node on farm %d", r.FarmId)
return 0, errors.Wrapf(err, "failed to get farm %d info", r.FarmID)
}
nodeId, err := strconv.ParseUint(output.Result.Params[0].Value.(string), 10, 32)
if err != nil {

data := buildNodeOptions(r)
var nodeID uint32

service := fmt.Sprintf("farmerbot-%d", r.FarmID)
if err := n.rmbClient.CallWithSession(ctx, info.farmerTwinID, &service, FarmerBotFindNodeAction, data, &nodeID); err != nil {
return 0, err
}
log.Printf("got a node with id %d", nodeId)
return uint32(nodeId), nil

log.Printf("got a node with id %d", nodeID)
return nodeID, nil
}

func buildFarmerBotArgs(r *Request) []Args {
return []Args{}
type NodeFilterOption struct {
NodesExcluded []uint32 `json:"nodes_excluded,omitempty"`
Certified bool `json:"certified,omitempty"`
Dedicated bool `json:"dedicated,omitempty"`
PublicConfig bool `json:"public_config,omitempty"`
PublicIPs uint64 `json:"public_ips,omitempty"`
HRU uint64 `json:"hru,omitempty"` // in GB
SRU uint64 `json:"sru,omitempty"` // in GB
CRU uint64 `json:"cru,omitempty"`
MRU uint64 `json:"mru,omitempty"` // in GB
}
func buildFarmerBotParams(r *Request) []Params {
params := []Params{}

func buildNodeOptions(r *Request) NodeFilterOption {
options := NodeFilterOption{}
if r.Capacity.HRU != 0 {
params = append(params, Params{Key: "required_hru", Value: r.Capacity.HRU})
options.HRU = r.Capacity.HRU / (1024 * 1024 * 1024)
}

if r.Capacity.SRU != 0 {
params = append(params, Params{Key: "required_sru", Value: r.Capacity.SRU})
options.SRU = r.Capacity.SRU / (1024 * 1024 * 1024)
}

if r.Capacity.MRU != 0 {
params = append(params, Params{Key: "required_mru", Value: r.Capacity.MRU})
options.MRU = r.Capacity.MRU / (1024 * 1024 * 1024)
}

if r.Capacity.CRU != 0 {
params = append(params, Params{Key: "required_cru", Value: r.Capacity.CRU})
options.CRU = r.Capacity.CRU
}

if len(r.NodeExclude) != 0 {
value := strings.Trim(strings.Join(strings.Fields(fmt.Sprint(r.NodeExclude)), ","), "")
params = append(params, Params{Key: "node_exclude", Value: value})
options.NodesExcluded = append(options.NodesExcluded, r.NodeExclude...)
}

if r.Dedicated {
params = append(params, Params{Key: "dedicated", Value: r.Dedicated})
options.Dedicated = r.Dedicated
}

if r.PublicConfig {
params = append(params, Params{Key: "public_config", Value: r.PublicConfig})
options.PublicConfig = r.PublicConfig
}

if r.PublicIpsCount > 0 {
params = append(params, Params{Key: "public_ips", Value: r.PublicIpsCount})
options.PublicIPs = uint64(r.PublicIpsCount)
}

if r.Certified {
params = append(params, Params{Key: "certified", Value: r.Certified})
options.Certified = r.Certified
}
return params
}

func buildFarmerBotAction(farmerTwinID uint32, sourceTwinID uint32, args []Args, params []Params, action string) FarmerBotAction {
return FarmerBotAction{
Guid: uuid.NewString(),
TwinID: farmerTwinID,
Action: action,
Args: FarmerBotArgs{
Args: args,
Params: params,
},
Result: FarmerBotArgs{
Args: []Args{},
Params: []Params{},
},
State: "init",
Start: uint64(time.Now().Unix()),
End: 0,
GracePeriod: 0,
Error: "",
Timeout: 6000,
SourceTwinID: sourceTwinID,
Dependencies: []string{},
}
return options
}
7 changes: 4 additions & 3 deletions internal/provider/scheduler/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
type Request struct {
Capacity Capacity
Name string
FarmId uint32
FarmID uint32
PublicConfig bool
PublicIpsCount uint32
Certified bool
Expand All @@ -28,8 +28,9 @@ func (r *Request) constructFilter(twinID uint64) (f proxyTypes.NodeFilter) {
// grid proxy should support filtering a node by certification type.
f.Status = []string{statusUP}
f.AvailableFor = &twinID
if r.FarmId != 0 {
f.FarmIDs = []uint64{uint64(r.FarmId)}
f.Healthy = &trueVal
if r.FarmID != 0 {
f.FarmIDs = []uint64{uint64(r.FarmID)}
}
if r.Capacity.HRU != 0 {
f.FreeHRU = &r.Capacity.HRU
Expand Down
10 changes: 5 additions & 5 deletions internal/provider/scheduler/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestFulfilsSuccess(t *testing.T) {
SRU: 3,
HRU: 3,
},
FarmId: 1,
FarmID: 1,
PublicIpsCount: 1,
PublicConfig: false,
}, farm), true, "fullfil-success")
Expand All @@ -53,7 +53,7 @@ func TestFulfilsFail(t *testing.T) {
SRU: 3,
HRU: 3,
},
FarmId: 1,
FarmID: 1,
PublicIpsCount: 0,
PublicConfig: false,
}
Expand All @@ -66,7 +66,7 @@ func TestFulfilsFail(t *testing.T) {
"mru": func(r *Request) { r.Capacity.MRU = 4 },
"sru": func(r *Request) { r.Capacity.SRU = 9 },
"hru": func(r *Request) { r.Capacity.HRU = 4 },
"farm_id": func(r *Request) { r.FarmId = 2 },
"farm_id": func(r *Request) { r.FarmID = 2 },
"public_ips_count": func(r *Request) { r.PublicIpsCount = 3 },
"public_config": func(r *Request) { r.PublicConfig = true },
}
Expand All @@ -86,7 +86,7 @@ func TestConstructFilter(t *testing.T) {
HRU: 3,
},
Name: "a",
FarmId: 1,
FarmID: 1,
PublicIpsCount: 1,
PublicConfig: false,
Certified: true,
Expand All @@ -99,7 +99,7 @@ func TestConstructFilter(t *testing.T) {
assert.Equal(t, *con.FreeHRU, uint64(3), "construct-filter-hru")
assert.Empty(t, con.Country, "construct-filter-country")
assert.Empty(t, con.City, "construct-filter-city")
assert.Equal(t, con.FarmIDs, []uint64{uint64(r.FarmId)}, "construct-filter-farm-ids")
assert.Equal(t, con.FarmIDs, []uint64{uint64(r.FarmID)}, "construct-filter-farm-ids")
assert.Equal(t, *con.FreeIPs, uint64(1), "construct-filter-free-ips")
assert.Empty(t, con.IPv4, "construct-filter-ipv4")
assert.Empty(t, con.IPv6, "construct-filter-ipv6")
Expand Down
22 changes: 12 additions & 10 deletions internal/provider/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ import (
"github.com/pkg/errors"
proxy "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/client"
proxyTypes "github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go"
)

// NoNodesFoundErr for empty nodes returned from scheduler
var NoNodesFoundErr = errors.New("couldn't find a node satisfying the given requirements")

type rmbClient interface {
CallWithSession(ctx context.Context, twin uint32, session *string, fn string, data interface{}, result interface{}) error
}

// Scheduler struct for scheduling
type Scheduler struct {
nodes map[uint32]nodeInfo
farms map[uint32]farmInfo
twinID uint64
gridProxyClient proxy.Client
rmbClient rmb.Client
rmbClient rmbClient
}

// nodeInfo related to scheduling
Expand All @@ -45,7 +48,7 @@ func (node *nodeInfo) fulfils(r *Request, farm farmInfo) bool {
if r.Capacity.MRU > node.FreeCapacity.MRU ||
r.Capacity.HRU > node.FreeCapacity.HRU ||
r.Capacity.SRU > node.FreeCapacity.SRU ||
(r.FarmId != 0 && node.Node.FarmID != int(r.FarmId)) ||
(r.FarmID != 0 && node.Node.FarmID != int(r.FarmID)) ||
(r.PublicConfig && node.Node.PublicConfig.Domain == "") ||
(r.PublicIpsCount > uint32(farm.freeIPs)) ||
(r.Dedicated && !node.Node.Dedicated) ||
Expand All @@ -57,7 +60,7 @@ func (node *nodeInfo) fulfils(r *Request, farm farmInfo) bool {
}

// NewScheduler generates a new scheduler
func NewScheduler(gridProxyClient proxy.Client, twinID uint64, rmbClient rmb.Client) Scheduler {
func NewScheduler(gridProxyClient proxy.Client, twinID uint64, rmbClient rmbClient) Scheduler {
return Scheduler{
nodes: map[uint32]nodeInfo{},
gridProxyClient: gridProxyClient,
Expand Down Expand Up @@ -137,12 +140,11 @@ func (n *Scheduler) addNodes(nodes []proxyTypes.Node) {

// Schedule makes sure there's at least one node that satisfies the given request
func (n *Scheduler) Schedule(ctx context.Context, r *Request) (uint32, error) {
// TODO: update farmerbot
// if r.FarmId != 0 {
// if n.hasFarmerBot(ctx, r.FarmId) {
// return n.farmerBotSchedule(ctx, r)
// }
// }
if r.FarmID != 0 {
if n.hasFarmerBot(ctx, r.FarmID) {
return n.farmerBotSchedule(ctx, r)
}
}
return n.gridProxySchedule(ctx, r)
}

Expand Down
Loading
Loading