-
Notifications
You must be signed in to change notification settings - Fork 2
/
scheduler.h
197 lines (166 loc) · 6.58 KB
/
scheduler.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
* A library for working with DVSI's AMBE vocoder chips
*
* Copyright (C) 2019-2020 Internet Real-Time Lab, Columbia University
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
#pragma once
#include <sys/types.h>
#include <thread>
#include <future>
#include <optional>
#include <unordered_map>
#include "device.h"
#include "queue.h"
#include "packet.h"
using namespace std;
namespace ambe {
class TaggingDevice;
class FifoDevice;
typedef function<void (const Packet& packet)> ResponseCallback;
typedef tuple<Packet, optional<ResponseCallback>> State;
/**
* AMBE request scheduler base class
*
* The AMBE chip sends a response upon each request it receives. Since
* requests and responses carry no data that would allow correlating them,
* the AMBE chip must send responses in the same order in which it received
* the requests. Processing a request takes non-trivial amount of time and
* there is an input buffer with limited size in the chip.
*
* An AMBE request scheduler determines the order in which requests are sent
* to the AMBE chip. The scheduler's goal is to maximize utilization of all
* channels within the chip, while keeping the input buffer size manageable.
* The scheduler buffers requests from clients and determines the best order
* in which the requests can be sent to the AMBE chip.
*/
class Scheduler {
public:
virtual ~Scheduler() {}
/**
* Start the scheduler
*
* Most scheduler implementations use a background thread which is
* created in the start method. Here, the scheduler will also subscribe
* to receive responses from the device.
*/
virtual void start() {}
/**
* Stop the scheduler
*
* If the scheduler created a thread, this is where it will be stopped.
* Also unsubscribe from the AMBE device in this method.
*
* This method is meant to terminate the scheduler cleanly, waiting for
* any requests submitted prior to the stop request to terminate.
*/
virtual void stop() {}
/**
* Submit a request to the device, receive response via the future object
*
* Use this method to send a request to the device. The future value
* will be set once a response has been received from the device. The
* future value will be fullfilled with an exception if the request
* cannot be sent to the device.
*
* All implementations of the request method must be non-blocking. In
* particular, the method must not wait for the packet to be written to
* the device, i.e., the write must be performed in the background.
*
* NOTE: This method cannot be used to send requests for which the AMBE
* dongle generates no response (there are a couple).
*/
virtual future<Packet> submit(const Packet& packet);
virtual void submitAsync(const Packet& packet, ResponseCallback callback) = 0;
};
/**
* A simple First In First Out (FIFO) request scheduler
*
* This class implements the simplest possible request scheduler for AMBE
* devices. The scheduler sends packets to the device in the order in which
* they arrive and it assumes that the AMBE device will generate reponses in
* the same order. Internally, the scheduler keeps a queue of submitted
* requests and whenever a packet is received from the device, it is
* associated with the request packet at the front of the queue.
*
* The future value may be satisfied with an exception if the scheduler
* fails to write the packet to the device.
*/
class FifoScheduler final : public Scheduler {
public:
FifoScheduler(TaggingDevice& device);
virtual void start() override;
virtual void stop() override;
void submitAsync(const Packet& packet, ResponseCallback callback) override;
private:
void recv(int32_t tag, const string& packet);
TaggingDevice& device;
int32_t tag;
std::mutex mutex;
unordered_map<int32_t, ResponseCallback> submitted;
bool quit;
promise<void> terminated;
};
/**
* A request scheduler for AMBE devices with multiple channels (pipelines)
*
* This is a request scheduler for DVSI's AMBE-3000 and AMBE-3003 devices.
* These devices have two independent CPU cores per channel (2 and 6 CPU
* cores respectively).
*
* This request scheduler maintains one queue per CPU core, plus one extra
* queue for device control requests. Incoming packets are assigned the
* queue corresponding to their channel and type of operation (compression /
* decompression). When selecting the next request to send to the device,
* the scheduler picks a queue to maximize the utilization of all CPU cores,
* making sure that the input buffer remains filled but not overloaded.
*
* Control requests that operate on the entire device (as opposed to a
* single channel) are prioritized and sent to the device as soon as space
* in the device's input buffer is available.
*/
class MultiQueueScheduler final : public Scheduler {
public:
static const unsigned int queues_per_channel = 2;
static const unsigned int max_channels = 3;
MultiQueueScheduler(FifoDevice& device, unsigned int channels);
void start() override;
void stop() override;
void submitAsync(const Packet& packet, ResponseCallback callback) override;
private:
void recv(const string& packet);
void run();
unsigned int queued() const;
int queueIndex(const Packet& request) const;
unsigned int typeIndex(const Packet& request) const;
bool canSend(const Packet& request) const;
FifoDevice& device;
thread runner;
SyncQueue<State> process;
// A separate high-priority queue for AMBE device control requests.
queue<State> device_queue;
unsigned int channels;
vector<queue<State>> channel_queue;
// A queue of requests that have been submitted to the AMBE device but
// for which we have not received a response yet.
queue<State> submitted;
// The number of requests on the submitted queue broken down by packet
// type.
array<unsigned int, (int)PacketType::MAX> submitted_by_type{0};
// The number of requests on the submitted queue broken down by channel
// queues.
vector<unsigned int> submitted_by_queue;
};
}