Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
sgreben committed Aug 29, 2024
0 parents commit 20dd1b8
Show file tree
Hide file tree
Showing 8 changed files with 675 additions and 0 deletions.
36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# sliding-topk

Sliding HeavyKeeper, as described in ["A Sketch Framework for Approximate Data Stream Processing in Sliding Windows"](https://yangtonghome.github.io/uploads/SlidingSketch_TKDE2022_final.pdf)

```go
import (
topk "github.com/keilerkonzept/sliding-topk"
)

func main() {
// make a new sketch keeping track of k=3 items over a window of the last 60 ticks
// use width=1024 x depth=3 = 3072 buckets
sketch := topk.New(3, 60, topk.WithWidth(1024),topk.WithDepth(3))

log.Println("the sketch takes", sketch.SizeBytes(), "bytes in memory")

sketch.Incr("an item") // count "an item" 1 time
sketch.Add("an item", 123) // count "an item" 123 times
sketch.Tick(1) // advance time by one tick
sketch.Add("another item", 4) // count "another item" 4 times
sketch.Tick(2) // advance time by two ticks
sketch.Add("an item", 5) // count "an item" 5 more times
sketch.Add("yet another item", 6) // count "yet another item" 6 times

if sketch.Query("an item") {
// "an item" is in the top K items observed within the last 60 ticks
}

_ = sketch.Count("another item") // return the estimated count for "another item"

for entry := range sketch.TopK() {// TopK() rseturn all top K items as a slice of {Item,Count} structs
log.Println(entry.Item, "counted", entry.Count, "times")
}

sketch.Reset() // reset to New() state
}
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
module github.com/keilerkonzept/sliding-topk

go 1.23.0

require github.com/google/go-cmp v0.6.0

require github.com/OneOfOne/xxhash v1.2.8
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
github.com/OneOfOne/xxhash v1.2.8 h1:31czK/TI9sNkxIKfaUfGlU47BAxQ0ztGgd9vPyqimf8=
github.com/OneOfOne/xxhash v1.2.8/go.mod h1:eZbhyaAYD41SGSSsnmcpxVoRiQ/MPUTjUdIIOT9Um7Q=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
16 changes: 16 additions & 0 deletions internal/unsafeutil/unsafeutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package unsafeutil

import "unsafe"

// Bytes converts a string into a byte slice.
func Bytes(s string) (b []byte) {
return unsafe.Slice(unsafe.StringData(s), len(s))
}

// String converts a byte slice into a string.
func String(b []byte) (s string) {
if len(b) == 0 {
return ""
}
return unsafe.String(&b[0], len(b))
}
135 changes: 135 additions & 0 deletions min_heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package topk

import (
"container/heap"
"unsafe"
)

type HeapItem struct {
Fingerprint uint32
Item string
Count uint32
}

type MinHeap struct {
Items []HeapItem
Index map[string]int
StoredKeysBytes int
}

func NewMinHeap(k int) *MinHeap {
return &MinHeap{
Items: make([]HeapItem, k),
Index: make(map[string]int, k),
}
}

var _ heap.Interface = &MinHeap{}

const (
sizeOfBucketMinHeapStruct = int(unsafe.Sizeof(MinHeap{}))
sizeOfHeapBucket = int(unsafe.Sizeof(HeapItem{}))
sizeOfIndex = int(unsafe.Sizeof(map[string]int{}))
)

func (h MinHeap) SizeBytes() int {
structSize := sizeOfBucketMinHeapStruct
bucketsSize := len(h.Items)*sizeOfHeapBucket + h.StoredKeysBytes
indexSize := sizeOfIndex + (sizeofInt+sizeofString)*len(h.Index)
return structSize + bucketsSize + indexSize
}

func (h MinHeap) Full() bool { return len(h.Items) == cap(h.Items) }
func (h MinHeap) Len() int { return len(h.Items) }
func (h MinHeap) Less(i, j int) bool {
ic := h.Items[i].Count
jc := h.Items[j].Count
if ic == jc {
return h.Items[i].Item < h.Items[j].Item
}
return ic < jc
}
func (h MinHeap) Swap(i, j int) {
itemi := h.Items[i].Item
itemj := h.Items[j].Item
h.Items[i], h.Items[j] = h.Items[j], h.Items[i]
h.Index[itemi] = j
h.Index[itemj] = i
}

func (h *MinHeap) Push(x interface{}) {
b := x.(HeapItem)
h.Items = append(h.Items, b)
h.Index[b.Item] = len(h.Items) - 1
}

func (h *MinHeap) Pop() interface{} {
old := h.Items
n := len(old)
x := old[n-1]
h.Items = old[0 : n-1]
delete(h.Index, x.Item)
return x
}

// Min returns the minimum count in the heap or 0 if the heap is empty.
func (h MinHeap) Min() uint32 {
if len(h.Items) == 0 {
return 0
}
return h.Items[0].Count
}

func (h MinHeap) Find(item string) (i int) {
if i, ok := h.Index[item]; ok {
return i
}
return -1
}

func (h MinHeap) Contains(item string) bool {
_, ok := h.Index[item]
return ok
}

func (h MinHeap) Get(item string) *HeapItem {
if i, ok := h.Index[item]; ok {
return &h.Items[i]
}
return nil
}

func (h *MinHeap) Update(item string, fingerprint uint32, count uint32) {
if count < h.Min() && h.Full() { // not in top k: ignore
return
}

if i := h.Find(item); i >= 0 { // already in heap: update count
h.Items[i].Count = count
heap.Fix(h, i)
return
}

h.StoredKeysBytes += len(item)

if !h.Full() { // heap not full: add to heap
h.Push(HeapItem{
Count: count,
Fingerprint: fingerprint,
Item: item,
})
return
}

// replace min on heap
minItem := h.Items[0].Item
h.StoredKeysBytes -= len(minItem)
delete(h.Index, minItem)
h.Items[0] = HeapItem{
Count: count,
Fingerprint: fingerprint,
Item: item,
}
h.Index[item] = 0
heap.Fix(h, 0)
}
10 changes: 10 additions & 0 deletions sizeof.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package topk

import "unsafe"

const (
sizeofString = int(unsafe.Sizeof(""))
sizeofInt = int(unsafe.Sizeof(int(0)))
sizeofUInt32 = int(unsafe.Sizeof(uint32(0)))
sizeofFloat32 = int(unsafe.Sizeof(float32(0)))
)
Loading

0 comments on commit 20dd1b8

Please sign in to comment.