From 6ae7cdb63a93adc78ed8837e0b4a0fc63aa222c8 Mon Sep 17 00:00:00 2001 From: Ivan Sushkov Date: Fri, 23 Feb 2024 13:12:07 +0700 Subject: [PATCH] Mediator pattern --- .../seedwork/mediator/mediator.go | 4 - .../seedwork/mediator/untyped.go | 146 ++++++++++ .../seedwork/mediator/untyped_test.go | 270 ++++++++++++++++++ grade/pkg/collections/slice.go | 10 + 4 files changed, 426 insertions(+), 4 deletions(-) delete mode 100644 grade/internal/infrastructure/seedwork/mediator/mediator.go create mode 100644 grade/internal/infrastructure/seedwork/mediator/untyped.go create mode 100644 grade/internal/infrastructure/seedwork/mediator/untyped_test.go create mode 100644 grade/pkg/collections/slice.go diff --git a/grade/internal/infrastructure/seedwork/mediator/mediator.go b/grade/internal/infrastructure/seedwork/mediator/mediator.go deleted file mode 100644 index 82c8be80..00000000 --- a/grade/internal/infrastructure/seedwork/mediator/mediator.go +++ /dev/null @@ -1,4 +0,0 @@ -package mediator - -type MediatorImp struct { -} diff --git a/grade/internal/infrastructure/seedwork/mediator/untyped.go b/grade/internal/infrastructure/seedwork/mediator/untyped.go new file mode 100644 index 00000000..cf348b37 --- /dev/null +++ b/grade/internal/infrastructure/seedwork/mediator/untyped.go @@ -0,0 +1,146 @@ +package mediator + +import ( + "context" + "errors" + "reflect" + "sync" + + "github.com/emacsway/grade/grade/internal/domain/seedwork/disposable" + "github.com/hashicorp/go-multierror" +) + +var ( + ErrUnsuitableHandlerSignature = errors.New("passed handler has unsuitable signature") +) + +type Handler func(ctx context.Context, command any) (any, error) + +func AsUntyped[T any](handler func(ctx context.Context, command T) (any, error)) Handler { + return func(ctx context.Context, command any) (any, error) { + if typedCommand, ok := command.(T); ok { + return handler(ctx, typedCommand) + } + + return nil, ErrUnsuitableHandlerSignature + } +} + +type RefUntypedMediator struct { + hLock sync.RWMutex + handlers map[reflect.Type]Handler + + sLock sync.RWMutex + subscribers map[reflect.Type]map[reflect.Value]Handler + + pLock sync.RWMutex + pipes []func(next Handler) Handler +} + +func NewRefUntypedMediator() *RefUntypedMediator { + return &RefUntypedMediator{ + hLock: sync.RWMutex{}, + handlers: map[reflect.Type]Handler{}, + + sLock: sync.RWMutex{}, + subscribers: map[reflect.Type]map[reflect.Value]Handler{}, + + pLock: sync.RWMutex{}, + } +} + +func (m *RefUntypedMediator) AddPipe(pipe func(next Handler) Handler) { + m.pLock.Lock() + defer m.pLock.Unlock() + + m.pipes = append(m.pipes, pipe) +} + +func (m *RefUntypedMediator) executeWithPipeline(handler Handler, ctx context.Context, command any) (any, error) { + m.pLock.RLock() + defer m.pLock.RUnlock() + + current := func(ctx context.Context, command any) (any, error) { + return handler(ctx, command) + } + + for ixd := range m.pipes { + reverse := len(m.pipes) - 1 - ixd + current = m.pipes[reverse](current) + } + + return current(ctx, command) +} + +func (m *RefUntypedMediator) Send(ctx context.Context, command any) (any, error) { + m.hLock.RLock() + defer m.hLock.RUnlock() + + commandType := reflect.TypeOf(command) + if handler, found := m.handlers[commandType]; found { + return m.executeWithPipeline(handler, ctx, command) + } + + return nil, nil +} + +func (m *RefUntypedMediator) Register(command any, handler Handler) disposable.Disposable { + m.hLock.Lock() + defer m.hLock.Unlock() + + commandType := reflect.TypeOf(command) + m.handlers[commandType] = handler + + return disposable.NewDisposable(func() { + m.Unregister(command) + }) +} + +func (m *RefUntypedMediator) Unregister(command any) { + m.hLock.Lock() + defer m.hLock.Unlock() + + commandType := reflect.TypeOf(command) + delete(m.handlers, commandType) +} + +func (m *RefUntypedMediator) Subscribe(event any, handler Handler) disposable.Disposable { + m.sLock.Lock() + defer m.sLock.Unlock() + + valueType := reflect.TypeOf(event) + if _, found := m.subscribers[valueType]; !found { + m.subscribers[valueType] = map[reflect.Value]Handler{} + } + + handlerValue := reflect.ValueOf(handler) + m.subscribers[valueType][handlerValue] = handler + + return disposable.NewDisposable(func() { + m.Unsubscribe(event, handler) + }) +} + +func (m *RefUntypedMediator) Unsubscribe(event any, handler Handler) { + m.sLock.Lock() + defer m.sLock.Unlock() + + eventType := reflect.TypeOf(event) + handlerValue := reflect.ValueOf(handler) + + delete(m.subscribers[eventType], handlerValue) +} + +func (m *RefUntypedMediator) Publish(ctx context.Context, event any) error { + m.sLock.RLock() + defer m.sLock.RUnlock() + + var errs error + eventType := reflect.TypeOf(event) + for _, handler := range m.subscribers[eventType] { + _, err := handler(ctx, event) + errs = multierror.Append(errs, err) + } + + return errs +} diff --git a/grade/internal/infrastructure/seedwork/mediator/untyped_test.go b/grade/internal/infrastructure/seedwork/mediator/untyped_test.go new file mode 100644 index 00000000..648f25b8 --- /dev/null +++ b/grade/internal/infrastructure/seedwork/mediator/untyped_test.go @@ -0,0 +1,270 @@ +package mediator + +import ( + "context" + "errors" + "testing" + + "github.com/hashicorp/go-multierror" + "github.com/stretchr/testify/assert" +) + +type ( + Event struct { + name string + } + Command struct { + name string + } +) + +func TestMediator(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + + assertion func(t *testing.T, m *RefUntypedMediator) + }{ + { + name: "test_publish", + + assertion: func(t *testing.T, m *RefUntypedMediator) { + counter := 0 + handler := func(ctx context.Context, t any) (any, error) { + counter++ + + return nil, nil + } + + m.Subscribe(Event{}, handler) + + _ = m.Publish(context.Background(), Event{}) + assert.Equal(t, 1, counter) + }, + }, + + { + name: "test_unsubscribe", + assertion: func(t *testing.T, m *RefUntypedMediator) { + + times := 0 + handler := func(ctx context.Context, e any) (any, error) { + times++ + return nil, nil + } + + times2 := 0 + handler2 := func(ctx context.Context, e any) (any, error) { + times2++ + return nil, nil + } + + times3 := 0 + handler3 := AsUntyped(func(ctx context.Context, e any) (any, error) { + times3++ + return nil, nil + }) + + m.Subscribe(Event{}, handler) + m.Subscribe(Event{}, handler2) + m.Subscribe(Event{}, handler3) + + m.Unsubscribe(Event{}, handler) + _ = m.Publish(context.Background(), Event{}) + + assert.Equal(t, 0, times) + assert.Equal(t, 1, times2) + assert.Equal(t, 1, times3) + + m.Unsubscribe(Event{}, handler3) + _ = m.Publish(context.Background(), Event{}) + + assert.Equal(t, 0, times) + assert.Equal(t, 2, times2) + assert.Equal(t, 1, times3) + }, + }, + + { + name: "test_disposable_event", + assertion: func(t *testing.T, m *RefUntypedMediator) { + + times := 0 + handler := func(ctx context.Context, e any) (any, error) { + times++ + return nil, nil + } + + times2 := 0 + handler2 := func(ctx context.Context, e any) (any, error) { + times2++ + return nil, nil + } + + disposable := m.Subscribe(Event{}, handler) + m.Subscribe(Event{}, handler2) + + disposable.Dispose() + _ = m.Publish(context.Background(), Event{}) + + assert.Equal(t, 0, times) + assert.Equal(t, 1, times2) + }, + }, + + { + name: "test_send", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e any) (any, error) { + times++ + return nil, nil + } + + m.Register(Command{}, handler) + _, _ = m.Send(context.Background(), Command{}) + + assert.Equal(t, 1, times) + }, + }, + + { + name: "test_unregister", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e any) (any, error) { + times++ + + return nil, nil + } + + m.Register(Command{}, handler) + m.Unregister(Command{}) + + _, _ = m.Send(context.Background(), Command{}) + + assert.Equal(t, 0, times) + }, + }, + + { + name: "test_disposable_command", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e any) (any, error) { + times++ + + return nil, nil + } + + disposable := m.Register(Command{}, handler) + disposable.Dispose() + + _, _ = m.Send(context.Background(), Command{}) + + assert.Equal(t, 0, times) + }, + }, + + { + name: "test_unsuitable_params_type_handler", + assertion: func(t *testing.T, m *RefUntypedMediator) { + times := 0 + handler := func(ctx context.Context, e int) (any, error) { + times++ + + return nil, nil + } + + m.Register(Command{}, AsUntyped(handler)) + + _, err := m.Send(context.Background(), Command{}) + assert.Equal(t, ErrUnsuitableHandlerSignature, err) + }, + }, + + { + name: "test_returning_errors", + assertion: func(t *testing.T, m *RefUntypedMediator) { + handlerError := errors.New("") + + handler := func(ctx context.Context, e any) (any, error) { + return nil, handlerError + } + + handler2 := func(ctx context.Context, e any) (any, error) { + return nil, handlerError + } + + handler3 := func(ctx context.Context, e Event) (any, error) { + return nil, handlerError + } + + m.Register(Command{}, handler) + + m.Subscribe(Event{}, handler2) + m.Subscribe(Event{}, AsUntyped(handler3)) + + var errs error + errs = multierror.Append(errs, handlerError, handlerError) + assert.Equal(t, errs, m.Publish(context.Background(), Event{})) + + _, err := m.Send(context.Background(), Command{}) + assert.Equal(t, handlerError, err) + }, + }, + { + name: "test_execute_pipeline", + assertion: func(t *testing.T, m *RefUntypedMediator) { + + m.AddPipe(func(next Handler) Handler { + + return AsUntyped[Command](func(ctx context.Context, command Command) (any, error) { + return next(ctx, Command{name: command.name + "1"}) + }) + + }) + + m.AddPipe(func(next Handler) Handler { + + return AsUntyped[Command](func(ctx context.Context, command Command) (any, error) { + return next(ctx, Command{name: command.name + "2"}) + }) + + }) + + handler := func(ctx context.Context, e any) (any, error) { + command := e.(Command) + return Command{name: command.name + "3"}, nil + } + + m.Register(Command{}, handler) + + res, err := m.Send(context.Background(), Command{}) + assert.NoError(t, err) + + assert.Equal(t, Command{name: "123"}, res) + }, + }, + } + + for _, tt := range tests { + tt := tt + + t.Run(tt.name, func(t *testing.T) { + m := NewRefUntypedMediator() + tt.assertion(t, m) + }) + } +} + +func asd() { + m := NewRefUntypedMediator() + + handler3 := func(ctx context.Context, e Event) (any, error) { + return nil, nil + } + + m.Subscribe(Event{}, AsUntyped(handler3)) +} diff --git a/grade/pkg/collections/slice.go b/grade/pkg/collections/slice.go new file mode 100644 index 00000000..8a79228d --- /dev/null +++ b/grade/pkg/collections/slice.go @@ -0,0 +1,10 @@ +package collections + +func Reverse[T any](values []T) []T { + var reverse = make([]T, 0, len(values)) + for i := len(values) - 1; i >= 0; i-- { + reverse = append(reverse, values[i]) + } + + return reverse +}