Skip to content

Commit

Permalink
V210 (#98)
Browse files Browse the repository at this point in the history
* Custom handshake/proro/resolver (#80)

* Compression feature (#85)

* Fix DIST connection reader

* fix issue #89 (#90)

* fix issue87

* sync

* Proxy (#97)

* Raft (#99)

Raft implementation

* update README.md

* update gen/README.md

* add github template

* fix conflict
  • Loading branch information
halturin authored Apr 19, 2022
1 parent e9c6905 commit 79bebaa
Show file tree
Hide file tree
Showing 85 changed files with 16,019 additions and 5,057 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/testLinuxWindowsMacOS.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.17

- name: Test
run: go test -v ./...
Expand All @@ -29,7 +29,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.17

- name: Test
run: go test -v ./...
Expand All @@ -42,7 +42,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16
go-version: 1.17

- name: Test
run: go test -v ./...
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
*~
cmd/epmd/epmd
coverage.txt
coverage.html
tests/coverage.txt
tests/coverage.html
*.swp
tags
.session
cover.out
tests/cover.out
examples/sandbox

25 changes: 25 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,31 @@ All notable changes to this project will be documented in this file.
This format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

#### [v2.0.0](https://github.com/ergo-services/ergo/releases/tag/v1.999.200) 2021-10-12 [tag version v1.999.200] ####

* Added support of Erlang/OTP 24 (including [Alias](https://blog.erlang.org/My-OTP-24-Highlights/#eep-53-process-aliases) feature and [Remote Spawn](https://blog.erlang.org/OTP-23-Highlights/#distributed-spawn-and-the-new-erpc-module) introduced in Erlang/OTP 23)
* **Important**: This release includes refined API (without backward compatibility) for a more convenient way to create OTP-designed microservices. Make sure to update your code.
* **Important**: Project repository has been moved to [https://github.com/ergo-services/ergo](https://github.com/ergo-services/ergo). It is still available on the old URL [https://github.com/halturin/ergo](https://github.com/halturin/ergo) and GitHub will redirect all requests to the new one (thanks to GitHub for this feature).
* Introduced new behavior `gen.Saga`. It implements Saga design pattern - a sequence of transactions that updates each service state and publishes the result (or cancels the transaction or triggers the next transaction step). `gen.Saga` also provides a feature of interim results (can be used as transaction progress or as a part of pipeline processing), time deadline (to limit transaction lifespan), two-phase commit (to make distributed transaction atomic). Here is example [examples/gensaga](examples/gensaga).
* Introduced new methods `Process.Direct` and `Process.DirectWithTimeout` to make direct request to the actor (`gen.Server` or inherited object). If an actor has no implementation of `HandleDirect` callback it returns `ErrUnsupportedRequest` as a error.
* Introduced new callback `HandleDirect` in the `gen.Server` interface as a handler for requests made by `Process.Direct` or `Process.DirectWithTimeout`. It should be easy to interact with actors from outside.
* Introduced new types intended to be used to interact with Erlang/Elixir
* `etf.ListImproper` to support improper lists like `[a|b]` (a cons cell).
* `etf.String` (an alias for the Golang string) encodes as a binary in order to support Elixir string type (which is `binary()` type)
* `etf.Charlist` (an alias for the Golang string) encodes as a list of chars `[]rune` in order to support Erlang string type (which is `charlist()` type)
* Introduced new methods `Node.ProvideRemoteSpawn`, `Node.RevokeRemoteSpawn`, `Process.RemoteSpawn`.
* Introduced new interfaces `Marshaler` (method `MarshalETF`) and `Unmarshaler` (method `UnmarshalETF`) for the custom encoding/decoding data.
* Improved performance for the local messaging (up to 3 times for some cases)
* Added example [examples/http](examples/http) to demonsrate how HTTP server can be integrated into the Ergo node.
* Added example [examples/gendemo](examples/gendemo) - how to create a custom behavior (design pattern) on top of the `gen.Server`. Take inspiration from the [gen/stage.go](gen/stage.go) or [gen/saga.go](gen/saga.go) design patterns.
* Added support FreeBSD, OpenBSD, NetBSD, DragonFly.
* Fixed RPC issue #45
* Fixed internal timer issue #48
* Fixed memory leaks #53
* Fixed double panic issue #52
* Fixed Atom Cache race conditioned issue #54
* Fixed ETF encoder issues #64 #66

#### [1.2.0](https://github.com/ergo-services/ergo/releases/tag/v1.2.0) - 2021-04-07 ####

* Added TLS support. Introduced new option `TLSmode` in `ergo.NodeOptions` with the following values:
Expand Down
152 changes: 92 additions & 60 deletions README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion debug.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//+build debug
//go:build debug
// +build debug

package ergo

Expand Down
25 changes: 23 additions & 2 deletions ergo.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,44 @@ import (
"github.com/ergo-services/ergo/erlang"
"github.com/ergo-services/ergo/gen"
"github.com/ergo-services/ergo/node"
"github.com/ergo-services/ergo/proto/dist"
)

// StartNode create new node with name and cookie string
func StartNode(name string, cookie string, opts node.Options) (node.Node, error) {
return StartNodeWithContext(context.Background(), name, cookie, opts)
}

// CreateNodeWithContext create new node with specified context, name and cookie string
// StartNodeWithContext create new node with specified context, name and cookie string
func StartNodeWithContext(ctx context.Context, name string, cookie string, opts node.Options) (node.Node, error) {
version := node.Version{
Release: Version,
Prefix: VersionPrefix,
OTP: VersionOTP,
}
if opts.Env == nil {
opts.Env = make(map[gen.EnvKey]interface{})
}
opts.Env[node.EnvKeyVersion] = version

// add erlang support application
opts.Applications = append([]gen.ApplicationBehavior{&erlang.KernelApp{}}, opts.Applications...)

return node.StartWithContext(context.WithValue(ctx, "version", version), name, cookie, opts)
if opts.Handshake == nil {
// create default handshake for the node (Erlang Dist Handshake)
opts.Handshake = dist.CreateHandshake(dist.HandshakeOptions{})
}

if opts.Proto == nil {
// create default proto handler (Erlang Dist Proto)
protoOptions := node.DefaultProtoOptions()
opts.Proto = dist.CreateProto(protoOptions)
}

if opts.StaticRoutesOnly == false && opts.Resolver == nil {
// create default resolver (with enabled Erlang EPMD server)
opts.Resolver = dist.CreateResolverWithLocalEPMD("", dist.DefaultEPMDPort)
}

return node.StartWithContext(ctx, name, cookie, opts)
}
6 changes: 3 additions & 3 deletions erlang/appmon.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type jobDetails struct {
}

// Init initializes process state using arbitrary arguments
// Init -> state
func (am *appMon) Init(process *gen.ServerProcess, args ...etf.Term) error {
lib.Log("APP_MON: Init %#v", args)
from := args[0]
Expand All @@ -37,10 +36,11 @@ func (am *appMon) Init(process *gen.ServerProcess, args ...etf.Term) error {
return nil
}

// HandleCast
func (am *appMon) HandleCast(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
var appState *appMonState = process.State.(*appMonState)
lib.Log("APP_MON: HandleCast: %#v", message)
node := process.Env("ergo:Node").(node.Node)
node := process.Env(node.EnvKeyNode).(node.Node)
switch message {
case "sendStat":

Expand Down Expand Up @@ -137,7 +137,7 @@ func (am *appMon) HandleCast(process *gen.ServerProcess, message etf.Term) gen.S
}

func (am *appMon) makeAppTree(process gen.Process, app etf.Atom) etf.Tuple {
node := process.Env("ergo:Node").(node.Node)
node := process.Env(node.EnvKeyNode).(node.Node)
appInfo, err := node.ApplicationInfo(string(app))
if err != nil {
return nil
Expand Down
4 changes: 3 additions & 1 deletion erlang/erlang.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ type erlang struct {
gen.Server
}

// Init
func (e *erlang) Init(process *gen.ServerProcess, args ...etf.Term) error {
lib.Log("ERLANG: Init: %#v", args)
return nil
}

// HandleCall
func (e *erlang) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (etf.Term, gen.ServerStatus) {
lib.Log("ERLANG: HandleCall: %#v, From: %#v", message, from)

Expand Down Expand Up @@ -99,7 +101,7 @@ func processInfo(p gen.Process, pid etf.Pid, property etf.Term) etf.Term {
case etf.Atom("priority"):
// values = append(values, etf.Tuple{p[i], 0})
case etf.Atom("reductions"):
values = append(values, etf.Tuple{p[i], info.Reductions})
values = append(values, etf.Tuple{p[i], 0})
case etf.Atom("registered_name"):
values = append(values, etf.Tuple{p[i], process.Name()})
case etf.Atom("sequential_trace_token"):
Expand Down
1 change: 1 addition & 0 deletions erlang/global_name_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type globalNameServer struct {
gen.Server
}

// HandleCast
func (gns *globalNameServer) HandleCast(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
return gen.ServerStatusOK
}
21 changes: 14 additions & 7 deletions erlang/net_kernel.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,50 +13,54 @@ import (
"github.com/ergo-services/ergo/lib/osdep"
)

// KernelApp
type KernelApp struct {
gen.Application
}

// Load
func (nka *KernelApp) Load(args ...etf.Term) (gen.ApplicationSpec, error) {
return gen.ApplicationSpec{
Name: "erlang",
Description: "Erlang support app",
Version: "v.1.0",
Children: []gen.ApplicationChildSpec{
gen.ApplicationChildSpec{
{
Child: &netKernelSup{},
Name: "net_kernel_sup",
},
},
}, nil
}

// Start
func (nka *KernelApp) Start(p gen.Process, args ...etf.Term) {}

type netKernelSup struct {
gen.Supervisor
}

// Init
func (nks *netKernelSup) Init(args ...etf.Term) (gen.SupervisorSpec, error) {
return gen.SupervisorSpec{
Children: []gen.SupervisorChildSpec{
gen.SupervisorChildSpec{
{
Name: "net_kernel",
Child: &netKernel{},
},
gen.SupervisorChildSpec{
{
Name: "global_name_server",
Child: &globalNameServer{},
},
gen.SupervisorChildSpec{
{
Name: "rex",
Child: &rex{},
},
gen.SupervisorChildSpec{
{
Name: "observer_backend",
Child: &observerBackend{},
},
gen.SupervisorChildSpec{
{
Name: "erlang",
Child: &erlang{},
},
Expand All @@ -75,12 +79,14 @@ type netKernel struct {
routinesCtx map[etf.Pid]context.CancelFunc
}

// Init
func (nk *netKernel) Init(process *gen.ServerProcess, args ...etf.Term) error {
lib.Log("NET_KERNEL: Init: %#v", args)
nk.routinesCtx = make(map[etf.Pid]context.CancelFunc)
return nil
}

// HandleCall
func (nk *netKernel) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (reply etf.Term, status gen.ServerStatus) {
lib.Log("NET_KERNEL: HandleCall: %#v, From: %#v", message, from)
status = gen.ServerStatusOK
Expand Down Expand Up @@ -124,6 +130,7 @@ func (nk *netKernel) HandleCall(process *gen.ServerProcess, from gen.ServerFrom,
return
}

// HandleInfo
func (nk *netKernel) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
lib.Log("NET_KERNEL: HandleInfo: %#v", message)
switch m := message.(type) {
Expand All @@ -147,7 +154,7 @@ func sendProcInfo(p gen.Process, to etf.Pid) {
etf.Atom("etop_proc_info"), // record name #etop_proc_info
list[i].Self(), // pid
0, // mem
info.Reductions, // reds
0, // reds
etf.Atom(list[i].Name()), // etf.Tuple{etf.Atom("ergo"), etf.Atom(list[i].Name()), 0}, // name
0, // runtime
info.CurrentFunction, // etf.Tuple{etf.Atom("ergo"), etf.Atom(info.CurrentFunction), 0}, // cf
Expand Down
6 changes: 3 additions & 3 deletions erlang/observer_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@ type observerBackend struct {
}

// Init initializes process state using arbitrary arguments
// Init(...) -> state
func (o *observerBackend) Init(process *gen.ServerProcess, args ...etf.Term) error {
lib.Log("OBSERVER: Init: %#v", args)

funProcLibInitialCall := func(a ...etf.Term) etf.Term {
return etf.Tuple{etf.Atom("proc_lib"), etf.Atom("init_p"), 5}
}
node := process.Env("ergo:Node").(node.Node)
node := process.Env(node.EnvKeyNode).(node.Node)
node.ProvideRPC("proc_lib", "translate_initial_call", funProcLibInitialCall)

funAppmonInfo := func(a ...etf.Term) etf.Term {
Expand All @@ -42,6 +41,7 @@ func (o *observerBackend) Init(process *gen.ServerProcess, args ...etf.Term) err
return nil
}

// HandleCall
func (o *observerBackend) HandleCall(state *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (etf.Term, gen.ServerStatus) {
lib.Log("OBSERVER: HandleCall: %v, From: %#v", message, from)
function := message.(etf.Tuple).Element(1).(etf.Atom)
Expand All @@ -68,7 +68,7 @@ func (o *observerBackend) HandleCall(state *gen.ServerProcess, from gen.ServerFr

func (o *observerBackend) sysInfo(p gen.Process) etf.List {
// observer_backend:sys_info()
node := p.Env("ergo:Node").(node.Node)
node := p.Env(node.EnvKeyNode).(node.Node)
processCount := etf.Tuple{etf.Atom("process_count"), len(p.ProcessList())}
processLimit := etf.Tuple{etf.Atom("process_limit"), 262144}
atomCount := etf.Tuple{etf.Atom("atom_count"), 0}
Expand Down
10 changes: 8 additions & 2 deletions erlang/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type rex struct {
methods map[modFun]gen.RPC
}

// Init
func (r *rex) Init(process *gen.ServerProcess, args ...etf.Term) error {
lib.Log("REX: Init: %#v", args)
// Do not overwrite existing methods if this process restarted
Expand All @@ -42,11 +43,12 @@ func (r *rex) Init(process *gen.ServerProcess, args ...etf.Term) error {
}
r.methods[mf] = nil
}
node := process.Env("ergo:Node").(node.Node)
node := process.Env(node.EnvKeyNode).(node.Node)
node.ProvideRemoteSpawn("erpc", &erpc{})
return nil
}

// HandleCall
func (r *rex) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, message etf.Term) (etf.Term, gen.ServerStatus) {
lib.Log("REX: HandleCall: %#v, From: %#v", message, from)
switch m := message.(type) {
Expand All @@ -63,7 +65,7 @@ func (r *rex) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, messag
return reply, gen.ServerStatusOK
}

to := gen.ProcessID{string(module), process.NodeName()}
to := gen.ProcessID{Name: string(module), Node: process.NodeName()}
m := etf.Tuple{m.Element(3), m.Element(4)}
reply, err := process.Call(to, m)
if err != nil {
Expand All @@ -78,11 +80,13 @@ func (r *rex) HandleCall(process *gen.ServerProcess, from gen.ServerFrom, messag
return reply, gen.ServerStatusOK
}

// HandleInfo
func (r *rex) HandleInfo(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
// add this handler to suppres any messages from erlang
return gen.ServerStatusOK
}

// HandleDirect
func (r *rex) HandleDirect(process *gen.ServerProcess, message interface{}) (interface{}, error) {
switch m := message.(type) {
case gen.MessageManageRPC:
Expand Down Expand Up @@ -179,6 +183,7 @@ type erpc struct {
gen.Server
}

// Init
func (e *erpc) Init(process *gen.ServerProcess, args ...etf.Term) error {
lib.Log("ERPC [%v]: Init: %#v", process.Self(), args)
mfa := erpcMFA{
Expand All @@ -192,6 +197,7 @@ func (e *erpc) Init(process *gen.ServerProcess, args ...etf.Term) error {

}

// HandleCast
func (e *erpc) HandleCast(process *gen.ServerProcess, message etf.Term) gen.ServerStatus {
lib.Log("ERPC [%v]: HandleCast: %#v", process.Self(), message)
mfa := message.(erpcMFA)
Expand Down
Loading

0 comments on commit 79bebaa

Please sign in to comment.