@@ -19,6 +19,7 @@ import (
19
19
"os"
20
20
"path/filepath"
21
21
"strings"
22
+ "sync"
22
23
"time"
23
24
24
25
"github.com/anyproto/any-sync/app"
@@ -49,12 +50,15 @@ type ftSearchTantivy struct {
49
50
index * tantivy.TantivyContext
50
51
schema * tantivy.Schema
51
52
parserPool * fastjson.ParserPool
53
+ mu sync.Mutex
52
54
}
53
55
54
56
func (f * ftSearchTantivy ) BatchDeleteObjects (ids []string ) error {
55
57
if len (ids ) == 0 {
56
58
return nil
57
59
}
60
+ f .mu .Lock ()
61
+ defer f .mu .Unlock ()
58
62
start := time .Now ()
59
63
defer func () {
60
64
spentMs := time .Since (start ).Milliseconds ()
@@ -74,6 +78,8 @@ func (f *ftSearchTantivy) BatchDeleteObjects(ids []string) error {
74
78
}
75
79
76
80
func (f * ftSearchTantivy ) DeleteObject (objectId string ) error {
81
+ f .mu .Lock ()
82
+ defer f .mu .Unlock ()
77
83
return f .index .DeleteDocuments (fieldIdRaw , objectId )
78
84
}
79
85
@@ -206,6 +212,8 @@ func (f *ftSearchTantivy) tryToBuildSchema(schema *tantivy.Schema) (*tantivy.Tan
206
212
}
207
213
208
214
func (f * ftSearchTantivy ) Index (doc SearchDoc ) error {
215
+ f .mu .Lock ()
216
+ defer f .mu .Unlock ()
209
217
metrics .ObjectFTUpdatedCounter .Inc ()
210
218
tantivyDoc , err := f .convertDoc (doc )
211
219
if err != nil {
@@ -256,7 +264,9 @@ func (f *ftSearchTantivy) BatchIndex(ctx context.Context, docs []SearchDoc, dele
256
264
l .Debugf ("ft index done" )
257
265
}
258
266
}()
267
+ f .mu .Lock ()
259
268
err = f .index .DeleteDocuments (fieldIdRaw , deletedDocs ... )
269
+ f .mu .Unlock ()
260
270
if err != nil {
261
271
return err
262
272
}
@@ -268,6 +278,8 @@ func (f *ftSearchTantivy) BatchIndex(ctx context.Context, docs []SearchDoc, dele
268
278
}
269
279
tantivyDocs = append (tantivyDocs , tantivyDoc )
270
280
}
281
+ f .mu .Lock ()
282
+ defer f .mu .Unlock ()
271
283
return f .index .AddAndConsumeDocuments (tantivyDocs ... )
272
284
}
273
285
0 commit comments