Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tmp/tokio 1.0 #28

Draft
wants to merge 17 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ travis-ci = {repository = "dwango/fibers-rs"}
[dependencies]
mio = "0.6"
futures = "0.1"
futures03 = {package = "futures", version = "0.3", features = ["thread-pool", "compat"]}
tokio = {version = "1.0", features = ["time", "rt", "rt-multi-thread", "net", "io-util"]}
splay_tree = "0.2"
num_cpus = "1"
nbchan = "0.1"
pin-project = "1.0.1"

[dev-dependencies]
clap = "2"
Expand Down
90 changes: 90 additions & 0 deletions src/executor/futures_in_place.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
// See the LICENSE file at the top-level directory of this distribution.

use futures::Future;
use futures03::compat::Future01CompatExt;
use futures03::executor::{LocalPool as LocalPool03, LocalSpawner as LocalSpawner03};
use futures03::task::{FutureObj as FutureObj03, Spawn as _};
use futures03::FutureExt;
use std::io;

use super::Executor;
use crate::fiber::Spawn;

/// An executor that executes spawned fibers and I/O event polling on current thread.
///
/// # Examples
///
/// An example to calculate fibonacci numbers:
///
/// ```
/// # extern crate fibers;
/// # extern crate futures;
/// use fibers::{Spawn, Executor, InPlaceExecutor};
/// use futures::{Async, Future};
///
/// fn fib<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
/// if n < 2 {
/// Box::new(futures::finished(n))
/// } else {
/// let f0 = handle.spawn_monitor(fib(n - 1, handle.clone()));
/// let f1 = handle.spawn_monitor(fib(n - 2, handle.clone()));
/// Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
/// }
/// }
///
/// let mut executor = InPlaceExecutor::new().unwrap();
/// let mut monitor = executor.spawn_monitor(fib(7, executor.handle()));
/// loop {
/// if let Async::Ready(answer) = monitor.poll().unwrap() {
/// assert_eq!(answer, 13);
/// return;
/// } else {
/// executor.run_once().unwrap();
/// }
/// }
/// ```
#[derive(Debug)]
pub struct InPlaceExecutor {
pool: LocalPool03,
}
impl InPlaceExecutor {
/// Creates a new instance of `InPlaceExecutor`.
pub fn new() -> io::Result<Self> {
let pool = LocalPool03::new();
Ok(InPlaceExecutor { pool })
}
}
impl Executor for InPlaceExecutor {
type Handle = InPlaceExecutorHandle;
fn handle(&self) -> Self::Handle {
InPlaceExecutorHandle {
spawner: self.pool.spawner(),
}
}
fn run_once(&mut self) -> io::Result<()> {
Ok(())
}
}
impl Spawn for InPlaceExecutor {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
self.handle().spawn_boxed(fiber)
}
}

/// A handle of an `InPlaceExecutor` instance.
#[derive(Debug, Clone)]
pub struct InPlaceExecutorHandle {
spawner: LocalSpawner03,
}

// TODO: don't rely on this
unsafe impl Send for InPlaceExecutorHandle {}
impl Spawn for InPlaceExecutorHandle {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
let future03 = fiber.compat().map(|_result| ());
let futureobj03: FutureObj03<()> = Box::new(future03).into();
// TODO: proper error handlings
self.spawner.spawn_obj(futureobj03).unwrap();
}
}
118 changes: 118 additions & 0 deletions src/executor/futures_thread_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved.
// See the LICENSE file at the top-level directory of this distribution.

use futures::Future;
use futures03::compat::Future01CompatExt;
use futures03::FutureExt;
use std::io;
use std::sync::Arc;
use tokio::runtime::Runtime as TokioRuntime;

use super::Executor;
use crate::fiber::Spawn;

/// An executor that executes spawned fibers on pooled threads.
///
/// # Examples
///
/// An example to calculate fibonacci numbers:
///
/// ```
/// # extern crate fibers;
/// # extern crate futures;
/// use fibers::{Spawn, Executor, ThreadPoolExecutor};
/// use futures::{Async, Future};
///
/// fn fib<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> {
/// if n < 2 {
/// Box::new(futures::finished(n))
/// } else {
/// let f0 = handle.spawn_monitor(fib(n - 1, handle.clone()));
/// let f1 = handle.spawn_monitor(fib(n - 2, handle.clone()));
/// Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ()))
/// }
/// }
///
/// let mut executor = ThreadPoolExecutor::new().unwrap();
/// let monitor = executor.spawn_monitor(fib(7, executor.handle()));
/// let answer = executor.run_fiber(monitor).unwrap();
/// assert_eq!(answer, Ok(13));
/// ```
#[derive(Debug)]
pub struct ThreadPoolExecutor {
pool: Arc<TokioRuntime>,
}
impl ThreadPoolExecutor {
/// Creates a new instance of `ThreadPoolExecutor`.
///
/// This is equivalent to `ThreadPoolExecutor::with_thread_count(num_cpus::get() * 2)`.
pub fn new() -> io::Result<Self> {
Self::with_thread_count(num_cpus::get() * 2)
}

/// Creates a new instance of `ThreadPoolExecutor` with the specified size of thread pool.
///
/// # Implementation Details
///
/// Note that current implementation is very naive and
/// should be improved in future releases.
///
/// Internally, `count` threads are assigned to each of
/// the scheduler (i.e., `fibers::fiber::Scheduler`) and
/// the I/O poller (i.e., `fibers::io::poll::Poller`).
///
/// When `spawn` function is called, the executor will assign a scheduler (thread)
/// for the fiber in simple round robin fashion.
///
/// If any of those threads are aborted, the executor will return an error as
/// a result of `run_once` method call after that.
pub fn with_thread_count(count: usize) -> io::Result<Self> {
assert!(count > 0);
let pool = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.worker_threads(count)
.build()?;
Ok(Self {
pool: Arc::new(pool),
})
}
}
impl Executor for ThreadPoolExecutor {
type Handle = ThreadPoolExecutorHandle;
fn handle(&self) -> Self::Handle {
ThreadPoolExecutorHandle {
pool: self.pool.clone(),
}
}
/// Does nothing. Futures are automatically polled.
fn run_once(&mut self) -> io::Result<()> {
Ok(())
}
/// Runs until the future is ready.
fn run_future<F: Future>(&mut self, future: F) -> io::Result<Result<F::Item, F::Error>> {
Ok(self.pool.block_on(future.compat()))
}

/// Runs infinitely until an error happens.
fn run(self) -> io::Result<()> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

サポートできないなら fibers-rs のメジャーバージョンを変更し、このメソッドも API から削除してしまってはどうでしょう。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

そうします。よく考えたらいずれにせよ、io, net, fiber::schedule モジュールは消してしまうので、メジャーバージョンの更新は必須でした。

// In this impl, run should never be called.
unreachable!("Don't call run directly!");
}
}
impl Spawn for ThreadPoolExecutor {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
self.handle().spawn_boxed(fiber)
}
}

/// A handle of a `ThreadPoolExecutor` instance.
#[derive(Debug, Clone)]
pub struct ThreadPoolExecutorHandle {
pool: Arc<TokioRuntime>,
}
impl Spawn for ThreadPoolExecutorHandle {
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) {
let future03 = fiber.compat().map(|_result| ());
self.pool.spawn(future03);
}
}
8 changes: 4 additions & 4 deletions src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
use futures::{Async, Future};
use std::io;

pub use self::in_place::{InPlaceExecutor, InPlaceExecutorHandle};
pub use self::thread_pool::{ThreadPoolExecutor, ThreadPoolExecutorHandle};
pub use self::futures_in_place::{InPlaceExecutor, InPlaceExecutorHandle};
pub use self::futures_thread_pool::{ThreadPoolExecutor, ThreadPoolExecutorHandle};

use crate::fiber::Spawn;
use crate::sync::oneshot::{Monitor, MonitorError};

mod in_place;
mod thread_pool;
mod futures_in_place;
mod futures_thread_pool;

/// The `Executor` trait allows for spawning and executing fibers.
pub trait Executor: Sized {
Expand Down
Loading