Skip to content

Commit 5ef2bfc

Browse files
authored
Merge branch 'AutoMQ:main' into main
2 parents 669053e + d11ef07 commit 5ef2bfc

33 files changed

+562
-160
lines changed

core/src/main/java/kafka/automq/failover/FailoverControlManager.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -265,22 +265,6 @@ private static List<FailedWal> getFailedWal(List<NodeRuntimeMetadata> allNodes)
265265
.filter(NodeRuntimeMetadata::shouldFailover)
266266
.map(DefaultFailedWal::from)
267267
.collect(Collectors.toCollection(ArrayList::new));
268-
maybeRemoveControllerNode(allNodes, result);
269268
return result;
270269
}
271-
272-
private static void maybeRemoveControllerNode(List<NodeRuntimeMetadata> allNodes, List<FailedWal> failedWALList) {
273-
long inactiveControllerCount = allNodes.stream()
274-
.filter(NodeRuntimeMetadata::isController)
275-
.filter(node -> !node.isActive())
276-
.count();
277-
if (inactiveControllerCount > 1) {
278-
LOGGER.warn("{} controller nodes is inactive, will not failover any controller node", inactiveControllerCount);
279-
Set<Integer> controllerNodeIds = allNodes.stream()
280-
.filter(NodeRuntimeMetadata::isController)
281-
.map(NodeRuntimeMetadata::id)
282-
.collect(Collectors.toSet());
283-
failedWALList.removeIf(wal -> controllerNodeIds.contains(wal.nodeId()));
284-
}
285-
}
286270
}

core/src/main/java/kafka/automq/partition/snapshot/PartitionSnapshotsManager.java

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@
5252
import java.util.List;
5353
import java.util.Map;
5454
import java.util.Objects;
55+
import java.util.Set;
5556
import java.util.concurrent.CompletableFuture;
57+
import java.util.concurrent.ConcurrentHashMap;
5658
import java.util.concurrent.CopyOnWriteArrayList;
5759
import java.util.concurrent.ThreadLocalRandom;
5860
import java.util.concurrent.TimeUnit;
@@ -69,7 +71,8 @@ public class PartitionSnapshotsManager {
6971
private final String confirmWalConfig;
7072
private final ConfirmWAL confirmWAL;
7173

72-
public PartitionSnapshotsManager(Time time, AutoMQConfig config, ConfirmWAL confirmWAL, Supplier<AutoMQVersion> versionGetter) {
74+
public PartitionSnapshotsManager(Time time, AutoMQConfig config, ConfirmWAL confirmWAL,
75+
Supplier<AutoMQVersion> versionGetter) {
7376
this.time = time;
7477
this.confirmWalConfig = config.walConfig();
7578
this.confirmWAL = confirmWAL;
@@ -119,9 +122,7 @@ public CompletableFuture<AutomqGetPartitionSnapshotResponse> handle(AutomqGetPar
119122
newSession = true;
120123
}
121124
}
122-
CompletableFuture<AutomqGetPartitionSnapshotResponse> resp = session.snapshotsDelta(request.data().version());
123-
CompletableFuture<Void> commitCf = (request.data().requestCommit() || newSession) ? confirmWAL.commit(0, false) : CompletableFuture.completedFuture(null);
124-
return commitCf.exceptionally(nil -> null).thenCompose(nil -> resp);
125+
return session.snapshotsDelta(request.data().version(), request.data().requestCommit() || newSession);
125126
}
126127

127128
private synchronized int nextSessionId() {
@@ -150,6 +151,7 @@ protected List<CompletableFuture<Void>> initialValue() {
150151
private final Map<Partition, PartitionSnapshotVersion> synced = new HashMap<>();
151152
private final List<Partition> removed = new ArrayList<>();
152153
private long lastGetSnapshotsTimestamp = time.milliseconds();
154+
private final Set<CompletableFuture<Void>> inflightCommitCfSet = ConcurrentHashMap.newKeySet();
153155

154156
public Session(int sessionId) {
155157
this.sessionId = sessionId;
@@ -159,13 +161,50 @@ public synchronized int sessionEpoch() {
159161
return sessionEpoch;
160162
}
161163

162-
public synchronized CompletableFuture<AutomqGetPartitionSnapshotResponse> snapshotsDelta(short requestVersion) {
164+
public synchronized CompletableFuture<AutomqGetPartitionSnapshotResponse> snapshotsDelta(short requestVersion,
165+
boolean requestCommit) {
163166
AutomqGetPartitionSnapshotResponseData resp = new AutomqGetPartitionSnapshotResponseData();
164167
sessionEpoch++;
168+
lastGetSnapshotsTimestamp = time.milliseconds();
165169
resp.setSessionId(sessionId);
166170
resp.setSessionEpoch(sessionEpoch);
167-
Map<Uuid, List<PartitionSnapshot>> topic2partitions = new HashMap<>();
171+
long finalSessionEpoch = sessionEpoch;
172+
CompletableFuture<Void> collectPartitionSnapshotsCf;
173+
if (!requestCommit && inflightCommitCfSet.isEmpty()) {
174+
collectPartitionSnapshotsCf = collectPartitionSnapshots(resp);
175+
} else {
176+
collectPartitionSnapshotsCf = CompletableFuture.completedFuture(null);
177+
}
178+
return collectPartitionSnapshotsCf
179+
.thenApply(nil -> {
180+
if (requestVersion > ZERO_ZONE_V0_REQUEST_VERSION) {
181+
if (finalSessionEpoch == 1) {
182+
// return the WAL config in the session first response
183+
resp.setConfirmWalConfig(confirmWalConfig);
184+
}
185+
resp.setConfirmWalEndOffset(confirmWAL.confirmOffset().bufferAsBytes());
186+
}
187+
if (requestCommit) {
188+
// Commit after generating the snapshots.
189+
// Then the snapshot-read partitions could read from snapshot-read cache or block cache.
190+
CompletableFuture<Void> commitCf = confirmWAL.commit(0, false);
191+
inflightCommitCfSet.add(commitCf);
192+
commitCf.whenComplete((rst, ex) -> inflightCommitCfSet.remove(commitCf));
193+
}
194+
return new AutomqGetPartitionSnapshotResponse(resp);
195+
});
196+
}
168197

198+
public synchronized void onPartitionClose(Partition partition) {
199+
removed.add(partition);
200+
}
201+
202+
public synchronized boolean expired() {
203+
return time.milliseconds() - lastGetSnapshotsTimestamp > 60000;
204+
}
205+
206+
private CompletableFuture<Void> collectPartitionSnapshots(AutomqGetPartitionSnapshotResponseData resp) {
207+
Map<Uuid, List<PartitionSnapshot>> topic2partitions = new HashMap<>();
169208
List<CompletableFuture<Void>> completeCfList = COMPLETE_CF_LIST_LOCAL.get();
170209
completeCfList.clear();
171210
removed.forEach(partition -> {
@@ -195,31 +234,11 @@ public synchronized CompletableFuture<AutomqGetPartitionSnapshotResponse> snapsh
195234
topics.add(topic);
196235
});
197236
resp.setTopics(topics);
198-
lastGetSnapshotsTimestamp = time.milliseconds();
199-
long finalSessionEpoch = sessionEpoch;
200-
CompletableFuture<AutomqGetPartitionSnapshotResponse> retCf = CompletableFuture.allOf(completeCfList.toArray(new CompletableFuture[0]))
201-
.thenApply(nil -> {
202-
if (requestVersion > ZERO_ZONE_V0_REQUEST_VERSION) {
203-
if (finalSessionEpoch == 1) {
204-
// return the WAL config in the session first response
205-
resp.setConfirmWalConfig(confirmWalConfig);
206-
}
207-
resp.setConfirmWalEndOffset(confirmWAL.confirmOffset().bufferAsBytes());
208-
}
209-
return new AutomqGetPartitionSnapshotResponse(resp);
210-
});
237+
CompletableFuture<Void> retCf = CompletableFuture.allOf(completeCfList.toArray(new CompletableFuture[0]));
211238
completeCfList.clear();
212239
return retCf;
213240
}
214241

215-
public synchronized void onPartitionClose(Partition partition) {
216-
removed.add(partition);
217-
}
218-
219-
public synchronized boolean expired() {
220-
return time.milliseconds() - lastGetSnapshotsTimestamp > 60000;
221-
}
222-
223242
private PartitionSnapshot snapshot(Partition partition, PartitionSnapshotVersion oldVersion,
224243
PartitionSnapshotVersion newVersion, List<CompletableFuture<Void>> completeCfList) {
225244
if (newVersion == null) {

core/src/main/java/kafka/automq/utils/ClientUtils.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import kafka.server.KafkaConfig;
2424

2525
import org.apache.kafka.common.network.ListenerName;
26+
import org.apache.kafka.common.security.auth.SecurityProtocol;
2627

2728
import java.util.List;
2829
import java.util.Map;
@@ -34,19 +35,53 @@
3435
public class ClientUtils {
3536
public static Properties clusterClientBaseConfig(KafkaConfig kafkaConfig) {
3637
ListenerName listenerName = kafkaConfig.interBrokerListenerName();
38+
3739
List<EndPoint> endpoints = asJava(kafkaConfig.effectiveAdvertisedBrokerListeners());
3840
Optional<EndPoint> endpointOpt = endpoints.stream().filter(e -> listenerName.equals(e.listenerName())).findFirst();
3941
if (endpointOpt.isEmpty()) {
4042
throw new IllegalArgumentException("Cannot find " + listenerName + " in endpoints " + endpoints);
4143
}
44+
4245
EndPoint endpoint = endpointOpt.get();
43-
Map<String, ?> securityConfig = kafkaConfig.originalsWithPrefix(listenerName.saslMechanismConfigPrefix(kafkaConfig.saslMechanismInterBrokerProtocol()));
46+
SecurityProtocol securityProtocol = kafkaConfig.interBrokerSecurityProtocol();
47+
Map<String, Object> parsedConfigs = kafkaConfig.valuesWithPrefixOverride(listenerName.configPrefix());
48+
49+
// mirror ChannelBuilders#channelBuilderConfigs
50+
kafkaConfig.originals().entrySet().stream()
51+
.filter(entry -> !parsedConfigs.containsKey(entry.getKey()))
52+
// exclude already parsed listener prefix configs
53+
.filter(entry -> !(entry.getKey().startsWith(listenerName.configPrefix())
54+
&& parsedConfigs.containsKey(entry.getKey().substring(listenerName.configPrefix().length()))))
55+
// exclude keys like `{mechanism}.some.prop` if "listener.name." prefix is present and key `some.prop` exists in parsed configs.
56+
.filter(entry -> !parsedConfigs.containsKey(entry.getKey().substring(entry.getKey().indexOf('.') + 1)))
57+
.forEach(entry -> parsedConfigs.put(entry.getKey(), entry.getValue()));
58+
4459
Properties clientConfig = new Properties();
45-
clientConfig.putAll(securityConfig);
46-
clientConfig.put("security.protocol", kafkaConfig.interBrokerSecurityProtocol().toString());
47-
clientConfig.put("sasl.mechanism", kafkaConfig.saslMechanismInterBrokerProtocol());
60+
parsedConfigs.entrySet().stream()
61+
.filter(entry -> entry.getValue() != null)
62+
.filter(entry -> isSecurityKey(entry.getKey(), listenerName))
63+
.forEach(entry -> clientConfig.put(entry.getKey(), entry.getValue()));
64+
65+
String interBrokerSaslMechanism = kafkaConfig.saslMechanismInterBrokerProtocol();
66+
if (interBrokerSaslMechanism != null && !interBrokerSaslMechanism.isEmpty()) {
67+
kafkaConfig.originalsWithPrefix(listenerName.saslMechanismConfigPrefix(interBrokerSaslMechanism)).entrySet().stream()
68+
.filter(entry -> entry.getValue() != null)
69+
.forEach(entry -> clientConfig.put(entry.getKey(), entry.getValue()));
70+
clientConfig.putIfAbsent("sasl.mechanism", interBrokerSaslMechanism);
71+
}
72+
73+
clientConfig.put("security.protocol", securityProtocol.toString());
4874
clientConfig.put("bootstrap.servers", String.format("%s:%d", endpoint.host(), endpoint.port()));
4975
return clientConfig;
5076
}
5177

78+
// Filter out non-security broker options (e.g. compression.type, log.retention.hours) so internal clients
79+
// only inherit listener-specific SSL/SASL settings.
80+
private static boolean isSecurityKey(String key, ListenerName listenerName) {
81+
return key.startsWith("ssl.")
82+
|| key.startsWith("sasl.")
83+
|| key.startsWith("security.")
84+
|| key.startsWith(listenerName.configPrefix());
85+
}
86+
5287
}

core/src/main/java/kafka/automq/zerozone/ConfirmWALProvider.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import com.automq.stream.s3.wal.WriteAheadLog;
2323

24+
import java.util.concurrent.CompletableFuture;
25+
2426
public interface ConfirmWALProvider {
2527

26-
WriteAheadLog readOnly(String walConfig, int nodeId);
28+
CompletableFuture<WriteAheadLog> readOnly(String walConfig, int nodeId);
2729

2830
}

core/src/main/java/kafka/automq/zerozone/DefaultConfirmWALProvider.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@
3131
import com.automq.stream.utils.Time;
3232

3333
import java.util.Map;
34+
import java.util.concurrent.CompletableFuture;
3435
import java.util.concurrent.ConcurrentHashMap;
36+
import java.util.concurrent.ExecutorService;
3537

3638
public class DefaultConfirmWALProvider implements ConfirmWALProvider {
39+
private static final ExecutorService START_EXECUTOR = java.util.concurrent.Executors.newCachedThreadPool();
3740
private final Map<Short, ObjectStorage> objectStorages = new ConcurrentHashMap<>();
3841
private final String clusterId;
3942
private final Time time = Time.SYSTEM;
@@ -43,7 +46,7 @@ public DefaultConfirmWALProvider(String clusterId) {
4346
}
4447

4548
@Override
46-
public WriteAheadLog readOnly(String walConfig, int nodeId) {
49+
public CompletableFuture<WriteAheadLog> readOnly(String walConfig, int nodeId) {
4750
BucketURI bucketURI = BucketURI.parse(walConfig);
4851
ObjectStorage objectStorage = objectStorages.computeIfAbsent(bucketURI.bucketId(), id -> {
4952
try {
@@ -64,6 +67,13 @@ public WriteAheadLog readOnly(String walConfig, int nodeId) {
6467
.withNodeId(nodeId)
6568
.withOpenMode(OpenMode.READ_ONLY)
6669
.build();
67-
return new ObjectWALService(time, objectStorage, objectWALConfig);
70+
ObjectWALService wal = new ObjectWALService(time, objectStorage, objectWALConfig);
71+
return CompletableFuture.runAsync(() -> {
72+
try {
73+
wal.start();
74+
} catch (Throwable e) {
75+
throw new RuntimeException(e);
76+
}
77+
}, START_EXECUTOR).thenApply(v -> wal);
6878
}
6979
}

core/src/main/java/kafka/automq/zerozone/LinkRecordDecoder.java renamed to core/src/main/java/kafka/automq/zerozone/DefaultLinkRecordDecoder.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,32 @@
2222
import org.apache.kafka.common.record.MemoryRecords;
2323
import org.apache.kafka.common.record.MutableRecordBatch;
2424

25+
import com.automq.stream.s3.cache.SnapshotReadCache;
2526
import com.automq.stream.s3.model.StreamRecordBatch;
2627

2728
import org.slf4j.Logger;
2829
import org.slf4j.LoggerFactory;
2930

3031
import java.util.concurrent.CompletableFuture;
31-
import java.util.function.Function;
3232

33+
import io.netty.buffer.ByteBuf;
3334
import io.netty.buffer.Unpooled;
3435

35-
public class LinkRecordDecoder implements Function<StreamRecordBatch, CompletableFuture<StreamRecordBatch>> {
36-
private static final Logger LOGGER = LoggerFactory.getLogger(LinkRecordDecoder.class);
36+
public class DefaultLinkRecordDecoder implements com.automq.stream.api.LinkRecordDecoder {
37+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultLinkRecordDecoder.class);
3738
private final RouterChannelProvider channelProvider;
3839

39-
public LinkRecordDecoder(RouterChannelProvider channelProvider) {
40+
public DefaultLinkRecordDecoder(RouterChannelProvider channelProvider) {
4041
this.channelProvider = channelProvider;
4142
}
4243

4344
@Override
44-
public CompletableFuture<StreamRecordBatch> apply(StreamRecordBatch src) {
45+
public int decodedSize(ByteBuf linkRecordBuf) {
46+
return LinkRecord.decodedSize(linkRecordBuf);
47+
}
48+
49+
@Override
50+
public CompletableFuture<StreamRecordBatch> decode(StreamRecordBatch src) {
4551
try {
4652
LinkRecord linkRecord = LinkRecord.decode(src.getPayload());
4753
ChannelOffset channelOffset = linkRecord.channelOffset();
@@ -57,8 +63,8 @@ public CompletableFuture<StreamRecordBatch> apply(StreamRecordBatch src) {
5763
recordBatch.setPartitionLeaderEpoch(linkRecord.partitionLeaderEpoch());
5864
StreamRecordBatch streamRecordBatch = new StreamRecordBatch(src.getStreamId(), src.getEpoch(), src.getBaseOffset(),
5965
-src.getCount(), Unpooled.wrappedBuffer(records.buffer()));
60-
// duplicated copy the payload
61-
streamRecordBatch.encoded();
66+
// The buf will be release after the finally block, so we need copy the data by #encoded.
67+
streamRecordBatch.encoded(SnapshotReadCache.ENCODE_ALLOC);
6268
return streamRecordBatch;
6369
} finally {
6470
src.release();

core/src/main/java/kafka/automq/zerozone/DefaultRouterChannelProvider.java

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
3939

40-
import java.io.IOException;
4140
import java.nio.ByteBuffer;
4241
import java.util.List;
4342
import java.util.Map;
@@ -84,11 +83,6 @@ public RouterChannel channel() {
8483
.withType(WAL_TYPE)
8584
.build();
8685
ObjectWALService wal = new ObjectWALService(Time.SYSTEM, objectStorage(), config);
87-
try {
88-
wal.start();
89-
} catch (IOException e) {
90-
throw new RuntimeException(e);
91-
}
9286
RouterChannel routerChannel = new ObjectRouterChannel(this.nodeId, channelId, wal);
9387
routerChannel.nextEpoch(epoch.getCurrent());
9488
routerChannel.trim(epoch.getCommitted());
@@ -106,11 +100,6 @@ public RouterChannel readOnlyChannel(int node) {
106100
return routerChannels.computeIfAbsent(node, nodeId -> {
107101
ObjectWALConfig config = ObjectWALConfig.builder().withClusterId(clusterId).withNodeId(node).withOpenMode(OpenMode.READ_ONLY).withType(WAL_TYPE).build();
108102
ObjectWALService wal = new ObjectWALService(Time.SYSTEM, objectStorage(), config);
109-
try {
110-
wal.start();
111-
} catch (IOException e) {
112-
throw new RuntimeException(e);
113-
}
114103
return new ObjectRouterChannel(nodeId, channelId, wal);
115104
});
116105
}
@@ -127,8 +116,8 @@ public void addEpochListener(EpochListener listener) {
127116

128117
@Override
129118
public void close() {
130-
FutureUtil.suppress(() -> routerChannel.close(), LOGGER);
131-
routerChannels.forEach((nodeId, channel) -> FutureUtil.suppress(channel::close, LOGGER));
119+
FutureUtil.suppress(() -> routerChannel.close().get(), LOGGER);
120+
routerChannels.forEach((nodeId, channel) -> FutureUtil.suppress(() -> channel.close().get(), LOGGER));
132121
}
133122

134123
@Override

core/src/main/java/kafka/automq/zerozone/LinkRecord.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@
2323
import org.apache.kafka.common.record.MutableRecordBatch;
2424
import org.apache.kafka.common.record.TimestampType;
2525

26+
import com.automq.stream.s3.wal.impl.DefaultRecordOffset;
27+
2628
import io.netty.buffer.ByteBuf;
2729
import io.netty.buffer.Unpooled;
2830

2931
public class LinkRecord {
3032
private static final byte MAGIC_V0 = (byte) 0x00;
33+
private static final int CHANNEL_OFFSET_OFFSET = 1 /* magic */ + 8 /* last offset */ + 4 /* timestamp type */ + 8 /* max timestamp */ + 4 /* leader epoch */;
3134
private final long lastOffset;
3235
private final TimestampType timestampType;
3336
private final long maxTimestamp;
@@ -110,4 +113,19 @@ public static LinkRecord decode(ByteBuf buf) {
110113
buf.readBytes(channelOffset);
111114
return new LinkRecord(lastOffset, timestampType, maxTimestamp, partitionLeaderEpoch, ChannelOffset.of(channelOffset));
112115
}
116+
117+
/**
118+
* Get the size of the linked record.
119+
*/
120+
public static int decodedSize(ByteBuf linkRecordBuf) {
121+
ByteBuf buf = linkRecordBuf.slice();
122+
byte magic = buf.getByte(0);
123+
if (magic != MAGIC_V0) {
124+
throw new UnsupportedOperationException("Unsupported magic: " + magic);
125+
}
126+
int channelOffsetSize = buf.readableBytes() - CHANNEL_OFFSET_OFFSET;
127+
ByteBuf channelOffsetBuf = Unpooled.buffer(channelOffsetSize);
128+
buf.getBytes(CHANNEL_OFFSET_OFFSET, channelOffsetBuf);
129+
return DefaultRecordOffset.of(ChannelOffset.of(channelOffsetBuf).walRecordOffset()).size();
130+
}
113131
}

0 commit comments

Comments
 (0)