Next Gen: Serialization / RPC #429
Replies: 8 comments 2 replies
-
@felixguendling thanks for starting this discussion! i'm not exactly sure what the best way to structure this is, but I posted your version 1 code above. offline in email alex and I provided some feedback on version 1. i think that feedback is fairly generic and applicable to any approach. |
Beta Was this translation helpful? Give feedback.
-
I'm wondering if there may be a way to provide defaults. So version 0 of a message may have no annotations on fields, as an example. |
Beta Was this translation helpful? Give feedback.
-
Questions around extension points. Currently we have extension point per type, I think it would be great to figure out a way to have extension points taking into account a version. i.e. (pseudo code): struct felix {
int x = 0;
};
struct parent {
field<felix, 3, 6> _f;
}; I'd like to be able to extend the actual
|
Beta Was this translation helpful? Give feedback.
-
Updated version addressing
Changed parts here ( template <typename T,
fixed_str FieldName = "unnamed", // For JSON/XML/CSV/... serialization
typename MinVersion = min_version<0>,
typename MaxVersion = max_version<kCurrVersion>>
struct field {
static constexpr char const* Name = FieldName;
static constexpr int MinV = MinVersion::v;
static constexpr int MaxV = MaxVersion::v;
field() = default;
field(T v) : _val{std::forward<T>(v)} {}
operator T const& () { return _val; }
T _val;
};
// Maybe not necessary.
// Current behaviour: always write/expect unversioned fields
constexpr auto const kDefaultUnversionedMinV = 0;
constexpr auto const kDefaultUnversionedMaxV = kCurrVersion;
template <typename T, typename = void>
struct is_versioned : std::false_type {};
template <typename T>
struct is_versioned<T, std::void_t<decltype(std::declval<T>().MinV)>> : std::true_type {};
template <typename T>
inline auto static constexpr is_versioned_v = is_versioned<T>::value;
// ================
// json_stdout_writer.h
// ----------------
#include <ostream>
struct json_writer { std::ostream& _out; };
template <int Version, typename T>
void write(json_writer& jw, T const& val) {
using Type = std::decay_t<T>;
if constexpr (std::is_aggregate_v<Type>) {
jw._out << "{\n";
for_each_field(val, [&](auto&& f) {
if constexpr (is_versioned_v<std::decay_t<decltype(f)>>) {
if (Version >= f.MinV && Version <= f.MaxV) {
jw._out << " \"" << f.Name << "\": ";
write<Version>(jw, f._val);
jw._out << '\n';
}
} else {
if (Version >= kDefaultUnversionedMinV &&
Version <= kDefaultUnversionedMaxV) {
jw._out << " \"unnamed\": ";
write<Version>(jw, f);
jw._out << '\n';
}
}
});
jw._out << "}\n";
} else if constexpr (std::is_same_v<std::string, Type>) {
jw._out << '"' << val << "\"";
} else {
jw._out << val;
}
}
// ================
// msg_definitions.h
// ----------------
struct special_struct { int _x, _y, _z; };
template <int Version>
std::enable_if_t<Version >= 6> // versions 6ff: generic array
write(json_writer& jw, special_struct const& val) {
jw._out << '[' << val._x << ',' << val._y << ',' << val._z << ']';
}
template <int Version>
std::enable_if_t<Version < 6> // versions until 6: compact for (x, y, z) values <10
write(json_writer& jw, special_struct const& val) {
jw._out << (val._x * 100 + val._y * 10 + val._z);
}
struct my_msg {
// field with custom writer for version >= 6
field<special_struct, "Special"> _special;
// unversioned field (will always be serialized)
std::string _payload;
// version 0, 1, 2, 3, 4 do send this field -> expect it
// version 5 and onward do not send this field -> don't expect it
field<int, "TTL", min_version<0>, max_version<4>> _ttl;
// version 0, 1 do not send this field -> don't expect it
// version >= 2 do send this field -> expect it
field<int, "Timeout", min_version<2>> _timeout;
};
// ================
// main.cc
// ----------------
#include <iostream>
int main() {
json_writer w{ ._out = std::cout };
std::cout << "Message to Server 1 (version 4)\n";
write<4>(w, my_msg{._special{{4, 5, 6}}, ._payload{"Hello"}, ._ttl{120}, ._timeout{30}});
std::cout << "\n";
// won't print TTL (has been remove in v4, current version is v6)
std::cout << "Message to Server 2 (version 6)\n";
write<6>(w, my_msg{._special{{4, 5, 6}}, ._payload{"Hello"}, ._ttl{120}, ._timeout{30}});
std::cout << "\n";
} Full version: https://godbolt.org/z/3q8Tvo |
Beta Was this translation helpful? Give feedback.
-
In my previous post, the version parameter has to be available at compile time. Since this is an unrealistic assumption (peers send their version as initial package at runtime), the following code makes the version a parameter at runtime. Essential new parts: namespace detail {
template <int Version, typename Writer, typename Msg>
bool write_if_version_matches(int const version, Writer&& w, Msg&& msg) {
if (Version == version || (Version == kCurrVersion && kCurrVersion <= version)) {
write<Version>(std::forward<Writer>(w), std::forward<Msg>(msg));
return true;
}
return false;
}
template <typename Writer, typename Msg, int... VersionSeq>
void write_impl(int const version, Writer&& w, Msg&& msg, std::integer_sequence<int, VersionSeq...>) {
(write_if_version_matches<VersionSeq>(version, std::forward<Writer>(w), std::forward<Msg>(msg)) || ...);
}
}
template <typename Writer, typename Msg>
void write(int const version, Writer&& w, Msg&& msg) {
detail::write_impl(
version,
std::forward<Writer>(w),
std::forward<Msg>(msg),
std::make_integer_sequence<int, kCurrVersion + 1>());
} // ================
// peer.h
// ----------------
#include <iostream>
struct peer {
template <typename Msg>
void send(Msg&& msg) {
write(_version, _writer, std::forward<Msg>(msg));
}
int _version;
json_writer _writer{ ._out = std::cout };
};
// ================
// main.cc
// ----------------
int main() {
peer p1{._version{4}}, p2{._version{7}};
std::cout << "Message to Server 1 (version " << p1._version << ")\n";
p1.send(my_msg{._special{{4, 5, 6}}, ._payload{"Hello"}, ._ttl{120}, ._timeout{30}});
std::cout << "\n";
// won't print TTL (has been remove in v4, current version is v6)
std::cout << "Message to Server 2 (version " << p2._version << ")\n";
p2.send(my_msg{._special{{4, 5, 6}}, ._payload{"Hello"}, ._ttl{120}, ._timeout{30}});
std::cout << "\n";
} Full version: https://godbolt.org/z/WT4z8n |
Beta Was this translation helpful? Give feedback.
-
@felixguendling , these concepts that you have put together are really great. I really like the idea of giving developers the power to write message definitions using annotated fields and extensions points. struct msg {
field<int, "TTL", min_version<0>, max_version<4>> ttl;
}; and as long as it continues to make sense, I think we should try this out. I'm going to attempt to push back against the design a little bit to see if there are weak spots or areas to improve. I think there is a workable fallback solution (hand-coded encoder/decoders) but it'd be great if we can make use of meta programming techniques to be safer, faster, and nicer to use. per-message metadataIndependent of how the API looks, I think it's good to think about what sort of metadata we really need for message encoding. Taking a look at Ceph's message encoding, two bytes are used per message to encode version information. It's fair to say that Ceph has likely encountered the full spectrum of possibilities here, so it seems like a good bet that it is sufficient. Here is a simplified example to demonstrate the metadata. struct message {
// start(version, compat_version)
// VERSION <- current version of the message (ie this code)
// COMPAT_VERSION <- oldest code that can decode this message
void encode() {
start(2, 1); // both of these are encoded in message header
encode(field);
encode(field);
}
// start(VERSION)
// VERSION <- current version of the message
void decode() {
start(2);
decode(field);
if (version <= 2) {
// SOME DYNAMIC CASE
}
if (version < 2 and compat_version > 1) {
// SOME DYNAMIC CASE
}
} So versioning is effectively at the message level, and per-field treatments are handled in code with control flow. New fields can be appended without require special cases--old code will simply not decode those fields. Again, this isn't an endorsement of the API just an example of the information used. When decoding, if the data being decoded can't be decoded based on version support the decoder throws. I think this is something we need too--some ability to catch mistakes. performanceThe per-field granularity in your solution is intriguing, but it seems like it introduces unnecessary costs because all versioning is applied per-field in insolation compared to exploiting message-level versioning, resulting in an I don't have a good intuition with the Are there cases with the breaking out of meta programmingThere are some practical reasons why I think we do need to be able to break out of meta programming magic and hand code the serialization. For example in this code snippet (https://github.com/vectorizedio/redpanda/blob/dev/src/v/storage/index_state.cc#L111) we had a bug which introduced the wrong data onto disk, and we effectively had to handle this explicitly and create an ad-hoc policy for the situation. Here is another recent complex example that occurred because we don't have a comprehensive approach to versioning components of a composite structure: https://github.com/vectorizedio/redpanda/blob/dev/src/v/raft/configuration.cc#L450 (e.g. a nested type with its own distinct versioning). I could imagine other cases later down the road where we decide to break backwards compatibility in rare cases because the costs are worth it for some reason. Being able to break out out of the prescribed approach with annotated fields feels necessary. apiI think it is fare to say that nearly all the message types we will have are likely to have (1) auxiliary data and (2) access methods. For example:
I'm not sure if the class message {
struct wire_msg {
...
};
extra fields;
methods;
}; short term goalsIn the short term we need to ensure our ability to evolve message types on the wire and on disk. I think everything above and what you are working on is in scope here. I think we should consider the types of simplifications we can make with hand coding each of the encoders. My observation is that (1) we currently have only a handful of types that are serialized. Writing all of the encoders/decoders by hand would be very little code. and (2) very few special cases--that is, most of the messages are at version 1. This would leave us open to clean-up the existing system we have for writing encoders. We have something right now that works well, but it is every verbose This short term phase includes switching over existing types to use the new protocol with versioning baked in. medium term goalIn the medium term we need to tackle network level issues such as exchanging information about version / features for connected peers, and in some cases making this information available within the context of encoding. Later development will need to to also exchange information at the connection level, so we should also ensure that we have a base version that we can use to transition away from. I think this is already done we just need to make sure we verify it is sufficient for evolution. |
Beta Was this translation helpful? Give feedback.
-
This new version introduces versioning attached to each message (i.e. a message header). There are two version fields (each one byte) in front of each message: the version this message was serialized with and the minimum required version to be able to read the message (compatibility/compat version). The reasoning behind this is that it's more flexible and generic because it's independent of the source of the versioning information (filename/fileheader vs. server sends its version in a handshake message). Therefore, we introduce a message struct that's parent to each user-defined message. struct my_other_msg : public message<my_other_msg> {
field<int, "SequenceNumber1"> _seq1;
field<int, "SequenceNumber2"> _seq2;
}; (the code currently contains a min/max version for each message; however this will be removed in the next step) The current version is available here: This addresses mainly @dotnwat's comments regarding per-message metadata. Regarding performance: Regarding api: one could either introduce a field annotation that expresses "do not (de)serialize" (something like Version 2#include <string>
#include <string_view>
namespace cista {
#if defined(_MSC_VER)
#define CISTA_SIG __FUNCSIG__
#elif defined(__clang__) || defined(__GNUC__)
#define CISTA_SIG __PRETTY_FUNCTION__
#else
#error unsupported compiler
#endif
template <typename T>
constexpr std::string_view type_str() {
#if defined(__clang__)
constexpr std::string_view prefix =
"std::string_view cista::type_str() [T = ";
constexpr std::string_view suffix = "]";
#elif defined(_MSC_VER)
constexpr std::string_view prefix =
"class std::basic_string_view<char,struct std::char_traits<char> > "
"__cdecl cista::type_str<";
constexpr std::string_view suffix = ">(void)";
#else
constexpr std::string_view prefix =
"constexpr std::string_view cista::type_str() [with T = ";
constexpr std::string_view suffix =
"; std::string_view = std::basic_string_view<char>]";
#endif
auto sig = std::string_view{CISTA_SIG};
sig.remove_prefix(prefix.size());
sig.remove_suffix(suffix.size());
return sig;
}
} // namespace cista
// ================
// for_each_field.h
// ----------------
#include <type_traits>
#include <tuple>
namespace detail {
struct instance {
template <typename Type>
operator Type() const;
};
template <typename Aggregate, typename IndexSequence = std::index_sequence<>,
typename = void>
struct arity_impl : IndexSequence {};
template <typename Aggregate, std::size_t... Indices>
struct arity_impl<Aggregate, std::index_sequence<Indices...>,
std::void_t<decltype(Aggregate{
(static_cast<void>(Indices), std::declval<instance>())...,
std::declval<instance>()})>>
: arity_impl<Aggregate,
std::index_sequence<Indices..., sizeof...(Indices)>> {};
} // namespace detail
template <typename T>
constexpr std::size_t arity() {
return detail::arity_impl<std::decay_t<T>>().size();
}
template <typename T>
inline auto to_tuple(T& t) {
constexpr auto const a = arity<T>() - 1;
static_assert(a <= 64, "Max. supported members: 64");
if constexpr (a == 2) {
auto& [p1, p2] = t;
return std::tie(p1, p2);
} else if constexpr (a == 3) {
auto& [p1, p2, p3] = t;
return std::tie(p1, p2, p3);
} else if constexpr (a == 4) {
auto& [p1, p2, p3, p4] = t;
return std::tie(p1, p2, p3, p4);
} else if constexpr (a == 5) {
auto& [p1, p2, p3, p4, p5] = t;
return std::tie(p1, p2, p3, p4, p5);
} else if constexpr (a == 6) {
auto& [p1, p2, p3, p4, p5, p6] = t;
return std::tie(p1, p2, p3, p4, p5, p6);
} else if constexpr (a == 7) {
auto& [p1, p2, p3, p4, p5, p6, p7] = t;
return std::tie(p1, p2, p3, p4, p5, p6, p7);
} else if constexpr (a == 8) {
auto& [p1, p2, p3, p4, p5, p6, p7, p8] = t;
return std::tie(p1, p2, p3, p4, p5, p6, p7, p8);
}
}
template <typename T, typename Fn>
inline void for_each_field(T& t, Fn&& fn) {
if constexpr (std::is_pointer_v<T>) {
if (t != nullptr) {
for_each_field(*t, std::forward<Fn>(fn));
}
} else if constexpr (std::is_scalar_v<T>) {
fn(t);
} else {
std::apply([&](auto&&... args) { (fn(args), ...); }, to_tuple(t));
}
}
// ================
// fixed_string.h
// ----------------
template<unsigned N>
struct fixed_str {
char buf[N + 1]{};
constexpr fixed_str(char const* s) {
for (auto i = 0U; i != N; ++i) {
buf[i] = s[i];
}
}
constexpr operator char const*() const { return buf; }
};
template<unsigned N> fixed_str(char const (&)[N]) -> fixed_str<N - 1>;
// ================
// field.h
// ----------------
#include <utility>
// Globally defined in the codebase.
// Increment after message definition changes.
static constexpr auto const kCurrVersion = 6;
// Helper types to make field definitions more verbose.
template <int V> struct min_version { static constexpr auto const v = V; };
template <int V> struct max_version { static constexpr auto const v = V; };
// When fading out fields max_version should be set to (kCurrVersion - 1)
//
// Example: (min_version=0 to keep it simple)
// current version 4 should stop supporting field X
// -> versions <=3 send & receive X to/from everyone
// -> versions >=4 send & receive X to/from version <=3,
// skip X to/from versions >=4
//
// Other solution: never remove fields
// => less efficient but easier to implement / reason about
template <typename T,
fixed_str FieldName = "unnamed", // For JSON/XML/CSV/... serialization
typename MinVersion = min_version<0>,
typename MaxVersion = max_version<kCurrVersion>>
struct field {
static constexpr char const* Name = FieldName;
static constexpr int MinV = MinVersion::v;
static constexpr int MaxV = MaxVersion::v;
field() = default;
field(T v) : _val{std::forward<T>(v)} {}
operator T const& () { return _val; }
T _val;
};
// Maybe not necessary.
// Current behaviour: always write/expect unversioned fields
constexpr auto const kDefaultUnversionedMinV = 0;
constexpr auto const kDefaultUnversionedMaxV = kCurrVersion;
template <typename T, typename = void>
struct is_versioned : std::false_type {};
template <typename T>
struct is_versioned<T, std::void_t<decltype(std::declval<T>().MinV)>> : std::true_type {};
template <typename T>
inline auto static constexpr is_versioned_v = is_versioned<T>::value;
// ================
// write.h
// ----------------
namespace detail {
template <int Version, typename Writer, typename Msg>
bool write_if_version_matches(int const version, Writer&& w, Msg&& msg) {
if (Version == version || (Version == kCurrVersion && kCurrVersion <= version)) {
write<Version>(std::forward<Writer>(w), std::forward<Msg>(msg));
return true;
}
return false;
}
template <typename Writer, typename Msg, int... VersionSeq>
void write_impl(int const version, Writer&& w, Msg&& msg, std::integer_sequence<int, VersionSeq...>) {
(write_if_version_matches<VersionSeq>(version, std::forward<Writer>(w), std::forward<Msg>(msg)) || ...);
}
}
template <typename Writer, typename Msg>
void write(int const version, Writer&& w, Msg&& msg) {
detail::write_impl(
version,
std::forward<Writer>(w),
std::forward<Msg>(msg),
std::make_integer_sequence<int, kCurrVersion + 1>());
}
// ================
// message.h
// ----------------
#include <type_traits>
template <typename T,
typename MinVersion = min_version<0>,
typename MaxVersion = max_version<kCurrVersion>>
struct message {
static constexpr int MessageMinV = MinVersion::v;
static constexpr int MessageMaxV = MaxVersion::v;
template <typename SerializeFn>
void write(SerializeFn&& serialize) const {
serialize(*this);
}
};
template <typename T>
struct inherits_from_message {
static constexpr auto const value =
std::is_base_of_v<
message<T,
min_version<T::MessageMinV>,
max_version<T::MessageMaxV>>,
T
>;
};
template <typename T, typename = void>
struct is_versioned_message : std::false_type {};
template <typename T>
struct is_versioned_message<T, std::void_t<decltype(std::declval<T>().MessageMinV)>> : std::true_type {};
template <typename T>
static inline constexpr auto const is_message_v =
std::conjunction_v<
is_versioned_message<T>,
inherits_from_message<T>
>;
// ================
// json_writer.h
// ----------------
#include <iostream>
struct json_writer { std::ostream& _out; };
template <int Version, typename T>
void write(json_writer& jw, T const& val) {
using Type = std::decay_t<T>;
if constexpr (is_versioned_v<Type>) {
if constexpr (Version >= Type::MinV && Version <= Type::MaxV) {
jw._out << " \"" << val.Name << "\": ";
write<Version>(jw, val._val);
jw._out << '\n';
}
} else if constexpr (is_message_v<Type> || std::is_aggregate_v<Type>) {
jw._out << "type: " << cista::type_str<Type>() << "\n";
if constexpr (is_message_v<Type>) {
jw._out << "header: min_v="
<< ((Type::MessageMinV != Type::MessageMaxV ? 128 : 0) + Type::MessageMinV);
if (Type::MessageMinV != Type::MessageMaxV) {
jw._out << ", max_v=" << Type::MessageMaxV;
}
jw._out << "\n";
}
jw._out << "{\n";
auto first = true;
for_each_field(val, [&](auto&& f) {
if constexpr (is_versioned_v<std::decay_t<decltype(f)>>) {
write<Version>(jw, f);
} else {
if constexpr (Version >= kDefaultUnversionedMinV &&
Version <= kDefaultUnversionedMaxV) {
jw._out << " \"unnamed\": ";
write<Version>(jw, f);
jw._out << '\n';
}
}
});
jw._out << "}\n";
} else if constexpr (std::is_same_v<std::string, Type>) {
jw._out << cista::type_str<Type>() << ": " << '"' << val << "\"";
} else {
jw._out << cista::type_str<Type>() << ": " << val;
}
}
// ================
// msg_definitions.h
// ----------------
struct special_struct { int _x, _y, _z; };
template <int Version>
std::enable_if_t<Version >= 6> // versions 6ff: generic array
write(json_writer& jw, special_struct const& val) {
jw._out << '[' << val._x << ',' << val._y << ',' << val._z << ']';
}
template <int Version>
std::enable_if_t<Version < 6> // versions until 6: compact for (x, y, z) values <10
write(json_writer& jw, special_struct const& val) {
jw._out << (val._x * 100 + val._y * 10 + val._z);
}
struct my_msg : public message<my_msg, min_version<3>, max_version<3>> {
// field with custom writer for version >= 6
field<special_struct, "Special"> _special;
// unversioned field (will always be serialized)
std::string _payload;
// version 0, 1, 2, 3, 4 do send this field -> expect it
// version 5 and onward do not send this field -> don't expect it
field<int, "TTL", min_version<0>, max_version<4>> _ttl;
// version 0, 1 do not send this field -> don't expect it
// version >= 2 do send this field -> expect it
field<int, "Timeout", min_version<2>> _timeout;
};
struct my_other_msg : public message<my_other_msg, min_version<3>, max_version<2>> {
field<int, "SequenceNumber1"> _seq1;
field<int, "SequenceNumber2"> _seq2;
};
static_assert(is_message_v<my_msg>);
static_assert(!is_message_v<int>);
// ================
// peer.h
// ----------------
#include <iostream>
struct peer {
template <typename Msg>
void send(Msg&& msg) {
write(_version, _writer, std::forward<Msg>(msg));
}
int _version;
json_writer _writer{ ._out = std::cout };
};
// ================
// main.cc
// ----------------
int main() {
peer p1{._version = 4}, p2{._version = 7};
std::cout << "Message to Server 1 (version " << p1._version << ")\n";
p1.send(my_msg{._special{{4, 5, 6}}, ._payload{"Hello"}, ._ttl{120}, ._timeout{30}});
std::cout << "\n";
// won't print TTL (has been remove in v4, current version is v6)
std::cout << "Message to Server 2 (version " << p2._version << ")\n";
p2.send(my_msg{._special{{4, 5, 6}}, ._payload{"Hello"}, ._ttl{120}, ._timeout{30}});
std::cout << "\n";
std::cout << "Message to Server 2 (version " << p2._version << ")\n";
p2.send(my_other_msg{._seq1 = 1337});
std::cout << "\n";
} |
Beta Was this translation helpful? Give feedback.
-
Hey @dotnwat thank you for your feedback!
Sounds like a great idea!
Maybe we can make the format compatible to the old format?
I think member variables that are not of type
I'm not sure if we need version (regardless of min/max or curr/compat) as template parameters at all. I tend to say no. They are completely runtime dependent.
I think having human readable messages representations could make debugging, logging, etc. really simple. The question is: is it worth the effort? I would assume so. However, if there are already tools in place for these tasks, we should skip this and only implement a binary serialization.
This should be a global compile time constant indicating the current serialization format version for the whole codebase. However, maybe this is better done on message type basis?
Yes, I should at least rename them but I now think that having them as template parameters is the wrong approach. But I think I won't fix this code but rather change this in the actual Redpanda codebase.
This code is not for the message but for fields. The idea was to allow for unversioned fields ( |
Beta Was this translation helpful? Give feedback.
-
The current approach to RPC has no easy way of versioning message types [0] and does not provide a way to communicate a set of capabilities a server provides at the start of a session. Of course, this can be done manually because the current mechanism is customizable for each message type. However, the basic approach does not distinguish protocol versions.
The purpose of this discussion is to determine how a new RPC mechanism could look like that supports versioning (upward and downward compatibility) so that servers with different versions in the cluster can seamlessly communicate with each other. Additionally, the RPC serialization format should be used for on-disk data storage as well. Therefore, newer versions need to be able to deserialize the on-disk format of older version.
The (de)serialization mechanisms should stay customizable to allow for the handling of special cases with manually written (as opposed to generated by the compiler through meta-programming) serialization and deserialization procedures.
Communication of server capabilities at the begin of each session can probably be implemented separately to the serialization format. However, the session initialization (first messages exchanged between servers) should probably contain the software version the server to configure the RPC mechanism.
For debugging purposes and maybe to ease the message exchange with software written in scripting languages (Python, Javascript, etc.), it may be useful to have a way to serialize to and from JSON. This requires each field to have a unique name (when using JSON structs).
Version 1
A first prototype mostly to demonstrate a potential syntax to define a message type is available here: https://godbolt.org/z/cr8Ezz
Beta Was this translation helpful? Give feedback.
All reactions