-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Tmp/tokio 1.0 #28
Draft
koba-e964
wants to merge
17
commits into
master
Choose a base branch
from
tmp/tokio-1.0
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+368
−435
Draft
Tmp/tokio 1.0 #28
Changes from 16 commits
Commits
Show all changes
17 commits
Select commit
Hold shift + click to select a range
f25f152
oneshot delegates to futures:0.1 (not working on its own)
koba-e964 5011f47
mpsc delegates to futures:0.1, remove sync_channel (not working on it…
koba-e964 0c3594e
executor delegates to futures:0.3 (still not working)
koba-e964 8567cb0
Finally, it works!
koba-e964 43d7c92
Remove functions in tcp
koba-e964 6872b3b
Remove functions from udp
koba-e964 eff770d
timer calls tokio's timer (not confirmed to work)
koba-e964 9a5e577
timer test works
koba-e964 ac82575
Remove unused functions, fix warnings
koba-e964 461a1d5
Add Send
koba-e964 a2b5052
Add test
koba-e964 9cdaa25
Why (Connection refused)?
koba-e964 ba15926
Test works!
koba-e964 a9cf714
Thank God r/w succeeds
koba-e964 d6f93e0
Remove debug print
koba-e964 79d7db3
tokio 1.0
koba-e964 14f8612
Remove unnecessary functions
koba-e964 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved. | ||
// See the LICENSE file at the top-level directory of this distribution. | ||
|
||
use futures::Future; | ||
use futures03::compat::Future01CompatExt; | ||
use futures03::executor::{LocalPool as LocalPool03, LocalSpawner as LocalSpawner03}; | ||
use futures03::task::{FutureObj as FutureObj03, Spawn as _}; | ||
use futures03::FutureExt; | ||
use std::io; | ||
|
||
use super::Executor; | ||
use crate::fiber::Spawn; | ||
|
||
/// An executor that executes spawned fibers and I/O event polling on current thread. | ||
/// | ||
/// # Examples | ||
/// | ||
/// An example to calculate fibonacci numbers: | ||
/// | ||
/// ``` | ||
/// # extern crate fibers; | ||
/// # extern crate futures; | ||
/// use fibers::{Spawn, Executor, InPlaceExecutor}; | ||
/// use futures::{Async, Future}; | ||
/// | ||
/// fn fib<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> { | ||
/// if n < 2 { | ||
/// Box::new(futures::finished(n)) | ||
/// } else { | ||
/// let f0 = handle.spawn_monitor(fib(n - 1, handle.clone())); | ||
/// let f1 = handle.spawn_monitor(fib(n - 2, handle.clone())); | ||
/// Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ())) | ||
/// } | ||
/// } | ||
/// | ||
/// let mut executor = InPlaceExecutor::new().unwrap(); | ||
/// let mut monitor = executor.spawn_monitor(fib(7, executor.handle())); | ||
/// loop { | ||
/// if let Async::Ready(answer) = monitor.poll().unwrap() { | ||
/// assert_eq!(answer, 13); | ||
/// return; | ||
/// } else { | ||
/// executor.run_once().unwrap(); | ||
/// } | ||
/// } | ||
/// ``` | ||
#[derive(Debug)] | ||
pub struct InPlaceExecutor { | ||
pool: LocalPool03, | ||
} | ||
impl InPlaceExecutor { | ||
/// Creates a new instance of `InPlaceExecutor`. | ||
pub fn new() -> io::Result<Self> { | ||
let pool = LocalPool03::new(); | ||
Ok(InPlaceExecutor { pool }) | ||
} | ||
} | ||
impl Executor for InPlaceExecutor { | ||
type Handle = InPlaceExecutorHandle; | ||
fn handle(&self) -> Self::Handle { | ||
InPlaceExecutorHandle { | ||
spawner: self.pool.spawner(), | ||
} | ||
} | ||
fn run_once(&mut self) -> io::Result<()> { | ||
Ok(()) | ||
} | ||
} | ||
impl Spawn for InPlaceExecutor { | ||
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) { | ||
self.handle().spawn_boxed(fiber) | ||
} | ||
} | ||
|
||
/// A handle of an `InPlaceExecutor` instance. | ||
#[derive(Debug, Clone)] | ||
pub struct InPlaceExecutorHandle { | ||
spawner: LocalSpawner03, | ||
} | ||
|
||
// TODO: don't rely on this | ||
unsafe impl Send for InPlaceExecutorHandle {} | ||
impl Spawn for InPlaceExecutorHandle { | ||
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) { | ||
let future03 = fiber.compat().map(|_result| ()); | ||
let futureobj03: FutureObj03<()> = Box::new(future03).into(); | ||
// TODO: proper error handlings | ||
self.spawner.spawn_obj(futureobj03).unwrap(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
// Copyright (c) 2016 DWANGO Co., Ltd. All Rights Reserved. | ||
// See the LICENSE file at the top-level directory of this distribution. | ||
|
||
use futures::Future; | ||
use futures03::compat::Future01CompatExt; | ||
use futures03::FutureExt; | ||
use std::io; | ||
use std::sync::Arc; | ||
use tokio::runtime::Runtime as TokioRuntime; | ||
|
||
use super::Executor; | ||
use crate::fiber::Spawn; | ||
|
||
/// An executor that executes spawned fibers on pooled threads. | ||
/// | ||
/// # Examples | ||
/// | ||
/// An example to calculate fibonacci numbers: | ||
/// | ||
/// ``` | ||
/// # extern crate fibers; | ||
/// # extern crate futures; | ||
/// use fibers::{Spawn, Executor, ThreadPoolExecutor}; | ||
/// use futures::{Async, Future}; | ||
/// | ||
/// fn fib<H: Spawn + Clone>(n: usize, handle: H) -> Box<dyn Future<Item=usize, Error=()> + Send> { | ||
/// if n < 2 { | ||
/// Box::new(futures::finished(n)) | ||
/// } else { | ||
/// let f0 = handle.spawn_monitor(fib(n - 1, handle.clone())); | ||
/// let f1 = handle.spawn_monitor(fib(n - 2, handle.clone())); | ||
/// Box::new(f0.join(f1).map(|(a0, a1)| a0 + a1).map_err(|_| ())) | ||
/// } | ||
/// } | ||
/// | ||
/// let mut executor = ThreadPoolExecutor::new().unwrap(); | ||
/// let monitor = executor.spawn_monitor(fib(7, executor.handle())); | ||
/// let answer = executor.run_fiber(monitor).unwrap(); | ||
/// assert_eq!(answer, Ok(13)); | ||
/// ``` | ||
#[derive(Debug)] | ||
pub struct ThreadPoolExecutor { | ||
pool: Arc<TokioRuntime>, | ||
} | ||
impl ThreadPoolExecutor { | ||
/// Creates a new instance of `ThreadPoolExecutor`. | ||
/// | ||
/// This is equivalent to `ThreadPoolExecutor::with_thread_count(num_cpus::get() * 2)`. | ||
pub fn new() -> io::Result<Self> { | ||
Self::with_thread_count(num_cpus::get() * 2) | ||
} | ||
|
||
/// Creates a new instance of `ThreadPoolExecutor` with the specified size of thread pool. | ||
/// | ||
/// # Implementation Details | ||
/// | ||
/// Note that current implementation is very naive and | ||
/// should be improved in future releases. | ||
/// | ||
/// Internally, `count` threads are assigned to each of | ||
/// the scheduler (i.e., `fibers::fiber::Scheduler`) and | ||
/// the I/O poller (i.e., `fibers::io::poll::Poller`). | ||
/// | ||
/// When `spawn` function is called, the executor will assign a scheduler (thread) | ||
/// for the fiber in simple round robin fashion. | ||
/// | ||
/// If any of those threads are aborted, the executor will return an error as | ||
/// a result of `run_once` method call after that. | ||
pub fn with_thread_count(count: usize) -> io::Result<Self> { | ||
assert!(count > 0); | ||
let pool = tokio::runtime::Builder::new_multi_thread() | ||
.enable_all() | ||
.worker_threads(count) | ||
.build()?; | ||
Ok(Self { | ||
pool: Arc::new(pool), | ||
}) | ||
} | ||
} | ||
impl Executor for ThreadPoolExecutor { | ||
type Handle = ThreadPoolExecutorHandle; | ||
fn handle(&self) -> Self::Handle { | ||
ThreadPoolExecutorHandle { | ||
pool: self.pool.clone(), | ||
} | ||
} | ||
/// Does nothing. Futures are automatically polled. | ||
fn run_once(&mut self) -> io::Result<()> { | ||
Ok(()) | ||
} | ||
/// Runs until the future is ready. | ||
fn run_future<F: Future>(&mut self, future: F) -> io::Result<Result<F::Item, F::Error>> { | ||
Ok(self.pool.block_on(future.compat())) | ||
} | ||
|
||
/// Runs infinitely until an error happens. | ||
fn run(self) -> io::Result<()> { | ||
// In this impl, run should never be called. | ||
unreachable!("Don't call run directly!"); | ||
} | ||
} | ||
impl Spawn for ThreadPoolExecutor { | ||
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) { | ||
self.handle().spawn_boxed(fiber) | ||
} | ||
} | ||
|
||
/// A handle of a `ThreadPoolExecutor` instance. | ||
#[derive(Debug, Clone)] | ||
pub struct ThreadPoolExecutorHandle { | ||
pool: Arc<TokioRuntime>, | ||
} | ||
impl Spawn for ThreadPoolExecutorHandle { | ||
fn spawn_boxed(&self, fiber: Box<dyn Future<Item = (), Error = ()> + Send>) { | ||
let future03 = fiber.compat().map(|_result| ()); | ||
self.pool.spawn(future03); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,23 +78,6 @@ impl TcpListener { | |
pub fn local_addr(&self) -> io::Result<SocketAddr> { | ||
self.handle.inner().local_addr() | ||
} | ||
|
||
/// Get the value of the `SO_ERROR` option on this socket. | ||
/// | ||
/// This will retrieve the stored error in the underlying socket, | ||
/// clearing the field in the process. | ||
/// This can be useful for checking errors between calls. | ||
pub fn take_error(&self) -> io::Result<Option<io::Error>> { | ||
self.handle.inner().take_error() | ||
} | ||
|
||
/// Calls `f` with the reference to the inner socket. | ||
pub fn with_inner<F, T>(&self, f: F) -> T | ||
where | ||
F: FnOnce(&MioTcpListener) -> T, | ||
{ | ||
f(&*self.handle.inner()) | ||
} | ||
} | ||
impl fmt::Debug for TcpListener { | ||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
|
@@ -463,3 +446,60 @@ impl Future for ConnectInner { | |
} | ||
} | ||
} | ||
|
||
mod tests { | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
|
||
#[test] | ||
fn async_works() { | ||
use crate::ThreadPoolExecutor; | ||
use crate::{Executor, Spawn}; | ||
use futures::Future; | ||
use futures03::TryFutureExt; | ||
use std::net::SocketAddr; | ||
use std::sync::Arc; | ||
use std::time::Duration; | ||
let mut exec = ThreadPoolExecutor::new().unwrap(); | ||
let addr: SocketAddr = "127.0.0.1:2525".parse().unwrap(); | ||
let flag = Arc::new(AtomicBool::new(false)); | ||
let flag_cp = flag.clone(); | ||
let fut_listen_03 = async move { | ||
let listener = tokio::net::TcpListener::bind(addr).await.unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. このファイルが提供している struct とかをテストしてるわけじゃなさそうなんですけど, このテストはここにあったほうが良いのでしょうか? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 確かにそうなのですが、他に適切な置き場所もなく、という感じです。 |
||
let (conn, addr) = listener.accept().await.unwrap(); | ||
eprintln!("Connected! addr = {:?}", addr); | ||
let mut buf = vec![0; 10]; | ||
for i in 0..10 { | ||
let read = conn.try_read(&mut buf); | ||
eprintln!("read = {:?}, buf = {:?}", read, buf); | ||
if read.is_ok() { | ||
break; | ||
} | ||
std::thread::sleep(Duration::from_secs(1)); | ||
} | ||
flag_cp.store(true, Ordering::SeqCst); | ||
Ok::<(), std::io::Error>(()) | ||
}; | ||
let fut_listen = Box::pin(fut_listen_03).compat(); | ||
let fut_conn_03 = async move { | ||
eprintln!("connecting..."); | ||
let str = tokio::net::TcpStream::connect(addr).await.unwrap(); | ||
for i in 0..10 { | ||
let res = str.try_write(&[68, 69, 70, 71]); | ||
eprintln!("write = {:?}", res); | ||
if res.is_ok() { | ||
break; | ||
} | ||
} | ||
Ok::<(), std::io::Error>(()) | ||
}; | ||
let fut = Box::pin(fut_conn_03).compat(); | ||
exec.spawn(fut_listen.map_err(|e| panic!("Spawn failed: server {}", e))); | ||
|
||
std::thread::sleep(Duration::from_secs(1)); | ||
|
||
exec.run_future(fut.map_err(|e| panic!("Spawn failed: client {}", e))) | ||
.unwrap() | ||
.unwrap(); | ||
while !flag.load(Ordering::SeqCst) {} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
サポートできないなら fibers-rs のメジャーバージョンを変更し、このメソッドも API から削除してしまってはどうでしょう。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
そうします。よく考えたらいずれにせよ、io, net, fiber::schedule モジュールは消してしまうので、メジャーバージョンの更新は必須でした。