Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func persistSegmentBaseToWriter(sb *SegmentBase, w io.Writer) (int, error) {
}

err = persistFooter(sb.numDocs, sb.storedIndexOffset, sb.fieldsIndexOffset, sb.sectionsIndexOffset,
sb.docValueOffset, sb.chunkMode, sb.memCRC, br)
sb.docValueOffset, sb.chunkMode, sb.memCRC, br, sb.writerId)
if err != nil {
return 0, err
}
Expand Down Expand Up @@ -159,7 +159,7 @@ func persistStoredFieldValues(fieldID int,
}

func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64,
storedIndexOffset uint64, sectionsIndexOffset uint64) (*SegmentBase, error) {
storedIndexOffset uint64, sectionsIndexOffset uint64, writerId string) (*SegmentBase, error) {
sb := &SegmentBase{
mem: mem,
memCRC: memCRC,
Expand All @@ -178,12 +178,19 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
fieldsMap: make(map[string]uint16),
dictLocs: make([]uint64, 0),
fieldsInv: make([]string, 0),
writerId: writerId,
}
sb.updateSize()

fileReader, err := NewFileReader(sb.writerId)
if err != nil {
return nil, err
}
sb.fileReader = fileReader

// load the data/section starting offsets for each field
// by via the sectionsIndexOffset as starting point.
err := sb.loadFieldsNew()
err = sb.loadFieldsNew()
if err != nil {
return nil, err
}
Expand Down
143 changes: 143 additions & 0 deletions callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) 2025 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package zap

// This file provides a mechanism for users of zap to provide callbacks
// that can process data before it is written to disk, and after it is read
// from disk. This can be used for things like encryption, compression, etc.

// The user is responsible for ensuring that the writer and reader callbacks
// are compatible with each other, and that any state needed by the callbacks
// is managed appropriately. For example, if the writer callback uses a
// unique key or nonce per write, the reader callback must be able to
// determine the correct key or nonce to use for each read.

// The callbacks are identified by an id string, which is returned by the
// WriterCallbackGetter. The same id string is passed to the ReaderCallbackGetter
// when creating a reader. This allows the reader to determine which
// callback to use for a given file.

// Additionaly, if the writer callback needs a unique counter or nonce
// per write, the CounterGetter can be used to provide that. The counter
// is passed to the writer callback along with the data to be written.
// The counter is not passed to the reader callback, as it is assumed that
// the reader callback can determine the correct counter to use based
// on the data being read.

// An example implementation using AES-GCM is provided in callbacks_test.go
// within initFileCallbacks.

// Default no-op implementation. Is called before writing any user data to a file.
var WriterCallbackGetter = func() (string, func(data []byte, _ []byte) ([]byte, error), error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what i understand, this method is a hook as per go idiomatics.
Rename to WriterHook.
A hook is a method that allows custom scripts to be injected in between regular code flow. Also I feel like having dummy / no-op hooks defeats the purpose of a hook, better to avoid altogether and retain as type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check this impl out. A drop in replacement for the count.go file.
It handles the issues

  • No need of new fileWriter wrapper type (augment the existing chw)
  • A single WriterHook to obtain both the tokenGenerator and the dataProcessor.
  • Overflow handling: if the counter resets itself after overflow, should we not generate a new counter? i am assuming that this was a bug and added it here. Feel free to comment on this (edge?) case.
  • Reader is a bit wonky, with the callback being bit hard to read. I have added a better method do check it out. You basically encapsulate the mem in a separate MemoryReader object that optionally uses a decoder if required. This way we do not have to worry about having to decode the mem slice later on if we have to add more code.

package zap

import (
	"hash/crc32"
	"io"

	segment "github.com/blevesearch/scorch_segment_api/v2"
)

// WriteToken represents a per-write unique value that provides
// context or variability for each write operation. It can be used
// for purposes such as maintaining a counter, nonce, or initialization
// vector (IV) to ensure each write is distinct.
type WriteToken []byte

// Increment increases the WriteToken value in place, treating it as a
// big-endian counter. When the token overflows, it wraps around to zero.
// Returns true if overflow occurred.
func (t WriteToken) Increment() bool {
	for i := len(t) - 1; i >= 0; i-- {
		if t[i] < 255 {
			t[i]++
			return false
		}
		t[i] = 0
	}
	return true // Overflow occurred
}

// WriterHookResult contains the ID, callback, and token generator for a writer
type WriterHookResult struct {
	ID        []byte
	Processor DataProcessor
	Generator TokenGenerator
}

// ReaderHookResult contains the decoder for a reader
type ReaderHookResult struct {
	Decoder DataDecoder
}

type (
	// TokenGenerator is any function that can be used to generate a new WriteToken.
	TokenGenerator func() (WriteToken, error)
	// DataProcessor is any function that can process data with the provided token before writing.
	DataProcessor func(data []byte, token WriteToken) ([]byte, error)
	// DataDecoder is any function that can read process raw data before reading.
	DataDecoder func(data []byte) ([]byte, error)
	// WriterHook is invoked to configure writer behavior. It provides the token
	// generator (which can be called on overflow), callback function, and ID.
	// If the hook returns nil, the writer operates in normal mode without callback processing.
	WriterHook func() (*WriterHookResult, error)
	// ReaderHook is invoked to configure reader behavior.
	// If the hook returns nil, the reader operates in normal mode without callback processing.
	ReaderHook func() (*ReaderHookResult, error)
)

var (
	// DefaultWriterHook can be set by the user to enable callback processing globally
	DefaultWriterHook WriterHook
	// DefaultReaderHook can be set by the user to enable callback processing globally for readers
	DefaultReaderHook ReaderHook
)

// CountHashWriter is a wrapper around a Writer which counts the number of
// bytes which have been written and computes a crc32 hash. It optionally
// supports a WriterCallback that processes data before writing.
type CountHashWriter struct {
	w         io.Writer
	crc       uint32
	n         int
	s         segment.StatsReporter
	id        []byte
	processor DataProcessor
	token     WriteToken
	generator TokenGenerator
}

// NewCountHashWriter returns a CountHashWriter which wraps the provided Writer.
// If DefaultWriterHook is set, it uses it to configure callback processing.
// Otherwise, it operates as a regular writer.
func NewCountHashWriter(w io.Writer) (*CountHashWriter, error) {
	return newCountHashWriter(w, nil, DefaultWriterHook)
}

// NewCountHashWriterWithStatsReporter returns a CountHashWriter with stats reporting.
// If DefaultWriterHook is set, it uses it to configure callback processing.
func NewCountHashWriterWithStatsReporter(w io.Writer, s segment.StatsReporter) (*CountHashWriter, error) {
	return newCountHashWriter(w, s, DefaultWriterHook)
}

// NewCountHashWriterWithHook returns a CountHashWriter that uses the provided hook
// for callback processing.
func NewCountHashWriterWithHook(w io.Writer, hook WriterHook) (*CountHashWriter, error) {
	return newCountHashWriter(w, nil, hook)
}

// NewCountHashWriterWithHookAndStats returns a CountHashWriter with custom hook and stats reporting
func NewCountHashWriterWithHookAndStats(w io.Writer, s segment.StatsReporter, hook WriterHook) (*CountHashWriter, error) {
	return newCountHashWriter(w, s, hook)
}

// newCountHashWriter is the internal constructor that handles hook processing
func newCountHashWriter(w io.Writer, s segment.StatsReporter, hook WriterHook) (*CountHashWriter, error) {
	chw := &CountHashWriter{
		w: w,
		s: s,
	}

	// If hook is provided, invoke it to get configuration
	if hook != nil {
		result, err := hook()
		if err != nil {
			return nil, err
		}

		// Hook can return nil to indicate no callback processing desired
		if result != nil {
			chw.id = result.ID
			chw.processor = result.Processor
			chw.generator = result.Generator

			// Generate initial token
			if chw.generator != nil {
				token, err := chw.generator()
				if err != nil {
					return nil, err
				}
				chw.token = token
			}
		}
	}

	return chw, nil
}

// Write writes the provided bytes to the wrapped writer and counts the bytes.
// If a callback is configured, it processes the data and increments the token first.
// On token overflow, it generates a new token using the generator.
func (c *CountHashWriter) Write(b []byte) (int, error) {
	data := b
	var err error

	// Process data through callback if configured
	if c.processor != nil {
		data, err = c.processor(b, c.token)
		if err != nil {
			return 0, err
		}

		// Increment token and check for overflow
		if c.token.Increment() {
			// Overflow occurred, generate new token
			if c.generator != nil {
				newToken, err := c.generator()
				if err != nil {
					return 0, err
				}
				c.token = newToken
			}
		}
	}

	// Write the (possibly processed) data
	n, err := c.w.Write(data)
	c.crc = crc32.Update(c.crc, crc32.IEEETable, data[:n])
	c.n += n

	if c.s != nil {
		c.s.ReportBytesWritten(uint64(n))
	}

	return len(b), err // Return original length for io.Writer contract
}

// Count returns the number of bytes written
func (c *CountHashWriter) Count() int {
	return c.n
}

// Sum32 returns the CRC-32 hash of the content written to this writer
func (c *CountHashWriter) Sum32() uint32 {
	return c.crc
}

// ID returns the writer ID if one was set by the hook
func (c *CountHashWriter) ID() []byte {
	return c.id
}

// Token returns the current token value
func (c *CountHashWriter) Token() WriteToken {
	return c.token
}

// --------------------------------------------------------------------
// MemoryReader wraps a memory-mapped byte slice and applies an optional
// decoder callback to all data reads. If no decoder is configured, it acts
// as a transparent passthrough.
type MemoryReader struct {
	mem     []byte      // memory-mapped byte slice
	decoder DataDecoder // optional decoder for read operations
}

// NewMemoryReader creates a new MemoryReader with the default reader hook.
// If DefaultReaderHook is nil, no decoder is applied.
func NewMemoryReader(mem []byte) (*MemoryReader, error) {
	return newMemoryReader(mem, DefaultReaderHook)
}

// NewMemoryReaderWithHook creates a new MemoryReader with a custom reader hook.
func NewMemoryReaderWithHook(mem []byte, hook ReaderHook) (*MemoryReader, error) {
	return newMemoryReader(mem, hook)
}

// newMemoryReader is the internal constructor that handles hook processing.
func newMemoryReader(mem []byte, hook ReaderHook) (*MemoryReader, error) {
	rv := &MemoryReader{
		mem: mem,
	}

	// Invoke hook to get decoder configuration if provided
	if hook != nil {
		result, err := hook()
		if err != nil {
			return nil, err
		}

		if result != nil {
			rv.decoder = result.Decoder
		}
	}

	return rv, nil
}

// Read returns a slice of memory from start to end, applying the decoder if configured.
// Returns io.EOF if the range is invalid or out of bounds.
func (r *MemoryReader) Read(start, end uint64) ([]byte, error) {
	if start >= uint64(len(r.mem)) || end > uint64(len(r.mem)) || start > end {
		return nil, io.EOF
	}

	data := r.mem[start:end]

	// Apply decoder if configured
	if r.decoder != nil {
		var err error
		data, err = r.decoder(data)
		if err != nil {
			return nil, err
		}
	}

	return data, nil
}

return "", func(data []byte, _ []byte) ([]byte, error) {
return data, nil
}, nil
}

// Default no-op implementation. Is called after reading any user data from a file.
var ReaderCallbackGetter = func(string) (func(data []byte) ([]byte, error), error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReaderHook Again, better to keep it as a type and avoid the noop impl interely with a nil variable instead.

return func(data []byte) ([]byte, error) {
return data, nil
}, nil
}

// Default no-op implementation. Is called once per write call if a unique counter is
// needed by the writer callback.
var CounterGetter = func() ([]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra hook feels unnecessary. What is the purpose of having a separate hook for something that is tied to the WriterHook? Better to consolidate into the WriterHook's signature itself. Ref above comment

return nil, nil
}

// fileWriter wraps a CountHashWriter and applies a user provided
// writer callback to the data being written.
type fileWriter struct {
writerCB func(data []byte, counter []byte) ([]byte, error)
counter []byte
id string
c *CountHashWriter
}
Comment on lines +62 to +69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels unnecessary alongside the numerous file being touched to accomodate this. consider augmenting the existing CountHashWriter. Ref Above comment


func NewFileWriter(c *CountHashWriter) (*fileWriter, error) {
var err error
rv := &fileWriter{c: c}
rv.id, rv.writerCB, err = WriterCallbackGetter()
if err != nil {
return nil, err
}
rv.counter, err = CounterGetter()
if err != nil {
return nil, err
}

return rv, nil
}

func (w *fileWriter) Write(data []byte) (int, error) {
return w.c.Write(data)
}

// process applies the writer callback to the data, if one is set
// and increments the counter if one is set.
func (w *fileWriter) process(data []byte) ([]byte, error) {
if w.writerCB != nil {
w.incrementCounter()
return w.writerCB(data, w.counter)
}
return data, nil
}

func (w *fileWriter) incrementCounter() {
if w.counter != nil {
for i := len(w.counter) - 1; i >= 0; i-- {
if w.counter[i] < 255 {
w.counter[i]++
return
}
w.counter[i] = 0
}
}
}

func (w *fileWriter) Count() int {
return w.c.Count()
}

func (w *fileWriter) Sum32() uint32 {
return w.c.Sum32()
}

// fileReader wraps a reader callback to be applied to data read from a file.
type fileReader struct {
callback func(data []byte) ([]byte, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not extensible, if tomorrow we need to read a new mem slice, we need to

  1. Get the subslice we want
  2. Call the process api
  3. Then read the actual subslice.

we may forget to perform step 2, so need to strictly enforce it. Check the code given in ref comment.

id string
}

func NewFileReader(id string) (*fileReader, error) {
var err error
rv := &fileReader{id: id}
rv.callback, err = ReaderCallbackGetter(id)
if err != nil {
return nil, err
}

return rv, nil
}

// process applies the reader callback to the data, if one is set
func (r *fileReader) process(data []byte) ([]byte, error) {
if r.callback != nil {
return r.callback(data)
}
return data, nil
}
131 changes: 131 additions & 0 deletions callbacks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
// Copyright (c) 2025 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

//go:build vectors
// +build vectors

package zap

import (
"crypto/aes"
"crypto/cipher"
"crypto/rand"
"fmt"
"testing"
)

func initFileCallbacks(t *testing.T) {
key := make([]byte, 32)
keyId := "test-key-id"

if _, err := rand.Read(key); err != nil {
t.Fatalf("Failed to generate random key: %v", err)
}

block, err := aes.NewCipher(key)
if err != nil {
t.Fatalf("Failed to create AES cipher: %v", err)
}

aesgcm, err := cipher.NewGCM(block)
if err != nil {
t.Fatalf("Failed to create AES GCM: %v", err)
}

CounterGetter = func() ([]byte, error) {
counter := make([]byte, 12)
if _, err := rand.Read(counter); err != nil {
return nil, err
}
return counter, nil
}

writerCallback := func(data, counter []byte) ([]byte, error) {
ciphertext := aesgcm.Seal(nil, counter, data, nil)
result := append(ciphertext, counter...)
return result, nil
}

readerCallback := func(data []byte) ([]byte, error) {
if len(data) < 12 {
return nil, fmt.Errorf("ciphertext too short")
}

counter := data[len(data)-12:]
ciphertext := data[:len(data)-12]
plaintext, err := aesgcm.Open(nil, counter, ciphertext, nil)
if err != nil {
return nil, err
}
return plaintext, nil
}

WriterCallbackGetter = func() (string, func(data []byte, counter []byte) ([]byte, error), error) {
return keyId, writerCallback, nil
}

ReaderCallbackGetter = func(id string) (func(data []byte) ([]byte, error), error) {
if id != keyId {
return nil, fmt.Errorf("unknown callback ID: %s", id)
}
return readerCallback, nil
}
}

func TestFileCallbacks(t *testing.T) {
initFileCallbacks(t)

TestOpen(t)
TestOpenMulti(t)
TestOpenMultiWithTwoChunks(t)
TestSegmentVisitableDocValueFieldsList(t)
TestSegmentDocsWithNonOverlappingFields(t)
TestMergedSegmentDocsWithNonOverlappingFields(t)

TestChunkedContentCoder(t)
TestChunkedContentCoders(t)

TestDictionary(t)
TestDictionaryError(t)
TestDictionaryBug1156(t)

TestEnumerator(t)

TestVecPostingsIterator(t)
TestVectorSegment(t)
TestHashCode(t)
TestPersistedVectorSegment(t)
TestValidVectorMerge(t)

TestChunkIntCoder(t)
TestChunkLengthToOffsets(t)
TestChunkReadBoundaryFromOffsets(t)

TestMerge(t)
TestMergeWithEmptySegment(t)
TestMergeWithEmptySegments(t)
TestMergeWithEmptySegmentFirst(t)
TestMergeWithEmptySegmentsFirst(t)
TestMergeAndDrop(t)
TestMergeAndDropAllFromOneSegment(t)
TestMergeWithUpdates(t)
TestMergeWithUpdatesOnManySegments(t)
TestMergeWithUpdatesOnOneDoc(t)
TestMergeBytesWritten(t)
TestUnder32Bits(t)

TestSynonymSegment(t)

TestRoaringSizes(t)
}
8 changes: 8 additions & 0 deletions contentcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ func (c *chunkedContentCoder) flushContents() error {
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
if fw, ok := c.w.(*fileWriter); ok && fw != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are u encrypting individual compressed data? can u not encrypt it all at once? in the finalized Write method?

// process the compressed data using the callback
var err error
c.compressed, err = fw.process(c.compressed)
if err != nil {
return err
}
}
c.incrementBytesWritten(uint64(len(c.compressed)))
c.final = append(c.final, c.compressed...)

Expand Down
6 changes: 5 additions & 1 deletion docvalues.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,14 @@ func (di *docValueReader) loadDvChunk(chunkNumber uint64, s *SegmentBase) error
offset += uint64(read)
}

var err error
compressedDataLoc := chunkMetaLoc + offset
dataLength := curChunkEnd - compressedDataLoc
di.incrementBytesRead(uint64(dataLength + offset))
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkData, err = s.fileReader.process(s.mem[compressedDataLoc : compressedDataLoc+dataLength])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the MemoryReader struct
mem will not be a []byte but a *MemoryReader

di.curChunkData, err = s.mem.Read(compressedDataLoc, compressedDataLoc+dataLength)
if err != nil {
	return err
}

if err != nil {
return err
}
di.curChunkNum = chunkNumber
di.uncompressed = di.uncompressed[:0]
return nil
Expand Down
Loading