Skip to content

Commit

Permalink
Merge pull request #60 from anyproto/remove-gothreads
Browse files Browse the repository at this point in the history
cleanup
  • Loading branch information
requilence authored Jun 12, 2023
2 parents 6914ccb + 72a9402 commit 969eab8
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 786 deletions.
4 changes: 0 additions & 4 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ updates:
- dependency-name: github.com/improbable-eng/grpc-web
versions:
- 0.14.0
- dependency-name: github.com/hsanjuan/ipfs-lite
versions:
- 1.1.18
- 1.1.19
- dependency-name: golang.org/x/text
versions:
- 0.3.6
Expand Down
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ https://github.com/njpatel/grpcc
- `docker-compose up` - run the prometheus/grafana
- use `ANYTYPE_PROM=0.0.0.0:9094` when running middleware to enable metrics collection. Client commands metrics available only in gRPC mode
- open http://127.0.0.1:3000 to view collected metrics in Grafana. You can find several dashboards there:
- **Threads gRPC client** for go-threads client metrics(when you make requests to other nodes)
- **Threads gRPC server** for go-threads server metrics(when other nodes make requests to you)
- **MW** internal middleware metrics such as changes, added and created threads histograms
- **MW commands server** metrics for clients commands. Works only in grpc-server mode

Expand Down
11 changes: 5 additions & 6 deletions core/block/editor/smartblock/smartblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ func (sb *smartBlock) onMetaChange(details *types.Struct) {
// if we've got update for ourselves, we are only interested in local-only details, because the rest details changes will be appended when applying records in the current sb
diff = pbtypes.StructFilterKeys(diff, bundle.LocalRelationsKeys)
if len(diff.GetFields()) > 0 {
log.With("thread", sb.Id()).Debugf("onMetaChange current object: %s", pbtypes.Sprint(diff))
log.With("objectID", sb.Id()).Debugf("onMetaChange current object: %s", pbtypes.Sprint(diff))
}
}

Expand Down Expand Up @@ -553,7 +553,7 @@ func (sb *smartBlock) navigationalLinks(s *state.State) []string {
return true
})
if err != nil {
log.With("thread", s.RootId()).Errorf("failed to iterate over simple blocks: %s", err)
log.With("objectID", s.RootId()).Errorf("failed to iterate over simple blocks: %s", err)
}

var det *types.Struct
Expand Down Expand Up @@ -771,7 +771,7 @@ func (sb *smartBlock) Apply(s *state.State, flags ...ApplyFlag) (err error) {
afterReportChangeTime := time.Now()
if hooks {
if e := sb.execHooks(HookAfterApply, ApplyInfo{State: sb.Doc.(*state.State), Events: msgs, Changes: changes}); e != nil {
log.With("thread", sb.Id()).Warnf("after apply execHooks error: %v", e)
log.With("objectID", sb.Id()).Warnf("after apply execHooks error: %v", e)
}
}
afterApplyHookTime := time.Now()
Expand Down Expand Up @@ -910,9 +910,8 @@ func (sb *smartBlock) injectLocalDetails(s *state.State) error {
return nil, nil
})
if err != nil {
log.With("thread", sb.Id()).
With("sbType", sb.Type()).
Errorf("failed to update pending details: %v", err)
log.With("objectID", sb.Id()).
With("sbType", sb.Type()).Errorf("failed to update pending details: %v", err)
}

// inject also derived keys, because it may be a good idea to have created date and creator cached,
Expand Down
4 changes: 2 additions & 2 deletions core/block/editor/state/change.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (s *State) GetAndUnsetFileKeys() (keys []pb.ChangeFileKeys) {
func (s *State) ApplyChangeIgnoreErr(changes ...*pb.ChangeContent) {
for _, ch := range changes {
if err := s.applyChange(ch); err != nil {
log.With("thread", s.RootId()).Warnf("error while applying change %T: %v; ignore", ch.Value, err)
log.With("objectID", s.RootId()).Warnf("error while applying change %T: %v; ignore", ch.Value, err)
}
}
return
Expand Down Expand Up @@ -513,7 +513,7 @@ func (s *State) fillChanges(msgs []simple.EventMessage) {
b1, _ = msg.Msg.Marshal()
b2, _ = msgs[i-1].Msg.Marshal()
if bytes.Equal(b1, b2) {
log.With("thread", s.rootId).Errorf("duplicate change: " + pbtypes.Sprint(msg.Msg))
log.With("objectID", s.rootId).Errorf("duplicate change: " + pbtypes.Sprint(msg.Msg))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/block/editor/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ func (s *State) apply(fast, one, withLayouts bool) (msgs []simple.EventMessage,
prevModifiedDate := pbtypes.Get(s.parent.LocalDetails(), bundle.RelationKeyLastModifiedDate.String())
if s.localDetails != nil {
if _, isNull := prevModifiedDate.GetKind().(*types.Value_NullValue); prevModifiedDate == nil || isNull {
log.With("thread", s.rootId).Debugf("failed to revert prev modifed date: prev date is nil")
log.With("objectID", s.rootId).Debugf("failed to revert prev modifed date: prev date is nil")
} else {
s.localDetails.Fields[bundle.RelationKeyLastModifiedDate.String()] = prevModifiedDate
}
Expand Down Expand Up @@ -706,7 +706,7 @@ func (s *State) processTrailingDuplicatedEvents(msgs []simple.EventMessage) (fil
continue
}
if bytes.Equal(prev, curr) {
log.With("thread", s.RootId()).Debugf("found trailing duplicated event %s", e.Msg.String())
log.With("objectID", s.RootId()).Debugf("found trailing duplicated event %s", e.Msg.String())
continue
}
prev = curr
Expand Down Expand Up @@ -1194,7 +1194,7 @@ func (s *State) DepSmartIds(blocks, details, relations, objTypes, creatorModifie
return true
})
if err != nil {
log.With("thread", s.RootId()).Errorf("failed to iterate over simple blocks: %s", err)
log.With("objectID", s.RootId()).Errorf("failed to iterate over simple blocks: %s", err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/block/editor/subobjectcollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func (c *SubObjectCollection) updateSubObject(info smartblock.ApplyInfo) (err er
} else if keyUnset := ch.GetStoreKeyUnset(); keyUnset != nil {
err = c.removeObject(st, strings.Join(keyUnset.Path, addr.SubObjectCollectionIdSeparator))
if err != nil {
log.With("threadId", c.Id()).Errorf("failed to remove object %s: %v", strings.Join(keyUnset.Path, addr.SubObjectCollectionIdSeparator), err)
log.With("objectID", c.Id()).Errorf("failed to remove object %s: %v", strings.Join(keyUnset.Path, addr.SubObjectCollectionIdSeparator), err)
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/block/editor/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ var WithDataviewID = func(id string, dataview model.BlockContentOfDataview, forc
forceViews && len(dvBlock.Model().GetDataview().Relations) != len(dataview.Dataview.Relations) ||
forceViews && !pbtypes.DataviewViewsEqualSorted(dvBlock.Model().GetDataview().Views, dataview.Dataview.Views) {

/* log.With("thread", s.RootId()).With("name", pbtypes.GetString(s.Details(), "name")).Warnf("dataview needs to be migrated: %v, %v, %v, %v",
/* log.With("object" s.RootId()).With("name", pbtypes.GetString(s.Details(), "name")).Warnf("dataview needs to be migrated: %v, %v, %v, %v",
len(dvBlock.Model().GetDataview().Relations) == 0,
!slice.UnsortedEquals(dvBlock.Model().GetDataview().Source, dataview.Dataview.Source),
len(dvBlock.Model().GetDataview().Views) == 0,
Expand Down
8 changes: 4 additions & 4 deletions core/block/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ func (e *export) Export(req pb.RpcObjectListExportRequest) (path string, succeed
for docId := range docs {
did := docId
if err = queue.Wait(func() {
log.With("threadId", did).Debugf("write doc")
log.With("objectID", did).Debugf("write doc")
if werr := e.writeDoc(req.Format, wr, docs, queue, did, req.IncludeFiles, req.IsJson); werr != nil {
log.With("threadId", did).Warnf("can't export doc: %v", werr)
log.With("objectID", did).Warnf("can't export doc: %v", werr)
} else {
succeed++
}
Expand Down Expand Up @@ -265,12 +265,12 @@ func (e *export) getExistedObjects(includeArchived bool) (map[string]*types.Stru
func (e *export) writeMultiDoc(mw converter.MultiConverter, wr writer, docs map[string]*types.Struct, queue process.Queue) (succeed int, err error) {
for did := range docs {
if err = queue.Wait(func() {
log.With("threadId", did).Debugf("write doc")
log.With("objectID", did).Debugf("write doc")
werr := e.bs.Do(did, func(b sb.SmartBlock) error {
return mw.Add(b.NewState().Copy())
})
if err != nil {
log.With("threadId", did).Warnf("can't export doc: %v", werr)
log.With("objectID", did).Warnf("can't export doc: %v", werr)
} else {
succeed++
}
Expand Down
6 changes: 3 additions & 3 deletions core/block/import/objectcreator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (oc *ObjectCreator) Create(ctx *session.Context,
}

if err = converter.UpdateLinksToObjects(st, oldIDtoNew, newID); err != nil {
log.With("object", newID).Errorf("failed to update objects ids: %s", err.Error())
log.With("objectID", newID).Errorf("failed to update objects ids: %s", err.Error())
}

if sn.SbType == coresb.SmartBlockTypeWorkspace {
Expand All @@ -116,7 +116,7 @@ func (oc *ObjectCreator) Create(ctx *session.Context,
if payload := createPayloads[newID]; payload.RootRawChange != nil {
respDetails, err = oc.createNewObject(ctx, payload, st, newID, oldIDtoNew)
if err != nil {
log.With("object", newID).Errorf("failed to create %s: %s", newID, err.Error())
log.With("objectID", newID).Errorf("failed to create %s: %s", newID, err.Error())
return nil, "", err
}
} else {
Expand Down Expand Up @@ -153,7 +153,7 @@ func (oc *ObjectCreator) createNewObject(ctx *session.Context,
}
})
if err != nil {
log.With("object", newID).Errorf("failed to create %s: %s", newID, err.Error())
log.With("objectID", newID).Errorf("failed to create %s: %s", newID, err.Error())
return nil, err
}
respDetails := sb.Details()
Expand Down
20 changes: 10 additions & 10 deletions core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock2.DocInfo, options .

err = i.store.SaveLastIndexedHeadsHash(info.Id, headHashToIndex)
if err != nil {
log.With("thread", info.Id).Errorf("failed to save indexed heads hash: %v", err)
log.With("objectID", info.Id).Errorf("failed to save indexed heads hash: %v", err)
}
}

Expand All @@ -211,14 +211,14 @@ func (i *indexer) Index(ctx context.Context, info smartblock2.DocInfo, options .

lastIndexedHash, err := i.store.GetLastIndexedHeadsHash(info.Id)
if err != nil {
log.With("thread", info.Id).Errorf("failed to get last indexed heads hash: %v", err)
log.With("object", info.Id).Errorf("failed to get last indexed heads hash: %v", err)
}

if opts.SkipIfHeadsNotChanged {
if headHashToIndex == "" {
log.With("thread", info.Id).Errorf("heads hash is empty")
log.With("objectID", info.Id).Errorf("heads hash is empty")
} else if lastIndexedHash == headHashToIndex {
log.With("thread", info.Id).Debugf("heads not changed, skipping indexing")
log.With("objectID", info.Id).Debugf("heads not changed, skipping indexing")

// todo: the optimization temporarily disabled to see the metrics
//return nil
Expand All @@ -232,7 +232,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock2.DocInfo, options .
if indexLinks {
if err = i.store.UpdateObjectLinks(info.Id, info.Links); err != nil {
hasError = true
log.With("thread", info.Id).Errorf("failed to save object links: %v", err)
log.With("objectID", info.Id).Errorf("failed to save object links: %v", err)
}
}

Expand All @@ -241,15 +241,15 @@ func (i *indexer) Index(ctx context.Context, info smartblock2.DocInfo, options .
if err := i.store.UpdateObjectDetails(info.Id, details); err != nil {
if errors.Is(err, objectstore.ErrDetailsNotChanged) {
metrics.ObjectDetailsHeadsNotChangedCounter.Add(1)
log.With("objectId", info.Id).With("hashesAreEqual", lastIndexedHash == headHashToIndex).With("lastHashIsEmpty", lastIndexedHash == "").With("skipFlagSet", opts.SkipIfHeadsNotChanged).Debugf("details have not changed")
log.With("objectID", info.Id).With("hashesAreEqual", lastIndexedHash == headHashToIndex).With("lastHashIsEmpty", lastIndexedHash == "").With("skipFlagSet", opts.SkipIfHeadsNotChanged).Debugf("details have not changed")
} else {
hasError = true
log.With("thread", info.Id).Errorf("can't update object store: %v", err)
log.With("objectID", info.Id).Errorf("can't update object store: %v", err)
}
} else {
// todo: remove temp log
if lastIndexedHash == headHashToIndex {
l := log.With("objectId", info.Id).
l := log.With("objectID", info.Id).
With("hashesAreEqual", lastIndexedHash == headHashToIndex).
With("lastHashIsEmpty", lastIndexedHash == "").
With("skipFlagSet", opts.SkipIfHeadsNotChanged)
Expand All @@ -268,7 +268,7 @@ func (i *indexer) Index(ctx context.Context, info smartblock2.DocInfo, options .
// todo: the optimization temporarily disabled to see the metrics
if true || !(opts.SkipFullTextIfHeadsNotChanged && lastIndexedHash == headHashToIndex) {
if err := i.store.AddToIndexQueue(info.Id); err != nil {
log.With("thread", info.Id).Errorf("can't add id to index queue: %v", err)
log.With("objectID", info.Id).Errorf("can't add id to index queue: %v", err)
}
}

Expand Down Expand Up @@ -683,7 +683,7 @@ func (i *indexer) reindexIdsIgnoreErr(ctx context.Context, ids ...string) (succe
for _, id := range ids {
err := i.reindexDoc(ctx, id)
if err != nil {
log.With("thread", id).Errorf("failed to reindex: %v", err)
log.With("objectID", id).Errorf("failed to reindex: %v", err)
} else {
successfullyReindexed++
}
Expand Down
34 changes: 0 additions & 34 deletions docs/proto.md
Original file line number Diff line number Diff line change
Expand Up @@ -1387,8 +1387,6 @@
- [Restrictions](#anytype.model.Restrictions)
- [Restrictions.DataviewRestrictions](#anytype.model.Restrictions.DataviewRestrictions)
- [SmartBlockSnapshotBase](#anytype.model.SmartBlockSnapshotBase)
- [ThreadCreateQueueEntry](#anytype.model.ThreadCreateQueueEntry)
- [ThreadDeeplinkPayload](#anytype.model.ThreadDeeplinkPayload)

- [Account.StatusType](#anytype.model.Account.StatusType)
- [Block.Align](#anytype.model.Block.Align)
Expand Down Expand Up @@ -21905,38 +21903,6 @@ stored |




<a name="anytype.model.ThreadCreateQueueEntry"></a>

### ThreadCreateQueueEntry



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| collectionThread | [string](#string) | | |
| threadId | [string](#string) | | |






<a name="anytype.model.ThreadDeeplinkPayload"></a>

### ThreadDeeplinkPayload



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| key | [string](#string) | | |
| addrs | [string](#string) | repeated | |








Expand Down
1 change: 1 addition & 0 deletions metrics/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (c ReindexEvent) ToEvent() *Event {

const RecordCreateEventThresholdMs = 30

// todo: reimplement or remove
type RecordCreateEvent struct {
PrepareMs int64
NewRecordMs int64
Expand Down
7 changes: 0 additions & 7 deletions pkg/lib/ipfs/ipfs.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/lib/localstore/objectstore/objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,7 @@ func hasObjectId(txn noctxds.Txn, id string) (bool, error) {
func (s *dsObjectStore) getObjectInfo(txn noctxds.Txn, id string) (*model.ObjectInfo, error) {
sbt, err := s.sbtProvider.Type(id)
if err != nil {
log.With("thread", id).Errorf("failed to extract smartblock type %s", id) // todo rq: surpess error?
log.With("objectID", id).Errorf("failed to extract smartblock type %s", id) // todo rq: surpess error?
return nil, ErrNotAnObject
}
if sbt == smartblock.SmartBlockTypeArchive {
Expand Down
Loading

0 comments on commit 969eab8

Please sign in to comment.