44 "encoding/json"
55 "fmt"
66 "sync"
7+
8+ "github.com/cespare/xxhash"
79)
810
911var SHARD_COUNT = 32
@@ -16,40 +18,60 @@ type Stringer interface {
1618// A "thread" safe map of type string:Anything.
1719// To avoid lock bottlenecks this map is dived to several (SHARD_COUNT) map shards.
1820type ConcurrentMap [K comparable , V any ] struct {
19- smap * sync.Map
21+ shards []* sync.Map
22+ sharding func (key K ) uint32
2023}
2124
22- func create [K comparable , V any ]() ConcurrentMap [K , V ] {
25+ func create [K comparable , V any ](fn32 func ( key K ) uint32 ) ConcurrentMap [K , V ] {
2326 m := ConcurrentMap [K , V ]{
24- smap : & sync.Map {},
27+ shards : make ([]* sync.Map , SHARD_COUNT ),
28+ sharding : fn32 ,
29+ }
30+
31+ for i := 0 ; i < SHARD_COUNT ; i ++ {
32+ m .shards [i ] = & sync.Map {}
2533 }
2634 return m
2735}
2836
2937// Creates a new concurrent map.
3038func New [V any ]() ConcurrentMap [string , V ] {
31- return create [string , V ]()
39+ return create [string , V ](GetShardIndex )
3240}
3341
3442// Creates a new concurrent map.
3543func NewStringer [K Stringer , V any ]() ConcurrentMap [K , V ] {
36- return create [K , V ]()
44+
45+ return create [K , V ](strfnv32 [K ])
46+ }
47+
48+ func strfnv32 [K fmt.Stringer ](key K ) uint32 {
49+ return GetShardIndex (key .String ())
3750}
3851
3952// Creates a new concurrent map.
40- func NewWithCustomShardingFunction [K comparable , V any ]() ConcurrentMap [K , V ] {
41- return create [K , V ]()
53+ func NewWithCustomShardingFunction [K comparable , V any ](fn32 func (key K ) uint32 ) ConcurrentMap [K , V ] {
54+ return create [K , V ](fn32 )
55+ }
56+
57+ func GetShardIndex (key string ) uint32 {
58+ return uint32 (xxhash .Sum64 ([]byte (key )))
59+ }
60+ func (m ConcurrentMap [K , V ]) getShard (key K ) * sync.Map {
61+ index := m .sharding (key )
62+ return m .shards [index % uint32 (SHARD_COUNT )]
4263}
4364
4465func (m ConcurrentMap [K , V ]) MSet (data map [K ]V ) {
4566 for key , value := range data {
46- m .smap .Store (key , value )
67+ m .getShard ( key ) .Store (key , value )
4768 }
4869}
4970
5071// Sets the given value under the specified key.
5172func (m ConcurrentMap [K , V ]) Set (key K , value V ) {
52- m .smap .Store (key , value )
73+ shard := m .getShard (key )
74+ shard .Store (key , value )
5375}
5476
5577// Callback to return new element to be inserted into the map
@@ -61,22 +83,23 @@ type UpsertCb[V any] func(exist bool, valueInMap V, newValue V) V
6183// Insert or Update - updates existing element or inserts a new one using UpsertCb
6284func (m ConcurrentMap [K , V ]) Upsert (key K , value V , cb UpsertCb [V ]) (res V ) {
6385 var val V
64- v , ok := m .smap .Load (key )
86+ shard := m .getShard (key )
87+ v , ok := shard .Load (key )
6588 if valueNew , ok1 := v .(V ); ok1 {
6689 val = valueNew
6790 }
6891 res = cb (ok , val , value )
69- m . smap .Store (key , res )
92+ shard .Store (key , res )
7093 return res
7194}
7295
7396// Sets the given value under the specified key if no value was associated with it.
7497func (m ConcurrentMap [K , V ]) SetIfAbsent (key K , value V ) bool {
7598 // Get map shard.
76-
77- _ , ok := m . smap .Load (key )
99+ shard := m . getShard ( key )
100+ _ , ok := shard .Load (key )
78101 if ! ok {
79- m . smap .Store (key , value )
102+ shard .Store (key , value )
80103 }
81104 return ! ok
82105}
@@ -85,7 +108,7 @@ func (m ConcurrentMap[K, V]) SetIfAbsent(key K, value V) bool {
85108func (m ConcurrentMap [K , V ]) Get (key K ) (V , bool ) {
86109 // Get shard
87110 var value V
88- val , ok := m .smap .Load (key )
111+ val , ok := m .getShard ( key ) .Load (key )
89112 if valueNew , ok1 := val .(V ); ok1 {
90113 return valueNew , ok
91114 }
@@ -95,22 +118,24 @@ func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
95118// Count returns the number of elements within the map.
96119func (m ConcurrentMap [K , V ]) Count () int {
97120 count := 0
98- m .smap .Range (func (key , value any ) bool {
99- count ++
100- return true
101- })
121+ for i := range m .shards {
122+ m .shards [i ].Range (func (key , value any ) bool {
123+ count ++
124+ return true
125+ })
126+ }
102127 return count
103128}
104129
105130// Looks up an item under specified key
106131func (m ConcurrentMap [K , V ]) Has (key K ) bool {
107- _ , ok := m .smap .Load (key )
132+ _ , ok := m .getShard ( key ) .Load (key )
108133 return ok
109134}
110135
111136// Remove removes an element from the map.
112137func (m ConcurrentMap [K , V ]) Remove (key K ) {
113- m .smap .Delete (key )
138+ m .getShard ( key ) .Delete (key )
114139}
115140
116141// RemoveCb is a callback executed in a map.RemoveCb() call, while Lock is held
@@ -123,23 +148,25 @@ type RemoveCb[K any, V any] func(key K, v V, exists bool) bool
123148func (m ConcurrentMap [K , V ]) RemoveCb (key K , cb RemoveCb [K , V ]) bool {
124149 // Try to get shard.
125150 var value V
126- v , ok := m .smap .Load (key )
151+ shard := m .getShard (key )
152+ v , ok := shard .Load (key )
127153 if valueNew , ok1 := v .(V ); ok1 {
128154 value = valueNew
129155 }
130156 remove := cb (key , value , ok )
131157 if remove && ok {
132- m . smap .Delete (key )
158+ shard .Delete (key )
133159 }
134160 return remove
135161}
136162
137163// Pop removes an element from the map and returns it
138164func (m ConcurrentMap [K , V ]) Pop (key K ) (v V , exists bool ) {
139165 var value V
140- val , exists := m .smap .Load (key )
166+ shard := m .getShard (key )
167+ val , exists := shard .Load (key )
141168 if valueNew , ok1 := val .(V ); ok1 {
142- m . smap .Delete (key )
169+ shard .Delete (key )
143170 return valueNew , exists
144171 }
145172 return value , exists
@@ -160,18 +187,16 @@ type Tuple[K comparable, V any] struct {
160187//
161188// Deprecated: using IterBuffered() will get a better performence
162189func (m ConcurrentMap [K , V ]) Iter () <- chan Tuple [K , V ] {
163- ch := make (chan Tuple [K , V ])
164- count := 0
165- mcount := m .Count ()
190+
191+ ch := make (chan Tuple [K , V ], m .Count ())
166192 go func () {
167- m .smap .Range (func (key , value any ) bool {
168- ch <- Tuple [K , V ]{key .(K ), value .(V )}
169- count ++
170- if count == mcount {
171- close (ch )
172- }
173- return true
174- })
193+ for i := range m .shards {
194+ m .shards [i ].Range (func (key , value any ) bool {
195+ ch <- Tuple [K , V ]{key .(K ), value .(V )}
196+ return true
197+ })
198+ }
199+ close (ch )
175200 }()
176201 return ch
177202}
@@ -204,29 +229,29 @@ type IterCb[K comparable, V any] func(key K, v V)
204229// Callback based iterator, cheapest way to read
205230// all elements in a map.
206231func (m ConcurrentMap [K , V ]) IterCb (fn IterCb [K , V ]) {
207- m .smap .Range (func (key , value any ) bool {
208- fn (key .(K ), value .(V ))
209- return true
210- })
211-
232+ for i := range m .shards {
233+ m .shards [i ].Range (func (key , value any ) bool {
234+ fn (key .(K ), value .(V ))
235+ return true
236+ })
237+ }
212238}
213239
214240// Keys returns all keys as []string
215241func (m ConcurrentMap [K , V ]) Keys () []K {
216242 count := m .Count ()
217- i := 0
218243 ch := make (chan K , count )
219244 // Foreach shard.
220245 go func () {
221- m . smap . Range ( func ( key , value any ) bool {
222- ch <- key .( K )
223- i ++
224- if i == count {
225- close ( ch )
226- }
227- return true
228- })
229-
246+ for i := range m . shards {
247+ m . shards [ i ]. Range ( func ( key , value any ) bool {
248+ ch <- key .( K )
249+ i ++
250+
251+ return true
252+ })
253+ }
254+ close ( ch )
230255 }()
231256
232257 // Generate keys
0 commit comments