Skip to content

Commit 880ac3d

Browse files
authored
Merge pull request #1401 from anyproto/GO-3769-sync-status-updates
GO-3769: Sync status updates
2 parents 1b9b641 + 15d2d39 commit 880ac3d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+2780
-2514
lines changed

.golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ issues:
1313
- pb
1414
exclude-files:
1515
- '.*_test.go'
16+
- 'mock*'
1617
- 'testMock/*'
1718
- 'clientlibrary/service/service.pb.go'
1819

.mockery.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,9 @@ packages:
192192
interfaces:
193193
PeerStatusChecker:
194194
SyncDetailsUpdater:
195+
github.com/anyproto/anytype-heart/core/syncstatus/nodestatus:
196+
interfaces:
197+
NodeStatus:
195198
github.com/anyproto/anytype-heart/core/syncstatus/objectsyncstatus:
196199
interfaces:
197200
Updater:
@@ -210,4 +213,6 @@ packages:
210213
github.com/anyproto/anytype-heart/core/syncstatus/spacesyncstatus:
211214
interfaces:
212215
SpaceIdGetter:
216+
NodeUsage:
217+
NetworkConfig:
213218
Updater:

core/anytype/bootstrap.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ import (
8181
"github.com/anyproto/anytype-heart/core/syncstatus/detailsupdater"
8282
"github.com/anyproto/anytype-heart/core/syncstatus/nodestatus"
8383
"github.com/anyproto/anytype-heart/core/syncstatus/spacesyncstatus"
84+
"github.com/anyproto/anytype-heart/core/syncstatus/syncsubscriptions"
8485
"github.com/anyproto/anytype-heart/core/wallet"
8586
"github.com/anyproto/anytype-heart/metrics"
8687
"github.com/anyproto/anytype-heart/pkg/lib/core"
@@ -263,7 +264,7 @@ func Bootstrap(a *app.App, components ...app.Component) {
263264
Register(treemanager.New()).
264265
Register(block.New()).
265266
Register(indexer.New()).
266-
Register(detailsupdater.NewUpdater()).
267+
Register(detailsupdater.New()).
267268
Register(session.NewHookRunner()).
268269
Register(spacesyncstatus.NewSpaceSyncStatus()).
269270
Register(nodestatus.NewNodeStatus()).
@@ -277,6 +278,7 @@ func Bootstrap(a *app.App, components ...app.Component) {
277278
Register(debug.New()).
278279
Register(collection.New()).
279280
Register(subscription.New()).
281+
Register(syncsubscriptions.New()).
280282
Register(builtinobjects.New()).
281283
Register(bookmark.New()).
282284
Register(importer.New()).

core/block/object/treesyncer/mock_treesyncer/mock_SyncDetailsUpdater.go

+14-16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/block/object/treesyncer/treesyncer.go

+8-34
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"github.com/anyproto/any-sync/net/streampool"
1616
"github.com/anyproto/any-sync/nodeconf"
1717
"go.uber.org/zap"
18-
19-
"github.com/anyproto/anytype-heart/core/domain"
2018
)
2119

2220
var log = logger.NewNamed(treemanager.CName)
@@ -62,14 +60,9 @@ type SyncedTreeRemover interface {
6260
RemoveAllExcept(senderId string, differentRemoteIds []string)
6361
}
6462

65-
type PeerStatusChecker interface {
66-
app.Component
67-
IsPeerOffline(peerId string) bool
68-
}
69-
7063
type SyncDetailsUpdater interface {
7164
app.Component
72-
UpdateDetails(objectId []string, status domain.ObjectSyncStatus, syncError domain.SyncError, spaceId string)
65+
UpdateSpaceDetails(existing, missing []string, spaceId string)
7366
}
7467

7568
type treeSyncer struct {
@@ -84,7 +77,6 @@ type treeSyncer struct {
8477
treeManager treemanager.TreeManager
8578
isRunning bool
8679
isSyncing bool
87-
peerManager PeerStatusChecker
8880
nodeConf nodeconf.NodeConf
8981
syncedTreeRemover SyncedTreeRemover
9082
syncDetailsUpdater SyncDetailsUpdater
@@ -106,7 +98,6 @@ func NewTreeSyncer(spaceId string) treesyncer.TreeSyncer {
10698
func (t *treeSyncer) Init(a *app.App) (err error) {
10799
t.isSyncing = true
108100
t.treeManager = app.MustComponent[treemanager.TreeManager](a)
109-
t.peerManager = app.MustComponent[PeerStatusChecker](a)
110101
t.nodeConf = app.MustComponent[nodeconf.NodeConf](a)
111102
t.syncedTreeRemover = app.MustComponent[SyncedTreeRemover](a)
112103
t.syncDetailsUpdater = app.MustComponent[SyncDetailsUpdater](a)
@@ -161,13 +152,11 @@ func (t *treeSyncer) ShouldSync(peerId string) bool {
161152
return t.isSyncing
162153
}
163154

164-
func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
155+
func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) (err error) {
165156
t.Lock()
166157
defer t.Unlock()
167-
var err error
168158
isResponsible := slices.Contains(t.nodeConf.NodeIds(t.spaceId), peerId)
169-
defer t.sendResultEvent(err, isResponsible, peerId, existing)
170-
t.sendSyncingEvent(peerId, existing, missing, isResponsible)
159+
t.sendSyncEvents(existing, missing, isResponsible)
171160
reqExec, exists := t.requestPools[peerId]
172161
if !exists {
173162
reqExec = newExecutor(t.requests, 0)
@@ -206,31 +195,15 @@ func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missi
206195
return nil
207196
}
208197

209-
func (t *treeSyncer) sendSyncingEvent(peerId string, existing []string, missing []string, nodePeer bool) {
198+
func (t *treeSyncer) sendSyncEvents(existing, missing []string, nodePeer bool) {
210199
if !nodePeer {
211200
return
212201
}
213-
if t.peerManager.IsPeerOffline(peerId) {
214-
t.sendDetailsUpdates(existing, domain.ObjectError, domain.NetworkError)
215-
return
216-
}
217-
if len(existing) != 0 || len(missing) != 0 {
218-
t.sendDetailsUpdates(existing, domain.ObjectSyncing, domain.Null)
219-
}
202+
t.sendDetailsUpdates(existing, missing)
220203
}
221204

222-
func (t *treeSyncer) sendResultEvent(err error, nodePeer bool, peerId string, existing []string) {
223-
if nodePeer && !t.peerManager.IsPeerOffline(peerId) {
224-
if err != nil {
225-
t.sendDetailsUpdates(existing, domain.ObjectError, domain.NetworkError)
226-
} else {
227-
t.sendDetailsUpdates(existing, domain.ObjectSynced, domain.Null)
228-
}
229-
}
230-
}
231-
232-
func (t *treeSyncer) sendDetailsUpdates(existing []string, status domain.ObjectSyncStatus, syncError domain.SyncError) {
233-
t.syncDetailsUpdater.UpdateDetails(existing, status, syncError, t.spaceId)
205+
func (t *treeSyncer) sendDetailsUpdates(existing, missing []string) {
206+
t.syncDetailsUpdater.UpdateSpaceDetails(existing, missing, t.spaceId)
234207
}
235208

236209
func (t *treeSyncer) requestTree(peerId, id string) {
@@ -257,6 +230,7 @@ func (t *treeSyncer) updateTree(peerId, id string) {
257230
syncTree, ok := tr.(synctree.SyncTree)
258231
if !ok {
259232
log.Warn("not a sync tree")
233+
return
260234
}
261235
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
262236
log.Warn("synctree.SyncWithPeer error", zap.Error(err))

core/block/object/treesyncer/treesyncer_test.go

+19-46
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"go.uber.org/mock/gomock"
1717

1818
"github.com/anyproto/anytype-heart/core/block/object/treesyncer/mock_treesyncer"
19-
"github.com/anyproto/anytype-heart/core/domain"
2019
"github.com/anyproto/anytype-heart/tests/testutil"
2120
)
2221

@@ -26,7 +25,6 @@ type fixture struct {
2625
missingMock *mock_objecttree.MockObjectTree
2726
existingMock *mock_synctree.MockSyncTree
2827
treeManager *mock_treemanager.MockTreeManager
29-
checker *mock_treesyncer.MockPeerStatusChecker
3028
nodeConf *mock_nodeconf.MockService
3129
syncStatus *mock_treesyncer.MockSyncedTreeRemover
3230
syncDetailsUpdater *mock_treesyncer.MockSyncDetailsUpdater
@@ -37,16 +35,13 @@ func newFixture(t *testing.T, spaceId string) *fixture {
3735
treeManager := mock_treemanager.NewMockTreeManager(ctrl)
3836
missingMock := mock_objecttree.NewMockObjectTree(ctrl)
3937
existingMock := mock_synctree.NewMockSyncTree(ctrl)
40-
checker := mock_treesyncer.NewMockPeerStatusChecker(t)
41-
checker.EXPECT().Name().Return("checker").Maybe()
4238
nodeConf := mock_nodeconf.NewMockService(ctrl)
4339
nodeConf.EXPECT().Name().Return("nodeConf").AnyTimes()
4440
syncStatus := mock_treesyncer.NewMockSyncedTreeRemover(t)
4541
syncDetailsUpdater := mock_treesyncer.NewMockSyncDetailsUpdater(t)
4642

4743
a := new(app.App)
4844
a.Register(testutil.PrepareMock(context.Background(), a, treeManager)).
49-
Register(testutil.PrepareMock(context.Background(), a, checker)).
5045
Register(testutil.PrepareMock(context.Background(), a, syncStatus)).
5146
Register(testutil.PrepareMock(context.Background(), a, nodeConf)).
5247
Register(testutil.PrepareMock(context.Background(), a, syncDetailsUpdater))
@@ -59,7 +54,6 @@ func newFixture(t *testing.T, spaceId string) *fixture {
5954
missingMock: missingMock,
6055
existingMock: existingMock,
6156
treeManager: treeManager,
62-
checker: checker,
6357
nodeConf: nodeConf,
6458
syncStatus: syncStatus,
6559
syncDetailsUpdater: syncDetailsUpdater,
@@ -91,6 +85,25 @@ func TestTreeSyncer(t *testing.T) {
9185
fx.Close(ctx)
9286
})
9387

88+
t.Run("delayed sync notify sync status", func(t *testing.T) {
89+
ctx := context.Background()
90+
fx := newFixture(t, spaceId)
91+
fx.treeManager.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(fx.existingMock, nil)
92+
fx.existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
93+
fx.treeManager.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(fx.missingMock, nil)
94+
fx.nodeConf.EXPECT().NodeIds(spaceId).Return([]string{peerId})
95+
fx.syncDetailsUpdater.EXPECT().UpdateSpaceDetails([]string{existingId}, []string{missingId}, spaceId)
96+
fx.syncStatus.EXPECT().RemoveAllExcept(peerId, []string{existingId}).Return()
97+
err := fx.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId})
98+
require.NoError(t, err)
99+
require.NotNil(t, fx.requestPools[peerId])
100+
require.NotNil(t, fx.headPools[peerId])
101+
102+
fx.StartSync()
103+
time.Sleep(100 * time.Millisecond)
104+
fx.Close(ctx)
105+
})
106+
94107
t.Run("sync after run", func(t *testing.T) {
95108
ctx := context.Background()
96109
fx := newFixture(t, spaceId)
@@ -189,45 +202,5 @@ func TestTreeSyncer(t *testing.T) {
189202
require.Equal(t, []string{"before close", "after done"}, events)
190203
mutex.Unlock()
191204
})
192-
t.Run("send offline event", func(t *testing.T) {
193-
ctx := context.Background()
194-
fx := newFixture(t, spaceId)
195-
fx.treeManager.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(fx.existingMock, nil)
196-
fx.existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
197-
fx.treeManager.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(fx.missingMock, nil)
198-
fx.nodeConf.EXPECT().NodeIds(spaceId).Return([]string{peerId})
199-
fx.checker.EXPECT().IsPeerOffline(peerId).Return(true)
200-
fx.syncStatus.EXPECT().RemoveAllExcept(peerId, []string{existingId}).Return()
201-
fx.syncDetailsUpdater.EXPECT().UpdateDetails([]string{"existing"}, domain.ObjectError, domain.NetworkError, "spaceId").Return()
202-
203-
fx.StartSync()
204-
err := fx.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId})
205-
require.NoError(t, err)
206-
require.NotNil(t, fx.requestPools[peerId])
207-
require.NotNil(t, fx.headPools[peerId])
208-
209-
time.Sleep(100 * time.Millisecond)
210-
fx.Close(ctx)
211-
})
212-
t.Run("send syncing and synced event", func(t *testing.T) {
213-
ctx := context.Background()
214-
fx := newFixture(t, spaceId)
215-
fx.treeManager.EXPECT().GetTree(gomock.Any(), spaceId, existingId).Return(fx.existingMock, nil)
216-
fx.existingMock.EXPECT().SyncWithPeer(gomock.Any(), peerId).Return(nil)
217-
fx.treeManager.EXPECT().GetTree(gomock.Any(), spaceId, missingId).Return(fx.missingMock, nil)
218-
fx.nodeConf.EXPECT().NodeIds(spaceId).Return([]string{peerId})
219-
fx.checker.EXPECT().IsPeerOffline(peerId).Return(false)
220-
fx.syncStatus.EXPECT().RemoveAllExcept(peerId, []string{existingId}).Return()
221-
fx.syncDetailsUpdater.EXPECT().UpdateDetails([]string{"existing"}, domain.ObjectSynced, domain.Null, "spaceId").Return()
222-
fx.syncDetailsUpdater.EXPECT().UpdateDetails([]string{"existing"}, domain.ObjectSyncing, domain.Null, "spaceId").Return()
223-
224-
fx.StartSync()
225-
err := fx.SyncAll(context.Background(), peerId, []string{existingId}, []string{missingId})
226-
require.NoError(t, err)
227-
require.NotNil(t, fx.requestPools[peerId])
228-
require.NotNil(t, fx.headPools[peerId])
229205

230-
time.Sleep(100 * time.Millisecond)
231-
fx.Close(ctx)
232-
})
233206
}

core/domain/syncstatus.go

+13-37
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,29 @@
11
package domain
22

3-
type SyncType int32
4-
5-
const (
6-
Objects SyncType = 0
7-
Files SyncType = 1
8-
)
9-
103
type SpaceSyncStatus int32
114

125
const (
13-
Synced SpaceSyncStatus = 0
14-
Syncing SpaceSyncStatus = 1
15-
Error SpaceSyncStatus = 2
16-
Offline SpaceSyncStatus = 3
17-
Unknown SpaceSyncStatus = 4
6+
SpaceSyncStatusSynced SpaceSyncStatus = 0
7+
SpaceSyncStatusSyncing SpaceSyncStatus = 1
8+
SpaceSyncStatusError SpaceSyncStatus = 2
9+
SpaceSyncStatusOffline SpaceSyncStatus = 3
10+
SpaceSyncStatusUnknown SpaceSyncStatus = 4
1811
)
1912

2013
type ObjectSyncStatus int32
2114

2215
const (
23-
ObjectSynced ObjectSyncStatus = 0
24-
ObjectSyncing ObjectSyncStatus = 1
25-
ObjectError ObjectSyncStatus = 2
26-
ObjectQueued ObjectSyncStatus = 3
16+
ObjectSyncStatusSynced ObjectSyncStatus = 0
17+
ObjectSyncStatusSyncing ObjectSyncStatus = 1
18+
ObjectSyncStatusError ObjectSyncStatus = 2
19+
ObjectSyncStatusQueued ObjectSyncStatus = 3
2720
)
2821

2922
type SyncError int32
3023

3124
const (
32-
Null SyncError = 0
33-
StorageLimitExceed SyncError = 1
34-
IncompatibleVersion SyncError = 2
35-
NetworkError SyncError = 3
36-
Oversized SyncError = 4
25+
SyncErrorNull SyncError = 0
26+
SyncErrorIncompatibleVersion SyncError = 2
27+
SyncErrorNetworkError SyncError = 3
28+
SyncErrorOversized SyncError = 4
3729
)
38-
39-
type SpaceSync struct {
40-
SpaceId string
41-
Status SpaceSyncStatus
42-
SyncError SyncError
43-
SyncType SyncType
44-
}
45-
46-
func MakeSyncStatus(spaceId string, status SpaceSyncStatus, syncError SyncError, syncType SyncType) *SpaceSync {
47-
return &SpaceSync{
48-
SpaceId: spaceId,
49-
Status: status,
50-
SyncError: syncError,
51-
SyncType: syncType,
52-
}
53-
}

0 commit comments

Comments
 (0)