Skip to content

Commit

Permalink
KAFKA-12829: Remove the deprecated method `init(ProcessorContext, Sta…
Browse files Browse the repository at this point in the history
…teStore)` from the `StateStore` interface (#16906)

Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax <[email protected]>
  • Loading branch information
xijiu authored Aug 29, 2024
1 parent ca0cc35 commit 291523e
Show file tree
Hide file tree
Showing 69 changed files with 198 additions and 1,054 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.annotation.InterfaceStability.Evolving;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
Expand Down Expand Up @@ -54,31 +53,6 @@ public interface StateStore {
*/
String name();

/**
* Initializes this state store.
* <p>
* The implementation of this function must register the root store in the context via the
* {@link org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, StateRestoreCallback)} function,
* where the first {@link StateStore} parameter should always be the passed-in {@code root} object, and
* the second parameter should be an object of user's implementation
* of the {@link StateRestoreCallback} interface used for restoring the state store from the changelog.
* <p>
* Note that if the state store engine itself supports bulk writes, users can implement another
* interface {@link BatchingStateRestoreCallback} which extends {@link StateRestoreCallback} to
* let users implement bulk-load restoration logic instead of restoring one record at a time.
* <p>
* This method is not called if {@link StateStore#init(StateStoreContext, StateStore)}
* is implemented.
*
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
* @deprecated Since 2.7.0. Callers should invoke {@link #init(StateStoreContext, StateStore)} instead.
* Implementers may choose to implement this method for backward compatibility or to throw an
* informative exception instead.
*/
@Deprecated
void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root);

/**
* Initializes this state store.
* <p>
Expand All @@ -95,9 +69,7 @@ public interface StateStore {
* @throws IllegalStateException If store gets registered after initialized is already finished
* @throws StreamsException if the store's change log does not contain the partition
*/
default void init(final StateStoreContext context, final StateStore root) {
init(StoreToProcessorContextAdapter.adapt(context), root);
}
void init(final StateStoreContext context, final StateStore root);

/**
* Flush any cached data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand Down Expand Up @@ -49,13 +48,6 @@ public void flush() {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.state.KeyValueIterator;
Expand All @@ -43,13 +42,6 @@ private AbstractReadWriteDecorator(final T inner) {
super(inner);
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
throw new UnsupportedOperationException(ERROR_MESSAGE);
}

@Override
public void init(final StateStoreContext context,
final StateStore root) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.query.Position;
Expand All @@ -45,6 +44,7 @@
import java.util.Optional;

import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
import static org.apache.kafka.streams.processor.internals.ProcessorContextUtils.asInternalProcessorContext;

public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Segment> implements SegmentedBytesStore {
private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
Expand All @@ -55,9 +55,7 @@ public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore<S extends Seg
protected final Optional<KeySchema> indexKeySchema;
private final long retentionPeriod;


protected ProcessorContext context;
private StateStoreContext stateStoreContext;
protected InternalProcessorContext internalProcessorContext;
private Sensor expiredRecordSensor;
protected long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
protected boolean consistencyEnabled = false;
Expand Down Expand Up @@ -146,7 +144,7 @@ void putIndex(final Bytes indexKey, final byte[] value) {

final long timestamp = indexKeySchema.get().segmentTimestamp(indexKey);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);

if (segment != null) {
segment.put(indexKey, value);
Expand All @@ -160,7 +158,7 @@ byte[] getIndex(final Bytes indexKey) {

final long timestamp = indexKeySchema.get().segmentTimestamp(indexKey);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);

if (segment != null) {
return segment.get(indexKey);
Expand All @@ -175,7 +173,7 @@ void removeIndex(final Bytes indexKey) {

final long timestamp = indexKeySchema.get().segmentTimestamp(indexKey);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);

if (segment != null) {
segment.delete(indexKey);
Expand All @@ -188,14 +186,14 @@ public void put(final Bytes rawBaseKey,
final long timestamp = baseKeySchema.segmentTimestamp(rawBaseKey);
observedStreamTime = Math.max(observedStreamTime, timestamp);
final long segmentId = segments.segmentId(timestamp);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, context, observedStreamTime);
final S segment = segments.getOrCreateSegmentIfLive(segmentId, internalProcessorContext, observedStreamTime);

if (segment == null) {
expiredRecordSensor.record(1.0d, context.currentSystemTimeMs());
expiredRecordSensor.record(1.0d, internalProcessorContext.currentSystemTimeMs());
LOG.warn("Skipping record for expired segment.");
} else {
synchronized (position) {
StoreQueryUtils.updatePosition(position, stateStoreContext);
StoreQueryUtils.updatePosition(position, internalProcessorContext);

// Put to index first so that if put to base failed, when we iterate index, we will
// find no base value. If put to base first but putting to index fails, when we iterate
Expand Down Expand Up @@ -241,28 +239,26 @@ public String name() {
return name;
}

@Deprecated
@Override
public void init(final ProcessorContext context,
final StateStore root) {
this.context = context;
public void init(final StateStoreContext stateStoreContext, final StateStore root) {
this.internalProcessorContext = asInternalProcessorContext(stateStoreContext);

final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(context);
final StreamsMetricsImpl metrics = ProcessorContextUtils.metricsImpl(stateStoreContext);
final String threadId = Thread.currentThread().getName();
final String taskName = context.taskId().toString();
final String taskName = stateStoreContext.taskId().toString();

expiredRecordSensor = TaskMetrics.droppedRecordsSensor(
threadId,
taskName,
metrics
);

final File positionCheckpointFile = new File(context.stateDir(), name() + ".position");
final File positionCheckpointFile = new File(stateStoreContext.stateDir(), name() + ".position");
this.positionCheckpoint = new OffsetCheckpoint(positionCheckpointFile);
this.position = StoreQueryUtils.readPositionFromCheckpoint(positionCheckpoint);
segments.setPosition(this.position);

segments.openExisting(context, observedStreamTime);
segments.openExisting(internalProcessorContext, observedStreamTime);

// register and possibly restore the state from the logs
stateStoreContext.register(
Expand All @@ -274,18 +270,12 @@ public void init(final ProcessorContext context,
open = true;

consistencyEnabled = StreamsConfig.InternalConfig.getBoolean(
context.appConfigs(),
stateStoreContext.appConfigs(),
IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED,
false
);
}

@Override
public void init(final StateStoreContext context, final StateStore root) {
this.stateStoreContext = context;
init(StoreToProcessorContextAdapter.adapt(context), root);
}

@Override
public void flush() {
segments.flush();
Expand Down
Loading

0 comments on commit 291523e

Please sign in to comment.