Skip to content

Commit

Permalink
chore: err113 (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
FoseFx authored Dec 22, 2024
1 parent 89e2023 commit 7a6256a
Show file tree
Hide file tree
Showing 45 changed files with 488 additions and 174 deletions.
1 change: 0 additions & 1 deletion .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ linters:
# TODOs: #865
- cyclop # TODO
- dupl # TODO
- err113 # TODO
- exhaustruct # TODO
- gci # TODO
- gocognit # TODO
Expand Down
26 changes: 26 additions & 0 deletions libs/common/hwerr/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package hwerr

import (
"context"
"fmt"
"hwlocale"

zlog "github.com/rs/zerolog"
"google.golang.org/genproto/googleapis/rpc/errdetails"
genstatus "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/protoadapt"
Expand Down Expand Up @@ -54,3 +56,27 @@ func LocalizedMessage(ctx context.Context, locale hwlocale.Locale) *errdetails.L
Message: str,
}
}

type InvalidEnumError struct {
Enum string
Value string
}

func (e InvalidEnumError) Error() string {
return fmt.Sprintf("invalid enum: %q is not a valid %q", e.Value, e.Enum)
}

// ProtoStatusError implements the Error interface on *genstatus.Status
type ProtoStatusError struct {
status *genstatus.Status
}

func (e ProtoStatusError) Error() string {
return e.status.GetMessage()
}

func NewProtoStatusError(status *genstatus.Status) ProtoStatusError {
return ProtoStatusError{
status: status,
}
}
2 changes: 1 addition & 1 deletion libs/common/hwgrpc/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func authInterceptor(ctx context.Context) (context.Context, error) {
Msg("only fake auth is enabled! no attempt verifying token. falling back to fake token instead")
claims, tokenExpires, err = auth.VerifyFakeToken(ctx, token)
} else {
// verify token -> if fakeToken is used claims will be nil and we will get an error
// verify token -> if fakeToken is used claims will be nil, and we will get an error
claims, tokenExpires, err = auth.VerifyIDToken(ctx, token)
}

Expand Down
4 changes: 3 additions & 1 deletion libs/common/hwgrpc/organization_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ func StreamOrganizationInterceptor(
return next(req, stream)
}

var ErrOrganizationIdMissing = errors.New("organization.id missing in id token")

// organizationInterceptor parses and injects the organization id of the OIDC claims into the current context
// This is a separate function to allow endpoints to not fail when an organization id is not provided
func organizationInterceptor(ctx context.Context) (context.Context, error) {
Expand All @@ -61,7 +63,7 @@ func organizationInterceptor(ctx context.Context) (context.Context, error) {
}

if len(claims.Organization.Id) == 0 {
return nil, errors.New("organization.id missing in id token")
return nil, ErrOrganizationIdMissing
}

// parse organizationID
Expand Down
4 changes: 3 additions & 1 deletion libs/common/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func newTraceProvider(ctx context.Context, res *resource.Resource) (*trace.Trace
return traceProvider, nil
}

var ErrOtelTraceExporterInvalid = errors.New("OTEL_TRACE_EXPORTER invalid")

// newTraceExporter returns a new trace.SpanExporter based on the OTEL_TRACE_EXPORTER env variable
// A SpanExporter pushes traces to a tracing database. For more configuration see the corresponding documentation.
func newTraceExporter(ctx context.Context) (trace.SpanExporter, error) {
Expand All @@ -145,6 +147,6 @@ func newTraceExporter(ctx context.Context) (trace.SpanExporter, error) {
// more info: https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace/[email protected]
return otlptracehttp.New(ctx)
default:
return nil, errors.New("OTEL_TRACE_EXPORTER invalid")
return nil, ErrOtelTraceExporterInvalid
}
}
3 changes: 2 additions & 1 deletion libs/hwauthz/spicedb/spicedb.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package spicedb

import (
"common/hwerr"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -217,7 +218,7 @@ func (s *SpiceDBAuthZ) BulkCheck(ctx context.Context, checks []hwauthz.Permissio
resc := req.GetResource()

if pberr := pair.GetError(); pberr != nil {
err := fmt.Errorf("spicedb: error while checking permissions: %s", pberr.GetMessage())
err := fmt.Errorf("spicedb: error while checking permissions: %w", hwerr.NewProtoStatusError(pberr))
log.Error().Err(err).Msg("spicedb: error while checking permissions")
return nil, err
}
Expand Down
9 changes: 9 additions & 0 deletions libs/hwdb/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"common/hwerr"
"context"
"errors"
"fmt"
"hwlocale"
"reflect"
"strings"
"telemetry"

"github.com/google/uuid"

"github.com/jackc/pgerrcode"
"github.com/jackc/pgx/v5/pgconn"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -261,3 +264,9 @@ func pgConnErr(ctx context.Context, connErr *pgconn.ConnectError) error {
Msg("connection issue")
return genericStatusError(ctx, "database connection issue")
}

type RecordNotFoundError uuid.UUID

func (e RecordNotFoundError) Error() string {
return fmt.Sprintf("could not find record with id %q", uuid.UUID(e).String())
}
53 changes: 43 additions & 10 deletions libs/hwes/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ func (a *AggregateBase) RegisterEventListener(eventType string, eventHandler eve
return a
}

type EventTypeInvalidError string

func (e EventTypeInvalidError) Error() string {
return fmt.Sprintf("event type '%s' is invalid", string(e))
}

// HandleEvent finds and calls the registered event handler
// based on the type of the passed event.
// The executed event handler can modify the in-memory data of the aggregate.
Expand All @@ -133,7 +139,7 @@ func (a *AggregateBase) HandleEvent(event Event) error {

eventHandler, found := a.eventHandlers[event.EventType]
if !found {
return fmt.Errorf("event type '%s' is invalid", event.EventType)
return EventTypeInvalidError(event.EventType)
}

if err := eventHandler(event); err != nil {
Expand All @@ -155,13 +161,27 @@ func (a *AggregateBase) ClearUncommittedEvents() {
a.uncommittedEvents = make([]Event, 0)
}

type EventAggregateMismatchError struct {
Targeted uuid.UUID
Got uuid.UUID
}

func (e EventAggregateMismatchError) Error() string {
return fmt.Sprintf("event applied to aggregate %q but was targeted at aggregate %q",
e.Got.String(), e.Targeted.String())
}

var ErrLoadDeletedAggregate = errors.New("AggregateBase.Load: aggregate has been marked as deleted")

// Load applies events to an aggregate by utilizing the registered event listeners
// Currently not in use. Could be helpful for testing.
func (a *AggregateBase) Load(events []Event) error {
for _, event := range events {
if event.GetAggregateID() != a.GetID() {
return fmt.Errorf("event applied to aggregate '%s' but was targeted at aggregate '%s'",
a.GetID(), event.GetAggregateID())
return fmt.Errorf("AggregateBase.Load: %w", EventAggregateMismatchError{
Targeted: a.GetID(),
Got: event.GetAggregateID(),
})
}

if err := a.HandleEvent(event); err != nil {
Expand All @@ -172,7 +192,7 @@ func (a *AggregateBase) Load(events []Event) error {
a.version++
}
if a.IsDeleted() {
return errors.New("AggregateBase.Load: aggregate has been marked as deleted")
return fmt.Errorf("AggregateBase.Load: %w", ErrLoadDeletedAggregate)
}

return nil
Expand All @@ -183,8 +203,10 @@ func (a *AggregateBase) Load(events []Event) error {
// Apply -> You apply a *new* event to the aggregate that could be persisted
func (a *AggregateBase) Apply(event Event) error {
if event.GetAggregateID() != a.GetID() {
return fmt.Errorf("event applied to aggregate '%s' but was targeted at aggregate '%s'",
a.GetID(), event.GetAggregateID())
return fmt.Errorf("AggregateBase.Apply: %w", EventAggregateMismatchError{
Targeted: a.GetID(),
Got: event.GetAggregateID(),
})
}

if err := a.HandleEvent(event); err != nil {
Expand All @@ -197,18 +219,29 @@ func (a *AggregateBase) Apply(event Event) error {
return nil
}

type EventOutOfDateError struct {
AggregateVersion uint64
EventVersion uint64
}

func (e EventOutOfDateError) Error() string {
return fmt.Sprintf("event version (%d) is lower than aggregate version (%d)",
e.EventVersion, e.AggregateVersion)
}

// Progress should be called after all events are loaded though an aggregate store.
// The passed event gets applied to an aggregate by utilizing the registered event listeners.
// Progress -> You progress the state of an aggregate
func (a *AggregateBase) Progress(event Event) error {
if event.GetAggregateID() != a.GetID() {
return fmt.Errorf("event applied to aggregate '%s' but was targeted at aggregate '%s'",
a.GetID(), event.GetAggregateID())
return fmt.Errorf("AggregateBase.Progress: %w", EventAggregateMismatchError{
Targeted: a.GetID(),
Got: event.GetAggregateID(),
})
}

if event.GetVersion() < a.GetVersion() {
return fmt.Errorf("event version of %d is lower then aggregate version of %d",
event.GetVersion(), a.GetVersion())
return EventOutOfDateError{EventVersion: event.GetVersion(), AggregateVersion: a.GetVersion()}
}

if err := a.HandleEvent(event); err != nil {
Expand Down
21 changes: 21 additions & 0 deletions libs/hwes/errs/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package errs

import (
"errors"
"fmt"

"github.com/google/uuid"
)

var (
ErrAlreadyExists = errors.New("cannot create an already existing aggregate")
ErrOrganizationMissing = errors.New("organization is missing from event")
ErrCommitterMissing = errors.New("committer is missing from event")
ErrPayloadMissing = errors.New("payload is empty")
)

type NotFoundError uuid.UUID

func (e NotFoundError) Error() string {
return fmt.Sprintf("record with id %s not found", uuid.UUID(e).String())
}
34 changes: 24 additions & 10 deletions libs/hwes/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"hwutil"
"hwutil/errs"
"strings"
"telemetry"
"time"
Expand Down Expand Up @@ -70,13 +71,16 @@ type metadata struct {
func GetEventTypeOptionOfProtoMessageV1(protoMessage proto.Message) string {
eventTypeOption := pbEventsV1.E_EventType

protoEventType, ok := proto.GetExtension(protoMessage.ProtoReflect().Descriptor().Options(), eventTypeOption).(string)
descriptorOptions := protoMessage.ProtoReflect().Descriptor().Options()
protoEventTypeAny := proto.GetExtension(descriptorOptions, eventTypeOption)
protoEventType, ok := protoEventTypeAny.(string)
if !ok {
panic(fmt.Sprintf(
"String type assertion for eventType '%s' on protoMessage '%s' failed.",
eventTypeOption.TypeDescriptor().FullName(),
protoMessage.ProtoReflect().Descriptor().FullName(),
))
panic(
fmt.Errorf("not a string: eventType %q on protoMessage %q: %w",
eventTypeOption.TypeDescriptor().FullName(),
protoMessage.ProtoReflect().Descriptor().FullName(),
errs.NewCastError("string", protoEventTypeAny)),
)
}

if protoEventType == "" {
Expand Down Expand Up @@ -164,6 +168,12 @@ func NewEventFromProto(aggregate Aggregate, message proto.Message, opts ...Event
return event, nil
}

type StreamIdMalformedError string

func (e StreamIdMalformedError) Error() string {
return fmt.Sprintf("cannot resolve aggregateType and aggregateID from streamID %q", string(e))
}

// resolveAggregateIDAndTypeFromStreamID extracts the aggregateID and aggregateType of a given streamID
// See aggregate.GetTypeID
//
Expand All @@ -181,13 +191,13 @@ func resolveAggregateIDAndTypeFromStreamID(streamID string) (aID uuid.UUID, aggr
aggregateTypeStr = streamIDParts[0]
aggregateIDStr = streamIDParts[1]
} else {
err = fmt.Errorf("cannot resolve aggregateType and aggregateID from streamID '%s'", streamID)
err = StreamIdMalformedError(streamID)
return
}

aggregateType = AggregateType(aggregateTypeStr)
if aggregateType == "" {
err = fmt.Errorf("resolved empty aggregateType from streamID '%s'", streamID)
err = StreamIdMalformedError(streamID)
return
}

Expand Down Expand Up @@ -326,9 +336,11 @@ func (e *Event) SetProtoData(message proto.Message) error {
return nil
}

var ErrGetJsonOnProtoData = errors.New("data of event is marked as proto, use GetProtoData instead")

func (e *Event) GetJsonData(data interface{}) error {
if e.DataIsProto {
return errors.New("data of this event is marked as proto, use GetProtoData instead")
return ErrGetJsonOnProtoData
}

if jsonable, ok := data.(hwutil.JSONAble); ok {
Expand All @@ -337,9 +349,11 @@ func (e *Event) GetJsonData(data interface{}) error {
return json.Unmarshal(e.Data, data)
}

var ErrGetProtoOnJsonData = errors.New("data of event is not marked as proto, use GetJsonData instead")

func (e *Event) GetProtoData(message proto.Message) error {
if !e.DataIsProto {
return errors.New("data of this event is not marked as proto, use GetJsonData instead")
return ErrGetProtoOnJsonData
}

return protojson.Unmarshal(e.Data, message)
Expand Down
9 changes: 7 additions & 2 deletions libs/hwes/eventstoredb/aggregate_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,17 @@ func NewAggregateStore(es *esdb.Client) *AggregateStore {
return &AggregateStore{es: es}
}

var ErrNoAppliedEvents = errors.New("aggregate has no applied events. " +
"Consider to persist and load the aggregate first")

// getExpectedRevisionByPreviousRead implements a strategy for our getExpectedRevision strategy pattern.
// This function resolves the version by returning the version of the last applied event of our aggregate.
func (as *AggregateStore) getExpectedRevisionByPreviousRead(
_ context.Context,
a hwes.Aggregate,
) (esdb.ExpectedRevision, error) {
if len(a.GetAppliedEvents()) == 0 {
return nil, errors.New("aggregate has no applied events. Consider to persist and load the aggregate first")
return nil, ErrNoAppliedEvents
}
lastAppliedEvent := a.GetAppliedEvents()[len(a.GetAppliedEvents())-1]
eventNumber := lastAppliedEvent.GetVersion()
Expand Down Expand Up @@ -97,6 +100,8 @@ func (as *AggregateStore) doSave(
return common.ConsistencyToken(r.NextExpectedVersion), nil
}

var ErrAggregateWasDeleted = errors.New("aggregate has been marked as deleted")

// Implements AggregateStore interface

func (as *AggregateStore) Load(ctx context.Context, aggregate hwes.Aggregate) error {
Expand Down Expand Up @@ -131,7 +136,7 @@ func (as *AggregateStore) Load(ctx context.Context, aggregate hwes.Aggregate) er
}

if aggregate.IsDeleted() {
return errors.New("AggregateStore.Load: aggregate has been marked as deleted")
return fmt.Errorf("AggregateStore.Load: %w", ErrAggregateWasDeleted)
}

return nil
Expand Down
Loading

0 comments on commit 7a6256a

Please sign in to comment.