From 8ab3e9bdcced17777374acb9b423fa736a147d80 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Mon, 18 Jan 2021 16:44:01 +0000 Subject: [PATCH 01/13] Draft version (work in progress) --- .github/workflows/dotnet.yml | 2 +- Directory.Build.props | 2 +- Directory.Build.targets | 2 + README.md | 4 + .../Abstractions/BatchedQueryResults.cs | 27 ++ src/KsqlDb.Client/Abstractions/ColumnType.cs | 15 ++ .../Abstractions/ExecuteStatementResult.cs | 38 +++ src/KsqlDb.Client/Abstractions/FieldInfo.cs | 32 +++ src/KsqlDb.Client/Abstractions/IClient.cs | 146 +++++++++++ .../Abstractions/Objects/KSqlArray.cs | 80 ++++++ .../Abstractions/Objects/KSqlNull.cs | 11 + .../Abstractions/Objects/KSqlObject.cs | 93 +++++++ src/KsqlDb.Client/Abstractions/QueryInfo.cs | 39 +++ .../QueryResults/QueryResultRow.cs | 64 +++++ .../QueryResults/StreamedQueryResult.cs | 41 +++ src/KsqlDb.Client/Abstractions/QueryType.cs | 18 ++ .../Abstractions/RowInsertAcknowledgment.cs | 18 ++ .../Abstractions/SourceDescription.cs | 77 ++++++ src/KsqlDb.Client/Abstractions/SourceType.cs | 8 + .../Abstractions/StreamOrTableInfo.cs | 45 ++++ src/KsqlDb.Client/Abstractions/TopicInfo.cs | 39 +++ src/KsqlDb.Client/Client.cs | 175 +++++++++++++ src/KsqlDb.Client/Exceptions/ErrorDetails.cs | 31 +++ .../Exceptions/HttpMessageDetails.cs | 59 +++++ .../Exceptions/HttpRequestMessageDetails.cs | 50 ++++ .../Exceptions/HttpResponseMessageDetails.cs | 41 +++ .../Exceptions/KsqlDbException.cs | 51 ++++ src/KsqlDb.Client/InternalsVisibleTo.cs | 3 + .../Content/IKsqlRequestHttpContentFactory.cs | 27 ++ .../KsqlV1RequestHttpContentFactory.cs | 111 ++++++++ .../KsqlApiV1/KSqlDbHttpClient.cs | 236 ++++++++++++++++++ .../KsqlApiV1/Requests/CloseStreamRequest.cs | 19 ++ .../KsqlApiV1/Requests/KsqlRequest.cs | 41 +++ .../KsqlApiV1/Requests/QueryStreamRequest.cs | 32 +++ .../KsqlApiV1/Requests/TargetStream.cs | 9 + .../Responses/InsertStreamAckResponse.cs | 24 ++ .../KsqlApiV1/Responses/KsqlResponse.cs | 72 ++++++ src/KsqlDb.Client/KsqlDb.Client.csproj | 6 +- src/KsqlDb.Client/Parsers/KObjectParser.cs | 102 ++++++++ .../Parsers/QueryResultRowParser.cs | 60 +++++ src/KsqlDb.Client/Serdes/IJsonSerializer.cs | 71 ++++++ src/KsqlDb.Client/Serdes/JsonSerializer.cs | 65 +++++ .../Serdes/KSqlArrayConverter.cs | 15 ++ src/KsqlDb.Client/Serdes/KSqlNullConverter.cs | 14 ++ .../Serdes/KSqlObjectConverter.cs | 16 ++ .../DescribeTests.cs | 44 ++++ .../KsqlDb.Client.IntegrationTests.csproj | 7 +- .../ListStreamsTests.cs | 34 +++ .../ListTablesTests.cs | 34 +++ .../ListTopicsTests.cs | 33 +++ .../StreamInsertsTests.cs | 73 ++++++ .../StreamQueryTests.cs | 128 ++++++++++ .../TestClient.cs | 70 ++++++ .../UnitTest1.cs | 13 - .../KsqlDb.Client.UnitTests.csproj | 7 +- .../Parsers/KObjectParserTests.cs | 217 ++++++++++++++++ .../QueryResultRowParserTests.cs | 25 ++ test/KsqlDb.Client.UnitTests/UnitTest1.cs | 13 - test/docker-compose.yml | 43 +++- test/run-integration-tests.ps1 | 9 + test/setup-ksqldb-test-env.ps1 | 28 +++ 61 files changed, 2861 insertions(+), 48 deletions(-) create mode 100644 src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs create mode 100644 src/KsqlDb.Client/Abstractions/ColumnType.cs create mode 100644 src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs create mode 100644 src/KsqlDb.Client/Abstractions/FieldInfo.cs create mode 100644 src/KsqlDb.Client/Abstractions/IClient.cs create mode 100644 src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs create mode 100644 src/KsqlDb.Client/Abstractions/Objects/KSqlNull.cs create mode 100644 src/KsqlDb.Client/Abstractions/Objects/KSqlObject.cs create mode 100644 src/KsqlDb.Client/Abstractions/QueryInfo.cs create mode 100644 src/KsqlDb.Client/Abstractions/QueryResults/QueryResultRow.cs create mode 100644 src/KsqlDb.Client/Abstractions/QueryResults/StreamedQueryResult.cs create mode 100644 src/KsqlDb.Client/Abstractions/QueryType.cs create mode 100644 src/KsqlDb.Client/Abstractions/RowInsertAcknowledgment.cs create mode 100644 src/KsqlDb.Client/Abstractions/SourceDescription.cs create mode 100644 src/KsqlDb.Client/Abstractions/SourceType.cs create mode 100644 src/KsqlDb.Client/Abstractions/StreamOrTableInfo.cs create mode 100644 src/KsqlDb.Client/Abstractions/TopicInfo.cs create mode 100644 src/KsqlDb.Client/Client.cs create mode 100644 src/KsqlDb.Client/Exceptions/ErrorDetails.cs create mode 100644 src/KsqlDb.Client/Exceptions/HttpMessageDetails.cs create mode 100644 src/KsqlDb.Client/Exceptions/HttpRequestMessageDetails.cs create mode 100644 src/KsqlDb.Client/Exceptions/HttpResponseMessageDetails.cs create mode 100644 src/KsqlDb.Client/Exceptions/KsqlDbException.cs create mode 100644 src/KsqlDb.Client/InternalsVisibleTo.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Content/IKsqlRequestHttpContentFactory.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Content/KsqlV1RequestHttpContentFactory.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/KSqlDbHttpClient.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Requests/CloseStreamRequest.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Requests/KsqlRequest.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Requests/QueryStreamRequest.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Requests/TargetStream.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Responses/InsertStreamAckResponse.cs create mode 100644 src/KsqlDb.Client/KsqlApiV1/Responses/KsqlResponse.cs create mode 100644 src/KsqlDb.Client/Parsers/KObjectParser.cs create mode 100644 src/KsqlDb.Client/Parsers/QueryResultRowParser.cs create mode 100644 src/KsqlDb.Client/Serdes/IJsonSerializer.cs create mode 100644 src/KsqlDb.Client/Serdes/JsonSerializer.cs create mode 100644 src/KsqlDb.Client/Serdes/KSqlArrayConverter.cs create mode 100644 src/KsqlDb.Client/Serdes/KSqlNullConverter.cs create mode 100644 src/KsqlDb.Client/Serdes/KSqlObjectConverter.cs create mode 100644 test/KsqlDb.Client.IntegrationTests/DescribeTests.cs create mode 100644 test/KsqlDb.Client.IntegrationTests/ListStreamsTests.cs create mode 100644 test/KsqlDb.Client.IntegrationTests/ListTablesTests.cs create mode 100644 test/KsqlDb.Client.IntegrationTests/ListTopicsTests.cs create mode 100644 test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs create mode 100644 test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs create mode 100644 test/KsqlDb.Client.IntegrationTests/TestClient.cs delete mode 100644 test/KsqlDb.Client.IntegrationTests/UnitTest1.cs create mode 100644 test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs create mode 100644 test/KsqlDb.Client.UnitTests/QueryResultRowParserTests.cs delete mode 100644 test/KsqlDb.Client.UnitTests/UnitTest1.cs create mode 100644 test/run-integration-tests.ps1 create mode 100644 test/setup-ksqldb-test-env.ps1 diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index bea8d15..66ec618 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -123,7 +123,7 @@ jobs: with: name: docker-compose - name: Start ksqlDB in docker - run: docker-compose up -d --scale additional-ksqldb-server=0 + run: docker-compose up -d - name: Wait until ksqlDB is up and running run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false - name: Run test diff --git a/Directory.Build.props b/Directory.Build.props index 6bccdaa..1077141 100644 --- a/Directory.Build.props +++ b/Directory.Build.props @@ -1,6 +1,6 @@  - alex.basiuk@outlook.com + Alex Basiuk © Alex Basiuk KsqlDb.Client ksqlDB Client. diff --git a/Directory.Build.targets b/Directory.Build.targets index bb3185c..563b579 100644 --- a/Directory.Build.targets +++ b/Directory.Build.targets @@ -3,6 +3,8 @@ + + diff --git a/README.md b/README.md index 42df90d..edfc095 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,7 @@ ===================================================== ![Build](https://github.com/alex-basiuk/ksqlDB-client-dotnet/workflows/Build/badge.svg) + + +In development scenarios when server and client have a priori knowledge that both will speak HTTP/2 unencrypted, you may establish an HTTP/2 connection over cleartext by setting an AppContext switch or an environment variable (DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_HTTP2UNENCRYPTEDSUPPORT=1). +`AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);` diff --git a/src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs b/src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs new file mode 100644 index 0000000..a5e8a60 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs @@ -0,0 +1,27 @@ +using System.Collections.Generic; +using KsqlDb.Api.Client.Abstractions.QueryResults; + +namespace KsqlDb.Api.Client.Abstractions +{ + /// + /// A batches query result. + /// + public class BatchedQueryResults + { + /// + /// The result rows async enumerator. + /// + IReadOnlyCollection ResultRows { get; } + + /// + /// The Id of the underlying push query if applicable. + /// + public string? QueryId { get; } + + public BatchedQueryResults(IReadOnlyCollection resultRows, string? queryId) + { + ResultRows = resultRows; + QueryId = queryId; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/ColumnType.cs b/src/KsqlDb.Client/Abstractions/ColumnType.cs new file mode 100644 index 0000000..17dd95f --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/ColumnType.cs @@ -0,0 +1,15 @@ +namespace KsqlDb.Api.Client.Abstractions +{ + public enum ColumnType + { + String, + Integer, + Bigint, + Double, + Boolean, + Decimal, + Array, + Map, + Struct + } +} diff --git a/src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs b/src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs new file mode 100644 index 0000000..138b6b6 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs @@ -0,0 +1,38 @@ +using KsqlDb.Api.Client.Abstractions; + +namespace KSQL.API.Client +{ + /// + /// The result of the method. + /// + public class ExecuteStatementResult + { + /// + /// Returns the ID of a newly started persistent query, if applicable. The return value is empty + /// for all statements other than 'CREATE ... AS * SELECT' and 'INSERT * INTO ... AS SELECT' + /// statements, as only these statements start persistent queries. For statements that start + /// persistent queries, the return value may still be empty if either: + /// + /// + /// + /// The statement was not executed on the server by the time the server response was sent. + /// This typically does not happen under normal server operation, but may happen if the ksqlDB + /// server's command runner thread is stuck, or if the configured value for ksql.server.command.response.timeout.ms + /// is too low. + /// + /// + /// + /// + /// The ksqlDB server version is lower than 0.11.0. + /// + /// + /// + /// + string? QueryId; + + public ExecuteStatementResult(string? queryId) + { + QueryId = queryId; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/FieldInfo.cs b/src/KsqlDb.Client/Abstractions/FieldInfo.cs new file mode 100644 index 0000000..6e51f86 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/FieldInfo.cs @@ -0,0 +1,32 @@ +using KsqlDb.Api.Client.Abstractions; + +namespace KSQL.API.Client +{ + /// + /// A field/column of a ksqlDB stream/table. + /// + public class FieldInfo + { + /// + /// The field name. + /// + public string Name { get; } + + /// + /// The field type. + /// + public ColumnType Type { get; } + + /// + /// Whether this field is a key field, rather than a value field. + /// + public bool IsKey { get; } + + public FieldInfo(string name, ColumnType type, bool isKey) + { + Name = name; + Type = type; + IsKey = isKey; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/IClient.cs b/src/KsqlDb.Client/Abstractions/IClient.cs new file mode 100644 index 0000000..8a6692c --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/IClient.cs @@ -0,0 +1,146 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using KSQL.API.Client; +using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Abstractions.QueryResults; + +namespace KsqlDb.Api.Client.Abstractions +{ + public interface IClient + { + /// + /// Executes a pull or push query supplied in the and streams the result. + /// + /// The query statement. + /// The cancellation token that cancels the operation. + /// + /// A task that completes when the server response is received. + /// + Task StreamQuery(string sql, CancellationToken cancellationToken = default); + + /// + /// Executes a pull or push query supplied in the and streams the result. + /// + /// The query statement. + /// The query properties. + /// The cancellation token that cancels the operation. + /// + /// A task that completes when the server response is received. + /// + Task StreamQuery(string sql, IDictionary properties, CancellationToken cancellationToken = default); + + /// + /// Executes a pull or push query supplied in the and returns the result as a single batch or rows. + /// + /// The query statement. + /// The cancellation token that cancels the operation. + /// + /// A task that completes when the server response is received. + /// + Task BatchQuery(string sql, CancellationToken cancellationToken = default); + + /// + /// Executes a pull or push query supplied in the and returns the result as a single batch or rows. + /// + /// The query statement. + /// The query properties. + /// The cancellation token that cancels the operation. + /// + /// A task that completes when the server response is received. + /// + Task BatchQuery(string sql, IDictionary properties, CancellationToken cancellationToken = default); + + /// + /// Terminates a push query with the specified , + /// + /// The Id of the query to terminate. + /// The cancellation token. + /// A task that completes once the server response is received. + Task TerminatePushQuery(string queryId, CancellationToken cancellationToken = default); + + /// + /// Inserts a new into the + /// + /// The target stream name. + /// The row to be inserted into the stream. + /// The cancellation token. + /// A task that completes when the operation is completed. + Task InsertRow(string streamName, KSqlObject row, CancellationToken cancellationToken = default); + + /// + /// Inserts the asynchronous stream of rows in to the stream . + /// + /// The target stream name. + /// The asynchronous stream of rows. + /// The cancellation token. + /// The asynchronous stream of row acknowledgments. + IAsyncEnumerable StreamInserts(string streamName, IAsyncEnumerable rows, CancellationToken cancellationToken = default); + + /// + /// Sends a DDL/DML statement to the ksqlDB server. + /// This method supports the following statements: + /// - 'CREATE'; + /// - 'CREATE ... AS SELECT'; + /// - 'DROP', 'TERMINATE'; + /// - 'INSERT INTO ... AS SELECT'. + /// Each request should contain exactly one statement. Requests that contain multiple statements will be rejected by the client. + /// + /// The SQL statement to be executed. + /// The cancellation token. + /// A task that completes when the operation is completed. + Task ExecuteStatement(string sql, CancellationToken cancellationToken = default); + + /// + /// Sends a DDL/DML statement to the ksqlDB server. + /// This method supports the following statements: + /// - 'CREATE'; + /// - 'CREATE ... AS SELECT'; + /// - 'DROP', 'TERMINATE'; + /// - 'INSERT INTO ... AS SELECT'. + /// Each request should contain exactly one statement. Requests that contain multiple statements will be rejected by the client. + /// + /// The SQL statement to be executed. + /// The statement properties. + /// The cancellation token. + /// A task that completes when the operation is completed. + Task ExecuteStatement(string sql, IDictionary properties, CancellationToken cancellationToken = default); + + /// + /// Returns the list of ksqlDB streams from the ksqlDB server's metastore. + /// + /// The cancellation token. + /// A task that returns a collection of streams when completes. + Task> ListStreams(CancellationToken cancellationToken = default); + + /// + /// Returns the list of ksqlDB tables from the ksqlDB server's metastore. + /// + /// The cancellation token. + /// A task that returns a collection of tables when completes. + Task> ListTables(CancellationToken cancellationToken = default); + + /// + /// Returns the list of Kafka topics available for use with ksqlDB. + /// + /// The cancellation token. + /// A task that returns a collection of topics when completes. + Task> ListTopics(CancellationToken cancellationToken = default); + + /// + /// Returns the list of queries currently running on the ksqlDB server. + /// + /// The cancellation token. + /// A task that returns a collection of queries when completes. + Task> ListQueries(CancellationToken cancellationToken = default); + + /// + /// Returns metadata about the ksqlDB stream or table of the provided name. + /// + /// The stream or table name. + /// The cancellation token. + /// A task that returns metadata for the stream or table. + Task DescribeSource(string sourceName, CancellationToken cancellationToken = default); + } +} diff --git a/src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs b/src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs new file mode 100644 index 0000000..c630a99 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs @@ -0,0 +1,80 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace KsqlDb.Api.Client.Abstractions.Objects +{ + public class KSqlArray + { + private readonly List _items; + + public KSqlArray() : this(new List()) + { + } + + private KSqlArray(List items) => _items = items; + + public int Count => _items.Count; + + /// + /// + /// + /// + /// + public object this[int index] => _items[index]; + + /// + /// Returns the value at a specified index as a . + /// + /// The index. + /// The value at . + /// If the is invalid. + /// If the value is not a . + public string GetString(int index) => GetValue(index); + public int GetInteger(int index) => GetValue(index); + public long GetLong(int index) => GetValue(index); + public double GetDouble(int index) => GetValue(index); + public decimal GetDecimal(int index) => GetValue(index); + public KSqlArray GetKSqlArray(int index) => GetValue(index); + public KSqlObject GetKSqlObject(int index) => GetValue(index); + public bool IsNull(int index) => _items[index] is KSqlNull; + + public bool Remove(object value) => _items.Remove(value); + public void RemoveAt(int index) => _items.RemoveAt(index); + + public KSqlArray Add(string value) => AddValue(value); + public KSqlArray Add(int value) => AddValue(value); + public KSqlArray Add(long value) => AddValue(value); + public KSqlArray Add(double value) => AddValue(value); + public KSqlArray Add(decimal value) => AddValue(value); + public KSqlArray Add(KSqlArray value) => AddValue(value); + public KSqlArray Add(KSqlObject value) => AddValue(value); + public KSqlArray AddNull() => AddValue(KSqlNull.Instance); + public KSqlArray AddRange(KSqlArray array) + { + _items.AddRange(array._items); + return this; + } + + /// + /// Shallow copy. + /// + /// + public KSqlArray Copy() => new KSqlArray(_items); + + public IReadOnlyList AsReadOnlyList() => ImmutableArray.CreateRange(_items); + + private T GetValue(int index) + { + var item = _items[index]; + if (_items[index] is T value) return value; + throw new InvalidCastException($"The value at index {index} is not a {typeof(T).FullName}, it's a {item.GetType().FullName}"); + } + + internal KSqlArray AddValue(object value) + { + _items.Add(value); + return this; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/Objects/KSqlNull.cs b/src/KsqlDb.Client/Abstractions/Objects/KSqlNull.cs new file mode 100644 index 0000000..d9334c6 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/Objects/KSqlNull.cs @@ -0,0 +1,11 @@ +namespace KsqlDb.Api.Client.Abstractions.Objects +{ + public class KSqlNull + { + public static readonly KSqlNull Instance = new KSqlNull(); + + private KSqlNull() + { + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/Objects/KSqlObject.cs b/src/KsqlDb.Client/Abstractions/Objects/KSqlObject.cs new file mode 100644 index 0000000..d9e4115 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/Objects/KSqlObject.cs @@ -0,0 +1,93 @@ +using System; +using System.Collections.Generic; +using System.Collections.Immutable; + +namespace KsqlDb.Api.Client.Abstractions.Objects +{ + public class KSqlObject + { + private readonly Dictionary _map; + + public KSqlObject() : this(new Dictionary()) + { + } + + private KSqlObject(Dictionary map) => _map = map; + + public static KSqlObject FromArray(IList keys, KSqlArray values) + { + if (keys is null) throw new ArgumentNullException(nameof(keys)); + if (values is null) throw new ArgumentNullException(nameof(values)); + + if (keys.Count != values.Count) throw new ArgumentException($"Size of {nameof(keys)} and {nameof(values)} must match."); + + var result = new KSqlObject(); + for (int i = 0; i < keys.Count; i++) + { + result.AddValue(keys[i], values[i]); + } + + return result; + } + + public int Count => _map.Count; + + public IReadOnlyCollection FieldNames => _map.Keys; + + public object this[string key] => _map[key]; + + public bool ContainsKey(string key) => _map.ContainsKey(key); + + public string? TryGetString(string key) => TryGetValue(key); + public int? TryGetInteger(string key) => TryGetValue(key); + public long? TryGetLong(string key) => TryGetValue(key); + public double? TryGetDouble(string key) => TryGetValue(key); + public decimal? TryGetDecimal(string key) => TryGetValue(key); + public KSqlArray? TryGetKSqlArray(string key) => TryGetValue(key); + public KSqlObject? TryGetKSqlObject(string key) => TryGetValue(key); + public bool IsNull(string key) => _map[key] is KSqlNull; + + public bool Remove(string key) => _map.Remove(key); + + public KSqlObject Add(string key, string value) => AddValue(key, value); + public KSqlObject Add(string key, int value) => AddValue(key, value); + public KSqlObject Add(string key, long value) => AddValue(key, value); + public KSqlObject Add(string key, double value) => AddValue(key, value); + public KSqlObject Add(string key, decimal value) => AddValue(key, value); + public KSqlObject Add(string key, bool value) => AddValue(key, value); + public KSqlObject Add(string key, KSqlArray value) => AddValue(key, value); + public KSqlObject Add(string key, KSqlObject value) => AddValue(key, value); + public KSqlObject AddNull(string key) => AddValue(key, KSqlNull.Instance); + + public KSqlObject MergeIn(KSqlObject other) + { + foreach (var (key, value) in other._map) + { + _map.Add(key, value); + } + + return this; + } + + /// + /// Shallow copy. + /// + /// + public KSqlObject Copy() => new KSqlObject(_map); + + public ImmutableDictionary AsImmutableDictionary() => ImmutableDictionary.CreateRange(_map); + + private T? TryGetValue(string key) + { + if (!_map.TryGetValue(key, out object? value)) return default; + if (value is T targetValue) return targetValue; + throw new InvalidCastException($"The value associated with \"{key}\" key is not a {typeof(T).FullName}, it's a {value.GetType().FullName}"); + } + + internal KSqlObject AddValue(string key, object value) + { + _map.Add(key, value); + return this; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/QueryInfo.cs b/src/KsqlDb.Client/Abstractions/QueryInfo.cs new file mode 100644 index 0000000..256ecb5 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/QueryInfo.cs @@ -0,0 +1,39 @@ +namespace KSQL.API.Client +{ + /// + /// Metadata for a ksqlDB query. + /// + public class QueryInfo + { + /// + /// The type of this query + /// + QueryType QueryType { get; } + + /// + /// The ID of this query, used for control operations such as terminating the query. + /// + string Id { get; } + + /// + /// Returns the ksqlDB statement text corresponding to this query. This text may not be exactly the + /// statement submitted in order to start the query, but submitting this statement will result + /// in exactly this query. + /// + string Sql { get; } + + /// + /// Returns the ksqlDB stream or table sink and the underlying topic that this query writes to, if this query is + /// persistent. If this query is a push query, then the value is not set. + /// + (string name, string topic)? Sink { get; } + + public QueryInfo(QueryType queryType, string id, string sql, (string name, string topic)? sink) + { + QueryType = queryType; + Id = id; + Sql = sql; + Sink = sink; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/QueryResults/QueryResultRow.cs b/src/KsqlDb.Client/Abstractions/QueryResults/QueryResultRow.cs new file mode 100644 index 0000000..0db6619 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/QueryResults/QueryResultRow.cs @@ -0,0 +1,64 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using KsqlDb.Api.Client.Abstractions.Objects; + +namespace KsqlDb.Api.Client.Abstractions.QueryResults +{ + public class QueryResultRow + { + private readonly KSqlArray _values; + private readonly Dictionary _columnNameToIndex; + public IReadOnlyList<(string name, Type type)> ColumnNamesAndTypes { get; } + + public QueryResultRow(KSqlArray values, Dictionary columnNameToIndex, IReadOnlyList<(string, Type)> columnNamesAndTypes) + { + _values = values ?? throw new ArgumentNullException(nameof(values)); + _columnNameToIndex = columnNameToIndex ?? throw new ArgumentNullException(nameof(columnNameToIndex)); + ColumnNamesAndTypes = columnNamesAndTypes ?? throw new ArgumentNullException(nameof(columnNamesAndTypes)); + } + + public KSqlArray AsArray => _values.Copy(); + + public KSqlObject AsObject + { + get + { + string[] fieldNames = ColumnNamesAndTypes.Select(t => t.name).ToArray(); + return KSqlObject.FromArray(fieldNames, _values); + } + } + + public object this[int columnIndex] => _values[AdjustIndex(columnIndex)]; + public object this[string columnName] => _values[IndexFromName(columnName)]; + + public string GetString(int columnIndex) => _values.GetString(AdjustIndex(columnIndex)); + public string GetString(string columnName) => _values.GetString(IndexFromName(columnName)); + public int GetInteger(int columnIndex) => _values.GetInteger(AdjustIndex(columnIndex)); + public int GetInteger(string columnName) => _values.GetInteger(IndexFromName(columnName)); + public long GetLong(int columnIndex) => _values.GetLong(AdjustIndex(columnIndex)); + public long GetLong(string columnName) => _values.GetLong(IndexFromName(columnName)); + public double GetDouble(int columnIndex) => _values.GetDouble(AdjustIndex(columnIndex)); + public double GetDouble(string columnName) => _values.GetDouble(IndexFromName(columnName)); + public decimal GetDecimal(int columnIndex) => _values.GetDecimal(AdjustIndex(columnIndex)); + public decimal GetDecimal(string columnName) => _values.GetDecimal(IndexFromName(columnName)); + public KSqlArray GetKSqlArray(int columnIndex) => _values.GetKSqlArray(AdjustIndex(columnIndex)); + public KSqlArray GetKSqlArray(string columnName) => _values.GetKSqlArray(IndexFromName(columnName)); + public KSqlObject GetKSqlObject(int columnIndex) => _values.GetKSqlObject(AdjustIndex(columnIndex)); + public KSqlObject GetKSqlObject(string columnName) => _values.GetKSqlObject(IndexFromName(columnName)); + public bool IsNull(int columnIndex) => _values.IsNull(AdjustIndex(columnIndex)); + public bool IsNull(string columnName) => _values.IsNull(IndexFromName(columnName)); + + private int IndexFromName(string columnName) => + _columnNameToIndex.TryGetValue(columnName, out int columnIndex) + ? columnIndex + : throw new ArgumentException($"No column exists with name: {columnName}"); + + private int AdjustIndex(int columnIndex) + { + if (columnIndex <= 1) throw new ArgumentException($"Column index cannot be less than 1. The supplied value is {columnIndex}", nameof(columnIndex)); + if (columnIndex > _values.Count) throw new ArgumentException($"Column index cannot be greater than number of columns which is {_values.Count}. The supplied value is {columnIndex}", nameof(columnIndex)); + return columnIndex - 1; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/QueryResults/StreamedQueryResult.cs b/src/KsqlDb.Client/Abstractions/QueryResults/StreamedQueryResult.cs new file mode 100644 index 0000000..7a9cde4 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/QueryResults/StreamedQueryResult.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; + +namespace KsqlDb.Api.Client.Abstractions.QueryResults +{ + /// + /// A streamed query result. + /// + public class StreamedQueryResult + { + /// + /// The column names and types. + /// + public IReadOnlyCollection<(string name, Type type)> Columns { get; } + + /// + /// The asynchronous stream of result rows. + /// + public IAsyncEnumerable Rows { get; } + + /// + /// The Id of the underlying push query if applicable. + /// + public string? QueryId { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The column names and types. + /// The async stream of result rows. + /// The optional query Id. + public StreamedQueryResult(IReadOnlyCollection<(string name, Type type)> columns, + IAsyncEnumerable resultRows, + string? queryId) + { + Columns = columns ?? throw new ArgumentNullException(nameof(columns)); + Rows = resultRows ?? throw new ArgumentNullException(nameof(resultRows)); + QueryId = queryId; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/QueryType.cs b/src/KsqlDb.Client/Abstractions/QueryType.cs new file mode 100644 index 0000000..852b601 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/QueryType.cs @@ -0,0 +1,18 @@ +namespace KSQL.API.Client +{ + /// + /// A type of the ksqlDB running query. + /// + public enum QueryType + { + /// + /// The persistent query. + /// + Persistent, + + /// + /// The push query. + /// + Push + } +} diff --git a/src/KsqlDb.Client/Abstractions/RowInsertAcknowledgment.cs b/src/KsqlDb.Client/Abstractions/RowInsertAcknowledgment.cs new file mode 100644 index 0000000..f14b21f --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/RowInsertAcknowledgment.cs @@ -0,0 +1,18 @@ +using KsqlDb.Api.Client.Abstractions; + +namespace KSQL.API.Client +{ + /// + /// An acknowledgment from the ksqlDB server that a row has been successfully inserted into a ksqlDB stream. + /// + public class RowInsertAcknowledgment + { + /// + /// The corresponding sequence number for the acknowledgment. + /// Sequence numbers start at zero for each new request. + /// + public long SequenceNumber { get; } + + public RowInsertAcknowledgment(long sequenceNumber) => SequenceNumber = sequenceNumber; + } +} diff --git a/src/KsqlDb.Client/Abstractions/SourceDescription.cs b/src/KsqlDb.Client/Abstractions/SourceDescription.cs new file mode 100644 index 0000000..03d786e --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/SourceDescription.cs @@ -0,0 +1,77 @@ +using System.Collections.Generic; +using KSQL.API.Client; + +namespace KsqlDb.Api.Client.Abstractions +{ + /// + /// Metadata for a ksqlDB stream or table. + /// + public class SourceDescription + { + /// + /// The name of this stream or table. + /// + public string Name { get; } + + /// + /// The type of this source. + /// + public SourceType Type { get; } + + /// + /// The collection of fields (key and value) present in this stream/table. + /// + public IReadOnlyCollection Fields { get; } + + /// + /// The name of the Kafka topic underlying this ksqlDB stream/table. + /// + public string Topic { get; } + + /// + /// The serialization formats of the key and the value used in this stream/table. + /// + public (string key, string value) SerializationFormat { get; } + + /// + /// The collection of ksqlDB queries currently reading from this stream/table. + /// + public IReadOnlyCollection ReadQueries { get; } + + /// + /// The collection of ksqlDB queries currently writing to this stream/table. + /// + public IReadOnlyCollection WriteQueries { get; } + + /// + /// The name of the column configured as the TIMESTAMP for this stream/table, if any. + /// + public string? TimestampColumnName { get; } + + /// + /// The type of the window (e.g., "TUMBLING", "HOPPING", "SESSION") associated with this source, if this source is a windowed table. Else, empty. + /// + public string? WindowType { get; } + + /// + /// Returns the ksqlDB statement text used to create this stream/table. This text may not be + /// exactly the statement submitted in order to create this stream/table, but submitting this + /// statement will result in exactly this stream/table being created. + /// + public string SqlStatement { get; } + + public SourceDescription(string name, SourceType type, IReadOnlyCollection fields, string topic, (string key, string value) serializationFormat, IReadOnlyCollection readQueries, IReadOnlyCollection writeQueries, string? timestampColumnName, string? windowType, string sqlStatement) + { + Name = name; + Type = type; + Fields = fields; + Topic = topic; + SerializationFormat = serializationFormat; + ReadQueries = readQueries; + WriteQueries = writeQueries; + TimestampColumnName = timestampColumnName; + WindowType = windowType; + SqlStatement = sqlStatement; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/SourceType.cs b/src/KsqlDb.Client/Abstractions/SourceType.cs new file mode 100644 index 0000000..a4005f3 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/SourceType.cs @@ -0,0 +1,8 @@ +namespace KSQL.API.Client +{ + public enum SourceType + { + Stream, + Table + } +} diff --git a/src/KsqlDb.Client/Abstractions/StreamOrTableInfo.cs b/src/KsqlDb.Client/Abstractions/StreamOrTableInfo.cs new file mode 100644 index 0000000..459a298 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/StreamOrTableInfo.cs @@ -0,0 +1,45 @@ +using System.Diagnostics; + +namespace KsqlDb.Api.Client.Abstractions +{ + /// + /// Metadata for a ksqlDB stream. + /// + [DebuggerDisplay("Name={" + nameof(Name) + "}")] + public class StreamOrTableInfo + { + /// + /// The name of this stream or table. + /// + public string Name { get; } + + /// + /// The name of the Kafka topic underlying this ksqlDB stream or table. + /// + public string Topic { get; } + + /// + /// The key format of the data in this stream or table. + /// + public string KeyFormat { get; } + + /// + /// The value format of the data in this stream or table. + /// + public string ValueFormat { get; } + + /// + /// Whether the key is windowed. + /// + public bool? IsWindowed { get; } + + public StreamOrTableInfo(string name, string topic, string keyFormat, string valueFormat, bool? isWindowed = null) + { + Name = name; + Topic = topic; + KeyFormat = keyFormat; + ValueFormat = valueFormat; + IsWindowed = isWindowed; + } + } +} diff --git a/src/KsqlDb.Client/Abstractions/TopicInfo.cs b/src/KsqlDb.Client/Abstractions/TopicInfo.cs new file mode 100644 index 0000000..9db01b2 --- /dev/null +++ b/src/KsqlDb.Client/Abstractions/TopicInfo.cs @@ -0,0 +1,39 @@ +using System.Collections.Generic; +using System.Diagnostics; + +namespace KsqlDb.Api.Client.Abstractions +{ + /// + /// Metadata for a Kafka topic available for use with ksqlDB. + /// + [DebuggerDisplay("Topic={" + nameof(Topic) + "}")] + public class TopicInfo + { + /// + /// The name of this topic. + /// + public string Topic { get; } + + /// + /// The number of replicas for each topic partition. + /// + public int NumberOfPartitions => ReplicasPerPartition.Count; + + /// + /// The number of replicas for each topic partition. + /// + /// + /// A dictionary where the key represents a partition index and the value represents the corresponding number of replicas. + /// + /// + /// The size of the dictionary is equal to . + /// + public IReadOnlyCollection ReplicasPerPartition { get; } + + public TopicInfo(string topic, IReadOnlyCollection replicasPerPartition) + { + Topic = topic; + ReplicasPerPartition = replicasPerPartition; + } + } +} diff --git a/src/KsqlDb.Client/Client.cs b/src/KsqlDb.Client/Client.cs new file mode 100644 index 0000000..af7ee07 --- /dev/null +++ b/src/KsqlDb.Client/Client.cs @@ -0,0 +1,175 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Text.RegularExpressions; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using KSQL.API.Client; +using KsqlDb.Api.Client.Abstractions; +using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Abstractions.QueryResults; +using KsqlDb.Api.Client.Exceptions; +using KsqlDb.Api.Client.KsqlApiV1; +using KsqlDb.Api.Client.KsqlApiV1.Responses; +using KsqlDb.Api.Client.Parsers; + +namespace KsqlDb.Api.Client +{ + internal class Client : IClient + { + private static readonly Regex _batchRequestValidationRegex = new Regex(@"EMIT\s+CHANGES\s+^(LIMIT\s+\d+)", RegexOptions.Singleline | RegexOptions.IgnoreCase | RegexOptions.Compiled); + private static readonly Dictionary _emptyProperties = new Dictionary(); + private readonly KSqlDbHttpClient _httpClient; + + public Client(KSqlDbHttpClient httpClient) + { + _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); + } + + public Task StreamQuery(string sql, CancellationToken cancellationToken = default) => StreamQuery(sql, _emptyProperties, cancellationToken); + + public async Task StreamQuery(string sql, IDictionary properties, CancellationToken cancellationToken = default) + { + ThrowIfInvalidStatement(sql); + const int bufferCapacity = 200; //TODO Make it configurable + var channel = Channel.CreateBounded(bufferCapacity); + _ = _httpClient.PostQueryStream(channel.Writer, sql, properties, cancellationToken); + + if (!await channel.Reader.WaitToReadAsync(cancellationToken)) + { + // We should end up here only in case of http error. + // Observer the task and throw the underlying exception. + await channel.Reader.Completion; + } + + var header = await channel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); // Can throw ChannelClosedException + string? queryId = header.TryGetProperty("queryId", out var queryIdElement) ? queryIdElement.GetString() : default; + var parser = new QueryResultRowParser(header); + var rows = channel.Reader.ReadAllAsync(cancellationToken).Select(parser.Parse); + return new StreamedQueryResult(parser.ColumnNamesAndTypes, rows, queryId); + } + + public Task BatchQuery(string sql, CancellationToken cancellationToken = default) => BatchQuery(sql, _emptyProperties, cancellationToken); + + public async Task BatchQuery(string sql, IDictionary properties, CancellationToken cancellationToken = default) + { + if (!_batchRequestValidationRegex.Match(sql).Success) throw new ArgumentException("The SQL statement must be either a pull query (without EMIT CHANGES clause) or a limiting push query (with EMIT CHANGES LIMIT {rows_number}).", nameof(sql)); + var streamedResult = await StreamQuery(sql, properties, cancellationToken); + var rows = await streamedResult.Rows.ToArrayAsync(cancellationToken).ConfigureAwait(false); + return new BatchedQueryResults(rows, streamedResult.QueryId); + } + + public Task TerminatePushQuery(string queryId, CancellationToken cancellationToken = default) => _httpClient.PostKqsl($"TERMINATE {queryId}", cancellationToken: cancellationToken); + + public async Task InsertRow(string streamName, KSqlObject row, CancellationToken cancellationToken = default) + { + var channel = Channel.CreateBounded(1); + channel.Writer.TryWrite(row); + channel.Writer.Complete(); + var acks = StreamInserts(streamName, channel.Reader.ReadAllAsync(cancellationToken), cancellationToken).ConfigureAwait(true); + await foreach (var ack in acks) + { + return ack; + } + + throw new KsqlDbException("An acknowledgement hasn't been received"); + } + + public async IAsyncEnumerable StreamInserts(string streamName, IAsyncEnumerable rows, [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + const int bufferCapacity = 200; //TODO Make it configurable + var channel = Channel.CreateBounded(bufferCapacity); + _ = _httpClient.PostInsertStream(streamName, rows, channel.Writer, cancellationToken); + + while (await channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) + { + while (channel.Reader.TryRead(out var item)) + { + if (string.Equals(item.Status, "ok", StringComparison.OrdinalIgnoreCase)) + { + yield return new RowInsertAcknowledgment(item.Seq); + } + else if (string.Equals(item.Status, "error", StringComparison.OrdinalIgnoreCase)) + { + throw new KsqlDbException("Received an error while trying to insert into stream.") + { + Body = new ErrorDetails { ErrorCode = item.ErrorCode ?? default, Message = item.Message } + }; + } + else + { + throw new KsqlDbException($"Received an unrecognized status while trying to insert into stream: {item.Status}."); + } + } + } + + await channel.Reader.Completion; + } + + public Task ExecuteStatement(string sql, CancellationToken cancellationToken = default) => ExecuteStatement(sql, _emptyProperties, cancellationToken); + + public async Task ExecuteStatement(string sql, IDictionary properties, CancellationToken cancellationToken = default) + { + var responses = await _httpClient.PostKqsl(sql, properties, cancellationToken: cancellationToken).ConfigureAwait(false); + var response = EnsureSingleEntityResponse(responses); + return new ExecuteStatementResult(response.CommandId); + } + + public async Task> ListStreams(CancellationToken cancellationToken = default) + { + static StreamOrTableInfo Map(KsqlResponse.StreamOrTableInfo stream) => new StreamOrTableInfo(stream.Name, stream.Topic, "KAFKA", stream.Format); + return await ListEntities("LIST STREAMS;", r => r.Streams, Map, cancellationToken).ConfigureAwait(false); + } + + public async Task> ListTables(CancellationToken cancellationToken = default) + { + //TODO REST Response has only 1 format field. + static StreamOrTableInfo Map(KsqlResponse.StreamOrTableInfo table) => new StreamOrTableInfo(table.Name, table.Topic, "KAFKA", table.Format, table.IsWindowed); + return await ListEntities("LIST TABLES;", r => r.Tables, Map, cancellationToken).ConfigureAwait(false); + } + + public async Task> ListTopics(CancellationToken cancellationToken = default) + { + static TopicInfo Map(KsqlResponse.TopicInfo topic) => new TopicInfo(topic.Name, topic.ReplicaInfo); + return await ListEntities("LIST TOPICS;", r => r.Topics, Map, cancellationToken).ConfigureAwait(false); + } + + public async Task> ListQueries(CancellationToken cancellationToken = default) + { + // TODO REST response doesn't match Java client + static QueryInfo Map(KsqlResponse.QueryInfo query) => new QueryInfo(query.Sinks == null ? QueryType.Push : QueryType.Persistent, query.Id, query.QueryString, null); + return await ListEntities("LIST QUERIES;", r => r.Queries, Map, cancellationToken).ConfigureAwait(false); + } + + public Task DescribeSource(string sourceName, CancellationToken cancellationToken = default) => throw new System.NotImplementedException(); + + private async Task> ListEntities(string sql, Func getEntityArray, Func mapInputToOutput, CancellationToken cancellationToken = default) + { + var responses = await _httpClient.PostKqsl(sql, cancellationToken: cancellationToken).ConfigureAwait(false); + var response = EnsureSingleEntityResponse(responses); + var entitiesArray = getEntityArray(response); + if (entitiesArray == null || entitiesArray.Length == 0) return Array.Empty(); + + var result = new TOutput[entitiesArray.Length]; + for (int i = 0; i < result.Length; i++) + { + result[i] = mapInputToOutput(entitiesArray[i]); + } + return result; + } + + private static KsqlResponse EnsureSingleEntityResponse(KsqlResponse[]? responses) => + responses?.SingleOrDefault() ?? throw new KsqlDbException($"Unexpected number of entities in server response: {responses?.Length ?? 0}. Expected 1."); + + private static void ThrowIfInvalidStatement(string sql) + { + if (sql is null) throw new ArgumentNullException(nameof(sql)); + var trimmedSql = sql.AsSpan().Trim(); + if (trimmedSql[^1] != ';') throw new ArgumentException($"Missing semicolon in the SQL statement: {sql}", nameof(sql)); + if (trimmedSql.IndexOf(';') != trimmedSql.Length - 1) throw new ArgumentException($"Only one KSQL statement can be executed at a time. The supplied statement: {sql}"); + } + } +} diff --git a/src/KsqlDb.Client/Exceptions/ErrorDetails.cs b/src/KsqlDb.Client/Exceptions/ErrorDetails.cs new file mode 100644 index 0000000..766eee1 --- /dev/null +++ b/src/KsqlDb.Client/Exceptions/ErrorDetails.cs @@ -0,0 +1,31 @@ +using System.Collections.Generic; +using System.Text.Json.Serialization; + +namespace KsqlDb.Api.Client.Exceptions +{ + /// + /// ksqlDB error details. + /// + public class ErrorDetails + { + /// + /// The error code. + /// + [JsonPropertyName("error_code")] + public int ErrorCode { get; set; } + + /// + /// The error message. + /// +#nullable disable annotations + public string Message { get; set; } +#nullable restore annotations + + /// + /// The additional fields returned by some endpoint. + /// They provide more context for handling the error. + /// + [JsonExtensionData] + public Dictionary? AdditionalFields { get; set; } + } +} diff --git a/src/KsqlDb.Client/Exceptions/HttpMessageDetails.cs b/src/KsqlDb.Client/Exceptions/HttpMessageDetails.cs new file mode 100644 index 0000000..5dff27e --- /dev/null +++ b/src/KsqlDb.Client/Exceptions/HttpMessageDetails.cs @@ -0,0 +1,59 @@ +using System.Collections.Generic; +using System.Linq; +using System.Net.Http; +using System.Net.Http.Headers; + +namespace KsqlDb.Api.Client.Exceptions +{ + /// + /// The base class for and . + /// It contains request/response details which are not linked to the lifetime of the respective . + /// + public class HttpMessageDetails + { + /// + /// The Http message contents. + /// + public string? Content { get; } + + /// + /// The Http headers. + /// + public IDictionary> Headers { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The http message content. + protected HttpMessageDetails(string? content) + { + Content = content; + Headers = new Dictionary>(); + } + + /// + /// Copies Http headers + /// + /// The message headers. + /// The content headers. + protected void CopyHeaders(HttpHeaders? messageHeaders, HttpHeaders? contentHeaders) + { + static IEnumerable EmptyIfNull(IEnumerable? e) => e ?? Enumerable.Empty(); + + var headers = EmptyIfNull(messageHeaders).Concat(EmptyIfNull(contentHeaders)); + + foreach (var (key, value) in headers) + { + if (Headers.TryGetValue(key, out var existingValues)) + { + existingValues = existingValues.Concat(value).ToArray(); + } + else + { + existingValues = value; + } + Headers[key] = existingValues; + } + } + } +} diff --git a/src/KsqlDb.Client/Exceptions/HttpRequestMessageDetails.cs b/src/KsqlDb.Client/Exceptions/HttpRequestMessageDetails.cs new file mode 100644 index 0000000..d973fb1 --- /dev/null +++ b/src/KsqlDb.Client/Exceptions/HttpRequestMessageDetails.cs @@ -0,0 +1,50 @@ +using System; +using System.Collections.Generic; +using System.Net.Http; + +namespace KsqlDb.Api.Client.Exceptions +{ + /// + /// The details of the http request. + /// + /// + /// It copies data from the respective and not linked to the lifetime of the respective . + /// + public class HttpRequestMessageDetails : HttpMessageDetails + { + /// + /// The Http method used by the Http request message. + /// + public HttpMethod Method { get; } + + /// + /// The Uri used for the Http request. + /// + public Uri? RequestUri { get; } + + /// + /// The properties for the Http request. + /// + public IDictionary Properties { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The http request message. + /// The http message content. + /// Throws if is not provided. + public HttpRequestMessageDetails(HttpRequestMessage httpRequest, string? content) + : base(content) + { + if (httpRequest == null) throw new ArgumentNullException(nameof(httpRequest)); + CopyHeaders(httpRequest.Headers, httpRequest.Content?.Headers); + Method = httpRequest.Method; + RequestUri = httpRequest.RequestUri; +#if NET5_0 + Properties = new Dictionary(httpRequest.Options); +#else + Properties = new Dictionary(httpRequest.Properties); +#endif + } + } +} diff --git a/src/KsqlDb.Client/Exceptions/HttpResponseMessageDetails.cs b/src/KsqlDb.Client/Exceptions/HttpResponseMessageDetails.cs new file mode 100644 index 0000000..1a6f089 --- /dev/null +++ b/src/KsqlDb.Client/Exceptions/HttpResponseMessageDetails.cs @@ -0,0 +1,41 @@ +using System.Net; +using System.Net.Http; + +namespace KsqlDb.Api.Client.Exceptions +{ + /// + /// The details of the http response. + /// + /// + /// It copies data from the respective and not linked to the lifetime of the respective . + /// + public class HttpResponseMessageDetails : HttpMessageDetails + { + /// + /// The status code of the Http response. + /// + public HttpStatusCode StatusCode { get; } + + /// + /// The reason phrase. + /// + /// + /// It typically sent along with the status code. + /// + public string? ReasonPhrase { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The http response message. + /// The http message content. + public HttpResponseMessageDetails(HttpResponseMessage? httpResponse, string? content = null) + : base(content) + { + if (httpResponse == null) return; + CopyHeaders(httpResponse.Headers, httpResponse.Content.Headers); + StatusCode = httpResponse.StatusCode; + ReasonPhrase = httpResponse.ReasonPhrase; + } + } +} diff --git a/src/KsqlDb.Client/Exceptions/KsqlDbException.cs b/src/KsqlDb.Client/Exceptions/KsqlDbException.cs new file mode 100644 index 0000000..99b1da9 --- /dev/null +++ b/src/KsqlDb.Client/Exceptions/KsqlDbException.cs @@ -0,0 +1,51 @@ +using System; + +namespace KsqlDb.Api.Client.Exceptions +{ + /// + /// An exception generated from an http response returned bt the ksqlDB. + /// + public class KsqlDbException : Exception + { + /// + /// Initializes a new instance of the class. + /// + public KsqlDbException() + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The exception message. + public KsqlDbException(string message) + : base(message) + { + } + + /// + /// Initializes a new instance of the class. + /// + /// The exception message. + /// Inner exception. + public KsqlDbException(string message, Exception innerException) + : base(message, innerException) + { + } + + /// + /// Gets information about the associated HTTP request. + /// + public HttpRequestMessageDetails? Request { get; set; } + + /// + /// Gets information about the associated HTTP response. + /// + public HttpResponseMessageDetails? Response { get; set; } + + /// + /// Gets or sets the response object. + /// + public ErrorDetails? Body { get; set; } + } +} diff --git a/src/KsqlDb.Client/InternalsVisibleTo.cs b/src/KsqlDb.Client/InternalsVisibleTo.cs new file mode 100644 index 0000000..0d7642f --- /dev/null +++ b/src/KsqlDb.Client/InternalsVisibleTo.cs @@ -0,0 +1,3 @@ +using System.Runtime.CompilerServices; +[assembly:InternalsVisibleTo("KsqlDb.Client.UnitTests")] +[assembly:InternalsVisibleTo("KsqlDb.Client.IntegrationTests")] diff --git a/src/KsqlDb.Client/KsqlApiV1/Content/IKsqlRequestHttpContentFactory.cs b/src/KsqlDb.Client/KsqlApiV1/Content/IKsqlRequestHttpContentFactory.cs new file mode 100644 index 0000000..a5f6baa --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Content/IKsqlRequestHttpContentFactory.cs @@ -0,0 +1,27 @@ +using System.Collections.Generic; +using System.Net.Http; + +namespace KsqlDb.Api.Client.KsqlApiV1.Content +{ + /// + /// The ksqlDb request content factory. + /// + internal interface IKsqlRequestHttpContentFactory + { + /// + /// Create an instance of ksqDb request content. + /// + /// The request. + /// The instance of . + HttpContent CreateContent(object value); + + /// + /// Create an instance of ksqDb delimited request content. + /// + /// The request. + /// The instance of . + HttpContent CreateDelimitedContent(object value); + + public HttpContent CreateDelimitedStreamingContent(string target, IAsyncEnumerable values) where T : class; + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/Content/KsqlV1RequestHttpContentFactory.cs b/src/KsqlDb.Client/KsqlApiV1/Content/KsqlV1RequestHttpContentFactory.cs new file mode 100644 index 0000000..1c19441 --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Content/KsqlV1RequestHttpContentFactory.cs @@ -0,0 +1,111 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Threading.Tasks; +using KsqlDb.Api.Client.KsqlApiV1.Requests; +using KsqlDb.Api.Client.Serdes; + +namespace KsqlDb.Api.Client.KsqlApiV1.Content +{ + /// + /// The factory for the V1 request schema. + /// + internal class KsqlV1RequestHttpContentFactory : IKsqlRequestHttpContentFactory + { + private static readonly MediaTypeHeaderValue s_ksqlV1 = new MediaTypeHeaderValue(KSqlDbHttpClient.SupportedMediaType); + private static readonly MediaTypeHeaderValue s_ksqlDelimitedV1 = new MediaTypeHeaderValue(KSqlDbHttpClient.SupportedDelimitedMediaType); + + private readonly IJsonSerializer _jsonSerializer; + + /// + /// Initializes a new instance of the class. + /// + /// The JSON serializer. + public KsqlV1RequestHttpContentFactory(IJsonSerializer jsonSerializer) => _jsonSerializer = jsonSerializer ?? throw new ArgumentNullException(nameof(jsonSerializer)); + + /// + public HttpContent CreateContent(object value) => new KsqlV1HttpContent(_jsonSerializer, value, s_ksqlV1); + + /// + public HttpContent CreateDelimitedContent(object value) => new KsqlV1HttpContent(_jsonSerializer, value, s_ksqlDelimitedV1); + + /// + public HttpContent CreateDelimitedStreamingContent(string target, IAsyncEnumerable values) where T : class + { + /*async Task ToStream(Stream stream) + { + await _jsonSerializer.SerializeAsync(stream, new TargetStream {Target = target}); + await _jsonSerializer.SerializeAsync(stream, Environment.NewLine); + await foreach (T value in values.ConfigureAwait(false)) + { + await _jsonSerializer.SerializeAsync(stream, value); + await _jsonSerializer.SerializeAsync(stream, Environment.NewLine); + } + } + + using var stream = new MemoryStream(); + ToStream(stream); + var sc = new StreamContent(); + sc.Headers.ContentType = s_ksqlDelimitedV1;*/ + return new KsqlV1StreamingHttpContent(_jsonSerializer, new TargetStream {Target = target}, values, s_ksqlDelimitedV1); + } + + private sealed class KsqlV1HttpContent : HttpContent + { + private readonly IJsonSerializer _jsonSerializer; + private readonly object _value; + + public KsqlV1HttpContent(IJsonSerializer jsonSerializer, object value, MediaTypeHeaderValue supportedMediaTypeHeader) + { + _jsonSerializer = jsonSerializer; + _value = value; + Headers.ContentType = supportedMediaTypeHeader; + } + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context) => _jsonSerializer.SerializeAsync(stream, _value); + + protected override bool TryComputeLength(out long length) + { + length = 0; + return false; + } + } + + private sealed class KsqlV1StreamingHttpContent : HttpContent where T : class + { + private readonly IJsonSerializer _jsonSerializer; + private readonly TargetStream _target; + private readonly IAsyncEnumerable _values; + + public KsqlV1StreamingHttpContent(IJsonSerializer jsonSerializer, TargetStream target, IAsyncEnumerable values, MediaTypeHeaderValue supportedMediaTypeHeader) + { + _jsonSerializer = jsonSerializer; + _target = target; + _values = values; + Headers.ContentType = supportedMediaTypeHeader; + } + + protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context) + { + byte[] lineSeparator = Encoding.UTF8.GetBytes(Environment.NewLine); + await _jsonSerializer.SerializeAsync(stream, _target); + await stream.WriteAsync(lineSeparator); + await foreach (T value in _values.ConfigureAwait(false)) + { + await _jsonSerializer.SerializeAsync(stream, value); + await stream.WriteAsync(lineSeparator); + } + } + + protected override bool TryComputeLength(out long length) + { + length = 0; + return false; + } + } + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/KSqlDbHttpClient.cs b/src/KsqlDb.Client/KsqlApiV1/KSqlDbHttpClient.cs new file mode 100644 index 0000000..09535ab --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/KSqlDbHttpClient.cs @@ -0,0 +1,236 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Net.Http; +using System.Net.Http.Headers; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; +using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Exceptions; +using KsqlDb.Api.Client.KsqlApiV1.Content; +using KsqlDb.Api.Client.KsqlApiV1.Requests; +using KsqlDb.Api.Client.KsqlApiV1.Responses; +using KsqlDb.Api.Client.Serdes; + +namespace KsqlDb.Api.Client.KsqlApiV1 +{ + internal class KSqlDbHttpClient + { + internal const string SupportedMediaType = "application/vnd.ksql.v1+json"; + internal const string SupportedDelimitedMediaType = "application/vnd.ksqlapi.delimited.v1"; + + private readonly IKsqlRequestHttpContentFactory _httpContentFactory; + private readonly IJsonSerializer _jsonSerializer; + + private static class Endpoints + { + public const string QueryStream = "/query-stream"; + public const string CloseStream = "/close-stream"; + public const string InsertStream = "/inserts-stream"; + public const string Ksql = "/ksql"; + } + + private readonly HttpClient _httpClient; + + public KSqlDbHttpClient(HttpClient httpClient, IKsqlRequestHttpContentFactory httpContentFactory, IJsonSerializer jsonSerializer) + { + _httpClient = httpClient ?? throw new ArgumentNullException(nameof(httpClient)); + _httpContentFactory = httpContentFactory ?? throw new ArgumentNullException(nameof(httpContentFactory)); + _jsonSerializer = jsonSerializer ?? throw new ArgumentNullException(nameof(jsonSerializer)); + //if (_httpClient.BaseAddress.Scheme != Uri.UriSchemeHttps) AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true); + //_httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(SupportedMediaType)); + //_httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue(SupportedDelimitedMediaType)); + } + + public async Task PostQueryStream(ChannelWriter channelWriter, string sql, IDictionary? properties = null, CancellationToken cancellationToken = default) + { + var requestContent = new QueryStreamRequest(sql, properties); + using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, Endpoints.QueryStream) + { + Content = _httpContentFactory.CreateDelimitedContent(requestContent), + Version = new Version(2, 0), +#if NET5_0 + VersionPolicy = HttpVersionPolicy.RequestVersionOrHigher +#endif + }; + + HttpResponseMessage response = null!; + try + { + response = await _httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + response.EnsureSuccessStatusCode(); + //EnsureResponseMediaType(response, SupportedDelimitedMediaType); // The header is not available + await using var responseStream = await ReadAsStream(response, cancellationToken); + using var sr = new StreamReader(responseStream, Encoding.UTF8); + while (!sr.EndOfStream) + { + string? line = await sr.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(line)) continue; + + //TODO It's convenient to see string representation while developing the component, but it's suboptimal. + var deserializedLine = (JsonElement)_jsonSerializer.Deserialize(line); + await channelWriter.WriteAsync(deserializedLine, cancellationToken); + } + + channelWriter.Complete(); + } + catch (HttpRequestException e) + { + string? responseContent = await ReadAsString(response, cancellationToken); + channelWriter.Complete(new KsqlDbException($"An error occured while requesting {httpRequestMessage.RequestUri} endpoint.", e) + { + Request = new HttpRequestMessageDetails(httpRequestMessage, _jsonSerializer.Serialize(requestContent)), + Response = new HttpResponseMessageDetails(response, responseContent), + Body = _jsonSerializer.TryDeserialize(responseContent), + + }); + } + catch (JsonException e) + { + channelWriter.Complete(new KsqlDbException($"Unable to deserialize JSON returned by {httpRequestMessage.RequestUri} endpoint.", e) + { + Request = new HttpRequestMessageDetails(httpRequestMessage, _jsonSerializer.Serialize(requestContent)), + Response = new HttpResponseMessageDetails(response) + }); + } + finally + { + response.Dispose(); + } + } + + public async Task PostInsertStream(string streamName, IAsyncEnumerable rows, ChannelWriter ackChannelWriter, CancellationToken cancellationToken = default) + { + using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, Endpoints.InsertStream) + { + Content = _httpContentFactory.CreateDelimitedStreamingContent(streamName, rows), + Version = new Version(2, 0), +#if NET5_0 + VersionPolicy = HttpVersionPolicy.RequestVersionOrHigher +#endif + }; + + HttpResponseMessage response = null!; + try + { + response = await _httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false); + response.EnsureSuccessStatusCode(); + //EnsureResponseMediaType(response, SupportedDelimitedMediaType); // The header is not available + await using var responseStream = await ReadAsStream(response, cancellationToken); + using var sr = new StreamReader(responseStream, Encoding.UTF8); + while (!sr.EndOfStream) + { + string? line = await sr.ReadLineAsync(); + if (string.IsNullOrWhiteSpace(line)) continue; + + //TODO It's convenient to see string representation while developing the component, but it's suboptimal. + var deserializedLine = _jsonSerializer.Deserialize(line); + await ackChannelWriter.WriteAsync(deserializedLine, cancellationToken); + } + + ackChannelWriter.Complete(); + } + catch (HttpRequestException e) + { + string? responseContent = await ReadAsString(response, cancellationToken); + ackChannelWriter.Complete(new KsqlDbException($"An error occured while requesting {httpRequestMessage.RequestUri} endpoint.", e) + { + Request = new HttpRequestMessageDetails(httpRequestMessage, null), + Response = new HttpResponseMessageDetails(response, responseContent), + Body = _jsonSerializer.TryDeserialize(responseContent), + + }); + } + catch (JsonException e) + { + ackChannelWriter.Complete(new KsqlDbException($"Unable to deserialize JSON returned by {httpRequestMessage.RequestUri} endpoint.", e) + { + Request = new HttpRequestMessageDetails(httpRequestMessage, null), + Response = new HttpResponseMessageDetails(response) + }); + } + finally + { + response.Dispose(); + } + } + + public async Task PostCloseStream(string queryId, CancellationToken cancellationToken = default) + { + var request = new CloseStreamRequest(queryId); + var response = await _httpClient.PostAsync(Endpoints.CloseStream, _httpContentFactory.CreateContent(request), cancellationToken); + response.EnsureSuccessStatusCode(); + } + + public async Task PostKqsl(string sql, IDictionary? properties = null, long? commandSequenceNumber = null, CancellationToken cancellationToken = default) + { + var requestContent = new KsqlRequest(sql, properties, commandSequenceNumber); + + using var httpRequestMessage = new HttpRequestMessage(HttpMethod.Post, Endpoints.Ksql) + { + Content = _httpContentFactory.CreateContent(requestContent), + Version = new Version(2, 0), +#if NET5_0 + VersionPolicy = HttpVersionPolicy.RequestVersionOrHigher +#endif + }; + httpRequestMessage.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(SupportedMediaType)); + + HttpResponseMessage? response = null; + try + { + response = await _httpClient.SendAsync(httpRequestMessage, HttpCompletionOption.ResponseContentRead, cancellationToken).ConfigureAwait(false); + response.EnsureSuccessStatusCode(); + EnsureResponseMediaType(response, "application/json"); //TODO Review? + var responseStream = await ReadAsStream(response, cancellationToken); + return await _jsonSerializer.DeserializeAsync(responseStream, cancellationToken); + } + catch (HttpRequestException e) + { + string? responseContent = await ReadAsString(response, cancellationToken); + throw new KsqlDbException($"An error occurred while requesting {httpRequestMessage.RequestUri} endpoint.", e) + { + Request = new HttpRequestMessageDetails(httpRequestMessage, _jsonSerializer.Serialize(requestContent)), + Response = new HttpResponseMessageDetails(response, responseContent), + Body = _jsonSerializer.TryDeserialize(responseContent) + }; + } + catch (JsonException e) + { + string? responseContent = await ReadAsString(response, cancellationToken); + throw new KsqlDbException($"Unable to deserialize JSON \"{responseContent ?? string.Empty}\" returned by {httpRequestMessage.RequestUri} endpoint.", e); + } + } + + private static async Task ReadAsString(HttpResponseMessage? responseMessage, CancellationToken cancellationToken) + { + if (responseMessage is null) return null; +#if NET5_0 + return await responseMessage.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false); +#else + return await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); +#endif + } + + private static async Task ReadAsStream(HttpResponseMessage? responseMessage, CancellationToken cancellationToken) + { + if (responseMessage is null) return Stream.Null; +#if NET5_0 + return await responseMessage.Content.ReadAsStreamAsync(cancellationToken).ConfigureAwait(false); +#else + return await responseMessage.Content.ReadAsStreamAsync().ConfigureAwait(false); +#endif + } + + private static void EnsureResponseMediaType(HttpResponseMessage response, string expectedMediaType) + { + if (!string.Equals(response.Content.Headers.ContentType?.MediaType, expectedMediaType, StringComparison.Ordinal)) + { + throw new HttpRequestException($"The response media type \"{response.Content.Headers.ContentType?.MediaType ?? string.Empty}\" is not supported."); + } + } + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/Requests/CloseStreamRequest.cs b/src/KsqlDb.Client/KsqlApiV1/Requests/CloseStreamRequest.cs new file mode 100644 index 0000000..26ec306 --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Requests/CloseStreamRequest.cs @@ -0,0 +1,19 @@ +namespace KsqlDb.Api.Client.KsqlApiV1.Requests +{ + /// + /// The close stream request. + /// + internal class CloseStreamRequest + { + /// + /// The query Id. + /// + public string? QueryId { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The query Id. + public CloseStreamRequest(string? queryId) => QueryId = queryId; + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/Requests/KsqlRequest.cs b/src/KsqlDb.Client/KsqlApiV1/Requests/KsqlRequest.cs new file mode 100644 index 0000000..f51992e --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Requests/KsqlRequest.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; + +namespace KsqlDb.Api.Client.KsqlApiV1.Requests +{ + /// + /// The ksql request. + /// + internal class KsqlRequest + { + /// + /// The semicolon-delimited sequence of SQL statements to run. + /// + public string Ksql { get; } + + /// + /// The property overrides to run the statements with. Refer to the Configuration Parameter Reference for details. + /// + public IDictionary? StreamsProperties { get; } + + /// + /// The optional command sequence number. + /// If specified, the statements will not be run until all existing commands up to and including the specified sequence number have completed. + /// If unspecified, the statements are run immediately. When a command is processed, the result object contains its sequence number. + /// + public long? CommandSequenceNumber { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The semicolon-delimited sequence of SQL statements to run. + /// The optional property overrides to run the statements with. + /// The optional command sequence number. + public KsqlRequest(string? ksql, IDictionary? streamsProperties, long? commandSequenceNumber) + { + Ksql = !string.IsNullOrWhiteSpace(ksql) ? ksql : throw new ArgumentNullException(nameof(ksql)); + StreamsProperties = streamsProperties; + CommandSequenceNumber = commandSequenceNumber; + } + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/Requests/QueryStreamRequest.cs b/src/KsqlDb.Client/KsqlApiV1/Requests/QueryStreamRequest.cs new file mode 100644 index 0000000..e0a736a --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Requests/QueryStreamRequest.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; + +namespace KsqlDb.Api.Client.KsqlApiV1.Requests +{ + /// + /// The query stream request. + /// + internal class QueryStreamRequest + { + /// + /// The SELECT statement to run. + /// + public string Sql { get; } + + /// + /// The property overrides to run the statements with. Refer to the Config Reference for details on properties that you can set. + /// + public IDictionary? Properties { get; } + + /// + /// Initializes a new instance of the class. + /// + /// The SELECT statement. + /// The properties. + public QueryStreamRequest(string sql, IDictionary? properties) + { + Sql = !string.IsNullOrWhiteSpace(sql) ? sql : throw new ArgumentNullException(nameof(sql)); + Properties = properties; + } + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/Requests/TargetStream.cs b/src/KsqlDb.Client/KsqlApiV1/Requests/TargetStream.cs new file mode 100644 index 0000000..f5d47dd --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Requests/TargetStream.cs @@ -0,0 +1,9 @@ +namespace KsqlDb.Api.Client.KsqlApiV1.Requests +{ + internal class TargetStream + { +#nullable disable annotations + public string Target { get; set; } +#nullable restore annotations + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/Responses/InsertStreamAckResponse.cs b/src/KsqlDb.Client/KsqlApiV1/Responses/InsertStreamAckResponse.cs new file mode 100644 index 0000000..4051b9b --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Responses/InsertStreamAckResponse.cs @@ -0,0 +1,24 @@ +using System.Text.Json.Serialization; + +namespace KsqlDb.Api.Client.KsqlApiV1.Responses +{ + internal class InsertStreamAckResponse + { +#nullable disable annotations + public string Status { get; set; } +#nullable restore annotations + + public long Seq { get; set; } + + /// + /// The error code. + /// + [JsonPropertyName("error_code")] + public int? ErrorCode { get; set; } + + /// + /// The error message. + /// + public string? Message { get; set; } + } +} diff --git a/src/KsqlDb.Client/KsqlApiV1/Responses/KsqlResponse.cs b/src/KsqlDb.Client/KsqlApiV1/Responses/KsqlResponse.cs new file mode 100644 index 0000000..68c4f89 --- /dev/null +++ b/src/KsqlDb.Client/KsqlApiV1/Responses/KsqlResponse.cs @@ -0,0 +1,72 @@ +using System.Collections.Generic; +using System.Text.Json.Serialization; +#nullable disable + +namespace KsqlDb.Api.Client.KsqlApiV1.Responses +{ + /// + /// The /ksql endpoint response. + /// + internal class KsqlResponse + { + // Fields that are common to all responses + [JsonPropertyName("@type")] + public string Type { get; set; } + public string StatementText { get; set; } + + public WarningMessage[] Warnings { get; set; } + + // CREATE, DROP and TERMINATE fields + public string CommandId { get; set; } + public CommandInfo CommandStatus { get; set; } + public long CommandSequenceNumber { get; set; } + + // LIST STREAMS and SHOW STREAMS fields + public StreamOrTableInfo[] Streams { get; set; } + + // LIST TABLES, SHOW TABLES fields + public StreamOrTableInfo[] Tables { get; set; } + + // LIST QUERIES, SHOW QUERIES fields + public QueryInfo[] Queries { get; set; } + + // LIST TOPICS QUERIES fields + public TopicInfo[] Topics { get; set; } + + // LIST PROPERTIES, SHOW PROPERTIES fields + public Dictionary Properties { get; set; } + + internal class WarningMessage + { + public string Warning { get; set; } + } + + internal class CommandInfo + { + public string Status { get; set; } + public string Message { get; set; } + } + + internal class StreamOrTableInfo + { + public string Name { get; set; } + public string Topic { get; set; } + public string Format { get; set; } + public string Type { get; set; } + public bool? IsWindowed { get; set; } + } + + internal class QueryInfo + { + public string QueryString { get; set; } + public string Sinks { get; set; } + public string Id { get; set; } + } + + internal class TopicInfo + { + public string Name { get; set; } + public int[] ReplicaInfo { get; set; } + } + } +} diff --git a/src/KsqlDb.Client/KsqlDb.Client.csproj b/src/KsqlDb.Client/KsqlDb.Client.csproj index 6a9a116..e43b883 100644 --- a/src/KsqlDb.Client/KsqlDb.Client.csproj +++ b/src/KsqlDb.Client/KsqlDb.Client.csproj @@ -9,7 +9,11 @@ - + + + + + diff --git a/src/KsqlDb.Client/Parsers/KObjectParser.cs b/src/KsqlDb.Client/Parsers/KObjectParser.cs new file mode 100644 index 0000000..5129468 --- /dev/null +++ b/src/KsqlDb.Client/Parsers/KObjectParser.cs @@ -0,0 +1,102 @@ +using System; +using System.Text.Json; +using KsqlDb.Api.Client.Abstractions.Objects; +// ReSharper disable HeapView.BoxingAllocation + +namespace KsqlDb.Api.Client.Parsers +{ + public class KObjectParser + { + private static readonly char[] _nonPrimitiveTypeElements = {'<', '('}; + private readonly Func _parse; + + public Type TargetType { get; } + + private KObjectParser(Func parse, Type targetType) => (_parse, TargetType) = (parse, targetType); + + public object Parse(JsonElement value) => _parse(value); + + public static KObjectParser Create(string type) + { + var primaryType = GetPrimaryTypeName(type); + if (primaryType.Equals("STRING", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseString, typeof(string)); + if (primaryType.Equals("INTEGER", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseInt, typeof(int)); + if (primaryType.Equals("BIGINT", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseLong, typeof(long)); + if (primaryType.Equals("DOUBLE", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseDouble, typeof(double)); + if (primaryType.Equals("BOOLEAN", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(TryParseBoolean, typeof(bool)); + if (primaryType.Equals("DECIMAL", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseDecimal, typeof(decimal)); + if (primaryType.Equals("ARRAY", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseArray(j, type), typeof(KSqlArray)); + if (primaryType.Equals("MAP", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseMap(j, type), typeof(KSqlObject)); + //TODO Add support of STRUCT + //if (primaryType.Equals("STRUCT", StringComparison.OrdinalIgnoreCase)) return new StructKObjectParser(); + throw new NotSupportedException($"The {type} type is not supported."); + } + + private static ReadOnlySpan GetPrimaryTypeName(ReadOnlySpan type) + { + int specialSymbolIndex = type.IndexOfAny(_nonPrimitiveTypeElements); + return specialSymbolIndex == -1 ? type : type.Slice(0, specialSymbolIndex); + } + + private static object ParseString(JsonElement jsonElement) + { + if (jsonElement.ValueKind != JsonValueKind.String) return KSqlNull.Instance; + object? stringValue = jsonElement.GetString(); + return stringValue ?? KSqlNull.Instance; + } + + private static object ParseInt(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out int output) => element.TryGetInt32(out output)); + + private static object ParseLong(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out long output) => element.TryGetInt64(out output)); + + private static object ParseDouble(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out double output) => element.TryGetDouble(out output)); + + private static object ParseDecimal(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out decimal output) => element.TryGetDecimal(out output)); + + private delegate bool TryGetNumber(JsonElement jsonElement, out T output); + private static object ParseNumber(JsonElement jsonElement, TryGetNumber tryGet) where T : struct + { + if (jsonElement.ValueKind != JsonValueKind.Number) return KSqlNull.Instance; + if (tryGet(jsonElement, out T value)) return value; + return KSqlNull.Instance; + } + + private static object TryParseBoolean(JsonElement jsonElement) => + jsonElement.ValueKind switch + { + JsonValueKind.True => true, + JsonValueKind.False => false, + _ => KSqlNull.Instance + }; + + private static object ParseArray(JsonElement jsonElement, ReadOnlySpan fullTypeName) + { + if (jsonElement.ValueKind != JsonValueKind.Array) return KSqlNull.Instance; + var itemType = fullTypeName.Slice(6, fullTypeName.Length - 7).Trim(); + var itemParser = Create(itemType.ToString()); + var kSqlArray = new KSqlArray(); + foreach (var itemJsonElement in jsonElement.EnumerateArray()) + { + var item = itemParser.Parse(itemJsonElement); + kSqlArray.AddValue(item); + } + + return kSqlArray; + } + + private static object ParseMap(JsonElement jsonElement, ReadOnlySpan fullTypeName) + { + if (jsonElement.ValueKind != JsonValueKind.Object) return KSqlNull.Instance; + var itemType = fullTypeName.Slice(12, fullTypeName.Length - 13).Trim(); + var valueParser = Create(itemType.ToString()); + var kSqlObject = new KSqlObject(); + foreach (var jsonProperty in jsonElement.EnumerateObject()) + { + var item = valueParser.Parse(jsonProperty.Value); + kSqlObject.AddValue(jsonProperty.Name, item); + } + + return kSqlObject; + } + } +} diff --git a/src/KsqlDb.Client/Parsers/QueryResultRowParser.cs b/src/KsqlDb.Client/Parsers/QueryResultRowParser.cs new file mode 100644 index 0000000..a7ce654 --- /dev/null +++ b/src/KsqlDb.Client/Parsers/QueryResultRowParser.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Abstractions.QueryResults; + +namespace KsqlDb.Api.Client.Parsers +{ + internal class QueryResultRowParser + { + public Dictionary ColumnNameToIndex { get; } + public (string, Type)[] ColumnNamesAndTypes { get; } + private readonly KObjectParser[] _kObjectParsers; + + public QueryResultRowParser(JsonElement schema) + { + if (!schema.TryGetProperty("columnNames", out var columnsNamesElement) || + !schema.TryGetProperty("columnTypes", out var columnsTypesElement)) + { + throw new ArgumentException(); + } + + var columnNamesArray = columnsNamesElement.EnumerateArray().ToArray(); + var columnTypesArray = columnsTypesElement.EnumerateArray().ToArray(); + + if (columnNamesArray.Length != columnTypesArray.Length) throw new ArgumentException(); + int columnCount = columnNamesArray.Length; + + _kObjectParsers = new KObjectParser[columnCount]; + ColumnNamesAndTypes = new (string, Type)[columnCount]; + ColumnNameToIndex = new Dictionary(); + + for (int i = 0; i < columnCount; i++) + { + string columnName = columnNamesArray[i].GetString() ?? throw new Exception(); + string columnType = columnTypesArray[i].GetString() ?? throw new Exception(); + ColumnNameToIndex.Add(columnName, i); + var columnParser = KObjectParser.Create(columnType); + _kObjectParsers[i] = columnParser; + ColumnNamesAndTypes[i] = (columnName, columnParser.TargetType); + } + } + + public QueryResultRow Parse(JsonElement row) + { + if (row.ValueKind != JsonValueKind.Array) throw new Exception(); + var values = new KSqlArray(); + int index = 0; + foreach (var value in row.EnumerateArray()) + { + if (index > _kObjectParsers.Length) throw new Exception(); + var parsedValue = _kObjectParsers[index++].Parse(value); + values.AddValue(parsedValue); + } + + return new QueryResultRow(values, ColumnNameToIndex, ColumnNamesAndTypes); + } + } +} diff --git a/src/KsqlDb.Client/Serdes/IJsonSerializer.cs b/src/KsqlDb.Client/Serdes/IJsonSerializer.cs new file mode 100644 index 0000000..50fb3ba --- /dev/null +++ b/src/KsqlDb.Client/Serdes/IJsonSerializer.cs @@ -0,0 +1,71 @@ +using System.IO; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace KsqlDb.Api.Client.Serdes +{ + /// + /// The JSON serializer abstraction. + /// + public interface IJsonSerializer + { + /// + /// Convert the provided value to UTF-8 encoded JSON text and write it to the . + /// + /// A task that represents the asynchronous write operation. + /// The UTF-8 to write to. + /// The value to convert. + /// The which may be used to cancel the write operation. + Task SerializeAsync(Stream utf8JsonStream, object value, CancellationToken cancellationToken = default); + + /// + /// Convert the provided value into a JSON . + /// + /// A JSON representation of the value. + /// The value to convert. + string Serialize(TValue value); + + /// + /// Read the UTF-8 encoded text representing a single JSON value into a . + /// The Stream will be read to completion. + /// + /// A representation of the JSON value. + /// JSON data to parse. + /// + /// The which may be used to cancel the read operation. + /// + /// + /// Thrown when the JSON is invalid, + /// is not compatible with the JSON, + /// or when there is remaining data in the Stream. + /// + ValueTask DeserializeAsync(Stream utf8JsonStream, CancellationToken cancellationToken = default); + + /// + /// Parse the text representing a single JSON value into a . + /// + /// A representation of the JSON value. + /// JSON text to parse. + /// + /// Thrown if is null. + /// + /// + /// Thrown when the JSON is invalid, + /// is not compatible with the JSON, + /// or when there is remaining data in the Stream. + /// + /// Using a is not as efficient as using the + /// UTF-8 methods since the implementation natively uses UTF-8. + /// + TValue Deserialize(string json); + + /// + /// Attempts to parse the text representing a single JSON value into a . + /// + /// JSON text to parse. + /// The result type. + /// A representation of the JSON value. + TValue? TryDeserialize(string? json) where TValue : class; + } +} diff --git a/src/KsqlDb.Client/Serdes/JsonSerializer.cs b/src/KsqlDb.Client/Serdes/JsonSerializer.cs new file mode 100644 index 0000000..9fb2883 --- /dev/null +++ b/src/KsqlDb.Client/Serdes/JsonSerializer.cs @@ -0,0 +1,65 @@ +using System.IO; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Serializer = System.Text.Json.JsonSerializer; + +namespace KsqlDb.Api.Client.Serdes +{ + /// + /// The JSON serializer for ksqlDB requests and responses. + /// + public class JsonSerializer : IJsonSerializer + { + private static readonly JsonSerializerOptions _serializerOptions = new JsonSerializerOptions + { + PropertyNamingPolicy = JsonNamingPolicy.CamelCase, + DictionaryKeyPolicy = JsonNamingPolicy.CamelCase, + AllowTrailingCommas = false, + IgnoreNullValues = true, + ReadCommentHandling = JsonCommentHandling.Disallow, + WriteIndented = false, + Converters = + { + new KSqlObjectConverter(), + new KSqlArrayConverter(), + new KSqlNullConverter() + } + }; + + /// + public string Serialize(TValue value) => Serializer.Serialize(value, _serializerOptions); + + /// + public Task SerializeAsync(Stream utf8JsonStream, object value, CancellationToken cancellationToken = default) => + Serializer.SerializeAsync(utf8JsonStream, value, value.GetType(), _serializerOptions, CancellationToken.None); + + /// + public async ValueTask DeserializeAsync(Stream utf8JsonStream, CancellationToken cancellationToken = default) + { + var value = await Serializer.DeserializeAsync(utf8JsonStream, _serializerOptions, cancellationToken); + return value ?? throw new JsonException($"Unable to deserialize the provided UTF8 JSON stream into {typeof(TValue).FullName}"); + } + + /// + public TValue Deserialize(string json) + { + var value = Serializer.Deserialize(json, _serializerOptions); + return value ?? throw new JsonException($"Unable to deserialize the following JSON into {typeof(TValue).FullName}: {json}"); + } + + /// + public TValue? TryDeserialize(string? json) where TValue : class + { + if (string.IsNullOrWhiteSpace(json)) return default; + try + { + return Deserialize(json); + } + catch (JsonException) + { + return default; + } + } + } +} diff --git a/src/KsqlDb.Client/Serdes/KSqlArrayConverter.cs b/src/KsqlDb.Client/Serdes/KSqlArrayConverter.cs new file mode 100644 index 0000000..70c0287 --- /dev/null +++ b/src/KsqlDb.Client/Serdes/KSqlArrayConverter.cs @@ -0,0 +1,15 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; +using KsqlDb.Api.Client.Abstractions.Objects; +using Serializer = System.Text.Json.JsonSerializer; + +namespace KsqlDb.Api.Client.Serdes +{ + internal class KSqlArrayConverter : JsonConverter + { + public override KSqlArray Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => throw new NotImplementedException(); + + public override void Write(Utf8JsonWriter writer, KSqlArray value, JsonSerializerOptions options) => Serializer.Serialize(writer, value.AsReadOnlyList(), options); + } +} diff --git a/src/KsqlDb.Client/Serdes/KSqlNullConverter.cs b/src/KsqlDb.Client/Serdes/KSqlNullConverter.cs new file mode 100644 index 0000000..aaf7c42 --- /dev/null +++ b/src/KsqlDb.Client/Serdes/KSqlNullConverter.cs @@ -0,0 +1,14 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; +using KsqlDb.Api.Client.Abstractions.Objects; + +namespace KsqlDb.Api.Client.Serdes +{ + internal class KSqlNullConverter : JsonConverter + { + public override KSqlNull Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => throw new NotImplementedException(); + + public override void Write(Utf8JsonWriter writer, KSqlNull value, JsonSerializerOptions options) => writer.WriteNullValue(); + } +} diff --git a/src/KsqlDb.Client/Serdes/KSqlObjectConverter.cs b/src/KsqlDb.Client/Serdes/KSqlObjectConverter.cs new file mode 100644 index 0000000..95b90dc --- /dev/null +++ b/src/KsqlDb.Client/Serdes/KSqlObjectConverter.cs @@ -0,0 +1,16 @@ +using System; +using System.Text.Json; +using System.Text.Json.Serialization; +using KsqlDb.Api.Client.Abstractions.Objects; +using Serializer = System.Text.Json.JsonSerializer; + +namespace KsqlDb.Api.Client.Serdes +{ + internal class KSqlObjectConverter : JsonConverter + { + + public override KSqlObject Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => throw new NotImplementedException(); + + public override void Write(Utf8JsonWriter writer, KSqlObject value, JsonSerializerOptions options) => Serializer.Serialize(writer, value.AsImmutableDictionary(), options); + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs b/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs new file mode 100644 index 0000000..22a4b1b --- /dev/null +++ b/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs @@ -0,0 +1,44 @@ +using System; +using KsqlDb.Api.Client.Abstractions; +using Xunit; + +namespace KsqlDb.Client.IntegrationTests +{ + public class DescribeTests : IClassFixture + { + private readonly SourceDescriptionsFixture _fixture; + + public DescribeTests(SourceDescriptionsFixture fixture) => _fixture = fixture; + + [Theory] + [InlineData(TestClient.KnownEntities.OrdersStreamName)] + [InlineData(TestClient.KnownEntities.UsersTableName)] + public void Contains_Statement_Text(string entity) + { + Assert.Contains($"DESCRIBE {entity}",_fixture.OrdersStreamDescription.SqlStatement); + } + + /*[Theory] + + public void Orders_Stream_Contains_ExpectedFields() + { + + }*/ + + public sealed class SourceDescriptionsFixture : IDisposable + { + public SourceDescription OrdersStreamDescription { get; } + public SourceDescription UsersTableDescription { get; } + + public SourceDescriptionsFixture() + { + OrdersStreamDescription = TestClient.Instance.DescribeSource(TestClient.KnownEntities.OrdersStreamName).GetAwaiter().GetResult(); + UsersTableDescription = TestClient.Instance.DescribeSource(TestClient.KnownEntities.UsersTableName).GetAwaiter().GetResult(); + } + + public void Dispose() + { + } + } + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj b/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj index 29c57fe..7ebee02 100644 --- a/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj +++ b/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj @@ -3,12 +3,13 @@ netcoreapp3.1;net5.0 win10-x64;linux-x64 + 9 false - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -20,7 +21,7 @@ - + diff --git a/test/KsqlDb.Client.IntegrationTests/ListStreamsTests.cs b/test/KsqlDb.Client.IntegrationTests/ListStreamsTests.cs new file mode 100644 index 0000000..55a74b9 --- /dev/null +++ b/test/KsqlDb.Client.IntegrationTests/ListStreamsTests.cs @@ -0,0 +1,34 @@ +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace KsqlDb.Client.IntegrationTests +{ + public class ListStreamsTests + { + [Fact] + public async Task Returns_ORDERS_TOPIC_Stream_In_The_List_Of_Streams() + { + // Act + var streams = await TestClient.Instance.ListStreams(); + + // Assert + Assert.Contains(streams, s => s.Name == TestClient.KnownEntities.OrdersStreamName + && s.Topic == TestClient.KnownEntities.OrdersTopicName + && s.KeyFormat == "KAFKA" + && s.ValueFormat == "JSON" + && s.IsWindowed is null); + } + + [Fact] + public async Task Throws_TaskCanceledException_When_Cancelled() + { + // Arrange + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // Act / Assert + await Assert.ThrowsAsync(() => TestClient.Instance.ListStreams(cts.Token)); + } + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/ListTablesTests.cs b/test/KsqlDb.Client.IntegrationTests/ListTablesTests.cs new file mode 100644 index 0000000..2aa27d4 --- /dev/null +++ b/test/KsqlDb.Client.IntegrationTests/ListTablesTests.cs @@ -0,0 +1,34 @@ +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace KsqlDb.Client.IntegrationTests +{ + public class ListTablesTests + { + [Fact] + public async Task Returns_Users_Table_In_The_List_Of_Table() + { + // Act + var tables = await TestClient.Instance.ListTables(); + + // Assert + Assert.Contains(tables, s => s.Name == TestClient.KnownEntities.UsersTableName + && s.Topic == TestClient.KnownEntities.UsersTopicName + && s.KeyFormat == "KAFKA" + && s.ValueFormat == "JSON" + && s.IsWindowed is false); + } + + [Fact] + public async Task Throws_TaskCanceledException_When_Cancelled() + { + // Arrange + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // Act / Assert + await Assert.ThrowsAsync(() => TestClient.Instance.ListTables(cts.Token)); + } + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/ListTopicsTests.cs b/test/KsqlDb.Client.IntegrationTests/ListTopicsTests.cs new file mode 100644 index 0000000..e6ec52e --- /dev/null +++ b/test/KsqlDb.Client.IntegrationTests/ListTopicsTests.cs @@ -0,0 +1,33 @@ +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace KsqlDb.Client.IntegrationTests +{ + public class ListTopicsTests + { + [Fact] + public async Task Returns_Order_Topic_In_The_List_Of_Topics() + { + // Act + var topics = await TestClient.Instance.ListTopics(); + + // Assert + Assert.Contains(topics, t => t.Topic == TestClient.KnownEntities.OrdersTopicName + && t.NumberOfPartitions == 1 + && t.ReplicasPerPartition.SequenceEqual(new[] {1})); + } + + [Fact] + public async Task Throws_TaskCanceledException_When_Cancelled() + { + // Arrange + using var cts = new CancellationTokenSource(); + cts.Cancel(); + + // Act / Assert + await Assert.ThrowsAsync(() => TestClient.Instance.ListTopics(cts.Token)); + } + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs b/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs new file mode 100644 index 0000000..fae45be --- /dev/null +++ b/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs @@ -0,0 +1,73 @@ +using System; +using System.Linq; +using System.Threading.Tasks; +using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Exceptions; +using KsqlDb.Api.Client.KsqlApiV1; +using Xunit; +using Xunit.Abstractions; + +namespace KsqlDb.Client.IntegrationTests +{ + public class StreamInsertsTests + { + private readonly TestClient _client; + private const string PrimitiveStreamName = "PrimitiveStream"; + + public StreamInsertsTests(ITestOutputHelper output) + { + _client = new TestClient(output); + } + + [Fact] + public async Task Inserts_Rows_With_Primitive_Type() + { + // Arrange + await CreatePrimitiveStream(); + var rowIndices = Enumerable.Range(1, 100).ToArray(); + var expected = rowIndices.Select(x => (long)x-1); + var rows = rowIndices.Select(CreatePrimitiveRow).ToAsyncEnumerable(); + + // Act + var acks = TestClient.Instance.StreamInserts(PrimitiveStreamName, rows); + + // Assert + var actual = (await acks.ToArrayAsync()).Select(x => x.SequenceNumber); + + Assert.Equal(expected, actual); + } + + private static KSqlObject CreatePrimitiveRow(int rowNumber) => + new KSqlObject() + .Add("i1", rowNumber * 10) + .Add("b2", rowNumber * 10_000) + .Add("b3", rowNumber % 2 == 0) + .Add("d4", 10007d / rowNumber) + .Add("v5", $"{rowNumber} row"); + + private static async Task CreatePrimitiveStream() + { + try + { + await TestClient.Instance.ExecuteStatement($"DROP STREAM {PrimitiveStreamName};"); + } + catch (KsqlDbException) + { + + } + + await TestClient.Instance.ExecuteStatement( + @$"CREATE STREAM {PrimitiveStreamName} ( + i1 INTEGER KEY, + b2 BIGINT, + b3 BOOLEAN, + d4 DOUBLE, + v5 VARCHAR + ) WITH ( + kafka_topic = '{PrimitiveStreamName}', + partitions = 1, + value_format = 'json' + );"); + } + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs new file mode 100644 index 0000000..25648b8 --- /dev/null +++ b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs @@ -0,0 +1,128 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Abstractions.QueryResults; +using KsqlDb.Api.Client.Exceptions; +using Xunit; + +namespace KsqlDb.Client.IntegrationTests +{ + public class StreamQueryTests : IClassFixture + { + private const int ExpectedRowsCount = 3; + private readonly IReadOnlyCollection<(string name, Type type)> _actualColumns; + private readonly QueryResultRow[] _actualRows; + private readonly string _actualQueryId; + + public StreamQueryTests(StreamQueryFixture fixture) => (_actualColumns, _actualRows, _actualQueryId) = (fixture.Columns, fixture.Rows, fixture.QueryId); + + [Fact] + public void Response_Contains_Expected_Number_Of_Result_Rows() + { + Assert.Equal(ExpectedRowsCount, _actualRows.Length); + } + + [Fact] + public void Response_Contains_USERID_Values_In_Expected_Format_Column() + { + string[] actual = _actualRows.Select(r => r.AsArray.GetString(0)).ToArray(); + Assert.All(actual, u => Assert.StartsWith("User_", u)); + } + + [Fact] + public void Response_Contains_REGISTERTIME_Values_In_Expected_Format_Column() + { + long[] actual = _actualRows.Select(r => r.AsArray.GetLong(1)).ToArray(); + Assert.All(actual, t => Assert.InRange(t, 0L, long.MaxValue)); + } + + [Fact] + public void Response_Contains_REGIONID_Values_In_Expected_Format_Column() + { + string[] actual = _actualRows.Select(r => r.AsArray.GetString(3)).ToArray(); + Assert.All(actual, u => Assert.StartsWith("Region_", u)); + } + + [Fact] + public void Response_Contains_GENDER_Values_In_Expected_Format_Column() + { + string[] actual = _actualRows.Select(r => r.AsArray.GetString(2)).ToArray(); + Assert.All(actual, g => Assert.Matches("MALE|FEMALE|OTHER", g)); + } + + [Theory] + [InlineData(0, 0)] + [InlineData(0, 1)] + [InlineData(1, 0)] + [InlineData(1, 1)] + [InlineData(2, 0)] + [InlineData(2, 1)] + public void Response_Contains_INTEREST_Values_In_Expected_Format_Column(int row, int arrayIndex) + { + string actual = _actualRows[row].GetKSqlArray("INTERESTS").GetString(arrayIndex); + Assert.Matches("News|Travel|Movies|Sport|Game|Travel", actual); + } + + [Fact] + public void Response_Contains_Different_USERID_Values() + { + int uniqueValues = _actualRows.Select(r => r.AsArray.GetString(0)).Distinct().Count(); + Assert.True(uniqueValues > 0); + } + + [Fact] + public void Response_Contains_Expected_Column_Definitions() + { + Assert.Collection(_actualColumns, + column => Assert.True(column.name == "USERID" && column.type == typeof(string), "USERID"), + column => Assert.True(column.name == "REGISTERTIME" && column.type == typeof(long), "REGISTERTIME"), + column => Assert.True(column.name == "GENDER" && column.type == typeof(string), "GENDER"), + column => Assert.True(column.name == "REGIONID" && column.type == typeof(string), "REGIONID"), + column => Assert.True(column.name == "INTERESTS" && column.type == typeof(KSqlArray), "INTERESTS"), + column => Assert.True(column.name == "CONTACTINFO" && column.type == typeof(KSqlObject), "CONTACTINFO")); + } + + [Fact] + public void Response_Contains_Non_Empty_QueryId() + { + Assert.NotEmpty(_actualQueryId); + } + + [Fact] + public async Task Throws_When_Supplied_Invalid_Sql() + { + var exception = await Assert.ThrowsAsync(() => TestClient.Instance.StreamQuery("Invalid sql;")); + Assert.NotNull(exception.Body); + Assert.Equal(40001, exception.Body.ErrorCode); + } + + [Fact] + public async Task Throws_When_Queries_Non_Existing_Stream() + { + var exception = await Assert.ThrowsAsync(() => TestClient.Instance.StreamQuery("SELECT * FROM NONEXISTINGSTREAM EMIT CHANGES LIMIT 1;")); + Assert.NotNull(exception.Body); + Assert.Equal(40001, exception.Body.ErrorCode); + } + + [Collection(nameof(StreamQueryFixture))] + public sealed class StreamQueryFixture : IAsyncLifetime + { + public IReadOnlyCollection<(string name, Type type)> Columns { get; set; } + public QueryResultRow[] Rows { get; set; } + public string QueryId { get; set; } + + public async Task InitializeAsync() + { + var properties = new Dictionary { ["ksql.streams.auto.offset.reset"] = "earliest" }; + var queryResult = await TestClient.Instance.StreamQuery($"SELECT * FROM {TestClient.KnownEntities.UsersTableName} EMIT CHANGES LIMIT {ExpectedRowsCount};", properties); + Columns = queryResult.Columns; + QueryId = queryResult.QueryId; + Rows = await queryResult.Rows.ToArrayAsync(); + } + + public Task DisposeAsync() => Task.CompletedTask; + } + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/TestClient.cs b/test/KsqlDb.Client.IntegrationTests/TestClient.cs new file mode 100644 index 0000000..d58aec4 --- /dev/null +++ b/test/KsqlDb.Client.IntegrationTests/TestClient.cs @@ -0,0 +1,70 @@ +using System; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using KsqlDb.Api.Client.KsqlApiV1; +using KsqlDb.Api.Client.KsqlApiV1.Content; +using KsqlDb.Api.Client.Serdes; +using Xunit.Abstractions; + +namespace KsqlDb.Client.IntegrationTests +{ + internal class TestClient : KsqlDb.Api.Client.Client + { + public static class KnownEntities + { + public const string OrdersTopicName = "orders_topic"; + public const string OrdersStreamName = "ORDERS_STREAM"; + public const string UsersTopicName = "users_topic"; + public const string UsersTableName = "USERS_TABLE"; + } + + public static KsqlDb.Api.Client.Client Instance => new TestClient(); + + private TestClient() : base(CreateHttpClient()) + { + } + + private static KSqlDbHttpClient CreateHttpClient() + { + var httpClient = new HttpClient {BaseAddress = new Uri("http://127.0.0.1:8088")}; + var jsonSerializer = new JsonSerializer(); + return new KSqlDbHttpClient(httpClient, new KsqlV1RequestHttpContentFactory(jsonSerializer), jsonSerializer); + } + + public TestClient(ITestOutputHelper output) : base(CreateHttpClientWithLogging(output)) + { + + } + + public static KSqlDbHttpClient CreateHttpClientWithLogging(ITestOutputHelper output) + { + var httpClient = new HttpClient(new LoggingHandler(output)) {BaseAddress = new Uri("http://127.0.0.1:8088")}; + var jsonSerializer = new JsonSerializer(); + return new KSqlDbHttpClient(httpClient, new KsqlV1RequestHttpContentFactory(jsonSerializer), jsonSerializer); + } + } + + internal class LoggingHandler : DelegatingHandler + { + private readonly ITestOutputHelper _output; + + public LoggingHandler(ITestOutputHelper output) + { + _output = output; + InnerHandler = new HttpClientHandler(); + } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + _output.WriteLine(request.ToString()); + if (request.Content is not null) + { + var content = await request.Content.ReadAsStringAsync(); + _output.WriteLine($"Content: {content}"); + } + + return await base.SendAsync(request, cancellationToken); + } + } +} diff --git a/test/KsqlDb.Client.IntegrationTests/UnitTest1.cs b/test/KsqlDb.Client.IntegrationTests/UnitTest1.cs deleted file mode 100644 index e0bc5e9..0000000 --- a/test/KsqlDb.Client.IntegrationTests/UnitTest1.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using Xunit; - -namespace KsqlDb.Client.IntegrationTests -{ - public class UnitTest1 - { - [Fact] - public void Test1() - { - } - } -} diff --git a/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj b/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj index 29c57fe..e0fbe48 100644 --- a/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj +++ b/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj @@ -4,11 +4,12 @@ netcoreapp3.1;net5.0 win10-x64;linux-x64 false + latest - - + + runtime; build; native; contentfiles; analyzers; buildtransitive all @@ -20,7 +21,7 @@ - + diff --git a/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs b/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs new file mode 100644 index 0000000..911964d --- /dev/null +++ b/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs @@ -0,0 +1,217 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text.Json; +using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Parsers; +using Xunit; + +namespace KsqlDb.Client.UnitTests.Parsers +{ + public class KObjectParserTests + { + [Theory] + [InlineData("STRING")] + [InlineData("String")] + [InlineData("INTEGER")] + [InlineData("BIGINT")] + [InlineData("DOUBLE")] + [InlineData("BOOLEAN")] + [InlineData("DECIMAL")] + [InlineData("ARRAY")] + [InlineData("MAP")] + [InlineData("ARRAY>>")] + [InlineData("STRUCT, AGE INT>")] + public void Creates_A_Parser_For_A_Known_Type(string type) + { + var target = KObjectParser.Create(type); + Assert.NotNull(target); + } + + [Fact] + public void Throws_NotSupportedException_When_Created_For_An_Unknown_Type() + { + Assert.Throws(() => KObjectParser.Create("HASHMAP")); + } + + [Theory] + [InlineData("INTEGER", typeof(int))] + [InlineData("BIGINT", typeof(long))] + [InlineData("DOUBLE", typeof(double))] + [InlineData("DECIMAL", typeof(decimal))] + public void Parses_A_Numeric_Json_Value_And_Returns_An_Instance_Of_The_Expected_Type(string ksqlType, Type expectedType) + { + // Arrange + const int value = 42; + var expected = Convert.ChangeType(value, expectedType); + var jsonElement = Deserialize(value.ToString()); + var target = KObjectParser.Create(ksqlType); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.Equal(expected, actual); + } + + [Theory] + [InlineData("INTEGER", "42.42")] + [InlineData("BIGINT", "42.42")] + public void Parses_A_Non_Numeric_Json_Value_And_Returns_An_Instance_Of_NullKObject_If_A_Numeric_Value_Is_Expected(string ksqlType, string json) + { + // Arrange + var jsonElement = Deserialize(json); + var target = KObjectParser.Create(ksqlType); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + } + + [Theory] + [InlineData("INTEGER")] + [InlineData("BIGINT")] + [InlineData("DOUBLE")] + [InlineData("DECIMAL")] + public void Parses_A_Non_Numeric_Json_Value_And_Returns_An_Instance_Of_NullKObject_If_Unable_To_Parse(string ksqlType) + { + // Arrange + const string value = "not-a-number"; + var jsonElement = Deserialize(JsonSerializer.Serialize(value)); + var target = KObjectParser.Create(ksqlType); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + } + + [Theory] + [InlineData(true)] + [InlineData(false)] + public void Parses_A_Boolean_Json_Value_And_Returns_An_Instance_Of_BooleanKObject(bool value) + { + // Arrange + var jsonElement = Deserialize(JsonSerializer.Serialize(value)); + var target = KObjectParser.Create("BOOLEAN"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + } + + [Fact] + public void Parses_A_Non_Boolean_Json_Value_And_Returns_An_Instance_Of_NullKObject_Type_If_A_Boolean_Is_Expected() + { + // Arrange + var jsonElement = Deserialize("42"); + var target = KObjectParser.Create("BOOLEAN"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + } + + [Fact] + public void Parses_A_String_Json_Value_And_Returns_An_Instance_Of_StringKObject() + { + // Arrange + const string value = "abcd"; + var jsonElement = Deserialize(JsonSerializer.Serialize(value)); + var target = KObjectParser.Create("STRING"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + } + + [Fact] + public void Parses_A_Non_String_Json_Value_And_Returns_An_Instance_Of_NullKObject_Type_Of_String_Is_Expected() + { + // Arrange + var jsonElement = Deserialize("42"); + var target = KObjectParser.Create("STRING"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + } + + [Fact] + public void Parses_A_Json_Array_With_Long_Values_And_Returns_An_Instance_Of_KSqlArray() + { + // Arrange + long[] array = {1, 2, 3, 4}; + var jsonElement = Deserialize(JsonSerializer.Serialize(array)); + var target = KObjectParser.Create("ARRAY"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + + if (actual is not KSqlArray actualArray) return; + var expected = array.Cast(); + Assert.Equal(expected, actualArray.AsReadOnlyList()); + } + + [Fact] + public void Parses_A_Json_Array_With_Map_Values_And_Returns_An_Instance_Of_KSqlArray() + { + // Arrange + var firstItem = new Dictionary { ["key11"] = 1, ["key12"] = 2 }; + var secondItem = new Dictionary { ["key21"] = 3, ["key22"] = 4 }; + var array = new[] {firstItem, secondItem}; + var jsonElement = Deserialize(JsonSerializer.Serialize(array)); + var target = KObjectParser.Create("ARRAY>"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + if (actual is not KSqlArray actualArray) return; + Assert.All(actualArray.AsReadOnlyList(), item => Assert.IsType(item)); + Assert.Collection(actualArray.AsReadOnlyList().OfType(), + item => AssertMap(firstItem, item), + item => AssertMap(secondItem, item)); + } + + [Fact] + public void Parses_A_Json_Map_With_Int_Values_And_Returns_An_Instance_Of_KSqlObject() + { + // Arrange + var map = new Dictionary { ["key1"] = 1, ["key2"] = 2 }; + var jsonElement = Deserialize(JsonSerializer.Serialize(map)); + var target = KObjectParser.Create("MAP"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + if (actual is not KSqlObject actualMap) return; + AssertMap(map, actualMap); + } + + private static void AssertMap(Dictionary expected, KSqlObject actual) + { + Assert.Equal(expected.Keys, actual.FieldNames); + Assert.All(actual.FieldNames, key => Assert.Equal(expected[key], actual.TryGetInteger(key))); + } + + private static JsonElement Deserialize(string json) => (JsonElement)JsonSerializer.Deserialize(json); + } +} diff --git a/test/KsqlDb.Client.UnitTests/QueryResultRowParserTests.cs b/test/KsqlDb.Client.UnitTests/QueryResultRowParserTests.cs new file mode 100644 index 0000000..74163c5 --- /dev/null +++ b/test/KsqlDb.Client.UnitTests/QueryResultRowParserTests.cs @@ -0,0 +1,25 @@ +using System; +using System.Linq; +using System.Text.Json; +using KsqlDb.Api.Client.Parsers; +using Xunit; + +namespace KsqlDb.Client.UnitTests +{ + public class QueryResultRowParserTests + { + [Fact] + public void Pr() + { + // Arrange + const string row = @"{ + ""columnNames"":[""USERID"",""REGISTERTIME"",""REGIONID"",""GENDER"",""INTERESTS"",""CONTACTINFO""], + ""columnTypes"":[""STRING"",""BIGINT"",""STRING"",""STRING"",""ARRAY"",""MAP""] + }"; + var rowJson = (JsonElement)JsonSerializer.Deserialize(row, new JsonSerializerOptions { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }); + + var target = new QueryResultRowParser(rowJson); + + } + } +} diff --git a/test/KsqlDb.Client.UnitTests/UnitTest1.cs b/test/KsqlDb.Client.UnitTests/UnitTest1.cs deleted file mode 100644 index d3df497..0000000 --- a/test/KsqlDb.Client.UnitTests/UnitTest1.cs +++ /dev/null @@ -1,13 +0,0 @@ -using System; -using Xunit; - -namespace KsqlDb.Client.UnitTests -{ - public class UnitTest1 - { - [Fact] - public void Test1() - { - } - } -} diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 1215b3f..82c64d6 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -5,6 +5,7 @@ services: environment: ZOOKEEPER_CLIENT_PORT: 32181 ZOOKEEPER_TICK_TIME: 2000 + restart: always kafka: image: confluentinc/cp-enterprise-kafka:${CP_VERSION} @@ -22,6 +23,7 @@ services: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100 + restart: always schema-registry: image: confluentinc/cp-schema-registry:${CP_VERSION} @@ -31,6 +33,7 @@ services: environment: SCHEMA_REGISTRY_HOST_NAME: schema-registry SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:32181 + restart: always primary-ksqldb-server: image: ${KSQL_IMAGE_BASE}confluentinc/ksqldb-server:${KSQL_VERSION} @@ -47,18 +50,7 @@ services: KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081 KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" - - additional-ksqldb-server: - image: ${KSQL_IMAGE_BASE}confluentinc/ksqldb-server:${KSQL_VERSION} - hostname: additional-ksqldb-server - depends_on: - - primary-ksqldb-server - ports: - - "8090" - environment: - KSQL_LISTENERS: http://0.0.0.0:8090 - KSQL_BOOTSTRAP_SERVERS: kafka:9092 - KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081 + restart: always ksql-datagen: image: confluentinc/ksqldb-examples:${CP_VERSION} @@ -72,13 +64,38 @@ services: cub kafka-ready -b kafka:9092 1 40 && \ echo Waiting for Confluent Schema Registry to be ready... && \ cub sr-ready schema-registry 8081 40 && \ + echo Waiting for ksqlDB to be ready... && \ + cub ksql-server-ready primary-ksqldb-server 8088 40 && \ echo Waiting a few seconds for topic creation to finish... && \ sleep 11 && \ - ksql-datagen quickstart=orders topic=orders_topic bootstrap-server=kafka:9092 && \ + ksql-datagen quickstart=orders topic=orders_topic format=avro msgRate=1 bootstrap-server=kafka:9092 & \ + ksql-datagen quickstart=users_ topic=users_topic format=json msgRate=1 bootstrap-server=kafka:9092 & \ tail -f /dev/null'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: kafka:9092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 + restart: always + control-center: + image: confluentinc/cp-enterprise-control-center:${CP_VERSION} + hostname: control-center + container_name: control-center + depends_on: + - kafka + - schema-registry + - primary-ksqldb-server + ports: + - "9021:9021" + environment: + CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092' + CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://primary-ksqldb-server:8088" + CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://primary-ksqldb-server:8088" + CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + CONTROL_CENTER_REPLICATION_FACTOR: 1 + CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 + CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 + CONFLUENT_METRICS_TOPIC_REPLICATION: 1 + PORT: 9021 + restart: always \ No newline at end of file diff --git a/test/run-integration-tests.ps1 b/test/run-integration-tests.ps1 new file mode 100644 index 0000000..01462f9 --- /dev/null +++ b/test/run-integration-tests.ps1 @@ -0,0 +1,9 @@ +docker-compose up -d + +dotnet build -c Release KsqlDb.Client.IntegrationTests\KsqlDb.Client.IntegrationTests.csproj + +.\setup-ksqldb-test-env.ps1 + +dotnet test --no-build KsqlDb.Client.IntegrationTests\KsqlDb.Client.IntegrationTests.csproj + +docker-compose down \ No newline at end of file diff --git a/test/setup-ksqldb-test-env.ps1 b/test/setup-ksqldb-test-env.ps1 new file mode 100644 index 0000000..b4bb5a4 --- /dev/null +++ b/test/setup-ksqldb-test-env.ps1 @@ -0,0 +1,28 @@ +$ksq_endpoint="http://localhost:8088/ksql" +$create_stream_sql="CREATE STREAM orders_stream (itemid VARCHAR, price DOUBLE, location STRUCT, timestamp VARCHAR) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');" +$create_table_sql="CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');" + +$create_stream_body = @{ + "ksql"=$create_stream_sql +} | ConvertTo-Json + +$create_table_body = @{ + "ksql"=$create_table_sql +} | ConvertTo-Json + +$header = @{ + "Accept"="application/vnd.ksql.v1+json" + "Content-Type"="application/json" +} + +do +{ + $response = Invoke-WebRequest -SkipHttpErrorCheck -Uri $ksq_endpoint -Method POST -Headers $header -Body $create_stream_body + if ($response.statusCode -ne 200) + { + Write-Host "ksqlDb is not ready yet: $response" + Start-Sleep -s 5 + } +} while($response.statusCode -ne 200) # We need to wait until the ksqldb server is up and running and the respective topics are created + +Invoke-WebRequest -Uri $ksq_endpoint -Method POST -Headers $header -Body $create_table_body \ No newline at end of file From cea631564f9b4e74b16ae5fe97b1a742dd096b27 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Mon, 18 Jan 2021 16:54:15 +0000 Subject: [PATCH 02/13] Creating a test stream and table for the integration tests. Removing control center from the docker yml. --- .github/workflows/dotnet.yml | 4 ++++ test/docker-compose.yml | 22 ---------------------- 2 files changed, 4 insertions(+), 22 deletions(-) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 66ec618..0bc1fab 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -126,5 +126,9 @@ jobs: run: docker-compose up -d - name: Wait until ksqlDB is up and running run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false + - name: Create orders stream + run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE STREAM orders_stream (itemid VARCHAR, price DOUBLE, location STRUCT, timestamp VARCHAR) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');\", \"streamsProperties\": {}}" + - name: Create users table + run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');\", \"streamsProperties\": {}}" - name: Run test run: dotnet test KsqlDb.Client.IntegrationTests.dll diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 82c64d6..b7d2029 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -76,26 +76,4 @@ services: STREAMS_BOOTSTRAP_SERVERS: kafka:9092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 - restart: always - - control-center: - image: confluentinc/cp-enterprise-control-center:${CP_VERSION} - hostname: control-center - container_name: control-center - depends_on: - - kafka - - schema-registry - - primary-ksqldb-server - ports: - - "9021:9021" - environment: - CONTROL_CENTER_BOOTSTRAP_SERVERS: 'kafka:9092' - CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://primary-ksqldb-server:8088" - CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://primary-ksqldb-server:8088" - CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONTROL_CENTER_REPLICATION_FACTOR: 1 - CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1 - CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1 - CONFLUENT_METRICS_TOPIC_REPLICATION: 1 - PORT: 9021 restart: always \ No newline at end of file From 0858527dcd7ac883b89d33128643f7ee2061b005 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Tue, 19 Jan 2021 23:12:09 +0000 Subject: [PATCH 03/13] Splitting test data generators into 2 separate containers Fixing test Orders stream definition --- test/docker-compose.yml | 32 +++++++++++++++++++++++++------- test/setup-ksqldb-test-env.ps1 | 2 +- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/test/docker-compose.yml b/test/docker-compose.yml index b7d2029..4697bc4 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -52,10 +52,8 @@ services: KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" restart: always - ksql-datagen: + ksql-datagen-orders: image: confluentinc/ksqldb-examples:${CP_VERSION} - hostname: ksql-datagen - container_name: ksql-datagen depends_on: - kafka - schema-registry @@ -67,10 +65,30 @@ services: echo Waiting for ksqlDB to be ready... && \ cub ksql-server-ready primary-ksqldb-server 8088 40 && \ echo Waiting a few seconds for topic creation to finish... && \ - sleep 11 && \ - ksql-datagen quickstart=orders topic=orders_topic format=avro msgRate=1 bootstrap-server=kafka:9092 & \ - ksql-datagen quickstart=users_ topic=users_topic format=json msgRate=1 bootstrap-server=kafka:9092 & \ - tail -f /dev/null'" + sleep 15 && \ + ksql-datagen quickstart=orders topic=orders_topic format=json msgRate=1 bootstrap-server=kafka:9092'" + environment: + KSQL_CONFIG_DIR: "/etc/ksql" + STREAMS_BOOTSTRAP_SERVERS: kafka:9092 + STREAMS_SCHEMA_REGISTRY_HOST: schema-registry + STREAMS_SCHEMA_REGISTRY_PORT: 8081 + restart: always + + ksql-datagen-users: + image: confluentinc/ksqldb-examples:${CP_VERSION} + depends_on: + - kafka + - schema-registry + - primary-ksqldb-server + command: "bash -c 'echo Waiting for Kafka to be ready... && \ + cub kafka-ready -b kafka:9092 1 40 && \ + echo Waiting for Confluent Schema Registry to be ready... && \ + cub sr-ready schema-registry 8081 40 && \ + echo Waiting for ksqlDB to be ready... && \ + cub ksql-server-ready primary-ksqldb-server 8088 40 && \ + echo Waiting a few seconds for topic creation to finish... && \ + sleep 15 && \ + ksql-datagen quickstart=users_ topic=users_topic format=json msgRate=1 bootstrap-server=kafka:9092'" environment: KSQL_CONFIG_DIR: "/etc/ksql" STREAMS_BOOTSTRAP_SERVERS: kafka:9092 diff --git a/test/setup-ksqldb-test-env.ps1 b/test/setup-ksqldb-test-env.ps1 index b4bb5a4..f6a3877 100644 --- a/test/setup-ksqldb-test-env.ps1 +++ b/test/setup-ksqldb-test-env.ps1 @@ -1,5 +1,5 @@ $ksq_endpoint="http://localhost:8088/ksql" -$create_stream_sql="CREATE STREAM orders_stream (itemid VARCHAR, price DOUBLE, location STRUCT, timestamp VARCHAR) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');" +$create_stream_sql="CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');" $create_table_sql="CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');" $create_stream_body = @{ From 4bd9aad1fa7e850fd747702f536bf8bea5a9afe7 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Tue, 19 Jan 2021 23:39:01 +0000 Subject: [PATCH 04/13] Fixing the broken workflow yaml --- .github/workflows/dotnet.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 0bc1fab..0cad75d 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -127,8 +127,8 @@ jobs: - name: Wait until ksqlDB is up and running run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false - name: Create orders stream - run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE STREAM orders_stream (itemid VARCHAR, price DOUBLE, location STRUCT, timestamp VARCHAR) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');\", \"streamsProperties\": {}}" + run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');\", \"streamsProperties\":\ {}}" - name: Create users table - run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');\", \"streamsProperties\": {}}" + run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');\", \"streamsProperties\":\ {}}" - name: Run test run: dotnet test KsqlDb.Client.IntegrationTests.dll From 86b0b7f2edee4fa89f154a957271ec0cd3d01e26 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Wed, 20 Jan 2021 11:23:14 +0000 Subject: [PATCH 05/13] Attempt to fix the curl steps in the github workflow --- .github/workflows/dotnet.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 0cad75d..22caf8b 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -127,8 +127,12 @@ jobs: - name: Wait until ksqlDB is up and running run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false - name: Create orders stream - run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');\", \"streamsProperties\":\ {}}" + uses: wei/curl@v1 + with: + args: -X POST http://127.0.0.1:8088/ksql -H Accept:\ application/vnd.ksql.v1+json -d { "ksql":\ "CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');", "streamsProperties":\ {}} - name: Create users table - run: curl -X "POST" "http://127.0.0.1:8088/ksql" -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');\", \"streamsProperties\":\ {}}" + uses: wei/curl@v1 + with: + args: -X POST http://127.0.0.1:8088/ksql -H Accept:\ application/vnd.ksql.v1+json -d { "ksql":\ "CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');", "streamsProperties":\ {}} - name: Run test run: dotnet test KsqlDb.Client.IntegrationTests.dll From 381e8beb6ead55f51489f38755a35df97804892c Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Wed, 20 Jan 2021 11:53:15 +0000 Subject: [PATCH 06/13] Another attempt: escaping double quotes. --- .github/workflows/dotnet.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 22caf8b..53d7e6e 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -129,10 +129,10 @@ jobs: - name: Create orders stream uses: wei/curl@v1 with: - args: -X POST http://127.0.0.1:8088/ksql -H Accept:\ application/vnd.ksql.v1+json -d { "ksql":\ "CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');", "streamsProperties":\ {}} + args: -X POST http://127.0.0.1:8088/ksql -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');\",\"streamsProperties\":\ {}}" - name: Create users table uses: wei/curl@v1 with: - args: -X POST http://127.0.0.1:8088/ksql -H Accept:\ application/vnd.ksql.v1+json -d { "ksql":\ "CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');", "streamsProperties":\ {}} + args: -X POST http://127.0.0.1:8088/ksql -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');\",\"streamsProperties\":\ {}}" - name: Run test run: dotnet test KsqlDb.Client.IntegrationTests.dll From d6339bdf0fa6cb1381fc160fec5d811b2a0ea122 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Thu, 21 Jan 2021 22:31:25 +0000 Subject: [PATCH 07/13] Moving creation of test stream and table into docker --- .github/workflows/dotnet.yml | 11 ++--------- test/CreateTestStreams/Dockerfile | 4 ++++ test/CreateTestStreams/createteststreams.sh | 9 +++++++++ test/docker-compose.yml | 7 +++++++ 4 files changed, 22 insertions(+), 9 deletions(-) create mode 100644 test/CreateTestStreams/Dockerfile create mode 100644 test/CreateTestStreams/createteststreams.sh diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 53d7e6e..fbc01fe 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -58,6 +58,7 @@ jobs: path: | test/docker-compose.yml test/.env + test/CreateTestStreams unit-tests: name: Unit tests @@ -123,16 +124,8 @@ jobs: with: name: docker-compose - name: Start ksqlDB in docker - run: docker-compose up -d + run: docker-compose up -d --build - name: Wait until ksqlDB is up and running run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false - - name: Create orders stream - uses: wei/curl@v1 - with: - args: -X POST http://127.0.0.1:8088/ksql -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');\",\"streamsProperties\":\ {}}" - - name: Create users table - uses: wei/curl@v1 - with: - args: -X POST http://127.0.0.1:8088/ksql -H "Accept:\ application/vnd.ksql.v1+json" -d "{ \"ksql\":\ \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');\",\"streamsProperties\":\ {}}" - name: Run test run: dotnet test KsqlDb.Client.IntegrationTests.dll diff --git a/test/CreateTestStreams/Dockerfile b/test/CreateTestStreams/Dockerfile new file mode 100644 index 0000000..89d3de1 --- /dev/null +++ b/test/CreateTestStreams/Dockerfile @@ -0,0 +1,4 @@ +FROM curlimages/curl:7.74.0 +WORKDIR %HOME%/bin +COPY ./createteststreams.sh . +CMD sh createteststreams.sh \ No newline at end of file diff --git a/test/CreateTestStreams/createteststreams.sh b/test/CreateTestStreams/createteststreams.sh new file mode 100644 index 0000000..b789329 --- /dev/null +++ b/test/CreateTestStreams/createteststreams.sh @@ -0,0 +1,9 @@ +#!/bin/bash +echo "Waiting for ksqlDB to be up and running..." +timeout 2m sh -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://primary-ksqldb-server:8088/info)" != "200" ]]; do sleep 5; done' || false +echo "Creating orders_stream..." +curl -X POST http://primary-ksqldb-server:8088/ksql -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON', PARTITIONS=1, REPLICAS=1);\",\"streamsProperties\": {}}" +echo "Creating users_table..." +curl -X POST http://primary-ksqldb-server:8088/ksql -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON', PARTITIONS=1, REPLICAS=1);\",\"streamsProperties\": {}}" +echo "All done" +tail -f /dev/null \ No newline at end of file diff --git a/test/docker-compose.yml b/test/docker-compose.yml index 4697bc4..ac20861 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -94,4 +94,11 @@ services: STREAMS_BOOTSTRAP_SERVERS: kafka:9092 STREAMS_SCHEMA_REGISTRY_HOST: schema-registry STREAMS_SCHEMA_REGISTRY_PORT: 8081 + restart: always + + create-test-streams: + build: CreateTestStreams + depends_on: + - ksql-datagen-orders + - ksql-datagen-users restart: always \ No newline at end of file From b66a7821243ff3e849adcd1e0b1fdd89e1fef748 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Thu, 21 Jan 2021 23:06:47 +0000 Subject: [PATCH 08/13] Adding support of structs to the parser --- src/KsqlDb.Client/Parsers/KObjectParser.cs | 40 ++++++++++++++++++- .../Parsers/KObjectParserTests.cs | 24 +++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/src/KsqlDb.Client/Parsers/KObjectParser.cs b/src/KsqlDb.Client/Parsers/KObjectParser.cs index 5129468..e442c14 100644 --- a/src/KsqlDb.Client/Parsers/KObjectParser.cs +++ b/src/KsqlDb.Client/Parsers/KObjectParser.cs @@ -1,6 +1,8 @@ using System; +using System.Collections.Generic; using System.Text.Json; using KsqlDb.Api.Client.Abstractions.Objects; +using KsqlDb.Api.Client.Exceptions; // ReSharper disable HeapView.BoxingAllocation namespace KsqlDb.Api.Client.Parsers @@ -27,8 +29,7 @@ public static KObjectParser Create(string type) if (primaryType.Equals("DECIMAL", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseDecimal, typeof(decimal)); if (primaryType.Equals("ARRAY", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseArray(j, type), typeof(KSqlArray)); if (primaryType.Equals("MAP", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseMap(j, type), typeof(KSqlObject)); - //TODO Add support of STRUCT - //if (primaryType.Equals("STRUCT", StringComparison.OrdinalIgnoreCase)) return new StructKObjectParser(); + if (primaryType.Equals("STRUCT", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseStruct(j, type), typeof(KSqlObject)); throw new NotSupportedException($"The {type} type is not supported."); } @@ -98,5 +99,40 @@ private static object ParseMap(JsonElement jsonElement, ReadOnlySpan fullT return kSqlObject; } + + private static object ParseStruct(JsonElement jsonElement, ReadOnlySpan fullTypeName) + { + if (jsonElement.ValueKind != JsonValueKind.Object) return KSqlNull.Instance; + var fieldDefinitions = fullTypeName.Slice(7, fullTypeName.Length - 8).Trim(); + var fieldValueParsers = new Dictionary(); + int fieldSeparatorIndex; + do + { + fieldSeparatorIndex = fieldDefinitions.IndexOf(','); + var fieldDefinition = fieldSeparatorIndex >= 0 ? fieldDefinitions.Slice(0, fieldSeparatorIndex) : fieldDefinitions; + int nameAndTypeSeparatorIndex = fieldDefinition.IndexOf(' '); + var name = fieldDefinition.Slice(0, nameAndTypeSeparatorIndex).Trim("` "); + var type = fieldDefinition.Slice(nameAndTypeSeparatorIndex + 1, fieldDefinition.Length - nameAndTypeSeparatorIndex - 1).Trim(); + fieldValueParsers.Add(name.ToString(), Create(type.ToString())); + fieldDefinitions = fieldDefinitions.Slice(fieldSeparatorIndex + 1, fieldDefinitions.Length - fieldSeparatorIndex - 1).Trim(); + } while (fieldSeparatorIndex >= 0); + + var kSqlObject = new KSqlObject(); + foreach (var jsonProperty in jsonElement.EnumerateObject()) + { + if (fieldValueParsers.TryGetValue(jsonProperty.Name, out var valueParser)) + { + var value = valueParser.Parse(jsonProperty.Value); + kSqlObject.AddValue(jsonProperty.Name, value); + } + else + { + string supportedFields = string.Join(",", fieldValueParsers.Keys); + throw new KsqlDbException($"Unable to parse unexpected struct field {jsonProperty.Name}. Expected fields: {supportedFields}"); + } + } + + return kSqlObject; + } } } diff --git a/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs b/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs index 911964d..bb6ab0b 100644 --- a/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs +++ b/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs @@ -22,6 +22,7 @@ public class KObjectParserTests [InlineData("MAP")] [InlineData("ARRAY>>")] [InlineData("STRUCT, AGE INT>")] + [InlineData("STRUCT<`CITY` STRING, `STATE` STRING, `ZIPCODE` BIGINT>")] public void Creates_A_Parser_For_A_Known_Type(string type) { var target = KObjectParser.Create(type); @@ -206,12 +207,35 @@ public void Parses_A_Json_Map_With_Int_Values_And_Returns_An_Instance_Of_KSqlObj AssertMap(map, actualMap); } + [Fact] + public void Parses_A_Struct_And_Returns_An_Instance_Of_KSqlObject() + { + // Arrange + var @struct = new Dictionary { ["CITY"] = "Sevenoaks", ["STATE"] = "Kent", ["ZIPCODE"] = 90021L }; + var jsonElement = Deserialize(JsonSerializer.Serialize(@struct)); + var target = KObjectParser.Create("STRUCT<`CITY` STRING, `STATE` STRING, `ZIPCODE` BIGINT>"); + + // Act + var actual = target.Parse(jsonElement); + + // Assert + Assert.IsType(actual); + if (actual is not KSqlObject actualStruct) return; + AssertStruct(@struct, actualStruct); + } + private static void AssertMap(Dictionary expected, KSqlObject actual) { Assert.Equal(expected.Keys, actual.FieldNames); Assert.All(actual.FieldNames, key => Assert.Equal(expected[key], actual.TryGetInteger(key))); } + private static void AssertStruct(Dictionary expected, KSqlObject actual) + { + Assert.Equal(expected.Keys, actual.FieldNames); + Assert.All(actual.FieldNames, key => Assert.Equal(expected[key], actual[key])); + } + private static JsonElement Deserialize(string json) => (JsonElement)JsonSerializer.Deserialize(json); } } From 5daf8d4ffc69a926fc594ad5a7ee0aece135a4de Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Fri, 22 Jan 2021 22:59:53 +0000 Subject: [PATCH 09/13] Adding devug-via-ssh step to the workflow --- .github/workflows/dotnet.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index fbc01fe..6d2629a 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -129,3 +129,8 @@ jobs: run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false - name: Run test run: dotnet test KsqlDb.Client.IntegrationTests.dll + - name: Start SSH session + uses: luchihoratiu/debug-via-ssh@main + with: + NGROK_AUTH_TOKEN: ${{ secrets.NGROK_AUTH_TOKEN }} + SSH_PASS: ${{ secrets.SSH_PASS }} From b753f21e4e5ea0050850c0d70531a575cf1daef2 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Fri, 22 Jan 2021 23:10:45 +0000 Subject: [PATCH 10/13] Adding a stop if failure step to the workflow --- .github/workflows/dotnet.yml | 3 +++ test/KsqlDb.Client.IntegrationTests/DescribeTests.cs | 6 ++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 6d2629a..52ef7e3 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -134,3 +134,6 @@ jobs: with: NGROK_AUTH_TOKEN: ${{ secrets.NGROK_AUTH_TOKEN }} SSH_PASS: ${{ secrets.SSH_PASS }} + - name: Don't kill instace + if: ${{ failure() }} + run: sleep 1h # Prevent to killing instance after failure diff --git a/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs b/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs index 22a4b1b..7ca9eb1 100644 --- a/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs +++ b/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs @@ -4,6 +4,7 @@ namespace KsqlDb.Client.IntegrationTests { +/* public class DescribeTests : IClassFixture { private readonly SourceDescriptionsFixture _fixture; @@ -18,12 +19,12 @@ public void Contains_Statement_Text(string entity) Assert.Contains($"DESCRIBE {entity}",_fixture.OrdersStreamDescription.SqlStatement); } - /*[Theory] + [Theory] public void Orders_Stream_Contains_ExpectedFields() { - }*/ + } public sealed class SourceDescriptionsFixture : IDisposable { @@ -41,4 +42,5 @@ public void Dispose() } } } +*/ } From 33a3b2f1f45fdb56c8e98cb364bc524931dd9307 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Fri, 22 Jan 2021 23:46:46 +0000 Subject: [PATCH 11/13] Removing ssh step --- .github/workflows/dotnet.yml | 10 +--------- .../StreamQueryTests.cs | 15 +++++++++++---- 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 52ef7e3..7261257 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -128,12 +128,4 @@ jobs: - name: Wait until ksqlDB is up and running run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false - name: Run test - run: dotnet test KsqlDb.Client.IntegrationTests.dll - - name: Start SSH session - uses: luchihoratiu/debug-via-ssh@main - with: - NGROK_AUTH_TOKEN: ${{ secrets.NGROK_AUTH_TOKEN }} - SSH_PASS: ${{ secrets.SSH_PASS }} - - name: Don't kill instace - if: ${{ failure() }} - run: sleep 1h # Prevent to killing instance after failure + run: dotnet test KsqlDb.Client.IntegrationTests.dll \ No newline at end of file diff --git a/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs index 25648b8..47d2eca 100644 --- a/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs +++ b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs @@ -116,10 +116,17 @@ public sealed class StreamQueryFixture : IAsyncLifetime public async Task InitializeAsync() { var properties = new Dictionary { ["ksql.streams.auto.offset.reset"] = "earliest" }; - var queryResult = await TestClient.Instance.StreamQuery($"SELECT * FROM {TestClient.KnownEntities.UsersTableName} EMIT CHANGES LIMIT {ExpectedRowsCount};", properties); - Columns = queryResult.Columns; - QueryId = queryResult.QueryId; - Rows = await queryResult.Rows.ToArrayAsync(); + try + { + var queryResult = await TestClient.Instance.StreamQuery($"SELECT * FROM {TestClient.KnownEntities.UsersTableName} EMIT CHANGES LIMIT {ExpectedRowsCount};", properties); + Columns = queryResult.Columns; + QueryId = queryResult.QueryId; + Rows = await queryResult.Rows.ToArrayAsync(); + } + catch (KsqlDbException e) + { + throw new Exception($"Error code: {e.Body?.ErrorCode ?? -1}, Error message: {e.Body?.Message ?? ""}", e); + } } public Task DisposeAsync() => Task.CompletedTask; From 065e6e6c6ae729aa14b0596f211ff785c2b50d05 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Sat, 23 Jan 2021 00:01:27 +0000 Subject: [PATCH 12/13] Setting env variable for unencrypted http2 in integration tests. --- .github/workflows/dotnet.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml index 7261257..430570c 100644 --- a/.github/workflows/dotnet.yml +++ b/.github/workflows/dotnet.yml @@ -128,4 +128,6 @@ jobs: - name: Wait until ksqlDB is up and running run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false - name: Run test - run: dotnet test KsqlDb.Client.IntegrationTests.dll \ No newline at end of file + run: dotnet test KsqlDb.Client.IntegrationTests.dll + env: + DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_HTTP2UNENCRYPTEDSUPPORT: 1 \ No newline at end of file From 52f33d2156dd8a9accd6cbc1adae622364d94696 Mon Sep 17 00:00:00 2001 From: Alex Basiuk Date: Sun, 24 Jan 2021 23:13:04 +0000 Subject: [PATCH 13/13] Added checks that insert stream method actually inserts data Removing redundant scripts and some diagnostics code. --- .../StreamInsertsTests.cs | 109 +++++++++++++----- .../StreamQueryTests.cs | 15 +-- .../TestClient.cs | 7 +- test/run-integration-tests.ps1 | 9 -- test/setup-ksqldb-test-env.ps1 | 28 ----- 5 files changed, 90 insertions(+), 78 deletions(-) delete mode 100644 test/run-integration-tests.ps1 delete mode 100644 test/setup-ksqldb-test-env.ps1 diff --git a/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs b/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs index fae45be..f85bd02 100644 --- a/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs +++ b/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs @@ -1,73 +1,128 @@ using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.IO; using System.Linq; +using System.Net.Http; +using System.Text; +using System.Text.Json; using System.Threading.Tasks; using KsqlDb.Api.Client.Abstractions.Objects; using KsqlDb.Api.Client.Exceptions; -using KsqlDb.Api.Client.KsqlApiV1; using Xunit; -using Xunit.Abstractions; namespace KsqlDb.Client.IntegrationTests { public class StreamInsertsTests { - private readonly TestClient _client; + private const int RowsNumber = 100; private const string PrimitiveStreamName = "PrimitiveStream"; - public StreamInsertsTests(ITestOutputHelper output) - { - _client = new TestClient(output); - } - [Fact] - public async Task Inserts_Rows_With_Primitive_Type() + public async Task Inserts_Rows_With_Primitive_Type_Fields() { // Arrange await CreatePrimitiveStream(); - var rowIndices = Enumerable.Range(1, 100).ToArray(); - var expected = rowIndices.Select(x => (long)x-1); - var rows = rowIndices.Select(CreatePrimitiveRow).ToAsyncEnumerable(); + int[] rowIndices = Enumerable.Range(1, RowsNumber).ToArray(); + var expectedSequenceNumbers = rowIndices.Select(x => (long)x-1); + var dataToInsert = rowIndices.Select(CreatePrimitiveRow).ToArray(); + var expectedRows = dataToInsert.Select(GetKsqlObjectValuesSortedByFieldName).ToArray(); + var asyncRows = dataToInsert.ToAsyncEnumerable(); // Act - var acks = TestClient.Instance.StreamInserts(PrimitiveStreamName, rows); + var acks = TestClient.Instance.StreamInserts(PrimitiveStreamName, asyncRows); // Assert - var actual = (await acks.ToArrayAsync()).Select(x => x.SequenceNumber); - - Assert.Equal(expected, actual); + var actualSequenceNumbers = (await acks.ToArrayAsync()).Select(x => x.SequenceNumber); + Assert.Equal(expectedSequenceNumbers, actualSequenceNumbers); + var actualRows = await QueryStream(PrimitiveStreamName, typeof(int), typeof(long), typeof(bool), typeof(double), typeof(string)); + Assert.Equal(expectedRows, actualRows, new TwoDimensionalArrayEqualityComparer()); } private static KSqlObject CreatePrimitiveRow(int rowNumber) => new KSqlObject() - .Add("i1", rowNumber * 10) - .Add("b2", rowNumber * 10_000) - .Add("b3", rowNumber % 2 == 0) - .Add("d4", 10007d / rowNumber) - .Add("v5", $"{rowNumber} row"); + .Add("f1", rowNumber * 10) + .Add("f2", rowNumber * 10_000L) + .Add("f3", rowNumber % 2 == 0) + .Add("f4", 10007d / rowNumber) + .Add("f5", $"{rowNumber} row"); private static async Task CreatePrimitiveStream() { try { + // Drop the stream if already exists await TestClient.Instance.ExecuteStatement($"DROP STREAM {PrimitiveStreamName};"); } catch (KsqlDbException) { - + // The stream doesn't exist } await TestClient.Instance.ExecuteStatement( @$"CREATE STREAM {PrimitiveStreamName} ( - i1 INTEGER KEY, - b2 BIGINT, - b3 BOOLEAN, - d4 DOUBLE, - v5 VARCHAR + f1 INTEGER KEY, + f2 BIGINT, + f3 BOOLEAN, + f4 DOUBLE, + f5 VARCHAR ) WITH ( kafka_topic = '{PrimitiveStreamName}', partitions = 1, value_format = 'json' );"); } + + private static async Task QueryStream(string streamName, params Type[] expectedTypes) + { + using var request = new HttpRequestMessage(HttpMethod.Post, "/query-stream") + { + Content = new StringContent($"{{ \"sql\": \"select * from {streamName} emit changes limit {RowsNumber};\", \"properties\": {{ \"ksql.streams.auto.offset.reset\": \"earliest\" }} }}", Encoding.UTF8, "application/json"), + Version = new Version(2, 0), +#if NET5_0 + VersionPolicy = HttpVersionPolicy.RequestVersionOrHigher +#endif + }; + + var response = await TestClient.RawHttpClient.SendAsync(request); + Debug.Assert(response.IsSuccessStatusCode); + await using var responseStream = await response.Content.ReadAsStreamAsync(); + using var streamReader = new StreamReader(responseStream); + _ = await streamReader.ReadLineAsync(); // skip data schema header + var rows = new List(); + while (!streamReader.EndOfStream) + { + string responseContentLine = await streamReader.ReadLineAsync(); + var parsedRow = JsonSerializer.Deserialize(responseContentLine!); + var processedRow = parsedRow!.Cast().Select((e, i) => JsonSerializer.Deserialize(e.GetRawText(), expectedTypes[i])).ToArray(); + rows.Add(processedRow); + } + + return rows.ToArray(); + } + + private static object[] GetKsqlObjectValuesSortedByFieldName(KSqlObject kSqlObject) => + kSqlObject.AsImmutableDictionary().OrderBy(x => x.Key).Select(x => x.Value).ToArray(); + + private sealed class TwoDimensionalArrayEqualityComparer : IEqualityComparer + { + public bool Equals(object[][] x, object[][] y) => x!.SequenceEqual(y!, new ArrayEqualityComparer()); + public int GetHashCode(object[][] obj) => 0; + } + + private sealed class ArrayEqualityComparer : IEqualityComparer + { + public bool Equals(object[] x, object[] y) + { + if (x!.Length != y!.Length) return false; + for (int i = 0; i < x.Length; i++) + { + if (!x[i].Equals(y[i])) return false; + } + + return true; + } + public int GetHashCode(object[] obj) => 0; + } } } diff --git a/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs index 47d2eca..25648b8 100644 --- a/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs +++ b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs @@ -116,17 +116,10 @@ public sealed class StreamQueryFixture : IAsyncLifetime public async Task InitializeAsync() { var properties = new Dictionary { ["ksql.streams.auto.offset.reset"] = "earliest" }; - try - { - var queryResult = await TestClient.Instance.StreamQuery($"SELECT * FROM {TestClient.KnownEntities.UsersTableName} EMIT CHANGES LIMIT {ExpectedRowsCount};", properties); - Columns = queryResult.Columns; - QueryId = queryResult.QueryId; - Rows = await queryResult.Rows.ToArrayAsync(); - } - catch (KsqlDbException e) - { - throw new Exception($"Error code: {e.Body?.ErrorCode ?? -1}, Error message: {e.Body?.Message ?? ""}", e); - } + var queryResult = await TestClient.Instance.StreamQuery($"SELECT * FROM {TestClient.KnownEntities.UsersTableName} EMIT CHANGES LIMIT {ExpectedRowsCount};", properties); + Columns = queryResult.Columns; + QueryId = queryResult.QueryId; + Rows = await queryResult.Rows.ToArrayAsync(); } public Task DisposeAsync() => Task.CompletedTask; diff --git a/test/KsqlDb.Client.IntegrationTests/TestClient.cs b/test/KsqlDb.Client.IntegrationTests/TestClient.cs index d58aec4..b362f90 100644 --- a/test/KsqlDb.Client.IntegrationTests/TestClient.cs +++ b/test/KsqlDb.Client.IntegrationTests/TestClient.cs @@ -17,9 +17,11 @@ public static class KnownEntities public const string OrdersStreamName = "ORDERS_STREAM"; public const string UsersTopicName = "users_topic"; public const string UsersTableName = "USERS_TABLE"; + } + public static HttpClient RawHttpClient => new() {BaseAddress = new Uri("http://127.0.0.1:8088")}; - public static KsqlDb.Api.Client.Client Instance => new TestClient(); + public static KsqlDb.Api.Client.Client Instance => new TestClient(); private TestClient() : base(CreateHttpClient()) { @@ -27,9 +29,8 @@ private TestClient() : base(CreateHttpClient()) private static KSqlDbHttpClient CreateHttpClient() { - var httpClient = new HttpClient {BaseAddress = new Uri("http://127.0.0.1:8088")}; var jsonSerializer = new JsonSerializer(); - return new KSqlDbHttpClient(httpClient, new KsqlV1RequestHttpContentFactory(jsonSerializer), jsonSerializer); + return new KSqlDbHttpClient(RawHttpClient, new KsqlV1RequestHttpContentFactory(jsonSerializer), jsonSerializer); } public TestClient(ITestOutputHelper output) : base(CreateHttpClientWithLogging(output)) diff --git a/test/run-integration-tests.ps1 b/test/run-integration-tests.ps1 deleted file mode 100644 index 01462f9..0000000 --- a/test/run-integration-tests.ps1 +++ /dev/null @@ -1,9 +0,0 @@ -docker-compose up -d - -dotnet build -c Release KsqlDb.Client.IntegrationTests\KsqlDb.Client.IntegrationTests.csproj - -.\setup-ksqldb-test-env.ps1 - -dotnet test --no-build KsqlDb.Client.IntegrationTests\KsqlDb.Client.IntegrationTests.csproj - -docker-compose down \ No newline at end of file diff --git a/test/setup-ksqldb-test-env.ps1 b/test/setup-ksqldb-test-env.ps1 deleted file mode 100644 index f6a3877..0000000 --- a/test/setup-ksqldb-test-env.ps1 +++ /dev/null @@ -1,28 +0,0 @@ -$ksq_endpoint="http://localhost:8088/ksql" -$create_stream_sql="CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON');" -$create_table_sql="CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON');" - -$create_stream_body = @{ - "ksql"=$create_stream_sql -} | ConvertTo-Json - -$create_table_body = @{ - "ksql"=$create_table_sql -} | ConvertTo-Json - -$header = @{ - "Accept"="application/vnd.ksql.v1+json" - "Content-Type"="application/json" -} - -do -{ - $response = Invoke-WebRequest -SkipHttpErrorCheck -Uri $ksq_endpoint -Method POST -Headers $header -Body $create_stream_body - if ($response.statusCode -ne 200) - { - Write-Host "ksqlDb is not ready yet: $response" - Start-Sleep -s 5 - } -} while($response.statusCode -ne 200) # We need to wait until the ksqldb server is up and running and the respective topics are created - -Invoke-WebRequest -Uri $ksq_endpoint -Method POST -Headers $header -Body $create_table_body \ No newline at end of file