diff --git a/.github/workflows/dotnet.yml b/.github/workflows/dotnet.yml
index bea8d15..430570c 100644
--- a/.github/workflows/dotnet.yml
+++ b/.github/workflows/dotnet.yml
@@ -58,6 +58,7 @@ jobs:
path: |
test/docker-compose.yml
test/.env
+ test/CreateTestStreams
unit-tests:
name: Unit tests
@@ -123,8 +124,10 @@ jobs:
with:
name: docker-compose
- name: Start ksqlDB in docker
- run: docker-compose up -d --scale additional-ksqldb-server=0
+ run: docker-compose up -d --build
- name: Wait until ksqlDB is up and running
run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false
- name: Run test
run: dotnet test KsqlDb.Client.IntegrationTests.dll
+ env:
+ DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_HTTP2UNENCRYPTEDSUPPORT: 1
\ No newline at end of file
diff --git a/Directory.Build.props b/Directory.Build.props
index 6bccdaa..1077141 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -1,6 +1,6 @@
- alex.basiuk@outlook.com
+ Alex Basiuk
© Alex Basiuk
KsqlDb.Client
ksqlDB Client.
diff --git a/Directory.Build.targets b/Directory.Build.targets
index bb3185c..563b579 100644
--- a/Directory.Build.targets
+++ b/Directory.Build.targets
@@ -3,6 +3,8 @@
+
+
diff --git a/README.md b/README.md
index 42df90d..edfc095 100644
--- a/README.md
+++ b/README.md
@@ -2,3 +2,7 @@
=====================================================
data:image/s3,"s3://crabby-images/b874a/b874a7434568c31ae7f4499ef75fc6731d058dc3" alt="Build"
+
+
+In development scenarios when server and client have a priori knowledge that both will speak HTTP/2 unencrypted, you may establish an HTTP/2 connection over cleartext by setting an AppContext switch or an environment variable (DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_HTTP2UNENCRYPTEDSUPPORT=1).
+`AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);`
diff --git a/src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs b/src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs
new file mode 100644
index 0000000..a5e8a60
--- /dev/null
+++ b/src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs
@@ -0,0 +1,27 @@
+using System.Collections.Generic;
+using KsqlDb.Api.Client.Abstractions.QueryResults;
+
+namespace KsqlDb.Api.Client.Abstractions
+{
+ ///
+ /// A batches query result.
+ ///
+ public class BatchedQueryResults
+ {
+ ///
+ /// The result rows async enumerator.
+ ///
+ IReadOnlyCollection ResultRows { get; }
+
+ ///
+ /// The Id of the underlying push query if applicable.
+ ///
+ public string? QueryId { get; }
+
+ public BatchedQueryResults(IReadOnlyCollection resultRows, string? queryId)
+ {
+ ResultRows = resultRows;
+ QueryId = queryId;
+ }
+ }
+}
diff --git a/src/KsqlDb.Client/Abstractions/ColumnType.cs b/src/KsqlDb.Client/Abstractions/ColumnType.cs
new file mode 100644
index 0000000..17dd95f
--- /dev/null
+++ b/src/KsqlDb.Client/Abstractions/ColumnType.cs
@@ -0,0 +1,15 @@
+namespace KsqlDb.Api.Client.Abstractions
+{
+ public enum ColumnType
+ {
+ String,
+ Integer,
+ Bigint,
+ Double,
+ Boolean,
+ Decimal,
+ Array,
+ Map,
+ Struct
+ }
+}
diff --git a/src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs b/src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs
new file mode 100644
index 0000000..138b6b6
--- /dev/null
+++ b/src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs
@@ -0,0 +1,38 @@
+using KsqlDb.Api.Client.Abstractions;
+
+namespace KSQL.API.Client
+{
+ ///
+ /// The result of the method.
+ ///
+ public class ExecuteStatementResult
+ {
+ ///
+ /// Returns the ID of a newly started persistent query, if applicable. The return value is empty
+ /// for all statements other than 'CREATE ... AS * SELECT' and 'INSERT * INTO ... AS SELECT'
+ /// statements, as only these statements start persistent queries. For statements that start
+ /// persistent queries, the return value may still be empty if either:
+ ///
+ /// -
+ ///
+ /// The statement was not executed on the server by the time the server response was sent.
+ /// This typically does not happen under normal server operation, but may happen if the ksqlDB
+ /// server's command runner thread is stuck, or if the configured value for
ksql.server.command.response.timeout.ms
+ /// is too low.
+ ///
+ ///
+ /// -
+ ///
+ /// The ksqlDB server version is lower than 0.11.0.
+ ///
+ ///
+ ///
+ ///
+ string? QueryId;
+
+ public ExecuteStatementResult(string? queryId)
+ {
+ QueryId = queryId;
+ }
+ }
+}
diff --git a/src/KsqlDb.Client/Abstractions/FieldInfo.cs b/src/KsqlDb.Client/Abstractions/FieldInfo.cs
new file mode 100644
index 0000000..6e51f86
--- /dev/null
+++ b/src/KsqlDb.Client/Abstractions/FieldInfo.cs
@@ -0,0 +1,32 @@
+using KsqlDb.Api.Client.Abstractions;
+
+namespace KSQL.API.Client
+{
+ ///
+ /// A field/column of a ksqlDB stream/table.
+ ///
+ public class FieldInfo
+ {
+ ///
+ /// The field name.
+ ///
+ public string Name { get; }
+
+ ///
+ /// The field type.
+ ///
+ public ColumnType Type { get; }
+
+ ///
+ /// Whether this field is a key field, rather than a value field.
+ ///
+ public bool IsKey { get; }
+
+ public FieldInfo(string name, ColumnType type, bool isKey)
+ {
+ Name = name;
+ Type = type;
+ IsKey = isKey;
+ }
+ }
+}
diff --git a/src/KsqlDb.Client/Abstractions/IClient.cs b/src/KsqlDb.Client/Abstractions/IClient.cs
new file mode 100644
index 0000000..8a6692c
--- /dev/null
+++ b/src/KsqlDb.Client/Abstractions/IClient.cs
@@ -0,0 +1,146 @@
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Channels;
+using System.Threading.Tasks;
+using KSQL.API.Client;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using KsqlDb.Api.Client.Abstractions.QueryResults;
+
+namespace KsqlDb.Api.Client.Abstractions
+{
+ public interface IClient
+ {
+ ///
+ /// Executes a pull or push query supplied in the and streams the result.
+ ///
+ /// The query statement.
+ /// The cancellation token that cancels the operation.
+ ///
+ /// A task that completes when the server response is received.
+ ///
+ Task StreamQuery(string sql, CancellationToken cancellationToken = default);
+
+ ///
+ /// Executes a pull or push query supplied in the and streams the result.
+ ///
+ /// The query statement.
+ /// The query properties.
+ /// The cancellation token that cancels the operation.
+ ///
+ /// A task that completes when the server response is received.
+ ///
+ Task StreamQuery(string sql, IDictionary properties, CancellationToken cancellationToken = default);
+
+ ///
+ /// Executes a pull or push query supplied in the and returns the result as a single batch or rows.
+ ///
+ /// The query statement.
+ /// The cancellation token that cancels the operation.
+ ///
+ /// A task that completes when the server response is received.
+ ///
+ Task BatchQuery(string sql, CancellationToken cancellationToken = default);
+
+ ///
+ /// Executes a pull or push query supplied in the and returns the result as a single batch or rows.
+ ///
+ /// The query statement.
+ /// The query properties.
+ /// The cancellation token that cancels the operation.
+ ///
+ /// A task that completes when the server response is received.
+ ///
+ Task BatchQuery(string sql, IDictionary properties, CancellationToken cancellationToken = default);
+
+ ///
+ /// Terminates a push query with the specified ,
+ ///
+ /// The Id of the query to terminate.
+ /// The cancellation token.
+ /// A task that completes once the server response is received.
+ Task TerminatePushQuery(string queryId, CancellationToken cancellationToken = default);
+
+ ///
+ /// Inserts a new into the
+ ///
+ /// The target stream name.
+ /// The row to be inserted into the stream.
+ /// The cancellation token.
+ /// A task that completes when the operation is completed.
+ Task InsertRow(string streamName, KSqlObject row, CancellationToken cancellationToken = default);
+
+ ///
+ /// Inserts the asynchronous stream of rows in to the stream .
+ ///
+ /// The target stream name.
+ /// The asynchronous stream of rows.
+ /// The cancellation token.
+ /// The asynchronous stream of row acknowledgments.
+ IAsyncEnumerable StreamInserts(string streamName, IAsyncEnumerable rows, CancellationToken cancellationToken = default);
+
+ ///
+ /// Sends a DDL/DML statement to the ksqlDB server.
+ /// This method supports the following statements:
+ /// - 'CREATE';
+ /// - 'CREATE ... AS SELECT';
+ /// - 'DROP', 'TERMINATE';
+ /// - 'INSERT INTO ... AS SELECT'.
+ /// Each request should contain exactly one statement. Requests that contain multiple statements will be rejected by the client.
+ ///
+ /// The SQL statement to be executed.
+ /// The cancellation token.
+ /// A task that completes when the operation is completed.
+ Task ExecuteStatement(string sql, CancellationToken cancellationToken = default);
+
+ ///
+ /// Sends a DDL/DML statement to the ksqlDB server.
+ /// This method supports the following statements:
+ /// - 'CREATE';
+ /// - 'CREATE ... AS SELECT';
+ /// - 'DROP', 'TERMINATE';
+ /// - 'INSERT INTO ... AS SELECT'.
+ /// Each request should contain exactly one statement. Requests that contain multiple statements will be rejected by the client.
+ ///
+ /// The SQL statement to be executed.
+ /// The statement properties.
+ /// The cancellation token.
+ /// A task that completes when the operation is completed.
+ Task ExecuteStatement(string sql, IDictionary properties, CancellationToken cancellationToken = default);
+
+ ///
+ /// Returns the list of ksqlDB streams from the ksqlDB server's metastore.
+ ///
+ /// The cancellation token.
+ /// A task that returns a collection of streams when completes.
+ Task> ListStreams(CancellationToken cancellationToken = default);
+
+ ///
+ /// Returns the list of ksqlDB tables from the ksqlDB server's metastore.
+ ///
+ /// The cancellation token.
+ /// A task that returns a collection of tables when completes.
+ Task> ListTables(CancellationToken cancellationToken = default);
+
+ ///
+ /// Returns the list of Kafka topics available for use with ksqlDB.
+ ///
+ /// The cancellation token.
+ /// A task that returns a collection of topics when completes.
+ Task> ListTopics(CancellationToken cancellationToken = default);
+
+ ///
+ /// Returns the list of queries currently running on the ksqlDB server.
+ ///
+ /// The cancellation token.
+ /// A task that returns a collection of queries when completes.
+ Task> ListQueries(CancellationToken cancellationToken = default);
+
+ ///
+ /// Returns metadata about the ksqlDB stream or table of the provided name.
+ ///
+ /// The stream or table name.
+ /// The cancellation token.
+ /// A task that returns metadata for the stream or table.
+ Task DescribeSource(string sourceName, CancellationToken cancellationToken = default);
+ }
+}
diff --git a/src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs b/src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs
new file mode 100644
index 0000000..c630a99
--- /dev/null
+++ b/src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs
@@ -0,0 +1,80 @@
+using System;
+using System.Collections.Generic;
+using System.Collections.Immutable;
+
+namespace KsqlDb.Api.Client.Abstractions.Objects
+{
+ public class KSqlArray
+ {
+ private readonly List
-
+
+
+
+
+
diff --git a/src/KsqlDb.Client/Parsers/KObjectParser.cs b/src/KsqlDb.Client/Parsers/KObjectParser.cs
new file mode 100644
index 0000000..e442c14
--- /dev/null
+++ b/src/KsqlDb.Client/Parsers/KObjectParser.cs
@@ -0,0 +1,138 @@
+using System;
+using System.Collections.Generic;
+using System.Text.Json;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using KsqlDb.Api.Client.Exceptions;
+// ReSharper disable HeapView.BoxingAllocation
+
+namespace KsqlDb.Api.Client.Parsers
+{
+ public class KObjectParser
+ {
+ private static readonly char[] _nonPrimitiveTypeElements = {'<', '('};
+ private readonly Func _parse;
+
+ public Type TargetType { get; }
+
+ private KObjectParser(Func parse, Type targetType) => (_parse, TargetType) = (parse, targetType);
+
+ public object Parse(JsonElement value) => _parse(value);
+
+ public static KObjectParser Create(string type)
+ {
+ var primaryType = GetPrimaryTypeName(type);
+ if (primaryType.Equals("STRING", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseString, typeof(string));
+ if (primaryType.Equals("INTEGER", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseInt, typeof(int));
+ if (primaryType.Equals("BIGINT", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseLong, typeof(long));
+ if (primaryType.Equals("DOUBLE", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseDouble, typeof(double));
+ if (primaryType.Equals("BOOLEAN", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(TryParseBoolean, typeof(bool));
+ if (primaryType.Equals("DECIMAL", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(ParseDecimal, typeof(decimal));
+ if (primaryType.Equals("ARRAY", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseArray(j, type), typeof(KSqlArray));
+ if (primaryType.Equals("MAP", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseMap(j, type), typeof(KSqlObject));
+ if (primaryType.Equals("STRUCT", StringComparison.OrdinalIgnoreCase)) return new KObjectParser(j => ParseStruct(j, type), typeof(KSqlObject));
+ throw new NotSupportedException($"The {type} type is not supported.");
+ }
+
+ private static ReadOnlySpan GetPrimaryTypeName(ReadOnlySpan type)
+ {
+ int specialSymbolIndex = type.IndexOfAny(_nonPrimitiveTypeElements);
+ return specialSymbolIndex == -1 ? type : type.Slice(0, specialSymbolIndex);
+ }
+
+ private static object ParseString(JsonElement jsonElement)
+ {
+ if (jsonElement.ValueKind != JsonValueKind.String) return KSqlNull.Instance;
+ object? stringValue = jsonElement.GetString();
+ return stringValue ?? KSqlNull.Instance;
+ }
+
+ private static object ParseInt(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out int output) => element.TryGetInt32(out output));
+
+ private static object ParseLong(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out long output) => element.TryGetInt64(out output));
+
+ private static object ParseDouble(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out double output) => element.TryGetDouble(out output));
+
+ private static object ParseDecimal(JsonElement jsonElement) => ParseNumber(jsonElement, (JsonElement element, out decimal output) => element.TryGetDecimal(out output));
+
+ private delegate bool TryGetNumber(JsonElement jsonElement, out T output);
+ private static object ParseNumber(JsonElement jsonElement, TryGetNumber tryGet) where T : struct
+ {
+ if (jsonElement.ValueKind != JsonValueKind.Number) return KSqlNull.Instance;
+ if (tryGet(jsonElement, out T value)) return value;
+ return KSqlNull.Instance;
+ }
+
+ private static object TryParseBoolean(JsonElement jsonElement) =>
+ jsonElement.ValueKind switch
+ {
+ JsonValueKind.True => true,
+ JsonValueKind.False => false,
+ _ => KSqlNull.Instance
+ };
+
+ private static object ParseArray(JsonElement jsonElement, ReadOnlySpan fullTypeName)
+ {
+ if (jsonElement.ValueKind != JsonValueKind.Array) return KSqlNull.Instance;
+ var itemType = fullTypeName.Slice(6, fullTypeName.Length - 7).Trim();
+ var itemParser = Create(itemType.ToString());
+ var kSqlArray = new KSqlArray();
+ foreach (var itemJsonElement in jsonElement.EnumerateArray())
+ {
+ var item = itemParser.Parse(itemJsonElement);
+ kSqlArray.AddValue(item);
+ }
+
+ return kSqlArray;
+ }
+
+ private static object ParseMap(JsonElement jsonElement, ReadOnlySpan fullTypeName)
+ {
+ if (jsonElement.ValueKind != JsonValueKind.Object) return KSqlNull.Instance;
+ var itemType = fullTypeName.Slice(12, fullTypeName.Length - 13).Trim();
+ var valueParser = Create(itemType.ToString());
+ var kSqlObject = new KSqlObject();
+ foreach (var jsonProperty in jsonElement.EnumerateObject())
+ {
+ var item = valueParser.Parse(jsonProperty.Value);
+ kSqlObject.AddValue(jsonProperty.Name, item);
+ }
+
+ return kSqlObject;
+ }
+
+ private static object ParseStruct(JsonElement jsonElement, ReadOnlySpan fullTypeName)
+ {
+ if (jsonElement.ValueKind != JsonValueKind.Object) return KSqlNull.Instance;
+ var fieldDefinitions = fullTypeName.Slice(7, fullTypeName.Length - 8).Trim();
+ var fieldValueParsers = new Dictionary();
+ int fieldSeparatorIndex;
+ do
+ {
+ fieldSeparatorIndex = fieldDefinitions.IndexOf(',');
+ var fieldDefinition = fieldSeparatorIndex >= 0 ? fieldDefinitions.Slice(0, fieldSeparatorIndex) : fieldDefinitions;
+ int nameAndTypeSeparatorIndex = fieldDefinition.IndexOf(' ');
+ var name = fieldDefinition.Slice(0, nameAndTypeSeparatorIndex).Trim("` ");
+ var type = fieldDefinition.Slice(nameAndTypeSeparatorIndex + 1, fieldDefinition.Length - nameAndTypeSeparatorIndex - 1).Trim();
+ fieldValueParsers.Add(name.ToString(), Create(type.ToString()));
+ fieldDefinitions = fieldDefinitions.Slice(fieldSeparatorIndex + 1, fieldDefinitions.Length - fieldSeparatorIndex - 1).Trim();
+ } while (fieldSeparatorIndex >= 0);
+
+ var kSqlObject = new KSqlObject();
+ foreach (var jsonProperty in jsonElement.EnumerateObject())
+ {
+ if (fieldValueParsers.TryGetValue(jsonProperty.Name, out var valueParser))
+ {
+ var value = valueParser.Parse(jsonProperty.Value);
+ kSqlObject.AddValue(jsonProperty.Name, value);
+ }
+ else
+ {
+ string supportedFields = string.Join(",", fieldValueParsers.Keys);
+ throw new KsqlDbException($"Unable to parse unexpected struct field {jsonProperty.Name}. Expected fields: {supportedFields}");
+ }
+ }
+
+ return kSqlObject;
+ }
+ }
+}
diff --git a/src/KsqlDb.Client/Parsers/QueryResultRowParser.cs b/src/KsqlDb.Client/Parsers/QueryResultRowParser.cs
new file mode 100644
index 0000000..a7ce654
--- /dev/null
+++ b/src/KsqlDb.Client/Parsers/QueryResultRowParser.cs
@@ -0,0 +1,60 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using KsqlDb.Api.Client.Abstractions.QueryResults;
+
+namespace KsqlDb.Api.Client.Parsers
+{
+ internal class QueryResultRowParser
+ {
+ public Dictionary ColumnNameToIndex { get; }
+ public (string, Type)[] ColumnNamesAndTypes { get; }
+ private readonly KObjectParser[] _kObjectParsers;
+
+ public QueryResultRowParser(JsonElement schema)
+ {
+ if (!schema.TryGetProperty("columnNames", out var columnsNamesElement) ||
+ !schema.TryGetProperty("columnTypes", out var columnsTypesElement))
+ {
+ throw new ArgumentException();
+ }
+
+ var columnNamesArray = columnsNamesElement.EnumerateArray().ToArray();
+ var columnTypesArray = columnsTypesElement.EnumerateArray().ToArray();
+
+ if (columnNamesArray.Length != columnTypesArray.Length) throw new ArgumentException();
+ int columnCount = columnNamesArray.Length;
+
+ _kObjectParsers = new KObjectParser[columnCount];
+ ColumnNamesAndTypes = new (string, Type)[columnCount];
+ ColumnNameToIndex = new Dictionary();
+
+ for (int i = 0; i < columnCount; i++)
+ {
+ string columnName = columnNamesArray[i].GetString() ?? throw new Exception();
+ string columnType = columnTypesArray[i].GetString() ?? throw new Exception();
+ ColumnNameToIndex.Add(columnName, i);
+ var columnParser = KObjectParser.Create(columnType);
+ _kObjectParsers[i] = columnParser;
+ ColumnNamesAndTypes[i] = (columnName, columnParser.TargetType);
+ }
+ }
+
+ public QueryResultRow Parse(JsonElement row)
+ {
+ if (row.ValueKind != JsonValueKind.Array) throw new Exception();
+ var values = new KSqlArray();
+ int index = 0;
+ foreach (var value in row.EnumerateArray())
+ {
+ if (index > _kObjectParsers.Length) throw new Exception();
+ var parsedValue = _kObjectParsers[index++].Parse(value);
+ values.AddValue(parsedValue);
+ }
+
+ return new QueryResultRow(values, ColumnNameToIndex, ColumnNamesAndTypes);
+ }
+ }
+}
diff --git a/src/KsqlDb.Client/Serdes/IJsonSerializer.cs b/src/KsqlDb.Client/Serdes/IJsonSerializer.cs
new file mode 100644
index 0000000..50fb3ba
--- /dev/null
+++ b/src/KsqlDb.Client/Serdes/IJsonSerializer.cs
@@ -0,0 +1,71 @@
+using System.IO;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace KsqlDb.Api.Client.Serdes
+{
+ ///
+ /// The JSON serializer abstraction.
+ ///
+ public interface IJsonSerializer
+ {
+ ///
+ /// Convert the provided value to UTF-8 encoded JSON text and write it to the .
+ ///
+ /// A task that represents the asynchronous write operation.
+ /// The UTF-8 to write to.
+ /// The value to convert.
+ /// The which may be used to cancel the write operation.
+ Task SerializeAsync(Stream utf8JsonStream, object value, CancellationToken cancellationToken = default);
+
+ ///
+ /// Convert the provided value into a JSON .
+ ///
+ /// A JSON representation of the value.
+ /// The value to convert.
+ string Serialize(TValue value);
+
+ ///
+ /// Read the UTF-8 encoded text representing a single JSON value into a .
+ /// The Stream will be read to completion.
+ ///
+ /// A representation of the JSON value.
+ /// JSON data to parse.
+ ///
+ /// The which may be used to cancel the read operation.
+ ///
+ ///
+ /// Thrown when the JSON is invalid,
+ /// is not compatible with the JSON,
+ /// or when there is remaining data in the Stream.
+ ///
+ ValueTask DeserializeAsync(Stream utf8JsonStream, CancellationToken cancellationToken = default);
+
+ ///
+ /// Parse the text representing a single JSON value into a .
+ ///
+ /// A representation of the JSON value.
+ /// JSON text to parse.
+ ///
+ /// Thrown if is null.
+ ///
+ ///
+ /// Thrown when the JSON is invalid,
+ /// is not compatible with the JSON,
+ /// or when there is remaining data in the Stream.
+ ///
+ /// Using a is not as efficient as using the
+ /// UTF-8 methods since the implementation natively uses UTF-8.
+ ///
+ TValue Deserialize(string json);
+
+ ///
+ /// Attempts to parse the text representing a single JSON value into a .
+ ///
+ /// JSON text to parse.
+ /// The result type.
+ /// A representation of the JSON value.
+ TValue? TryDeserialize(string? json) where TValue : class;
+ }
+}
diff --git a/src/KsqlDb.Client/Serdes/JsonSerializer.cs b/src/KsqlDb.Client/Serdes/JsonSerializer.cs
new file mode 100644
index 0000000..9fb2883
--- /dev/null
+++ b/src/KsqlDb.Client/Serdes/JsonSerializer.cs
@@ -0,0 +1,65 @@
+using System.IO;
+using System.Text.Json;
+using System.Threading;
+using System.Threading.Tasks;
+using Serializer = System.Text.Json.JsonSerializer;
+
+namespace KsqlDb.Api.Client.Serdes
+{
+ ///
+ /// The JSON serializer for ksqlDB requests and responses.
+ ///
+ public class JsonSerializer : IJsonSerializer
+ {
+ private static readonly JsonSerializerOptions _serializerOptions = new JsonSerializerOptions
+ {
+ PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
+ DictionaryKeyPolicy = JsonNamingPolicy.CamelCase,
+ AllowTrailingCommas = false,
+ IgnoreNullValues = true,
+ ReadCommentHandling = JsonCommentHandling.Disallow,
+ WriteIndented = false,
+ Converters =
+ {
+ new KSqlObjectConverter(),
+ new KSqlArrayConverter(),
+ new KSqlNullConverter()
+ }
+ };
+
+ ///
+ public string Serialize(TValue value) => Serializer.Serialize(value, _serializerOptions);
+
+ ///
+ public Task SerializeAsync(Stream utf8JsonStream, object value, CancellationToken cancellationToken = default) =>
+ Serializer.SerializeAsync(utf8JsonStream, value, value.GetType(), _serializerOptions, CancellationToken.None);
+
+ ///
+ public async ValueTask DeserializeAsync(Stream utf8JsonStream, CancellationToken cancellationToken = default)
+ {
+ var value = await Serializer.DeserializeAsync(utf8JsonStream, _serializerOptions, cancellationToken);
+ return value ?? throw new JsonException($"Unable to deserialize the provided UTF8 JSON stream into {typeof(TValue).FullName}");
+ }
+
+ ///
+ public TValue Deserialize(string json)
+ {
+ var value = Serializer.Deserialize(json, _serializerOptions);
+ return value ?? throw new JsonException($"Unable to deserialize the following JSON into {typeof(TValue).FullName}: {json}");
+ }
+
+ ///
+ public TValue? TryDeserialize(string? json) where TValue : class
+ {
+ if (string.IsNullOrWhiteSpace(json)) return default;
+ try
+ {
+ return Deserialize(json);
+ }
+ catch (JsonException)
+ {
+ return default;
+ }
+ }
+ }
+}
diff --git a/src/KsqlDb.Client/Serdes/KSqlArrayConverter.cs b/src/KsqlDb.Client/Serdes/KSqlArrayConverter.cs
new file mode 100644
index 0000000..70c0287
--- /dev/null
+++ b/src/KsqlDb.Client/Serdes/KSqlArrayConverter.cs
@@ -0,0 +1,15 @@
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using Serializer = System.Text.Json.JsonSerializer;
+
+namespace KsqlDb.Api.Client.Serdes
+{
+ internal class KSqlArrayConverter : JsonConverter
+ {
+ public override KSqlArray Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => throw new NotImplementedException();
+
+ public override void Write(Utf8JsonWriter writer, KSqlArray value, JsonSerializerOptions options) => Serializer.Serialize(writer, value.AsReadOnlyList(), options);
+ }
+}
diff --git a/src/KsqlDb.Client/Serdes/KSqlNullConverter.cs b/src/KsqlDb.Client/Serdes/KSqlNullConverter.cs
new file mode 100644
index 0000000..aaf7c42
--- /dev/null
+++ b/src/KsqlDb.Client/Serdes/KSqlNullConverter.cs
@@ -0,0 +1,14 @@
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using KsqlDb.Api.Client.Abstractions.Objects;
+
+namespace KsqlDb.Api.Client.Serdes
+{
+ internal class KSqlNullConverter : JsonConverter
+ {
+ public override KSqlNull Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => throw new NotImplementedException();
+
+ public override void Write(Utf8JsonWriter writer, KSqlNull value, JsonSerializerOptions options) => writer.WriteNullValue();
+ }
+}
diff --git a/src/KsqlDb.Client/Serdes/KSqlObjectConverter.cs b/src/KsqlDb.Client/Serdes/KSqlObjectConverter.cs
new file mode 100644
index 0000000..95b90dc
--- /dev/null
+++ b/src/KsqlDb.Client/Serdes/KSqlObjectConverter.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using Serializer = System.Text.Json.JsonSerializer;
+
+namespace KsqlDb.Api.Client.Serdes
+{
+ internal class KSqlObjectConverter : JsonConverter
+ {
+
+ public override KSqlObject Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) => throw new NotImplementedException();
+
+ public override void Write(Utf8JsonWriter writer, KSqlObject value, JsonSerializerOptions options) => Serializer.Serialize(writer, value.AsImmutableDictionary(), options);
+ }
+}
diff --git a/test/CreateTestStreams/Dockerfile b/test/CreateTestStreams/Dockerfile
new file mode 100644
index 0000000..89d3de1
--- /dev/null
+++ b/test/CreateTestStreams/Dockerfile
@@ -0,0 +1,4 @@
+FROM curlimages/curl:7.74.0
+WORKDIR %HOME%/bin
+COPY ./createteststreams.sh .
+CMD sh createteststreams.sh
\ No newline at end of file
diff --git a/test/CreateTestStreams/createteststreams.sh b/test/CreateTestStreams/createteststreams.sh
new file mode 100644
index 0000000..b789329
--- /dev/null
+++ b/test/CreateTestStreams/createteststreams.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+echo "Waiting for ksqlDB to be up and running..."
+timeout 2m sh -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://primary-ksqldb-server:8088/info)" != "200" ]]; do sleep 5; done' || false
+echo "Creating orders_stream..."
+curl -X POST http://primary-ksqldb-server:8088/ksql -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE STREAM orders_stream (ordertime BIGINT, orderid INT, itemid VARCHAR, orderunits DOUBLE, address STRUCT) WITH (KAFKA_TOPIC='orders_topic', VALUE_FORMAT='JSON', PARTITIONS=1, REPLICAS=1);\",\"streamsProperties\": {}}"
+echo "Creating users_table..."
+curl -X POST http://primary-ksqldb-server:8088/ksql -H "Accept: application/vnd.ksql.v1+json" -d "{ \"ksql\": \"CREATE TABLE users_table (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR PRIMARY KEY, interests ARRAY, contactInfo MAP) WITH (kafka_topic='users_topic', value_format='JSON', PARTITIONS=1, REPLICAS=1);\",\"streamsProperties\": {}}"
+echo "All done"
+tail -f /dev/null
\ No newline at end of file
diff --git a/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs b/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs
new file mode 100644
index 0000000..7ca9eb1
--- /dev/null
+++ b/test/KsqlDb.Client.IntegrationTests/DescribeTests.cs
@@ -0,0 +1,46 @@
+using System;
+using KsqlDb.Api.Client.Abstractions;
+using Xunit;
+
+namespace KsqlDb.Client.IntegrationTests
+{
+/*
+ public class DescribeTests : IClassFixture
+ {
+ private readonly SourceDescriptionsFixture _fixture;
+
+ public DescribeTests(SourceDescriptionsFixture fixture) => _fixture = fixture;
+
+ [Theory]
+ [InlineData(TestClient.KnownEntities.OrdersStreamName)]
+ [InlineData(TestClient.KnownEntities.UsersTableName)]
+ public void Contains_Statement_Text(string entity)
+ {
+ Assert.Contains($"DESCRIBE {entity}",_fixture.OrdersStreamDescription.SqlStatement);
+ }
+
+ [Theory]
+
+ public void Orders_Stream_Contains_ExpectedFields()
+ {
+
+ }
+
+ public sealed class SourceDescriptionsFixture : IDisposable
+ {
+ public SourceDescription OrdersStreamDescription { get; }
+ public SourceDescription UsersTableDescription { get; }
+
+ public SourceDescriptionsFixture()
+ {
+ OrdersStreamDescription = TestClient.Instance.DescribeSource(TestClient.KnownEntities.OrdersStreamName).GetAwaiter().GetResult();
+ UsersTableDescription = TestClient.Instance.DescribeSource(TestClient.KnownEntities.UsersTableName).GetAwaiter().GetResult();
+ }
+
+ public void Dispose()
+ {
+ }
+ }
+ }
+*/
+}
diff --git a/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj b/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj
index 29c57fe..7ebee02 100644
--- a/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj
+++ b/test/KsqlDb.Client.IntegrationTests/KsqlDb.Client.IntegrationTests.csproj
@@ -3,12 +3,13 @@
netcoreapp3.1;net5.0
win10-x64;linux-x64
+ 9
false
-
-
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
@@ -20,7 +21,7 @@
-
+
diff --git a/test/KsqlDb.Client.IntegrationTests/ListStreamsTests.cs b/test/KsqlDb.Client.IntegrationTests/ListStreamsTests.cs
new file mode 100644
index 0000000..55a74b9
--- /dev/null
+++ b/test/KsqlDb.Client.IntegrationTests/ListStreamsTests.cs
@@ -0,0 +1,34 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace KsqlDb.Client.IntegrationTests
+{
+ public class ListStreamsTests
+ {
+ [Fact]
+ public async Task Returns_ORDERS_TOPIC_Stream_In_The_List_Of_Streams()
+ {
+ // Act
+ var streams = await TestClient.Instance.ListStreams();
+
+ // Assert
+ Assert.Contains(streams, s => s.Name == TestClient.KnownEntities.OrdersStreamName
+ && s.Topic == TestClient.KnownEntities.OrdersTopicName
+ && s.KeyFormat == "KAFKA"
+ && s.ValueFormat == "JSON"
+ && s.IsWindowed is null);
+ }
+
+ [Fact]
+ public async Task Throws_TaskCanceledException_When_Cancelled()
+ {
+ // Arrange
+ using var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ // Act / Assert
+ await Assert.ThrowsAsync(() => TestClient.Instance.ListStreams(cts.Token));
+ }
+ }
+}
diff --git a/test/KsqlDb.Client.IntegrationTests/ListTablesTests.cs b/test/KsqlDb.Client.IntegrationTests/ListTablesTests.cs
new file mode 100644
index 0000000..2aa27d4
--- /dev/null
+++ b/test/KsqlDb.Client.IntegrationTests/ListTablesTests.cs
@@ -0,0 +1,34 @@
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace KsqlDb.Client.IntegrationTests
+{
+ public class ListTablesTests
+ {
+ [Fact]
+ public async Task Returns_Users_Table_In_The_List_Of_Table()
+ {
+ // Act
+ var tables = await TestClient.Instance.ListTables();
+
+ // Assert
+ Assert.Contains(tables, s => s.Name == TestClient.KnownEntities.UsersTableName
+ && s.Topic == TestClient.KnownEntities.UsersTopicName
+ && s.KeyFormat == "KAFKA"
+ && s.ValueFormat == "JSON"
+ && s.IsWindowed is false);
+ }
+
+ [Fact]
+ public async Task Throws_TaskCanceledException_When_Cancelled()
+ {
+ // Arrange
+ using var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ // Act / Assert
+ await Assert.ThrowsAsync(() => TestClient.Instance.ListTables(cts.Token));
+ }
+ }
+}
diff --git a/test/KsqlDb.Client.IntegrationTests/ListTopicsTests.cs b/test/KsqlDb.Client.IntegrationTests/ListTopicsTests.cs
new file mode 100644
index 0000000..e6ec52e
--- /dev/null
+++ b/test/KsqlDb.Client.IntegrationTests/ListTopicsTests.cs
@@ -0,0 +1,33 @@
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace KsqlDb.Client.IntegrationTests
+{
+ public class ListTopicsTests
+ {
+ [Fact]
+ public async Task Returns_Order_Topic_In_The_List_Of_Topics()
+ {
+ // Act
+ var topics = await TestClient.Instance.ListTopics();
+
+ // Assert
+ Assert.Contains(topics, t => t.Topic == TestClient.KnownEntities.OrdersTopicName
+ && t.NumberOfPartitions == 1
+ && t.ReplicasPerPartition.SequenceEqual(new[] {1}));
+ }
+
+ [Fact]
+ public async Task Throws_TaskCanceledException_When_Cancelled()
+ {
+ // Arrange
+ using var cts = new CancellationTokenSource();
+ cts.Cancel();
+
+ // Act / Assert
+ await Assert.ThrowsAsync(() => TestClient.Instance.ListTopics(cts.Token));
+ }
+ }
+}
diff --git a/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs b/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs
new file mode 100644
index 0000000..f85bd02
--- /dev/null
+++ b/test/KsqlDb.Client.IntegrationTests/StreamInsertsTests.cs
@@ -0,0 +1,128 @@
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.IO;
+using System.Linq;
+using System.Net.Http;
+using System.Text;
+using System.Text.Json;
+using System.Threading.Tasks;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using KsqlDb.Api.Client.Exceptions;
+using Xunit;
+
+namespace KsqlDb.Client.IntegrationTests
+{
+ public class StreamInsertsTests
+ {
+ private const int RowsNumber = 100;
+ private const string PrimitiveStreamName = "PrimitiveStream";
+
+ [Fact]
+ public async Task Inserts_Rows_With_Primitive_Type_Fields()
+ {
+ // Arrange
+ await CreatePrimitiveStream();
+ int[] rowIndices = Enumerable.Range(1, RowsNumber).ToArray();
+ var expectedSequenceNumbers = rowIndices.Select(x => (long)x-1);
+ var dataToInsert = rowIndices.Select(CreatePrimitiveRow).ToArray();
+ var expectedRows = dataToInsert.Select(GetKsqlObjectValuesSortedByFieldName).ToArray();
+ var asyncRows = dataToInsert.ToAsyncEnumerable();
+
+ // Act
+ var acks = TestClient.Instance.StreamInserts(PrimitiveStreamName, asyncRows);
+
+ // Assert
+ var actualSequenceNumbers = (await acks.ToArrayAsync()).Select(x => x.SequenceNumber);
+ Assert.Equal(expectedSequenceNumbers, actualSequenceNumbers);
+ var actualRows = await QueryStream(PrimitiveStreamName, typeof(int), typeof(long), typeof(bool), typeof(double), typeof(string));
+ Assert.Equal(expectedRows, actualRows, new TwoDimensionalArrayEqualityComparer());
+ }
+
+ private static KSqlObject CreatePrimitiveRow(int rowNumber) =>
+ new KSqlObject()
+ .Add("f1", rowNumber * 10)
+ .Add("f2", rowNumber * 10_000L)
+ .Add("f3", rowNumber % 2 == 0)
+ .Add("f4", 10007d / rowNumber)
+ .Add("f5", $"{rowNumber} row");
+
+ private static async Task CreatePrimitiveStream()
+ {
+ try
+ {
+ // Drop the stream if already exists
+ await TestClient.Instance.ExecuteStatement($"DROP STREAM {PrimitiveStreamName};");
+ }
+ catch (KsqlDbException)
+ {
+ // The stream doesn't exist
+ }
+
+ await TestClient.Instance.ExecuteStatement(
+ @$"CREATE STREAM {PrimitiveStreamName} (
+ f1 INTEGER KEY,
+ f2 BIGINT,
+ f3 BOOLEAN,
+ f4 DOUBLE,
+ f5 VARCHAR
+ ) WITH (
+ kafka_topic = '{PrimitiveStreamName}',
+ partitions = 1,
+ value_format = 'json'
+ );");
+ }
+
+ private static async Task QueryStream(string streamName, params Type[] expectedTypes)
+ {
+ using var request = new HttpRequestMessage(HttpMethod.Post, "/query-stream")
+ {
+ Content = new StringContent($"{{ \"sql\": \"select * from {streamName} emit changes limit {RowsNumber};\", \"properties\": {{ \"ksql.streams.auto.offset.reset\": \"earliest\" }} }}", Encoding.UTF8, "application/json"),
+ Version = new Version(2, 0),
+#if NET5_0
+ VersionPolicy = HttpVersionPolicy.RequestVersionOrHigher
+#endif
+ };
+
+ var response = await TestClient.RawHttpClient.SendAsync(request);
+ Debug.Assert(response.IsSuccessStatusCode);
+ await using var responseStream = await response.Content.ReadAsStreamAsync();
+ using var streamReader = new StreamReader(responseStream);
+ _ = await streamReader.ReadLineAsync(); // skip data schema header
+ var rows = new List();
+ while (!streamReader.EndOfStream)
+ {
+ string responseContentLine = await streamReader.ReadLineAsync();
+ var parsedRow = JsonSerializer.Deserialize(responseContentLine!);
+ var processedRow = parsedRow!.Cast().Select((e, i) => JsonSerializer.Deserialize(e.GetRawText(), expectedTypes[i])).ToArray();
+ rows.Add(processedRow);
+ }
+
+ return rows.ToArray();
+ }
+
+ private static object[] GetKsqlObjectValuesSortedByFieldName(KSqlObject kSqlObject) =>
+ kSqlObject.AsImmutableDictionary().OrderBy(x => x.Key).Select(x => x.Value).ToArray();
+
+ private sealed class TwoDimensionalArrayEqualityComparer : IEqualityComparer
+ {
+ public bool Equals(object[][] x, object[][] y) => x!.SequenceEqual(y!, new ArrayEqualityComparer());
+ public int GetHashCode(object[][] obj) => 0;
+ }
+
+ private sealed class ArrayEqualityComparer : IEqualityComparer
+ {
+ public bool Equals(object[] x, object[] y)
+ {
+ if (x!.Length != y!.Length) return false;
+ for (int i = 0; i < x.Length; i++)
+ {
+ if (!x[i].Equals(y[i])) return false;
+ }
+
+ return true;
+ }
+ public int GetHashCode(object[] obj) => 0;
+ }
+ }
+}
diff --git a/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs
new file mode 100644
index 0000000..25648b8
--- /dev/null
+++ b/test/KsqlDb.Client.IntegrationTests/StreamQueryTests.cs
@@ -0,0 +1,128 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using KsqlDb.Api.Client.Abstractions.QueryResults;
+using KsqlDb.Api.Client.Exceptions;
+using Xunit;
+
+namespace KsqlDb.Client.IntegrationTests
+{
+ public class StreamQueryTests : IClassFixture
+ {
+ private const int ExpectedRowsCount = 3;
+ private readonly IReadOnlyCollection<(string name, Type type)> _actualColumns;
+ private readonly QueryResultRow[] _actualRows;
+ private readonly string _actualQueryId;
+
+ public StreamQueryTests(StreamQueryFixture fixture) => (_actualColumns, _actualRows, _actualQueryId) = (fixture.Columns, fixture.Rows, fixture.QueryId);
+
+ [Fact]
+ public void Response_Contains_Expected_Number_Of_Result_Rows()
+ {
+ Assert.Equal(ExpectedRowsCount, _actualRows.Length);
+ }
+
+ [Fact]
+ public void Response_Contains_USERID_Values_In_Expected_Format_Column()
+ {
+ string[] actual = _actualRows.Select(r => r.AsArray.GetString(0)).ToArray();
+ Assert.All(actual, u => Assert.StartsWith("User_", u));
+ }
+
+ [Fact]
+ public void Response_Contains_REGISTERTIME_Values_In_Expected_Format_Column()
+ {
+ long[] actual = _actualRows.Select(r => r.AsArray.GetLong(1)).ToArray();
+ Assert.All(actual, t => Assert.InRange(t, 0L, long.MaxValue));
+ }
+
+ [Fact]
+ public void Response_Contains_REGIONID_Values_In_Expected_Format_Column()
+ {
+ string[] actual = _actualRows.Select(r => r.AsArray.GetString(3)).ToArray();
+ Assert.All(actual, u => Assert.StartsWith("Region_", u));
+ }
+
+ [Fact]
+ public void Response_Contains_GENDER_Values_In_Expected_Format_Column()
+ {
+ string[] actual = _actualRows.Select(r => r.AsArray.GetString(2)).ToArray();
+ Assert.All(actual, g => Assert.Matches("MALE|FEMALE|OTHER", g));
+ }
+
+ [Theory]
+ [InlineData(0, 0)]
+ [InlineData(0, 1)]
+ [InlineData(1, 0)]
+ [InlineData(1, 1)]
+ [InlineData(2, 0)]
+ [InlineData(2, 1)]
+ public void Response_Contains_INTEREST_Values_In_Expected_Format_Column(int row, int arrayIndex)
+ {
+ string actual = _actualRows[row].GetKSqlArray("INTERESTS").GetString(arrayIndex);
+ Assert.Matches("News|Travel|Movies|Sport|Game|Travel", actual);
+ }
+
+ [Fact]
+ public void Response_Contains_Different_USERID_Values()
+ {
+ int uniqueValues = _actualRows.Select(r => r.AsArray.GetString(0)).Distinct().Count();
+ Assert.True(uniqueValues > 0);
+ }
+
+ [Fact]
+ public void Response_Contains_Expected_Column_Definitions()
+ {
+ Assert.Collection(_actualColumns,
+ column => Assert.True(column.name == "USERID" && column.type == typeof(string), "USERID"),
+ column => Assert.True(column.name == "REGISTERTIME" && column.type == typeof(long), "REGISTERTIME"),
+ column => Assert.True(column.name == "GENDER" && column.type == typeof(string), "GENDER"),
+ column => Assert.True(column.name == "REGIONID" && column.type == typeof(string), "REGIONID"),
+ column => Assert.True(column.name == "INTERESTS" && column.type == typeof(KSqlArray), "INTERESTS"),
+ column => Assert.True(column.name == "CONTACTINFO" && column.type == typeof(KSqlObject), "CONTACTINFO"));
+ }
+
+ [Fact]
+ public void Response_Contains_Non_Empty_QueryId()
+ {
+ Assert.NotEmpty(_actualQueryId);
+ }
+
+ [Fact]
+ public async Task Throws_When_Supplied_Invalid_Sql()
+ {
+ var exception = await Assert.ThrowsAsync(() => TestClient.Instance.StreamQuery("Invalid sql;"));
+ Assert.NotNull(exception.Body);
+ Assert.Equal(40001, exception.Body.ErrorCode);
+ }
+
+ [Fact]
+ public async Task Throws_When_Queries_Non_Existing_Stream()
+ {
+ var exception = await Assert.ThrowsAsync(() => TestClient.Instance.StreamQuery("SELECT * FROM NONEXISTINGSTREAM EMIT CHANGES LIMIT 1;"));
+ Assert.NotNull(exception.Body);
+ Assert.Equal(40001, exception.Body.ErrorCode);
+ }
+
+ [Collection(nameof(StreamQueryFixture))]
+ public sealed class StreamQueryFixture : IAsyncLifetime
+ {
+ public IReadOnlyCollection<(string name, Type type)> Columns { get; set; }
+ public QueryResultRow[] Rows { get; set; }
+ public string QueryId { get; set; }
+
+ public async Task InitializeAsync()
+ {
+ var properties = new Dictionary { ["ksql.streams.auto.offset.reset"] = "earliest" };
+ var queryResult = await TestClient.Instance.StreamQuery($"SELECT * FROM {TestClient.KnownEntities.UsersTableName} EMIT CHANGES LIMIT {ExpectedRowsCount};", properties);
+ Columns = queryResult.Columns;
+ QueryId = queryResult.QueryId;
+ Rows = await queryResult.Rows.ToArrayAsync();
+ }
+
+ public Task DisposeAsync() => Task.CompletedTask;
+ }
+ }
+}
diff --git a/test/KsqlDb.Client.IntegrationTests/TestClient.cs b/test/KsqlDb.Client.IntegrationTests/TestClient.cs
new file mode 100644
index 0000000..b362f90
--- /dev/null
+++ b/test/KsqlDb.Client.IntegrationTests/TestClient.cs
@@ -0,0 +1,71 @@
+using System;
+using System.Net.Http;
+using System.Threading;
+using System.Threading.Tasks;
+using KsqlDb.Api.Client.KsqlApiV1;
+using KsqlDb.Api.Client.KsqlApiV1.Content;
+using KsqlDb.Api.Client.Serdes;
+using Xunit.Abstractions;
+
+namespace KsqlDb.Client.IntegrationTests
+{
+ internal class TestClient : KsqlDb.Api.Client.Client
+ {
+ public static class KnownEntities
+ {
+ public const string OrdersTopicName = "orders_topic";
+ public const string OrdersStreamName = "ORDERS_STREAM";
+ public const string UsersTopicName = "users_topic";
+ public const string UsersTableName = "USERS_TABLE";
+
+ }
+ public static HttpClient RawHttpClient => new() {BaseAddress = new Uri("http://127.0.0.1:8088")};
+
+ public static KsqlDb.Api.Client.Client Instance => new TestClient();
+
+ private TestClient() : base(CreateHttpClient())
+ {
+ }
+
+ private static KSqlDbHttpClient CreateHttpClient()
+ {
+ var jsonSerializer = new JsonSerializer();
+ return new KSqlDbHttpClient(RawHttpClient, new KsqlV1RequestHttpContentFactory(jsonSerializer), jsonSerializer);
+ }
+
+ public TestClient(ITestOutputHelper output) : base(CreateHttpClientWithLogging(output))
+ {
+
+ }
+
+ public static KSqlDbHttpClient CreateHttpClientWithLogging(ITestOutputHelper output)
+ {
+ var httpClient = new HttpClient(new LoggingHandler(output)) {BaseAddress = new Uri("http://127.0.0.1:8088")};
+ var jsonSerializer = new JsonSerializer();
+ return new KSqlDbHttpClient(httpClient, new KsqlV1RequestHttpContentFactory(jsonSerializer), jsonSerializer);
+ }
+ }
+
+ internal class LoggingHandler : DelegatingHandler
+ {
+ private readonly ITestOutputHelper _output;
+
+ public LoggingHandler(ITestOutputHelper output)
+ {
+ _output = output;
+ InnerHandler = new HttpClientHandler();
+ }
+
+ protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
+ {
+ _output.WriteLine(request.ToString());
+ if (request.Content is not null)
+ {
+ var content = await request.Content.ReadAsStringAsync();
+ _output.WriteLine($"Content: {content}");
+ }
+
+ return await base.SendAsync(request, cancellationToken);
+ }
+ }
+}
diff --git a/test/KsqlDb.Client.IntegrationTests/UnitTest1.cs b/test/KsqlDb.Client.IntegrationTests/UnitTest1.cs
deleted file mode 100644
index e0bc5e9..0000000
--- a/test/KsqlDb.Client.IntegrationTests/UnitTest1.cs
+++ /dev/null
@@ -1,13 +0,0 @@
-using System;
-using Xunit;
-
-namespace KsqlDb.Client.IntegrationTests
-{
- public class UnitTest1
- {
- [Fact]
- public void Test1()
- {
- }
- }
-}
diff --git a/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj b/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj
index 29c57fe..e0fbe48 100644
--- a/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj
+++ b/test/KsqlDb.Client.UnitTests/KsqlDb.Client.UnitTests.csproj
@@ -4,11 +4,12 @@
netcoreapp3.1;net5.0
win10-x64;linux-x64
false
+ latest
-
-
+
+
runtime; build; native; contentfiles; analyzers; buildtransitive
all
@@ -20,7 +21,7 @@
-
+
diff --git a/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs b/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs
new file mode 100644
index 0000000..bb6ab0b
--- /dev/null
+++ b/test/KsqlDb.Client.UnitTests/Parsers/KObjectParserTests.cs
@@ -0,0 +1,241 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text.Json;
+using KsqlDb.Api.Client.Abstractions.Objects;
+using KsqlDb.Api.Client.Parsers;
+using Xunit;
+
+namespace KsqlDb.Client.UnitTests.Parsers
+{
+ public class KObjectParserTests
+ {
+ [Theory]
+ [InlineData("STRING")]
+ [InlineData("String")]
+ [InlineData("INTEGER")]
+ [InlineData("BIGINT")]
+ [InlineData("DOUBLE")]
+ [InlineData("BOOLEAN")]
+ [InlineData("DECIMAL")]
+ [InlineData("ARRAY")]
+ [InlineData("MAP")]
+ [InlineData("ARRAY