Skip to content

Conversation

@Likith101
Copy link
Member

No description provided.

@Likith101 Likith101 changed the title MB-65860: Added new file writers and readers MB-65860: Added new file writers and readers (WIP) Jul 9, 2025
@Likith101 Likith101 changed the title MB-65860: Added new file writers and readers (WIP) MB-65860: Added new file writers and readers Aug 13, 2025
// within initFileCallbacks.

// Default no-op implementation. Is called before writing any user data to a file.
var WriterCallbackGetter = func() (string, func(data []byte, _ []byte) ([]byte, error), error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what i understand, this method is a hook as per go idiomatics.
Rename to WriterHook.
A hook is a method that allows custom scripts to be injected in between regular code flow. Also I feel like having dummy / no-op hooks defeats the purpose of a hook, better to avoid altogether and retain as type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check this impl out. A drop in replacement for the count.go file.
It handles the issues

  • No need of new fileWriter wrapper type (augment the existing chw)
  • A single WriterHook to obtain both the tokenGenerator and the dataProcessor.
  • Overflow handling: if the counter resets itself after overflow, should we not generate a new counter? i am assuming that this was a bug and added it here. Feel free to comment on this (edge?) case.
  • Reader is a bit wonky, with the callback being bit hard to read. I have added a better method do check it out. You basically encapsulate the mem in a separate MemoryReader object that optionally uses a decoder if required. This way we do not have to worry about having to decode the mem slice later on if we have to add more code.

package zap

import (
	"hash/crc32"
	"io"

	segment "github.com/blevesearch/scorch_segment_api/v2"
)

// WriteToken represents a per-write unique value that provides
// context or variability for each write operation. It can be used
// for purposes such as maintaining a counter, nonce, or initialization
// vector (IV) to ensure each write is distinct.
type WriteToken []byte

// Increment increases the WriteToken value in place, treating it as a
// big-endian counter. When the token overflows, it wraps around to zero.
// Returns true if overflow occurred.
func (t WriteToken) Increment() bool {
	for i := len(t) - 1; i >= 0; i-- {
		if t[i] < 255 {
			t[i]++
			return false
		}
		t[i] = 0
	}
	return true // Overflow occurred
}

// WriterHookResult contains the ID, callback, and token generator for a writer
type WriterHookResult struct {
	ID        []byte
	Processor DataProcessor
	Generator TokenGenerator
}

// ReaderHookResult contains the decoder for a reader
type ReaderHookResult struct {
	Decoder DataDecoder
}

type (
	// TokenGenerator is any function that can be used to generate a new WriteToken.
	TokenGenerator func() (WriteToken, error)
	// DataProcessor is any function that can process data with the provided token before writing.
	DataProcessor func(data []byte, token WriteToken) ([]byte, error)
	// DataDecoder is any function that can read process raw data before reading.
	DataDecoder func(data []byte) ([]byte, error)
	// WriterHook is invoked to configure writer behavior. It provides the token
	// generator (which can be called on overflow), callback function, and ID.
	// If the hook returns nil, the writer operates in normal mode without callback processing.
	WriterHook func() (*WriterHookResult, error)
	// ReaderHook is invoked to configure reader behavior.
	// If the hook returns nil, the reader operates in normal mode without callback processing.
	ReaderHook func() (*ReaderHookResult, error)
)

var (
	// DefaultWriterHook can be set by the user to enable callback processing globally
	DefaultWriterHook WriterHook
	// DefaultReaderHook can be set by the user to enable callback processing globally for readers
	DefaultReaderHook ReaderHook
)

// CountHashWriter is a wrapper around a Writer which counts the number of
// bytes which have been written and computes a crc32 hash. It optionally
// supports a WriterCallback that processes data before writing.
type CountHashWriter struct {
	w         io.Writer
	crc       uint32
	n         int
	s         segment.StatsReporter
	id        []byte
	processor DataProcessor
	token     WriteToken
	generator TokenGenerator
}

// NewCountHashWriter returns a CountHashWriter which wraps the provided Writer.
// If DefaultWriterHook is set, it uses it to configure callback processing.
// Otherwise, it operates as a regular writer.
func NewCountHashWriter(w io.Writer) (*CountHashWriter, error) {
	return newCountHashWriter(w, nil, DefaultWriterHook)
}

// NewCountHashWriterWithStatsReporter returns a CountHashWriter with stats reporting.
// If DefaultWriterHook is set, it uses it to configure callback processing.
func NewCountHashWriterWithStatsReporter(w io.Writer, s segment.StatsReporter) (*CountHashWriter, error) {
	return newCountHashWriter(w, s, DefaultWriterHook)
}

// NewCountHashWriterWithHook returns a CountHashWriter that uses the provided hook
// for callback processing.
func NewCountHashWriterWithHook(w io.Writer, hook WriterHook) (*CountHashWriter, error) {
	return newCountHashWriter(w, nil, hook)
}

// NewCountHashWriterWithHookAndStats returns a CountHashWriter with custom hook and stats reporting
func NewCountHashWriterWithHookAndStats(w io.Writer, s segment.StatsReporter, hook WriterHook) (*CountHashWriter, error) {
	return newCountHashWriter(w, s, hook)
}

// newCountHashWriter is the internal constructor that handles hook processing
func newCountHashWriter(w io.Writer, s segment.StatsReporter, hook WriterHook) (*CountHashWriter, error) {
	chw := &CountHashWriter{
		w: w,
		s: s,
	}

	// If hook is provided, invoke it to get configuration
	if hook != nil {
		result, err := hook()
		if err != nil {
			return nil, err
		}

		// Hook can return nil to indicate no callback processing desired
		if result != nil {
			chw.id = result.ID
			chw.processor = result.Processor
			chw.generator = result.Generator

			// Generate initial token
			if chw.generator != nil {
				token, err := chw.generator()
				if err != nil {
					return nil, err
				}
				chw.token = token
			}
		}
	}

	return chw, nil
}

// Write writes the provided bytes to the wrapped writer and counts the bytes.
// If a callback is configured, it processes the data and increments the token first.
// On token overflow, it generates a new token using the generator.
func (c *CountHashWriter) Write(b []byte) (int, error) {
	data := b
	var err error

	// Process data through callback if configured
	if c.processor != nil {
		data, err = c.processor(b, c.token)
		if err != nil {
			return 0, err
		}

		// Increment token and check for overflow
		if c.token.Increment() {
			// Overflow occurred, generate new token
			if c.generator != nil {
				newToken, err := c.generator()
				if err != nil {
					return 0, err
				}
				c.token = newToken
			}
		}
	}

	// Write the (possibly processed) data
	n, err := c.w.Write(data)
	c.crc = crc32.Update(c.crc, crc32.IEEETable, data[:n])
	c.n += n

	if c.s != nil {
		c.s.ReportBytesWritten(uint64(n))
	}

	return len(b), err // Return original length for io.Writer contract
}

// Count returns the number of bytes written
func (c *CountHashWriter) Count() int {
	return c.n
}

// Sum32 returns the CRC-32 hash of the content written to this writer
func (c *CountHashWriter) Sum32() uint32 {
	return c.crc
}

// ID returns the writer ID if one was set by the hook
func (c *CountHashWriter) ID() []byte {
	return c.id
}

// Token returns the current token value
func (c *CountHashWriter) Token() WriteToken {
	return c.token
}

// --------------------------------------------------------------------
// MemoryReader wraps a memory-mapped byte slice and applies an optional
// decoder callback to all data reads. If no decoder is configured, it acts
// as a transparent passthrough.
type MemoryReader struct {
	mem     []byte      // memory-mapped byte slice
	decoder DataDecoder // optional decoder for read operations
}

// NewMemoryReader creates a new MemoryReader with the default reader hook.
// If DefaultReaderHook is nil, no decoder is applied.
func NewMemoryReader(mem []byte) (*MemoryReader, error) {
	return newMemoryReader(mem, DefaultReaderHook)
}

// NewMemoryReaderWithHook creates a new MemoryReader with a custom reader hook.
func NewMemoryReaderWithHook(mem []byte, hook ReaderHook) (*MemoryReader, error) {
	return newMemoryReader(mem, hook)
}

// newMemoryReader is the internal constructor that handles hook processing.
func newMemoryReader(mem []byte, hook ReaderHook) (*MemoryReader, error) {
	rv := &MemoryReader{
		mem: mem,
	}

	// Invoke hook to get decoder configuration if provided
	if hook != nil {
		result, err := hook()
		if err != nil {
			return nil, err
		}

		if result != nil {
			rv.decoder = result.Decoder
		}
	}

	return rv, nil
}

// Read returns a slice of memory from start to end, applying the decoder if configured.
// Returns io.EOF if the range is invalid or out of bounds.
func (r *MemoryReader) Read(start, end uint64) ([]byte, error) {
	if start >= uint64(len(r.mem)) || end > uint64(len(r.mem)) || start > end {
		return nil, io.EOF
	}

	data := r.mem[start:end]

	// Apply decoder if configured
	if r.decoder != nil {
		var err error
		data, err = r.decoder(data)
		if err != nil {
			return nil, err
		}
	}

	return data, nil
}

}

// Default no-op implementation. Is called after reading any user data from a file.
var ReaderCallbackGetter = func(string) (func(data []byte) ([]byte, error), error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReaderHook Again, better to keep it as a type and avoid the noop impl interely with a nil variable instead.


// Default no-op implementation. Is called once per write call if a unique counter is
// needed by the writer callback.
var CounterGetter = func() ([]byte, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This extra hook feels unnecessary. What is the purpose of having a separate hook for something that is tied to the WriterHook? Better to consolidate into the WriterHook's signature itself. Ref above comment

Comment on lines +62 to +69
// fileWriter wraps a CountHashWriter and applies a user provided
// writer callback to the data being written.
type fileWriter struct {
writerCB func(data []byte, counter []byte) ([]byte, error)
counter []byte
id string
c *CountHashWriter
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels unnecessary alongside the numerous file being touched to accomodate this. consider augmenting the existing CountHashWriter. Ref Above comment


// fileReader wraps a reader callback to be applied to data read from a file.
type fileReader struct {
callback func(data []byte) ([]byte, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not extensible, if tomorrow we need to read a new mem slice, we need to

  1. Get the subslice we want
  2. Call the process api
  3. Then read the actual subslice.

we may forget to perform step 2, so need to strictly enforce it. Check the code given in ref comment.

c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
if fw, ok := c.w.(*fileWriter); ok && fw != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are u encrypting individual compressed data? can u not encrypt it all at once? in the finalized Write method?

dataLength := curChunkEnd - compressedDataLoc
di.incrementBytesRead(uint64(dataLength + offset))
di.curChunkData = s.mem[compressedDataLoc : compressedDataLoc+dataLength]
di.curChunkData, err = s.fileReader.process(s.mem[compressedDataLoc : compressedDataLoc+dataLength])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the MemoryReader struct
mem will not be a []byte but a *MemoryReader

di.curChunkData, err = s.mem.Read(compressedDataLoc, compressedDataLoc+dataLength)
if err != nil {
	return err
}

@CascadingRadium
Copy link
Member

I think i get the main idea here, but want some clarifications regarding the following points:

  • When the system's creating a segment -> a []byte basically, we use the CountHashWriter to write any data we want into the []byte, before flushing to disk. Is there any reason why we need to manually process each bit of data selectively before writing to []byte? Can we not just interleave the code to encrypt data within the existing code? (within the Write method itself?)
    I have added some code that demonstrates optimal callback use (which is more readable as well). Check the code added and let me know if there's a misunderstanding here.

  • You have mentioned that the reader should be able to discern the WriteToken (counter) itself. I believe this is something that the user should handle. How do you plan on handling overflows with this? If the original Counter overflowed (say) and created a new counter (im not sure if thats the right way to handle this). How will the reader be aware of this fact? (I am assuming that the current system uses exactly one counter from both the writer's and reader's POV makit it easier to synchronize the two.)

Will give another pass when you provide some clarity regarding the aforementioned points.

@abhinavdangeti
Copy link
Member

Per plan, let's target this to the unstable branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants