Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft version (work in progress) #4

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/dotnet.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ jobs:
path: |
test/docker-compose.yml
test/.env
test/CreateTestStreams

unit-tests:
name: Unit tests
Expand Down Expand Up @@ -123,8 +124,10 @@ jobs:
with:
name: docker-compose
- name: Start ksqlDB in docker
run: docker-compose up -d --scale additional-ksqldb-server=0
run: docker-compose up -d --build
- name: Wait until ksqlDB is up and running
run: timeout 2m bash -c 'while [[ "$(curl -s -o /dev/null -w ''%{http_code}'' http://127.0.0.1:8088/info)" != "200" ]]; do sleep 5; done' || false
- name: Run test
run: dotnet test KsqlDb.Client.IntegrationTests.dll
env:
DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_HTTP2UNENCRYPTEDSUPPORT: 1
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<Authors>[email protected]</Authors>
<Authors>Alex Basiuk</Authors>
<Copyright>© Alex Basiuk</Copyright>
<Product>KsqlDb.Client</Product>
<Description>ksqlDB Client.</Description>
Expand Down
2 changes: 2 additions & 0 deletions Directory.Build.targets
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
<Project>
<ItemGroup>

<PackageReference Update="System.Linq.Async" Version="[5.0.0,6.0.0)" />

<!-- Testing/Samples-->
<PackageReference Update="Microsoft.NET.Test.Sdk" Version="16.7.1"/>
<PackageReference Update="xunit" Version="2.4.1"/>
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
=====================================================

![Build](https://github.com/alex-basiuk/ksqlDB-client-dotnet/workflows/Build/badge.svg)


In development scenarios when server and client have a priori knowledge that both will speak HTTP/2 unencrypted, you may establish an HTTP/2 connection over cleartext by setting an AppContext switch or an environment variable (DOTNET_SYSTEM_NET_HTTP_SOCKETSHTTPHANDLER_HTTP2UNENCRYPTEDSUPPORT=1).
`AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);`
27 changes: 27 additions & 0 deletions src/KsqlDb.Client/Abstractions/BatchedQueryResults.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using System.Collections.Generic;
using KsqlDb.Api.Client.Abstractions.QueryResults;

namespace KsqlDb.Api.Client.Abstractions
{
/// <summary>
/// A batches query result.
/// </summary>
public class BatchedQueryResults
{
/// <summary>
/// The result rows async enumerator.
/// </summary>
IReadOnlyCollection<QueryResultRow> ResultRows { get; }

/// <summary>
/// The Id of the underlying push query if applicable.
/// </summary>
public string? QueryId { get; }

public BatchedQueryResults(IReadOnlyCollection<QueryResultRow> resultRows, string? queryId)
{
ResultRows = resultRows;
QueryId = queryId;
}
}
}
15 changes: 15 additions & 0 deletions src/KsqlDb.Client/Abstractions/ColumnType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
namespace KsqlDb.Api.Client.Abstractions
{
public enum ColumnType
{
String,
Integer,
Bigint,
Double,
Boolean,
Decimal,
Array,
Map,
Struct
}
}
38 changes: 38 additions & 0 deletions src/KsqlDb.Client/Abstractions/ExecuteStatementResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using KsqlDb.Api.Client.Abstractions;

namespace KSQL.API.Client
{
/// <summary>
/// The result of the <see cref="IClient.ExecuteStatement"/> method.
/// </summary>
public class ExecuteStatementResult
{
/// <summary>
/// Returns the ID of a newly started persistent query, if applicable. The return value is empty
/// for all statements other than 'CREATE ... AS * SELECT' and 'INSERT * INTO ... AS SELECT'
/// statements, as only these statements start persistent queries. For statements that start
/// persistent queries, the return value may still be empty if either:
/// <list type="bullet">
/// <item>
/// <description>
/// The statement was not executed on the server by the time the server response was sent.
/// This typically does not happen under normal server operation, but may happen if the ksqlDB
/// server's command runner thread is stuck, or if the configured value for <code>ksql.server.command.response.timeout.ms</code>
/// is too low.
/// </description>
/// </item>
/// <item>
/// <description>
/// The ksqlDB server version is lower than 0.11.0.
/// </description>
/// </item>
/// </list>
/// </summary>
string? QueryId;

public ExecuteStatementResult(string? queryId)
{
QueryId = queryId;
}
}
}
32 changes: 32 additions & 0 deletions src/KsqlDb.Client/Abstractions/FieldInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using KsqlDb.Api.Client.Abstractions;

namespace KSQL.API.Client
{
/// <summary>
/// A field/column of a ksqlDB stream/table.
/// </summary>
public class FieldInfo
{
/// <summary>
/// The field name.
/// </summary>
public string Name { get; }

/// <summary>
/// The field type.
/// </summary>
public ColumnType Type { get; }

/// <summary>
/// Whether this field is a key field, rather than a value field.
/// </summary>
public bool IsKey { get; }

public FieldInfo(string name, ColumnType type, bool isKey)
{
Name = name;
Type = type;
IsKey = isKey;
}
}
}
146 changes: 146 additions & 0 deletions src/KsqlDb.Client/Abstractions/IClient.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using KSQL.API.Client;
using KsqlDb.Api.Client.Abstractions.Objects;
using KsqlDb.Api.Client.Abstractions.QueryResults;

namespace KsqlDb.Api.Client.Abstractions
{
public interface IClient
{
/// <summary>
/// Executes a pull or push query supplied in the <paramref name="sql"/> and streams the result.
/// </summary>
/// <param name="sql">The query statement.</param>
/// <param name="cancellationToken">The cancellation token that cancels the operation.</param>
/// <returns>
/// A task that completes when the server response is received.
/// </returns>
Task<StreamedQueryResult> StreamQuery(string sql, CancellationToken cancellationToken = default);

/// <summary>
/// Executes a pull or push query supplied in the <paramref name="sql"/> and streams the result.
/// </summary>
/// <param name="sql">The query statement.</param>
/// <param name="properties">The query properties.</param>
/// <param name="cancellationToken">The cancellation token that cancels the operation.</param>
/// <returns>
/// A task that completes when the server response is received.
/// </returns>
Task<StreamedQueryResult> StreamQuery(string sql, IDictionary<string, object> properties, CancellationToken cancellationToken = default);

/// <summary>
/// Executes a pull or push query supplied in the <paramref name="sql"/> and returns the result as a single batch or rows.
/// </summary>
/// <param name="sql">The query statement.</param>
/// <param name="cancellationToken">The cancellation token that cancels the operation.</param>
/// <returns>
/// A task that completes when the server response is received.
/// </returns>
Task<BatchedQueryResults> BatchQuery(string sql, CancellationToken cancellationToken = default);

/// <summary>
/// Executes a pull or push query supplied in the <paramref name="sql"/> and returns the result as a single batch or rows.
/// </summary>
/// <param name="sql">The query statement.</param>
/// <param name="properties">The query properties.</param>
/// <param name="cancellationToken">The cancellation token that cancels the operation.</param>
/// <returns>
/// A task that completes when the server response is received.
/// </returns>
Task<BatchedQueryResults> BatchQuery(string sql, IDictionary<string, object> properties, CancellationToken cancellationToken = default);

/// <summary>
/// Terminates a push query with the specified <paramref name="queryId"/>,
/// </summary>
/// <param name="queryId">The Id of the query to terminate.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes once the server response is received.</returns>
Task TerminatePushQuery(string queryId, CancellationToken cancellationToken = default);

/// <summary>
/// Inserts a new <paramref name="row"/> into the <param name="streamName"></param>
/// </summary>
/// <param name="streamName">The target stream name.</param>
/// <param name="row">The row to be inserted into the stream.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes when the operation is completed.</returns>
Task<RowInsertAcknowledgment> InsertRow(string streamName, KSqlObject row, CancellationToken cancellationToken = default);

/// <summary>
/// Inserts the asynchronous stream of rows <paramref name="rows"/> in to the stream <paramref name="streamName"/>.
/// </summary>
/// <param name="streamName">The target stream name.</param>
/// <param name="rows">The asynchronous stream of rows.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The asynchronous stream of row acknowledgments.</returns>
IAsyncEnumerable<RowInsertAcknowledgment> StreamInserts(string streamName, IAsyncEnumerable<KSqlObject> rows, CancellationToken cancellationToken = default);

/// <summary>
/// Sends a DDL/DML statement to the ksqlDB server.
/// This method supports the following statements:
/// - 'CREATE';
/// - 'CREATE ... AS SELECT';
/// - 'DROP', 'TERMINATE';
/// - 'INSERT INTO ... AS SELECT'.
/// Each request should contain exactly one statement. Requests that contain multiple statements will be rejected by the client.
/// </summary>
/// <param name="sql">The SQL statement to be executed.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes when the operation is completed.</returns>
Task<ExecuteStatementResult> ExecuteStatement(string sql, CancellationToken cancellationToken = default);

/// <summary>
/// Sends a DDL/DML statement to the ksqlDB server.
/// This method supports the following statements:
/// - 'CREATE';
/// - 'CREATE ... AS SELECT';
/// - 'DROP', 'TERMINATE';
/// - 'INSERT INTO ... AS SELECT'.
/// Each request should contain exactly one statement. Requests that contain multiple statements will be rejected by the client.
/// </summary>
/// <param name="sql">The SQL statement to be executed.</param>
/// <param name="properties">The statement properties.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that completes when the operation is completed.</returns>
Task<ExecuteStatementResult> ExecuteStatement(string sql, IDictionary<string, object> properties, CancellationToken cancellationToken = default);

/// <summary>
/// Returns the list of ksqlDB streams from the ksqlDB server's metastore.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns a collection of streams when completes.</returns>
Task<IReadOnlyCollection<StreamOrTableInfo>> ListStreams(CancellationToken cancellationToken = default);

/// <summary>
/// Returns the list of ksqlDB tables from the ksqlDB server's metastore.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns a collection of tables when completes.</returns>
Task<IReadOnlyCollection<StreamOrTableInfo>> ListTables(CancellationToken cancellationToken = default);

/// <summary>
/// Returns the list of Kafka topics available for use with ksqlDB.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns a collection of topics when completes.</returns>
Task<IReadOnlyCollection<TopicInfo>> ListTopics(CancellationToken cancellationToken = default);

/// <summary>
/// Returns the list of queries currently running on the ksqlDB server.
/// </summary>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns a collection of queries when completes.</returns>
Task<IReadOnlyCollection<QueryInfo>> ListQueries(CancellationToken cancellationToken = default);

/// <summary>
/// Returns metadata about the ksqlDB stream or table of the provided name.
/// </summary>
/// <param name="sourceName">The stream or table name.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>A task that returns metadata for the stream or table.</returns>
Task<SourceDescription> DescribeSource(string sourceName, CancellationToken cancellationToken = default);
}
}
80 changes: 80 additions & 0 deletions src/KsqlDb.Client/Abstractions/Objects/KSqlArray.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;

namespace KsqlDb.Api.Client.Abstractions.Objects
{
public class KSqlArray
{
private readonly List<object> _items;

public KSqlArray() : this(new List<object>())
{
}

private KSqlArray(List<object> items) => _items = items;

public int Count => _items.Count;

/// <summary>
///
/// </summary>
/// <param name="index"></param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public object this[int index] => _items[index];

/// <summary>
/// Returns the value at a specified index as a <see cref="string"/>.
/// </summary>
/// <param name="index">The index.</param>
/// <returns>The value at <paramref name="index"/>.</returns>
/// <exception cref="ArgumentOutOfRangeException">If the <paramref name="index"/> is invalid.</exception>
/// <exception cref="InvalidCastException">If the value is not a <see cref="string"/>.</exception>
public string GetString(int index) => GetValue<string>(index);
public int GetInteger(int index) => GetValue<int>(index);
public long GetLong(int index) => GetValue<long>(index);
public double GetDouble(int index) => GetValue<double>(index);
public decimal GetDecimal(int index) => GetValue<decimal>(index);
public KSqlArray GetKSqlArray(int index) => GetValue<KSqlArray>(index);
public KSqlObject GetKSqlObject(int index) => GetValue<KSqlObject>(index);
public bool IsNull(int index) => _items[index] is KSqlNull;

public bool Remove(object value) => _items.Remove(value);
public void RemoveAt(int index) => _items.RemoveAt(index);

public KSqlArray Add(string value) => AddValue(value);
public KSqlArray Add(int value) => AddValue(value);
public KSqlArray Add(long value) => AddValue(value);
public KSqlArray Add(double value) => AddValue(value);
public KSqlArray Add(decimal value) => AddValue(value);
public KSqlArray Add(KSqlArray value) => AddValue(value);
public KSqlArray Add(KSqlObject value) => AddValue(value);
public KSqlArray AddNull() => AddValue(KSqlNull.Instance);
public KSqlArray AddRange(KSqlArray array)
{
_items.AddRange(array._items);
return this;
}

/// <summary>
/// Shallow copy.
/// </summary>
/// <returns></returns>
public KSqlArray Copy() => new KSqlArray(_items);

public IReadOnlyList<object> AsReadOnlyList() => ImmutableArray.CreateRange(_items);

private T GetValue<T>(int index)
{
var item = _items[index];
if (_items[index] is T value) return value;
throw new InvalidCastException($"The value at index {index} is not a {typeof(T).FullName}, it's a {item.GetType().FullName}");
}

internal KSqlArray AddValue(object value)
{
_items.Add(value);
return this;
}
}
}
Loading