Skip to content

Commit

Permalink
Add Rails Integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Chizhanov committed Jul 10, 2022
1 parent 06c0012 commit 7f96b94
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 92 deletions.
11 changes: 9 additions & 2 deletions bin/gruf
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ end
load 'config/environment.rb' if defined?(Rails)
require 'gruf'

cli = Gruf::Cli::Executor.new
cli.run
begin
cli = Gruf::Cli::Executor.new
cli.run(ARGV)
rescue => e
raise e if $DEBUG
STDERR.puts e.message
STDERR.puts e.backtrace.join("\n")
exit 1
end
129 changes: 104 additions & 25 deletions lib/gruf/cli/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,67 +23,96 @@ module Cli
# Handles execution of the gruf binstub, along with command-line arguments
#
class Executor
include Gruf::Loggable

class NoServicesBoundError < StandardError; end

KILL_SIGNALS = %w[INT TERM QUIT].freeze

# Run CLI inside the current process and shutdown at exit
def self.embed!(args = [])
new(embedded: true).tap do |cli|
cli.run(args)
at_exit { cli.shutdown }
end
end

attr_reader :server, :embedded
alias embedded? embedded

##
# @param [Hash|ARGV]
# @param [Boolean] embedded
# @param [::Gruf::Server|NilClass] server
# @param [Array<Class>|NilClass] services
# @param [Gruf::Hooks::Executor|NilClass] hook_executor
# @param [Logger|NilClass] logger
#
def initialize(
args = ARGV,
embedded: false,
server: nil,
services: nil,
hook_executor: nil,
logger: nil
hook_executor: nil
)
@args = args
setup! # ensure we set some defaults from CLI here so we can allow configuration
@embedded = embedded
@services = services.is_a?(Array) ? services : []
@hook_executor = hook_executor || Gruf::Hooks::Executor.new(hooks: Gruf.hooks&.prepare)
@server = server || Gruf::Server.new(Gruf.server_options)
@logger = logger || Gruf.logger || ::Logger.new($stderr)
@hook_executor = hook_executor
@server = server
end

##
# Run the server
# @param [Hash|ARGV] args
#
def run
exception = nil
# wait to load controllers until last possible second to allow late configuration
::Gruf.autoloaders.load!(controllers_path: Gruf.controllers_path)
# allow lazy registering globally as late as possible, this allows more flexible binstub injections
@services = ::Gruf.services unless @services&.any?
def run(args = [])
@args = args
setup_options! # ensure we set some defaults from CLI here so we can allow configuration
setup!

unless @services.any?
raise NoServicesBoundError,
'No services bound to this gruf process; please bind a service to a Gruf controller ' \
'to start the server successfully'
end
exception = nil

update_proc_title(:starting)
begin
@services.each { |s| @server.add_service(s) }
@hook_executor.call(:before_server_start, server: @server)
@server.start!
@server.start
rescue StandardError => e
exception = e
# Catch the exception here so that we always ensure the post hook runs
# This allows systems wanting to provide external server instrumentation
# the ability to properly handle server failures
@logger.fatal("FATAL ERROR: #{e.message} #{e.backtrace.join("\n")}")
logger.fatal "FATAL ERROR: #{e.message} #{e.backtrace.join("\n")}"
end
update_proc_title(:serving)

if exception
shutdown
raise exception
end

return if embedded?

begin
wait_till_terminated
rescue Interrupt => e
logger.info "[gruf] Stopping... #{e.message}"

shutdown

logger.info '[gruf] Stopped. Good-bye!'
end
end

def shutdown
server&.stop
@hook_executor.call(:after_server_stop, server: @server)
raise exception if exception
update_proc_title(:stopped)
end

private

##
# Setup options for CLI execution and configure Gruf based on inputs
#
def setup!
def setup_options!
opts = parse_options

Gruf.server_binding_url = opts[:host] if opts[:host]
Expand Down Expand Up @@ -114,6 +143,56 @@ def parse_options
end
end
end

##
# Setup Gruf
#
def setup!
@server ||= Gruf::Server.new(Gruf.server_options)
@hook_executor ||= Gruf::Hooks::Executor.new(hooks: Gruf.hooks&.prepare)

# wait to load controllers until last possible second to allow late configuration
::Gruf.autoloaders.load!(controllers_path: Gruf.controllers_path)
# allow lazy registering globally as late as possible, this allows more flexible binstub injections
@services = ::Gruf.services unless @services&.any?
unless @services.any? # rubocop:disable Lint/GuardClause
raise NoServicesBoundError,
'No services bound to this gruf process; please bind a service to a Gruf controller ' \
'to start the server successfully'
end
end

def wait_till_terminated
self_read = setup_signals

readable_io = IO.select([self_read]) # rubocop:disable Lint/IncompatibleIoSelectWithFiberScheduler
signal = readable_io.first[0].gets.strip
raise Interrupt, "SIG#{signal} received"
end

def setup_signals
self_read, self_write = IO.pipe

KILL_SIGNALS.each do |signal|
trap signal do
self_write.puts signal
end
end

self_read
end

##
# Updates proc name/title
#
# @param [Symbol] state
#
# :nocov:
def update_proc_title(state)
Process.setproctitle("gruf #{Gruf::VERSION} -- #{state}")
end

# :nocov:
end
end
end
8 changes: 8 additions & 0 deletions lib/gruf/integrations/rails/railtie.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ class Railtie < ::Rails::Railtie
end
end
end

# Since Rails 6.1
if respond_to?(:server)
server do
require 'gruf'
Gruf::Cli::Executor.embed!
end
end
end
end
end
Expand Down
76 changes: 44 additions & 32 deletions lib/gruf/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ module Gruf
class Server
class ServerAlreadyStartedError < StandardError; end

KILL_SIGNALS = %w[INT TERM QUIT].freeze

include Gruf::Loggable

# @!attribute [r] port
Expand All @@ -44,7 +42,6 @@ def initialize(opts = {})
@interceptors = opts.fetch(:interceptor_registry, Gruf.interceptors)
@interceptors = Gruf::Interceptors::Registry.new unless @interceptors.is_a?(Gruf::Interceptors::Registry)
@services = nil
@started = false
@hostname = opts.fetch(:hostname, Gruf.server_binding_url)
@event_listener_proc = opts.fetch(:event_listener_proc, Gruf.event_listener_proc)
end
Expand Down Expand Up @@ -89,31 +86,55 @@ def server
# Start the gRPC server
#
# :nocov:
def start!
update_proc_title(:starting)
def start
return if running?

server_thread = Thread.new do
logger.info { "[gruf] Starting gruf server at #{@hostname}..." }
server.run_till_terminated_or_interrupted(KILL_SIGNALS)
end
@started = true
update_proc_title(:serving)
server_thread.join
@started = false
raise 'Cannot re-start stopped server' if stopped?

logger.info "[gruf] Starting gruf server at #{@hostname}..."

update_proc_title(:stopped)
logger.info { '[gruf] Goodbye!' }
@server_thread = Thread.new { server.run }

server.wait_till_running
end

# :nocov:

def wait_till_terminated
raise 'Server is not running' unless running?

server_thread.join
end

# Stop gRPC server if it's running
def stop
return unless running?

server.stop

logger.info '[gruf] gRPC server stopped'
end

def running?
return false if @server.nil?

server.running_state == :running
end

def stopped?
return false if @server.nil?

server.running_state == :stopped
end

##
# Add a gRPC service stub to be served by gruf
#
# @param [Class] klass
# @raise [ServerAlreadyStartedError] if the server is already started
#
def add_service(klass)
raise ServerAlreadyStartedError if @started
raise ServerAlreadyStartedError if running?

@services << klass unless services.include?(klass)
end
Expand All @@ -126,7 +147,7 @@ def add_service(klass)
# @raise [ServerAlreadyStartedError] if the server is already started
#
def add_interceptor(klass, opts = {})
raise ServerAlreadyStartedError if @started
raise ServerAlreadyStartedError if running?

@interceptors.use(klass, opts)
end
Expand All @@ -139,7 +160,7 @@ def add_interceptor(klass, opts = {})
# @param [Hash] opts A hash of options for the interceptor
#
def insert_interceptor_before(before_class, interceptor_class, opts = {})
raise ServerAlreadyStartedError if @started
raise ServerAlreadyStartedError if running?

@interceptors.insert_before(before_class, interceptor_class, opts)
end
Expand All @@ -152,7 +173,7 @@ def insert_interceptor_before(before_class, interceptor_class, opts = {})
# @param [Hash] opts A hash of options for the interceptor
#
def insert_interceptor_after(after_class, interceptor_class, opts = {})
raise ServerAlreadyStartedError if @started
raise ServerAlreadyStartedError if running?

@interceptors.insert_after(after_class, interceptor_class, opts)
end
Expand All @@ -172,7 +193,7 @@ def list_interceptors
# @param [Class] klass
#
def remove_interceptor(klass)
raise ServerAlreadyStartedError if @started
raise ServerAlreadyStartedError if running?

@interceptors.remove(klass)
end
Expand All @@ -181,13 +202,15 @@ def remove_interceptor(klass)
# Clear the interceptor registry of interceptors
#
def clear_interceptors
raise ServerAlreadyStartedError if @started
raise ServerAlreadyStartedError if running?

@interceptors.clear
end

private

attr_reader :server_thread

##
# @return [Array<Class>]
#
Expand Down Expand Up @@ -216,19 +239,8 @@ def ssl_credentials
certs = [nil, [{ private_key: private_key, cert_chain: cert_chain }], false]
GRPC::Core::ServerCredentials.new(*certs)
end
# :nocov:

##
# Updates proc name/title
#
# @param [Symbol] state
#
# :nocov:
def update_proc_title(state)
Process.setproctitle("gruf #{Gruf::VERSION} -- #{state}")
end
# :nocov:
#

##
# Handle thread-safe access to the server
Expand Down
Loading

0 comments on commit 7f96b94

Please sign in to comment.