Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nats support #114

Open
zykovkirill opened this issue Mar 20, 2023 · 10 comments
Open

Nats support #114

zykovkirill opened this issue Mar 20, 2023 · 10 comments

Comments

@zykovkirill
Copy link

zykovkirill commented Mar 20, 2023

Will there be Nats support in the future?
For Example

using JsonRpc.Core;
using JsonRpc.Router;
using JsonRpc.Router.Abstractions;
using JsonRpc.Router.Defaults;

namespace TecNotificationProcessing.NatsProcessing
{
    public interface IJRpcService
    {

        Task<RpcResponse> ProcessRequestAsync(RpcRequest request);
    }

    public class JRpcService : IJRpcService
    {
        private readonly IServiceProvider _serviceProvider;
        private readonly IRpcInvoker _invoker;
        private readonly IRouteContext _routeContext;

        public JRpcService(IRpcRouteProvider routeProvider, IServiceProvider serviceProvider, IRpcInvoker invoker)
        {
            _invoker = invoker ?? throw new ArgumentNullException(nameof(invoker)); ;
            _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); ;
            _routeContext = new DefaultRouteContext(null, null, routeProvider ?? throw new ArgumentNullException(nameof(routeProvider)));
        }
        public async Task<RpcResponse> ProcessRequestAsync(RpcRequest request)
        {
            return await _invoker.InvokeRequestAsync(request, RpcPath.Default, _routeContext, _serviceProvider);
        }
    }
}


@Gekctek
Copy link
Collaborator

Gekctek commented Mar 20, 2023

@zykovkirill Can you elaborate, I'm not familiar with Nats.

@amoraller
Copy link
Contributor

amoraller commented Mar 21, 2023

We have alternative channel for JSON rpc.
We want resolve IRpcRequestHandler interface and invoke Task<bool> HandleRequestAsync(Stream requestBody, Stream responseBody); for processing jsonrpc protocol.
I think IRpcRequestHandler and another interfaces must be public for using without aspnet core infrastructure.

@zykovkirill
Copy link
Author

I managed to use the nuts with the help of reflection

public async Task<byte[]> ProcessRequestAsync(byte[] npt)
        {
            string path = "jrpcapi";
            var routeContext = new RpcContext(_serviceProvider, path);

            using MemoryStream requestStream = new MemoryStream(npt);
            using MemoryStream responseStream = new MemoryStream();
            var ttt = _reue.FirstOrDefault( c => c.FullName == "EdjCase.JsonRpc.Router.Abstractions.IRpcRequestHandler");

            using var scope = _serviceProvider.CreateScope();

            var ty = _reue.FirstOrDefault(c => c.FullName == "EdjCase.JsonRpc.Router.Abstractions.IRpcContextAccessor");
            var routeContextService = _serviceProvider.GetRequiredService(ty);
            var method = ty.GetMethods().FirstOrDefault(m => m.Name == "Set");
            method.Invoke(routeContextService, new object[] { routeContext });


            var tm = ttt.GetMethods();
            var tmm = tm.FirstOrDefault(m => m.Name == "HandleRequestAsync");
            var  t = await (Task<bool>)tmm.Invoke(Instance,  new object[] { requestStream, responseStream });

            responseStream.Position = 0L;
            return responseStream.ToArray();

        }

using var sub = await _natsConnection.SubscribeRequestAsync<byte[], byte[]>(_config.Value.NatsChannel, _jRpcService.ProcessRequestAsync);

@zykovkirill
Copy link
Author

zykovkirill commented Mar 21, 2023

@zykovkirillМожно поподробнее, я не знаком с Натсом.
In short, the data bus https://docs.nats.io/nats-concepts/what-is-nats
I use this library in my project
This one is the most popular (C#)
https://stackoverflow.com/questions/63418503/does-grpc-vs-nats-or-kafka-make-any-sense

Similar question #37

@zykovkirill
Copy link
Author

У нас есть альтернативный канал для JSON rpc. Мы хотим разрешить интерфейс IRpcRequestHandler и вызвать его Task<bool> HandleRequestAsync(Stream requestBody, Stream responseBody);для обработки протокола jsonrpc. Я думаю, что IRpcRequestHandler и другие интерфейсы должны быть общедоступными для использования без базовой инфраструктуры aspnet.

I agree with you

@zykovkirill
Copy link
Author

zykovkirill commented Mar 21, 2023

Example

using EdjCase.JsonRpc.Router.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.IO;
using System.Threading.Tasks;

namespace EdjCase.JsonRpc.Router
{
	/// <summary>
	/// Custom Invoker 
	/// </summary>
	public class RpcCustomInvoker
	{
		private readonly IServiceProvider serviceProvider;
		public RpcCustomInvoker(IServiceProvider services)
		{
			this.serviceProvider = services ?? throw new ArgumentNullException(nameof(services));
		}

		public async Task<byte[]> ProcessRequestAsync(byte[] request, string controllerName)
		{

			if (controllerName.EndsWith("Controller"))
			{
				controllerName = controllerName.Substring(0, controllerName.IndexOf("Controller", StringComparison.Ordinal));
			}

			var routeContext = new RpcContext(this.serviceProvider, controllerName);

			using MemoryStream requestStream = new MemoryStream(request);
			using MemoryStream responseStream = new MemoryStream();

			using var scope = this.serviceProvider.CreateScope();
			var rpcContextAccessor = scope.ServiceProvider.GetRequiredService<IRpcContextAccessor>();
			rpcContextAccessor.Set(routeContext);

			var rpcRequestHandler = scope.ServiceProvider.GetRequiredService<IRpcRequestHandler>();
			await rpcRequestHandler.HandleRequestAsync(requestStream, responseStream);

			responseStream.Position = 0L;
			return responseStream.ToArray();

		}

	}
}

Nats service

using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
using AlterNats;
using Core;
using EdjCase.JsonRpc.Router;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace Test
{
    public class NatsService : BackgroundService
    {
        private readonly ILogger _logger;
        private readonly NatsConnection _natsConnection;
        private readonly IOptions<NatsConfiguration> _config;
        private readonly RpcCustomInvoker _invoker;
        public NatsService(ILogger<NatsService> logger,
            IOptions<NatsConfiguration> config,
            INatsCommand command,
            RpcCustomInvoker invoker)
        {
            _config = config;
            _logger = logger;
            _natsConnection = (NatsConnection)command;
            _invoker = invoker;
        }
        private const string Error = "Ошибка при работе с натс";
        public bool HealthState { get; private set; } = false;

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            var connectionDisconnected = Observable.FromEventPattern<string>(a => _natsConnection.ConnectionDisconnected += a, a => _natsConnection.ConnectionDisconnected -= a).Select(p => Unit.Default);
            var reconnectFailed = Observable.FromEventPattern<string>(a => _natsConnection.ReconnectFailed += a, a => _natsConnection.ReconnectFailed -= a).Select(p => Unit.Default);

            using var _1 = reconnectFailed.Merge(connectionDisconnected).Subscribe(_ => HealthState = false);

            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {

                    using var sub = await _natsConnection.SubscribeRequestAsync<byte[], byte[]>(_config.Value.NatsChannel, Handle);
                    HealthState = true;
                    await Observable.Merge(Observable.FromAsync(() => stoppingToken.AsTaskAsync())).FirstOrDefaultAsync();

                }
                catch (Exception ex) when (!stoppingToken.IsCancellationRequested)
                {
                    _logger.LogError(ex, Error);
                    await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
                }
            }
        }

        /// <summary>
        /// Обработчик запросов 
        /// </summary>
        /// <param name="rpcRequest">Запрос</param>
        private async Task<byte[]> Handle(byte[] rpcRequest)
        {
             _logger.LogInformation($"Принят запрос - {rpcRequest}");
            return await  _invoker.ProcessRequestAsync(rpcRequest, nameof(JrpcApiController));

        }

    }
}

@Gekctek
Copy link
Collaborator

Gekctek commented Mar 21, 2023

I have put up a PR for a potential fix to help out your situation in #115
Take a look and let me know what you think @zykovkirill

@Gekctek Gekctek reopened this Mar 21, 2023
@zykovkirill
Copy link
Author

I have put up a PR for a potential fix to help out your situation in #115 Take a look and let me know what you think @zykovkirill

It works well, thanks

@zykovkirill
Copy link
Author

In the near future I will formulate a question on integration NATS(Client) into EdjCase.JsonRpc.Client.Preliminary questions:

  1. Make the RpcClient constructor public.
  2. Make IRpcTransportClient public.
  3. Analyze the code for flexibility.
  4. Make alternative implementations of NatsClientBuilder, NatsRpcTransportClient.

@Gekctek
Copy link
Collaborator

Gekctek commented Mar 23, 2023

@zykovkirill
Version 5.1.5 has the PR changes in it

For your questions, if you can supply me with some code for a NatsClientBuilder and NatsRpcTransportClient, I can integrate them in. Im just not familiar with Nats. Then we can go back and forth on any feedback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants