Skip to content

Commit eb8afc5

Browse files
Split Executor header and source files into corresponding class files
1 parent 5ede61b commit eb8afc5

File tree

16 files changed

+223
-167
lines changed

16 files changed

+223
-167
lines changed

.cspell.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"Bodyless", "HTTPGET", "ratelimiter", "Ratelimiter", "STDMETHODCALLTYPE", "CANTSAVE", "OLECHAR", "DISPID",
1212
"UNKNOWNNAME", "DISPPARAMS", "XMLHTTP", "comptr", "Metadataservice", "Streamfn", "HWAVEOUT", "matdesc",
1313
"Presigner", "xindex", "errortype", "waveout", "WAVEOUTCAPSA", "ALLOWSYNC", "WAVEHDR", "MMSYSERR",
14-
"WAVEFORMATEX", "Unprepare", "DDISABLE_IMDSV1",
14+
"WAVEFORMATEX", "Unprepare", "DDISABLE_IMDSV1", "threadpool",
1515
// AWS general
1616
"Arns", "AMZN", "amzn", "Paulo", "Ningxia", "ISOB", "isob", "AWSXML", "IMDSV",
1717
// AWS Signature
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#pragma once
7+
8+
#include <aws/core/utils/threading/Executor.h>
9+
10+
#include <aws/core/utils/memory/stl/AWSMap.h>
11+
12+
#include <atomic>
13+
#include <functional>
14+
#include <mutex>
15+
#include <thread>
16+
17+
namespace Aws
18+
{
19+
namespace Utils
20+
{
21+
namespace Threading
22+
{
23+
/**
24+
* Default Executor implementation. Simply spawns a thread and detaches it.
25+
*/
26+
class AWS_CORE_API DefaultExecutor : public Executor
27+
{
28+
public:
29+
DefaultExecutor() : m_state(State::Free) {}
30+
~DefaultExecutor();
31+
32+
void WaitUntilStopped() override;
33+
protected:
34+
enum class State
35+
{
36+
Free, Locked, Shutdown
37+
};
38+
bool SubmitToThread(std::function<void()>&&) override;
39+
void Detach(std::thread::id id);
40+
std::atomic<State> m_state;
41+
Aws::UnorderedMap<std::thread::id, std::thread> m_threads;
42+
};
43+
} // namespace Threading
44+
} // namespace Utils
45+
} // namespace Aws

src/aws-cpp-sdk-core/include/aws/core/utils/threading/Executor.h

+8-83
Original file line numberDiff line numberDiff line change
@@ -4,25 +4,19 @@
44
*/
55

66
#pragma once
7+
#if !defined(AWS_EXECUTOR_H)
8+
#define AWS_EXECUTOR_H
79

810
#include <aws/core/Core_EXPORTS.h>
9-
#include <aws/core/utils/memory/stl/AWSQueue.h>
10-
#include <aws/core/utils/memory/stl/AWSVector.h>
11-
#include <aws/core/utils/memory/stl/AWSMap.h>
12-
#include <aws/core/utils/threading/Semaphore.h>
11+
1312
#include <functional>
14-
#include <future>
15-
#include <mutex>
16-
#include <atomic>
1713

1814
namespace Aws
1915
{
2016
namespace Utils
2117
{
2218
namespace Threading
2319
{
24-
class ThreadTask;
25-
2620
/**
2721
* Interface for implementing an Executor, to implement a custom thread execution strategy, inherit from this class
2822
* and override SubmitToThread().
@@ -59,80 +53,11 @@ namespace Aws
5953
*/
6054
virtual bool SubmitToThread(std::function<void()>&&) = 0;
6155
};
62-
63-
64-
/**
65-
* Default Executor implementation. Simply spawns a thread and detaches it.
66-
*/
67-
class AWS_CORE_API DefaultExecutor : public Executor
68-
{
69-
public:
70-
DefaultExecutor() : m_state(State::Free) {}
71-
~DefaultExecutor();
72-
73-
void WaitUntilStopped() override;
74-
protected:
75-
enum class State
76-
{
77-
Free, Locked, Shutdown
78-
};
79-
bool SubmitToThread(std::function<void()>&&) override;
80-
void Detach(std::thread::id id);
81-
std::atomic<State> m_state;
82-
Aws::UnorderedMap<std::thread::id, std::thread> m_threads;
83-
};
84-
85-
enum class OverflowPolicy
86-
{
87-
QUEUE_TASKS_EVENLY_ACROSS_THREADS,
88-
REJECT_IMMEDIATELY
89-
};
90-
91-
/**
92-
* Thread Pool Executor implementation.
93-
*/
94-
class AWS_CORE_API PooledThreadExecutor : public Executor
95-
{
96-
public:
97-
PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS);
98-
~PooledThreadExecutor();
99-
100-
/**
101-
* Rule of 5 stuff.
102-
* Don't copy or move
103-
*/
104-
PooledThreadExecutor(const PooledThreadExecutor&) = delete;
105-
PooledThreadExecutor& operator =(const PooledThreadExecutor&) = delete;
106-
PooledThreadExecutor(PooledThreadExecutor&&) = delete;
107-
PooledThreadExecutor& operator =(PooledThreadExecutor&&) = delete;
108-
109-
/**
110-
* Call to ensure the threadpool can be safely destroyed. It blocks until all threads finished.
111-
*/
112-
void WaitUntilStopped() override;
113-
114-
protected:
115-
bool SubmitToThread(std::function<void()>&&) override;
116-
117-
private:
118-
Aws::Queue<std::function<void()>*> m_tasks;
119-
mutable std::mutex m_queueLock;
120-
Aws::Utils::Threading::Semaphore m_sync;
121-
Aws::Vector<ThreadTask*> m_threadTaskHandles;
122-
size_t m_poolSize = 0;
123-
OverflowPolicy m_overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS;
124-
bool m_stopped{false};
125-
126-
/**
127-
* Once you call this, you are responsible for freeing the memory pointed to by task.
128-
*/
129-
std::function<void()>* PopTask();
130-
bool HasTasks() const;
131-
132-
friend class ThreadTask;
133-
};
134-
135-
13656
} // namespace Threading
13757
} // namespace Utils
13858
} // namespace Aws
59+
60+
// TODO: remove on a next minor API bump from 1.11.x
61+
#endif // !defined(AWS_EXECUTOR_H)
62+
#include <aws/core/utils/threading/DefaultExecutor.h>
63+
#include <aws/core/utils/threading/PooledThreadExecutor.h>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#pragma once
7+
8+
#include <aws/core/utils/threading/Executor.h>
9+
10+
#include <aws/core/utils/memory/stl/AWSQueue.h>
11+
#include <aws/core/utils/memory/stl/AWSVector.h>
12+
#include <aws/core/utils/threading/Semaphore.h>
13+
#include <functional>
14+
#include <mutex>
15+
#include <atomic>
16+
17+
namespace Aws
18+
{
19+
namespace Utils
20+
{
21+
namespace Threading
22+
{
23+
class ThreadTask;
24+
25+
enum class OverflowPolicy
26+
{
27+
QUEUE_TASKS_EVENLY_ACROSS_THREADS,
28+
REJECT_IMMEDIATELY
29+
};
30+
31+
/**
32+
* Thread Pool Executor implementation.
33+
*/
34+
class AWS_CORE_API PooledThreadExecutor : public Executor
35+
{
36+
public:
37+
PooledThreadExecutor(size_t poolSize, OverflowPolicy overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS);
38+
~PooledThreadExecutor();
39+
40+
/**
41+
* Rule of 5 stuff.
42+
* Don't copy or move
43+
*/
44+
PooledThreadExecutor(const PooledThreadExecutor&) = delete;
45+
PooledThreadExecutor& operator =(const PooledThreadExecutor&) = delete;
46+
PooledThreadExecutor(PooledThreadExecutor&&) = delete;
47+
PooledThreadExecutor& operator =(PooledThreadExecutor&&) = delete;
48+
49+
/**
50+
* Call to ensure the threadpool can be safely destroyed. It blocks until all threads finished.
51+
*/
52+
void WaitUntilStopped() override;
53+
54+
protected:
55+
bool SubmitToThread(std::function<void()>&&) override;
56+
57+
private:
58+
Aws::Queue<std::function<void()>*> m_tasks;
59+
mutable std::mutex m_queueLock;
60+
Aws::Utils::Threading::Semaphore m_sync;
61+
Aws::Vector<ThreadTask*> m_threadTaskHandles;
62+
size_t m_poolSize = 0;
63+
OverflowPolicy m_overflowPolicy = OverflowPolicy::QUEUE_TASKS_EVENLY_ACROSS_THREADS;
64+
bool m_stopped{false};
65+
66+
/**
67+
* Once you call this, you are responsible for freeing the memory pointed to by task.
68+
*/
69+
std::function<void()>* PopTask();
70+
bool HasTasks() const;
71+
72+
friend class ThreadTask;
73+
};
74+
} // namespace Threading
75+
} // namespace Utils
76+
} // namespace Aws
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#include <aws/core/utils/threading/DefaultExecutor.h>
7+
#include <aws/core/utils/threading/ThreadTask.h>
8+
9+
#include <cassert>
10+
11+
using namespace Aws::Utils::Threading;
12+
13+
bool DefaultExecutor::SubmitToThread(std::function<void()>&& fx)
14+
{
15+
// Generalized lambda capture is C++14, using std::bind as a workaround to force moving fx (instead of copying)
16+
std::function<void()> main = std::bind(
17+
[this](std::function<void()>& storedFx)
18+
{
19+
storedFx();
20+
Detach(std::this_thread::get_id());
21+
},
22+
std::move(fx)
23+
);
24+
25+
State expected;
26+
do
27+
{
28+
expected = State::Free;
29+
if(m_state.compare_exchange_strong(expected, State::Locked))
30+
{
31+
std::thread t(std::move(main));
32+
const auto id = t.get_id(); // copy the id before we std::move the thread
33+
m_threads.emplace(id, std::move(t));
34+
m_state = State::Free;
35+
return true;
36+
}
37+
}
38+
while(expected != State::Shutdown);
39+
return false;
40+
}
41+
42+
void DefaultExecutor::Detach(std::thread::id id)
43+
{
44+
State expected;
45+
do
46+
{
47+
expected = State::Free;
48+
if(m_state.compare_exchange_strong(expected, State::Locked))
49+
{
50+
auto it = m_threads.find(id);
51+
assert(it != m_threads.end());
52+
it->second.detach();
53+
m_threads.erase(it);
54+
m_state = State::Free;
55+
return;
56+
}
57+
}
58+
while(expected != State::Shutdown);
59+
}
60+
61+
void DefaultExecutor::WaitUntilStopped()
62+
{
63+
auto expected = State::Free;
64+
while(!m_state.compare_exchange_strong(expected, State::Shutdown))
65+
{
66+
//spin while currently detaching threads finish
67+
assert(expected == State::Locked);
68+
expected = State::Free;
69+
}
70+
}
71+
72+
DefaultExecutor::~DefaultExecutor()
73+
{
74+
WaitUntilStopped();
75+
76+
auto it = m_threads.begin();
77+
while(!m_threads.empty())
78+
{
79+
it->second.join();
80+
it = m_threads.erase(it);
81+
}
82+
}

0 commit comments

Comments
 (0)