Skip to content

Commit

Permalink
MINOR: remove get prefix for internal IQ methods (#16954)
Browse files Browse the repository at this point in the history
Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai <[email protected]>
  • Loading branch information
mjsax authored Aug 25, 2024
1 parent 9eb7e1a commit 4ae0ab3
Show file tree
Hide file tree
Showing 16 changed files with 114 additions and 114 deletions.
18 changes: 9 additions & 9 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout
final boolean callingThreadIsNotCurrentStreamThread = !streamThread.getName().equals(Thread.currentThread().getName());
if (streamThread.isThreadAlive() && (callingThreadIsNotCurrentStreamThread || numLiveStreamThreads() == 1)) {
log.info("Removing StreamThread " + streamThread.getName());
final Optional<String> groupInstanceID = streamThread.getGroupInstanceID();
final Optional<String> groupInstanceID = streamThread.groupInstanceID();
streamThread.requestLeaveGroupDuringShutdown();
streamThread.shutdown();
if (!streamThread.getName().equals(Thread.currentThread().getName())) {
Expand Down Expand Up @@ -1630,7 +1630,7 @@ public synchronized boolean close(final CloseOptions options) throws IllegalArgu

private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long remainingTimeMs) {
return thread -> {
final Optional<String> groupInstanceId = thread.getGroupInstanceID();
final Optional<String> groupInstanceId = thread.groupInstanceID();
if (groupInstanceId.isPresent()) {
log.debug("Sending leave group trigger to removing instance from consumer group: {}.",
groupInstanceId.get());
Expand Down Expand Up @@ -1685,7 +1685,7 @@ public void cleanUp() {
*/
public Collection<StreamsMetadata> metadataForAllStreamsClients() {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata();
return streamsMetadataState.allMetadata();
}

/**
Expand All @@ -1705,7 +1705,7 @@ public Collection<StreamsMetadata> metadataForAllStreamsClients() {
*/
public Collection<StreamsMetadata> streamsMetadataForStore(final String storeName) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName);
return streamsMetadataState.allMetadataForStore(storeName);
}

/**
Expand All @@ -1722,7 +1722,7 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, keySerializer);
return streamsMetadataState.keyQueryMetadataForKey(storeName, key, keySerializer);
}

/**
Expand All @@ -1739,7 +1739,7 @@ public <K> KeyQueryMetadata queryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getKeyQueryMetadataForKey(storeName, key, partitioner);
return streamsMetadataState.keyQueryMetadataForKey(storeName, key, partitioner);
}

/**
Expand All @@ -1766,7 +1766,7 @@ public <T> T store(final StoreQueryParameters<T> storeQueryParameters) {
"Cannot get state store " + storeName + " because no such store is registered in the topology."
);
}
return queryableStoreProvider.getStore(storeQueryParameters);
return queryableStoreProvider.store(storeQueryParameters);
}

/**
Expand Down Expand Up @@ -2049,7 +2049,7 @@ protected Map<String, Map<Integer, LagInfo>> allLocalStorePartitionLags(final Li
final long changelogPosition = allChangelogPositions.getOrDefault(entry.getKey(), earliestOffset);
final long latestOffset = entry.getValue().offset();
final LagInfo lagInfo = new LagInfo(changelogPosition == Task.LATEST_OFFSET ? latestOffset : changelogPosition, latestOffset);
final String storeName = streamsMetadataState.getStoreForChangelogTopic(entry.getKey().topic());
final String storeName = streamsMetadataState.storeForChangelogTopic(entry.getKey().topic());
localStorePartitionLags.computeIfAbsent(storeName, ignored -> new TreeMap<>())
.put(entry.getKey().partition(), lagInfo);
}
Expand Down Expand Up @@ -2116,7 +2116,7 @@ public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) {
final TaskId taskId = task.id();
final int partition = taskId.partition();
if (request.isAllPartitions() || request.getPartitions().contains(partition)) {
final StateStore store = task.getStore(storeName);
final StateStore store = task.store(storeName);
if (store != null) {
final StreamThread.State state = thread.state();
final boolean active = task.isActive();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public void markChangelogAsCorrupted(final Collection<TopicPartition> partitions
}

@Override
public StateStore getStore(final String name) {
public StateStore store(final String name) {
return stateMgr.getStore(name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ public boolean commitNeeded() {
}

@Override
public StateStore getStore(final String name) {
return task.getStore(name);
public StateStore store(final String name) {
return task.store(name);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ public boolean isStartingRunningOrPartitionAssigned() {
private volatile State state = State.CREATED;
private volatile ThreadMetadata threadMetadata;
private StreamThread.StateListener stateListener;
private final Optional<String> getGroupInstanceID;
private final Optional<String> groupInstanceID;

private final ChangelogReader changelogReader;
private final ConsumerRebalanceListener rebalanceListener;
Expand Down Expand Up @@ -629,7 +629,7 @@ public StreamThread(final Time time,
this.originalReset = originalReset;
this.nextProbingRebalanceMs = nextProbingRebalanceMs;
this.nonFatalExceptionsToHandle = nonFatalExceptionsToHandle;
this.getGroupInstanceID = mainConsumer.groupMetadata().groupInstanceId();
this.groupInstanceID = mainConsumer.groupMetadata().groupInstanceId();

this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG));
final int dummyThreadIdx = 1;
Expand Down Expand Up @@ -1586,8 +1586,8 @@ public String toString(final String indent) {
return indent + "\tStreamsThread threadId: " + getName() + "\n" + taskManager.toString(indent);
}

public Optional<String> getGroupInstanceID() {
return getGroupInstanceID;
public Optional<String> groupInstanceID() {
return groupInstanceID;
}

public void requestLeaveGroupDuringShutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public StreamsMetadata getLocalMetadata() {
*
* @return all the {@link StreamsMetadata}s in a {@link KafkaStreams} application
*/
public Collection<StreamsMetadata> getAllMetadata() {
public Collection<StreamsMetadata> allMetadata() {
return Collections.unmodifiableList(allMetadata);
}

Expand All @@ -112,10 +112,10 @@ public Collection<StreamsMetadata> getAllMetadata() {
* @param storeName the storeName to find metadata for
* @return A collection of {@link StreamsMetadata} that have the provided storeName
*/
public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName) {
public synchronized Collection<StreamsMetadata> allMetadataForStore(final String storeName) {
Objects.requireNonNull(storeName, "storeName cannot be null");
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalArgumentException("Cannot invoke the getAllMetadataForStore(storeName) method when"
throw new IllegalArgumentException("Cannot invoke the allMetadataForStore(storeName) method when"
+ "using named topologies, please use the overload that accepts"
+ "a topologyName parameter to identify the correct store");
}
Expand Down Expand Up @@ -149,7 +149,7 @@ public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final Str
* @param topologyName the storeName to find metadata for
* @return A collection of {@link StreamsMetadata} that have the provided storeName
*/
public synchronized Collection<StreamsMetadata> getAllMetadataForStore(final String storeName, final String topologyName) {
public synchronized Collection<StreamsMetadata> allMetadataForStore(final String storeName, final String topologyName) {
Objects.requireNonNull(storeName, "storeName cannot be null");
Objects.requireNonNull(topologyName, "topologyName cannot be null");

Expand Down Expand Up @@ -193,7 +193,7 @@ public synchronized Collection<StreamsMetadata> getAllMetadataForTopology(final
/**
* Find the {@link KeyQueryMetadata}s for a given storeName and key. This method will use the
* {@link DefaultStreamPartitioner} to locate the store. If a custom partitioner has been used
* please use {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)} instead.
* please use {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, StreamPartitioner)} instead.
*
* Note: the key may not exist in the {@link org.apache.kafka.streams.processor.StateStore},
* this method provides a way of finding which {@link KeyQueryMetadata} it would exist on.
Expand All @@ -206,32 +206,32 @@ public synchronized Collection<StreamsMetadata> getAllMetadataForTopology(final
* if streams is (re-)initializing or {@code null} if the corresponding topic cannot be found,
* or null if no matching metadata could be found.
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer) {
Objects.requireNonNull(keySerializer, "keySerializer can't be null");
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)"
throw new IllegalArgumentException("Cannot invoke the KeyQueryMetadataForKey(storeName, key, keySerializer)"
+ "method when using named topologies, please use the overload that"
+ "accepts a topologyName parameter to identify the correct store");
}
return getKeyQueryMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer));
return keyQueryMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer));
}

/**
* See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, Serializer)}
* See {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, Serializer)}
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer,
final String topologyName) {
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final Serializer<K> keySerializer,
final String topologyName) {
Objects.requireNonNull(keySerializer, "keySerializer can't be null");
return getKeyQueryMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer),
topologyName);
return keyQueryMetadataForKey(storeName,
key,
new DefaultStreamPartitioner<>(keySerializer),
topologyName);
}

/**
Expand All @@ -247,14 +247,14 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
* @return The {@link KeyQueryMetadata} for the storeName and key or {@link KeyQueryMetadata#NOT_AVAILABLE}
* if streams is (re-)initializing, or {@code null} if no matching metadata could be found.
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner) {
Objects.requireNonNull(storeName, "storeName can't be null");
Objects.requireNonNull(key, "key can't be null");
Objects.requireNonNull(partitioner, "partitioner can't be null");
if (topologyMetadata.hasNamedTopologies()) {
throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, partitioner)"
throw new IllegalArgumentException("Cannot invoke the keyQueryMetadataForKey(storeName, key, partitioner)"
+ "method when using named topologies, please use the overload that"
+ "accepts a topologyName parameter to identify the correct store");
}
Expand All @@ -276,16 +276,16 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
if (sourceTopicsInfo == null) {
return null;
}
return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
return keyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo);
}

/**
* See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, StreamPartitioner)}
* See {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, StreamPartitioner)}
*/
public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final String topologyName) {
public synchronized <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final String topologyName) {
Objects.requireNonNull(storeName, "storeName can't be null");
Objects.requireNonNull(key, "key can't be null");
Objects.requireNonNull(partitioner, "partitioner can't be null");
Expand All @@ -300,7 +300,7 @@ public synchronized <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String
if (sourceTopicsInfo == null) {
return null;
}
return getKeyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo, topologyName);
return keyQueryMetadataForKey(storeName, key, partitioner, sourceTopicsInfo, topologyName);
}

/**
Expand Down Expand Up @@ -478,10 +478,10 @@ private List<StreamsMetadata> rebuildMetadataForSingleTopology(final Map<HostInf
return maybeMulticastPartitions.get().iterator().next();
};

private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo) {
private <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo) {

final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
final Set<TopicPartition> matchingPartitions = new HashSet<>();
Expand Down Expand Up @@ -511,11 +511,11 @@ private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
return new KeyQueryMetadata(activeHost, standbyHosts, partition);
}

private <K> KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo,
final String topologyName) {
private <K> KeyQueryMetadata keyQueryMetadataForKey(final String storeName,
final K key,
final StreamPartitioner<? super K, ?> partitioner,
final SourceTopicsInfo sourceTopicsInfo,
final String topologyName) {
Objects.requireNonNull(topologyName, "topology name must not be null");
final Integer partition = getPartition.apply(partitioner.partitions(sourceTopicsInfo.topicWithMostPartitions, key, null, sourceTopicsInfo.maxPartitions));
final Set<TopicPartition> matchingPartitions = new HashSet<>();
Expand Down Expand Up @@ -566,7 +566,7 @@ private boolean isInitialized() {
return partitionsByTopic != null && !partitionsByTopic.isEmpty() && localMetadata.get() != null;
}

public String getStoreForChangelogTopic(final String topicName) {
public String storeForChangelogTopic(final String topicName) {
return topologyMetadata.storeForChangelogTopic(topicName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ default boolean commitRequested() {

// IQ related methods

StateStore getStore(final String name);
StateStore store(final String name);

/**
* @return the offsets of all the changelog partitions associated with this task,
Expand Down
Loading

0 comments on commit 4ae0ab3

Please sign in to comment.