Skip to content

Commit

Permalink
Merge pull request #1481 from anyproto/release-6-fix-file-migration-t…
Browse files Browse the repository at this point in the history
…o-main

Release 6 to main
  • Loading branch information
fat-fellow authored Aug 26, 2024
2 parents 61604b8 + a7829d5 commit 7f0c94a
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 4 deletions.
5 changes: 3 additions & 2 deletions core/block/editor/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,13 @@ func (f *File) Init(ctx *smartblock.InitContext) error {
f.SmartBlock.AddHook(f.reconciler.FileObjectHook(domain.FullID{SpaceID: f.SpaceID(), ObjectID: f.Id()}), smartblock.HookBeforeApply)

if !ctx.IsNewObject {
err = f.fileObjectService.EnsureFileAddedToSyncQueue(domain.FullID{ObjectID: f.Id(), SpaceID: f.SpaceID()}, ctx.State.Details())
fullId := domain.FullID{ObjectID: f.Id(), SpaceID: f.SpaceID()}
err = f.fileObjectService.EnsureFileAddedToSyncQueue(fullId, ctx.State.Details())
if err != nil {
log.Errorf("failed to ensure file added to sync queue: %v", err)
}
f.AddHook(func(applyInfo smartblock.ApplyInfo) error {
return f.fileObjectService.EnsureFileAddedToSyncQueue(domain.FullID{ObjectID: f.Id(), SpaceID: f.SpaceID()}, applyInfo.State.Details())
return f.fileObjectService.EnsureFileAddedToSyncQueue(fullId, applyInfo.State.Details())
}, smartblock.HookOnStateRebuild)
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion core/block/source/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type accountService interface {

type Space interface {
Id() string
IsPersonal() bool
TreeBuilder() objecttreebuilder.TreeBuilder
GetRelationIdByKey(ctx context.Context, key domain.RelationKey) (id string, err error)
GetTypeIdByKey(ctx context.Context, key domain.TypeKey) (id string, err error)
DeriveObjectID(ctx context.Context, uniqueKey domain.UniqueKey) (id string, err error)
StoredIds() []string
IsPersonal() bool
}

type Service interface {
Expand Down
52 changes: 51 additions & 1 deletion core/files/fileobject/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/anyproto/anytype-heart/core/files"
"github.com/anyproto/anytype-heart/core/files/fileoffloader"
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
"github.com/anyproto/anytype-heart/core/session"
"github.com/anyproto/anytype-heart/core/syncstatus/filesyncstatus"
"github.com/anyproto/anytype-heart/pb"
"github.com/anyproto/anytype-heart/pkg/lib/bundle"
Expand Down Expand Up @@ -81,6 +82,7 @@ type service struct {
objectStore objectstore.ObjectStore
spaceIdResolver idresolver.Resolver
migrationQueue *persistentqueue.Queue[*migrationItem]
objectArchiver objectArchiver

indexer *indexer

Expand Down Expand Up @@ -120,6 +122,7 @@ func (s *service) Init(a *app.App) error {
s.fileStore = app.MustComponent[filestore.FileStore](a)
s.spaceIdResolver = app.MustComponent[idresolver.Resolver](a)
s.fileOffloader = app.MustComponent[fileoffloader.Service](a)
s.objectArchiver = app.MustComponent[objectArchiver](a)
cfg := app.MustComponent[configProvider](a)

s.indexer = s.newIndexer()
Expand Down Expand Up @@ -148,7 +151,11 @@ func (s *service) Run(_ context.Context) error {
go func() {
defer s.closeWg.Done()

err := s.ensureNotSyncedFilesAddedToQueue()
err := s.deleteMigratedFilesInNonPersonalSpaces(context.Background())
if err != nil {
log.Errorf("delete migrated files in non personal spaces: %v", err)
}
err = s.ensureNotSyncedFilesAddedToQueue()
if err != nil {
log.Errorf("ensure not synced files added to queue: %v", err)
}
Expand All @@ -158,6 +165,49 @@ func (s *service) Run(_ context.Context) error {
return nil
}

type objectArchiver interface {
SetPagesIsArchived(ctx session.Context, req pb.RpcObjectListSetIsArchivedRequest) error
}

func (s *service) deleteMigratedFilesInNonPersonalSpaces(ctx context.Context) error {
personalSpace, err := s.spaceService.GetPersonalSpace(ctx)
if err != nil {
return err
}

objectIds, _, err := s.objectStore.QueryObjectIDs(database.Query{
Filters: []*model.BlockContentDataviewFilter{
{
RelationKey: bundle.RelationKeyFileId.String(),
Condition: model.BlockContentDataviewFilter_NotEmpty,
},
{
RelationKey: bundle.RelationKeyUniqueKey.String(),
Condition: model.BlockContentDataviewFilter_NotEmpty,
},
{
RelationKey: bundle.RelationKeySpaceId.String(),
Condition: model.BlockContentDataviewFilter_NotEqual,
Value: pbtypes.String(personalSpace.Id()),
},
},
})
if err != nil {
return err
}
if len(objectIds) > 0 {
err = s.objectArchiver.SetPagesIsArchived(nil, pb.RpcObjectListSetIsArchivedRequest{
ObjectIds: objectIds,
IsArchived: true,
})
if err != nil {
return err
}
}

return nil
}

// After migrating to new sync queue we need to ensure that all not synced files are added to the queue
func (s *service) ensureNotSyncedFilesAddedToQueue() error {
records, err := s.objectStore.Query(database.Query{
Expand Down
13 changes: 13 additions & 0 deletions core/files/fileobject/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/anyproto/anytype-heart/core/filestorage"
"github.com/anyproto/anytype-heart/core/filestorage/filesync"
"github.com/anyproto/anytype-heart/core/filestorage/rpcstore"
"github.com/anyproto/anytype-heart/core/session"
wallet2 "github.com/anyproto/anytype-heart/core/wallet"
"github.com/anyproto/anytype-heart/core/wallet/mock_wallet"
"github.com/anyproto/anytype-heart/pb"
Expand Down Expand Up @@ -68,6 +69,16 @@ func (c *dummyConfig) Name() string {
return "dummyConfig"
}

type dummyObjectArchiver struct{}

func (a *dummyObjectArchiver) SetPagesIsArchived(ctx session.Context, req pb.RpcObjectListSetIsArchivedRequest) error {
return nil
}

func (a *dummyObjectArchiver) Name() string { return "dummyObjectArchiver" }

func (a *dummyObjectArchiver) Init(_ *app.App) error { return nil }

const testResolveRetryDelay = 5 * time.Millisecond

func newFixture(t *testing.T) *fixture {
Expand All @@ -85,6 +96,7 @@ func newFixture(t *testing.T) *fixture {
eventSender.EXPECT().Broadcast(mock.Anything).Return().Maybe()
fileService := files.New()
spaceService := mock_space.NewMockService(t)
spaceService.EXPECT().GetPersonalSpace(mock.Anything).Return(nil, fmt.Errorf("not needed")).Maybe()
spaceIdResolver := mock_idresolver.NewMockResolver(t)

svc := New(testResolveRetryDelay, testResolveRetryDelay)
Expand Down Expand Up @@ -114,6 +126,7 @@ func newFixture(t *testing.T) *fixture {
a.Register(testutil.PrepareMock(ctx, a, mock_accountservice.NewMockService(ctrl)))
a.Register(testutil.PrepareMock(ctx, a, wallet))
a.Register(&config.Config{DisableFileConfig: true, NetworkMode: pb.RpcAccount_DefaultConfig, PeferYamuxTransport: true})
a.Register(&dummyObjectArchiver{})

err = a.Start(ctx)
require.NoError(t, err)
Expand Down
29 changes: 29 additions & 0 deletions util/debug/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,35 @@ var (
addressPattern = regexp.MustCompile(`\+?0x[0-9a-z]*`)
)

func InlineCallStack() string {
// Allocate space for the call stack
var pcs [32]uintptr

// Skip 3 frames: runtime.Callers, printStack, and the function calling printStack
n := runtime.Callers(2, pcs[:])

// Get the stack frames
frames := runtime.CallersFrames(pcs[:n])

var sep string
buf := &strings.Builder{}
// Iterate through the frames and print them
for {
frame, more := frames.Next()
buf.WriteString(sep)
sep = " -> "
buf.WriteString(frame.Function)
buf.WriteString(" ")
buf.WriteString(frame.File)
buf.WriteString(":")
buf.WriteString(fmt.Sprintf("%d", frame.Line))
if !more {
break
}
}
return buf.String()
}

func ParseGoroutinesDump(trace string, pattern string) string {
var sb strings.Builder

Expand Down

0 comments on commit 7f0c94a

Please sign in to comment.