Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions Sources/Kafka/KafkaAcknowledgedMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public struct KafkaAcknowledgedMessage {
public var topic: String
/// The partition that the message was sent to.
public var partition: KafkaPartition
/// The headers of the message.
public var headers: [KafkaHeader]
/// The key of the message.
public var key: ByteBuffer?
/// The body of the message.
Expand All @@ -33,9 +35,6 @@ public struct KafkaAcknowledgedMessage {
internal init(messagePointer: UnsafePointer<rd_kafka_message_t>) throws {
let rdKafkaMessage = messagePointer.pointee

let valueBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.payload, count: rdKafkaMessage.len)
self.value = ByteBuffer(bytes: valueBufferPointer)

guard rdKafkaMessage.err == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: rdKafkaMessage.err)
}
Expand All @@ -47,15 +46,7 @@ public struct KafkaAcknowledgedMessage {

self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition))

if let keyPointer = rdKafkaMessage.key {
let keyBufferPointer = UnsafeRawBufferPointer(
start: keyPointer,
count: rdKafkaMessage.key_len
)
self.key = .init(bytes: keyBufferPointer)
} else {
self.key = nil
}
(self.key, self.value, self.headers) = try KafkaConsumerMessage.extractContent(fromMessage: messagePointer)

self.offset = KafkaOffset(rawValue: Int(rdKafkaMessage.offset))
}
Expand Down
138 changes: 74 additions & 64 deletions Sources/Kafka/KafkaConsumerMessage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public struct KafkaConsumerMessage {
}

internal var _value: MessageContent

/// The topic that the message was received from.
public var topic: String
/// The partition that the message was received from.
Expand Down Expand Up @@ -84,8 +84,6 @@ public struct KafkaConsumerMessage {
// self.value = ByteBuffer()
// self.headers = [KafkaHeader]()
// }

private static let bufferAlignment: Int = MemoryLayout<UInt64>.size

/// Initialize ``KafkaConsumerMessage`` from `rd_kafka_message_t` pointer.
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
Expand Down Expand Up @@ -119,67 +117,9 @@ public struct KafkaConsumerMessage {
var timestamp: Timestamp? = nil

if rdKafkaMessage.err != RD_KAFKA_RESP_ERR__PARTITION_EOF {

var bufferSize = 0
var headersCount = 0
try Self.forEachHeader(inMessage: messagePointer) { _, value in
headersCount += 1
bufferSize += Int(value.count).roundUpToMultipleOf(Self.bufferAlignment)
}
bufferSize += Int(rdKafkaMessage.key_len).roundUpToMultipleOf(Self.bufferAlignment)
bufferSize += Int(rdKafkaMessage.len)

var buffer = ByteBufferAllocator().buffer(capacity: bufferSize)

try Self.forEachHeader(inMessage: messagePointer) { _, value in
if value.count > 0 {
buffer.writeBytes(value)
let alignment = Int(value.count).roundUpToMultipleOf(Self.bufferAlignment) - value.count
buffer.moveWriterIndex(forwardBy: alignment)
}
}

if rdKafkaMessage.key_len > 0 {
let keyBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.key, count: rdKafkaMessage.key_len)
buffer.writeBytes(keyBufferPointer)
let alignment = Int(rdKafkaMessage.key_len).roundUpToMultipleOf(Self.bufferAlignment) - rdKafkaMessage.key_len
buffer.moveWriterIndex(forwardBy: alignment)
}

buffer.writeBytes(valueBufferPointer)

var headers = [KafkaHeader]()
headers.reserveCapacity(headersCount)
try Self.forEachHeader(inMessage: messagePointer) { key, value in
let valueBuffer: ByteBuffer? = {
if value.count > 0 {
buffer.moveWriterIndex(to: buffer.readerIndex + value.count)
let ret = buffer.slice()
let newIndex = buffer.readerIndex + Int(value.count).roundUpToMultipleOf(Self.bufferAlignment)
buffer.moveWriterIndex(to: newIndex)
buffer.moveReaderIndex(to: newIndex)
return ret
} else {
return nil
}
}()
let header = KafkaHeader(key: String(cString: key), value: valueBuffer)
headers.append(header)
}
self.headers = headers

if rdKafkaMessage.key_len > 0 {
buffer.moveWriterIndex(to: buffer.readerIndex + rdKafkaMessage.key_len)
self.key = buffer.slice()
let newIndex = buffer.readerIndex + Int(rdKafkaMessage.key_len).roundUpToMultipleOf(Self.bufferAlignment)
buffer.moveWriterIndex(to: newIndex)
buffer.moveReaderIndex(to: newIndex)
} else {
self.key = nil
}

buffer.moveWriterIndex(to: buffer.readerIndex + valueBufferPointer.count)
self._value = .buffer(buffer)
var valueBuffer: ByteBuffer
(self.key, valueBuffer, self.headers) = try Self.extractContent(fromMessage: messagePointer)
self._value = .buffer(valueBuffer)

var timestampType = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE
let kafkaTimestamp = rd_kafka_message_timestamp(messagePointer, &timestampType)
Expand Down Expand Up @@ -212,6 +152,76 @@ extension KafkaConsumerMessage: Sendable {}
// MARK: - Helpers

extension KafkaConsumerMessage {
static let bufferAlignment: Int = MemoryLayout<UInt64>.size

static func extractContent(
fromMessage messagePointer: UnsafePointer<rd_kafka_message_t>
) throws -> (key: ByteBuffer?, value: ByteBuffer, headers: [KafkaHeader]) {
let rdKafkaMessage = messagePointer.pointee

let valueBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.payload, count: rdKafkaMessage.len)

var bufferSize = 0
var headersCount = 0
try Self.forEachHeader(inMessage: messagePointer) { _, value in
headersCount += 1
bufferSize += Int(value.count).roundUpToMultipleOf(Self.bufferAlignment)
}
bufferSize += Int(rdKafkaMessage.key_len).roundUpToMultipleOf(Self.bufferAlignment)
bufferSize += Int(rdKafkaMessage.len)

var buffer = ByteBufferAllocator().buffer(capacity: bufferSize)

try Self.forEachHeader(inMessage: messagePointer) { _, value in
if value.count > 0 {
buffer.writeBytes(value)
let alignment = Int(value.count).roundUpToMultipleOf(Self.bufferAlignment) - value.count
buffer.moveWriterIndex(forwardBy: alignment)
}
}

if rdKafkaMessage.key_len > 0 {
let keyBufferPointer = UnsafeRawBufferPointer(start: rdKafkaMessage.key, count: rdKafkaMessage.key_len)
buffer.writeBytes(keyBufferPointer)
let alignment = Int(rdKafkaMessage.key_len).roundUpToMultipleOf(Self.bufferAlignment) - rdKafkaMessage.key_len
buffer.moveWriterIndex(forwardBy: alignment)
}

buffer.writeBytes(valueBufferPointer)

var headers = [KafkaHeader]()
headers.reserveCapacity(headersCount)
try Self.forEachHeader(inMessage: messagePointer) { key, value in
let valueBuffer: ByteBuffer? = {
if value.count > 0 {
buffer.moveWriterIndex(to: buffer.readerIndex + value.count)
let ret = buffer.slice()
let newIndex = buffer.readerIndex + Int(value.count).roundUpToMultipleOf(Self.bufferAlignment)
buffer.moveWriterIndex(to: newIndex)
buffer.moveReaderIndex(to: newIndex)
return ret
} else {
return nil
}
}()
let header = KafkaHeader(key: String(cString: key), value: valueBuffer)
headers.append(header)
}

var key: ByteBuffer?
if rdKafkaMessage.key_len > 0 {
buffer.moveWriterIndex(to: buffer.readerIndex + rdKafkaMessage.key_len)
key = buffer.slice()
let newIndex = buffer.readerIndex + Int(rdKafkaMessage.key_len).roundUpToMultipleOf(Self.bufferAlignment)
buffer.moveWriterIndex(to: newIndex)
buffer.moveReaderIndex(to: newIndex)
}

buffer.moveWriterIndex(to: buffer.readerIndex + valueBufferPointer.count)

return (key: key, value: buffer, headers: headers)
}

/// Iterates over ``KafkaHeader``s from a `rd_kafka_message_t` pointer
/// applying the `body` function for each header.
///
Expand Down
Loading