Skip to content

Commit

Permalink
first branch
Browse files Browse the repository at this point in the history
  • Loading branch information
oracleloyall committed Apr 26, 2018
1 parent 8fb2650 commit cffccbf
Show file tree
Hide file tree
Showing 39 changed files with 4,146 additions and 0 deletions.
Binary file added a.out
Binary file not shown.
192 changes: 192 additions & 0 deletions block_object.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#include "block_object.h"
#include "scheduler.h"
#include "error.h"
#include <unistd.h>
#include <mutex>
#include <limits>

namespace co
{

BlockObject::BlockObject(std::size_t init_wakeup, std::size_t max_wakeup)
: wakeup_(init_wakeup), max_wakeup_(max_wakeup)
{
DebugPrint(dbg_syncblock, "BlockObject::BlockObject this=%p", this);
}

BlockObject::~BlockObject()
{
if (!lock_.try_lock()) {
ThrowError(eCoErrorCode::ec_block_object_locked);
}

if (!wait_queue_.empty()) {
ThrowError(eCoErrorCode::ec_block_object_waiting);
}

DebugPrint(dbg_syncblock, "BlockObject::~BlockObject this=%p", this);
}

void BlockObject::CoBlockWait()
{
if (!g_Scheduler.IsCoroutine()) {
while (!TryBlockWait()) usleep(10 * 1000);
return ;
}

std::unique_lock<LFLock> lock(lock_);
if (wakeup_ > 0) {
DebugPrint(dbg_syncblock, "wait immedaitely done.");
--wakeup_;
return ;
}
lock.unlock();

Task* tk = g_Scheduler.GetCurrentTask();
tk->block_ = this;
tk->state_ = TaskState::sys_block;
tk->block_timeout_ = MininumTimeDurationType::zero();
tk->is_block_timeout_ = false;
++ tk->block_sequence_;
DebugPrint(dbg_syncblock, "wait to switch. task(%s)", tk->DebugInfo());
g_Scheduler.CoYield();
}

bool BlockObject::CoBlockWaitTimed(MininumTimeDurationType timeo)
{
auto begin = std::chrono::steady_clock::now();
if (!g_Scheduler.IsCoroutine()) {
while (!TryBlockWait() &&
std::chrono::duration_cast<MininumTimeDurationType>
(std::chrono::steady_clock::now() - begin) < timeo)
usleep(10 * 1000);
return false;
}

std::unique_lock<LFLock> lock(lock_);
if (wakeup_ > 0) {
DebugPrint(dbg_syncblock, "wait immedaitely done.");
--wakeup_;
return true;
}
lock.unlock();

Task* tk = g_Scheduler.GetCurrentTask();
tk->block_ = this;
tk->state_ = TaskState::sys_block;
++tk->block_sequence_;
tk->block_timeout_ = timeo;
tk->is_block_timeout_ = false;
DebugPrint(dbg_syncblock, "wait to switch. task(%s)", tk->DebugInfo());
g_Scheduler.CoYield();

return !tk->is_block_timeout_;
}

bool BlockObject::TryBlockWait()
{
std::unique_lock<LFLock> lock(lock_);
if (wakeup_ == 0)
return false;

--wakeup_;
DebugPrint(dbg_syncblock, "try wait success.");
return true;
}

bool BlockObject::Wakeup()
{
std::unique_lock<LFLock> lock(lock_);
Task* tk = wait_queue_.pop();
if (!tk) {
if (wakeup_ >= max_wakeup_) {
DebugPrint(dbg_syncblock, "wakeup failed.");
return false;
}

++wakeup_;
DebugPrint(dbg_syncblock, "wakeup to %lu.", (long unsigned)wakeup_);
return true;
}
lock.unlock();

tk->block_ = nullptr;
if (tk->block_timer_) { // block cancel timer蹇呴』鍦╨ock涔嬪, 鍥犱负閲岄潰浼歭ock
if (g_Scheduler.BlockCancelTimer(tk->block_timer_))
tk->DecrementRef();
tk->block_timer_.reset();
}
DebugPrint(dbg_syncblock, "wakeup task(%s).", tk->DebugInfo());
g_Scheduler.AddTaskRunnable(tk);
return true;
}
void BlockObject::CancelWait(Task* tk, uint32_t block_sequence, bool in_timer)
{
std::unique_lock<LFLock> lock(lock_);
if (tk->block_ != this) {
DebugPrint(dbg_syncblock, "cancelwait task(%s) failed. tk->block_ is not this!", tk->DebugInfo());
return;
}

if (tk->block_sequence_ != block_sequence) {
DebugPrint(dbg_syncblock, "cancelwait task(%s) failed. tk->block_sequence_ = %u, block_sequence = %u.",
tk->DebugInfo(), tk->block_sequence_, block_sequence);
return;
}

if (!wait_queue_.erase(tk)) {
DebugPrint(dbg_syncblock, "cancelwait task(%s) erase failed.", tk->DebugInfo());
return;
}
lock.unlock();

tk->block_ = nullptr;
if (!in_timer && tk->block_timer_) { // block cancel timer蹇呴』鍦╨ock涔嬪, 鍥犱负閲岄潰浼歭ock
g_Scheduler.BlockCancelTimer(tk->block_timer_);
tk->block_timer_.reset();
}
tk->is_block_timeout_ = true;
DebugPrint(dbg_syncblock, "cancelwait task(%s).", tk->DebugInfo());
g_Scheduler.AddTaskRunnable(tk);
}

bool BlockObject::IsWakeup()
{
std::unique_lock<LFLock> lock(lock_);
return wakeup_ > 0;
}

bool BlockObject::AddWaitTask(Task* tk)
{
std::unique_lock<LFLock> lock(lock_);
if (wakeup_) {
--wakeup_;
return false;
}

wait_queue_.push(tk);
DebugPrint(dbg_syncblock, "add wait task(%s). timeout=%ld", tk->DebugInfo(),
(long int)std::chrono::duration_cast<std::chrono::milliseconds>(tk->block_timeout_).count());

// 甯﹁秴鏃剁殑, 澧炲姞瀹氭椂鍣�
if (MininumTimeDurationType::zero() != tk->block_timeout_) {
uint32_t seq = tk->block_sequence_;
tk->IncrementRef();
tk->block_timer_ = g_Scheduler.ExpireAt(tk->block_timeout_, [=]{
// 姝ゅ畾鏃跺櫒瓒呰繃block_object鐢熷懡鏈熸椂浼歝rash鎴栨閿�,
// 鎵�浠akeup鎴朿ancelwait鏃朵竴瀹氳kill姝ゅ畾鏃跺櫒
if (tk->block_sequence_ == seq) {
DebugPrint(dbg_syncblock,
"wait timeout, will cancelwait task(%s). this=%p, tk->block_=%p, seq=%u, timeout=%ld",
tk->DebugInfo(), this, tk->block_, seq,
(long int)std::chrono::duration_cast<std::chrono::milliseconds>(tk->block_timeout_).count());
this->CancelWait(tk, seq, true);
}
tk->DecrementRef();
});
}

return true;
}

} //namespace co
59 changes: 59 additions & 0 deletions block_object.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#pragma once
#include "task.h"

namespace co
{

// 淇″彿绠$悊瀵硅薄
// @绾跨▼瀹夊叏

class BlockObject
{
protected:
friend class Processer;
std::size_t wakeup_; // 褰撳墠淇″彿鏁伴噺
std::size_t max_wakeup_; // 鍙互绉疮鐨勪俊鍙锋暟閲忎笂闄�
TSQueue<Task, false> wait_queue_; // 绛夊緟淇″彿鐨勫崗绋嬮槦鍒�
LFLock lock_;

public:
explicit BlockObject(std::size_t init_wakeup = 0, std::size_t max_wakeup = -1);
~BlockObject();

// 闃诲寮忕瓑寰呬俊鍙�
void CoBlockWait();

// 甯﹁秴鏃剁殑闃诲寮忕瓑寰呬俊鍙�
// @returns: 鏄惁鎴愬姛绛夊埌淇″彿
bool CoBlockWaitTimed(MininumTimeDurationType timeo);

template <typename R, typename P>
bool CoBlockWaitTimed(std::chrono::duration<R, P> duration)
{
return CoBlockWaitTimed(std::chrono::duration_cast<MininumTimeDurationType>(duration));
}

template <typename Clock, typename Dur>
bool CoBlockWaitTimed(std::chrono::time_point<Clock, Dur> const& deadline)
{
auto now = Clock::now();
if (deadline < now)
return CoBlockWaitTimed(MininumTimeDurationType(0));

return CoBlockWaitTimed(std::chrono::duration_cast<MininumTimeDurationType>
(deadline - Clock::now()));
}

bool TryBlockWait();

bool Wakeup();

bool IsWakeup();

private:
void CancelWait(Task* tk, uint32_t block_sequence, bool in_timer = false);

bool AddWaitTask(Task* tk);
};

} //namespace co
Loading

0 comments on commit cffccbf

Please sign in to comment.