Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Semaphore now able to be interrupted #380

Merged
merged 4 commits into from
Feb 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions thread/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,101 @@ TEST(intrusive_list, split) {

}

TEST(interrupt, mutex) {
photon::mutex mtx(0);
// lock first
mtx.lock();
auto th = photon::CURRENT;
int reason = rand();
while (reason == 0) reason = rand();
photon::thread_create11([th, reason]() {
// any errno except 0 is able to stop waiting
photon::thread_interrupt(th, reason);
});
// this time will goto sleep
auto ret = mtx.lock();
ERRNO err;
EXPECT_EQ(-1, ret);
EXPECT_EQ(reason, err.no);
mtx.unlock();
}

TEST(interrupt, condition_variable) {
photon::condition_variable cond;
auto th = photon::CURRENT;
int reason = rand();
while (reason == 0) reason = rand();
photon::thread_create11([th, reason]() {
// any errno except 0 is able to stop waiting
photon::thread_interrupt(th, reason);
});
auto ret = cond.wait_no_lock();
ERRNO err;
EXPECT_EQ(-1, ret);
EXPECT_EQ(reason, err.no);
}

TEST(interrupt, semaphore) {
photon::semaphore sem(0);
auto th = photon::CURRENT;
int reason = rand();
while (reason == 0) reason = rand();
photon::thread_create11([th, reason]() {
// any errno except 0 is able to stop waiting
photon::thread_interrupt(th, reason);
});
auto ret = sem.wait_interruptible(1); // nobody
ERRNO err;
EXPECT_EQ(-1, ret);
EXPECT_EQ(reason, err.no);
}


TEST(condition_variable, pred) {
photon::condition_variable cond;
int flag = 0;
photon::thread_create11([&cond, &flag]() {
// any errno except 0 is able to stop waiting
flag = 1;
cond.notify_one();
// first notify should not wake up condition variable
photon::thread_usleep(1000 * 1000);
flag = 2;
cond.notify_one();

});
auto ret = cond.wait_no_lock([&flag](){ return flag == 2;});
EXPECT_EQ(0, ret);
EXPECT_EQ(2, flag);
ret = cond.wait_no_lock([&flag](){ return flag == 3; }, 1000);
EXPECT_EQ(-1, ret);
EXPECT_EQ(ETIMEDOUT, errno);
flag = 0;
photon::mutex mtx;
SCOPED_LOCK(mtx);
photon::thread_create11([&cond, &flag, &mtx]() {
// any errno except 0 is able to stop waiting
{
SCOPED_LOCK(mtx);
flag = 1;
cond.notify_one();
}
// first notify should not wake up condition variable
photon::thread_usleep(1000 * 1000);
{
SCOPED_LOCK(mtx);
flag = 2;
cond.notify_one();
}
});
ret = cond.wait(mtx, [&flag](){ return flag == 2;});
EXPECT_EQ(0, ret);
EXPECT_EQ(2, flag);
ret = cond.wait(mtx, [&flag](){ return flag == 3; }, 1000);
EXPECT_EQ(-1, ret);
EXPECT_EQ(ETIMEDOUT, errno);
}

int main(int argc, char** arg)
{
::testing::InitGoogleTest(&argc, arg);
Expand Down
17 changes: 10 additions & 7 deletions thread/thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ R"(
*perrno = ETIMEDOUT;
return -1;
}
return (*perrno == ECANCELED) ? 0 : -1;
return (*perrno == -1) ? 0 : -1;
}
int waitq::wait(Timeout timeout)
{
Expand Down Expand Up @@ -1591,7 +1591,7 @@ R"(
ScopedLockHead h(m);
m->owner.store(h);
if (h)
prelocked_thread_interrupt(h, ECANCELED);
prelocked_thread_interrupt(h, -1);
}
static void mutex_unlock(void* m_)
{
Expand Down Expand Up @@ -1672,19 +1672,22 @@ R"(
{
return cvar_do_wait((thread_list*)&q, m, timeout, spinlock_lock, spinlock_unlock);
}
int semaphore::wait(uint64_t count, Timeout timeout)
int semaphore::wait_interruptible(uint64_t count, Timeout timeout)
{
if (count == 0) return 0;
splock.lock();
CURRENT->semaphore_count = count;
int ret = 0;
while (!try_substract(count)) {
ret = waitq::wait_defer(timeout, spinlock_unlock, &splock);
ERRNO err;
splock.lock();
if (ret < 0 && errno == ETIMEDOUT) {
if (ret < 0) {
CURRENT->semaphore_count = 0;
try_resume(); // when timeout, we need to try
splock.unlock(); // to resume next thread(s) in q
// when timeout, we need to try to resume next thread(s) in q
if (err.no == ETIMEDOUT) try_resume();
splock.unlock();
errno = err.no;
return ret;
}
}
Expand All @@ -1704,7 +1707,7 @@ R"(
if (qfcount > cnt) break;
cnt -= qfcount;
qfcount = 0;
prelocked_thread_interrupt(th, ECANCELED);
prelocked_thread_interrupt(th, -1);
}
}
bool semaphore::try_substract(uint64_t count)
Expand Down
41 changes: 37 additions & 4 deletions thread/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ namespace photon
protected:
int wait(Timeout timeout = {});
int wait_defer(Timeout Timeout, void(*defer)(void*), void* arg);
void resume(thread* th, int error_number = ECANCELED); // `th` must be waiting in this waitq!
int resume_all(int error_number = ECANCELED);
thread* resume_one(int error_number = ECANCELED);
void resume(thread* th, int error_number = -1); // `th` must be waiting in this waitq!
int resume_all(int error_number = -1);
thread* resume_one(int error_number = -1);
waitq() = default;
Copy link
Collaborator

@beef9999 beef9999 Feb 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it comes from the inside of the synchronization primitives due to technical reasons, we use -1 to distinguish from user generated ordinary error numbers (>0). And 0 has already been used for another scenario.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the default behavior of the default value -1 ? Documents should be adjusted in:

https://github.com/alibaba/PhotonLibOS/blob/main/doc/docs/api/thread.md#thread_interrupt

https://github.com/alibaba/PhotonLibOS/blob/main/doc/i18n/cn/docusaurus-plugin-content-docs/current/api/thread.md#thread_interrupt

Those documents are about photon::thread_interrupt. The default errno in photon::thread_interrupt is not changed, so it still EINTR, nothing has changed in document.

Here is all about method waitq::resume, which is protected method, not a part of API.

waitq(const waitq& rhs) = delete; // not allowed to copy construct
waitq(waitq&& rhs) = delete;
Expand Down Expand Up @@ -362,17 +362,50 @@ namespace photon
{
return waitq::wait(timeout);
}
template <typename LOCK, typename PRED,
typename = decltype(std::declval<PRED>()())>
int wait(LOCK&& lock, PRED&& pred, Timeout timeout = {}) {
return do_wait_pred(
[&] { return wait(std::forward<LOCK>(lock), timeout); },
std::forward<PRED>(pred), timeout);
}
template <typename PRED,
typename = decltype(std::declval<PRED>()())>
int wait_no_lock(PRED&& pred, Timeout timeout = {}) {
return do_wait_pred(
[&] { return wait_no_lock(timeout); },
std::forward<PRED>(pred), timeout);
}
thread* signal() { return resume_one(); }
thread* notify_one() { return resume_one(); }
int notify_all() { return resume_all(); }
int broadcast() { return resume_all(); }
protected:
template<typename DO_WAIT, typename PRED>
int do_wait_pred(DO_WAIT&& do_wait, PRED&& pred, Timeout timeout) {
int ret = 0;
int err = ETIMEDOUT;
while (!pred() && !timeout.expired()) {
ret = do_wait();
err = errno;
}
errno = err;
return ret;
}
};

class semaphore : protected waitq
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a wait_uninterruptible()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
public:
explicit semaphore(uint64_t count = 0) : m_count(count) { }
int wait(uint64_t count, Timeout timeout = {});
int wait(uint64_t count, Timeout timeout = {}) {
int ret = 0;
do {
ret = wait_interruptible(count, timeout);
} while (ret < 0 && (errno != ESHUTDOWN && errno != ETIMEDOUT));
return ret;
}
int wait_interruptible(uint64_t count, Timeout timeout = {});
int signal(uint64_t count)
{
if (count == 0) return 0;
Expand Down
Loading