-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathudpcom.hpp
267 lines (198 loc) · 8.99 KB
/
udpcom.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
/*
Socle - Socket Library Ecosystem
Copyright (c) 2014, Ales Stibal <[email protected]>, All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 3.0 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
See the GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library.
*/
#ifndef UDPCOM_HPP
# define UDPCOM_HPP
#include <string>
#include <array>
#include <optional>
#include <cstring>
#include <ctime>
#include <csignal>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>
#include <unistd.h>
#include <buffer.hpp>
#include <log/logger.hpp>
#include <basecom.hpp>
#include <baseproxy.hpp>
#include <linux/ipv6.h>
// If including linux/ipv6.h fails, use these constants as a dirty trick to make it work
// constant value should not change, but they may eventually, you have been warned.
//#define IPV6_ORIGDSTADDR 74
//#define IPV6_RECVORIGDSTADDR IPV6_ORIGDSTADDR
struct Datagram {
Datagram() = default;
Datagram(Datagram const& r): dst(r.dst), src(r.src), socket_left(r.socket_left), reuse(r.reuse), cx(r.cx), rx_queue(r.rx_queue) {}
Datagram& operator=(Datagram const& r) {
assign(r);
return *this;
}
void assign(Datagram const& r) {
dst = r.dst;
src = r.src;
socket_left = r.socket_left;
reuse = r.reuse;
cx = r.cx;
rx_queue = r.rx_queue;
}
sockaddr_storage dst{};
sockaddr_storage src{};
std::optional<int> socket_left;
bool reuse = false; // make this true if there is e.g. clash and closed CX/Com should not
// trigger its removal from the pool: com()->close() will otherwise
// erase it.
// It's toggle type, whenever used, it should be again set to false,
// in order to be deleted once in the future.
baseHostCX* cx = nullptr;
std::array<buffer,5> rx_queue;
mutable std::mutex rx_queue_lock;
inline size_t queue_bytes() const {
size_t elem_bytes = 0;
for(auto const& r: rx_queue) {
if (!r.empty()) {
elem_bytes += r.size();
}
}
return elem_bytes;
}
size_t queue_bytes_l() const {
auto l_ = std::scoped_lock(rx_queue_lock);
return queue_bytes();
}
inline bool empty() const {
return (queue_bytes() == 0);
}
inline bool empty_l() const {
return (queue_bytes_l() == 0);
}
inline size_t enqueue(unsigned char* data, size_t len) {
for(auto& elem: rx_queue) {
if(elem.empty()) {
elem.append(data, len);
return len;
}
}
return 0;
}
inline sockaddr_in* src_sockaddr_in() { return (sockaddr_in*)&src; }
inline sockaddr_in6* src_sockaddr_in6() { return (sockaddr_in6*)&src; }
inline bool src_ipv4() const { return src.ss_family == AF_INET; }
inline bool src_ipv6() const { return src.ss_family == AF_INET6; }
inline in_addr& src_in_addr4() { return src_sockaddr_in()->sin_addr; };
inline in6_addr& src_in_addr6() { return src_sockaddr_in6()->sin6_addr; };
inline unsigned short src_port4() { return src_sockaddr_in()->sin_port; }
inline unsigned short src_port6() { return src_sockaddr_in6()->sin6_port; }
inline sa_family_t src_family() { return src.ss_family; }
inline sockaddr_in* dst_sockaddr_in() { return (sockaddr_in*)&dst; }
inline sockaddr_in6* dst_sockaddr_in6() { return (sockaddr_in6*)&dst; }
inline bool dst_ipv4() const { return dst.ss_family == AF_INET; }
inline bool dst_ipv6() const { return dst.ss_family == AF_INET6; }
inline in_addr& dst_in_addr4() { return dst_sockaddr_in()->sin_addr; };
inline in6_addr& dst_in_addr6() { return dst_sockaddr_in6()->sin6_addr; };
inline unsigned short dst_port4() { return dst_sockaddr_in()->sin_port; }
inline unsigned short dst_port6() { return dst_sockaddr_in6()->sin6_port; }
inline sa_family_t dst_family() { return dst.ss_family; }
};
class DatagramCom {
public:
std::recursive_mutex lock;
std::map<uint64_t,std::shared_ptr<Datagram>> datagrams_received;
// set with all virtual sockets which have data to read
epoll::set_type in_virt_set;
};
class UDPCom : public virtual baseCom {
// create on demand
static inline std::shared_ptr<DatagramCom> datagram_com_static_;
mutable std::shared_ptr<DatagramCom> datagram_com_;
public:
// if someone needs external access, create reference!
static std::shared_ptr<DatagramCom> datagram_com_static();
std::shared_ptr<DatagramCom> datagram_com() const;
UDPCom();
void init(baseHostCX* owner) override;
baseCom* replicate() override { return new UDPCom(); };
int connect(const char* host, const char* port) override;
int bind(unsigned short port) override;
int bind([[maybe_unused]] const char* path) override { return -1; };
int accept ( int sockfd, sockaddr* addr, socklen_t* addrlen_ ) override;
int translate_socket(int vsock) const override;
bool in_readset(int s) override;
bool in_writeset(int s) override;
virtual bool in_exset(int s);
int poll() override ;
ssize_t read(int _fd, void* _buf, size_t _n, int _flags) override;
virtual int read_from_pool(int _fd, void* _buf, size_t _n, int _flags);
virtual ssize_t recv(int _fd, void* _buf, size_t _n, int _flags) { return ::recv(_fd, _buf, _n, _flags); }
ssize_t peek(int _fd, void* _buf, size_t _n, int _flags) override { return read(_fd, _buf, _n, static_cast<uint8_t>(_flags) | MSG_PEEK );};
ssize_t write(int _fd, const void* _buf, size_t _n, int _flags) override;
virtual ssize_t write_to_pool(int _fd, const void* _buf, size_t _n, int _flags);
int kill_socket(int fd);
size_t kill_and_deref_from_connnect(std::string const& key);
int remove_datagram_entry(int fd);
void shutdown(int _fd) override;
void cleanup() override {};
bool is_connected(int s) override;
virtual bool resolve_nonlocal_socket(int sock);
bool resolve_socket(bool source, int s, std::string* target_host, std::string* target_port, sockaddr_storage* target_storage) override;
struct embryon {
uint32_t id = 0; // is it a new connection? If non-zero, we should look in datagram store before reading real
// sockets. After all datagram early data are processed, we should set it to 0
// and not read from store anymore
bool pool_depleted = false; // should we read from pool, or we already depleted it? It's cache value to not check pool again.
};
embryon embryonics() const { return embryonics_; };
embryon& embryonics() { return embryonics_; };
embryon embryonics(uint32_t n, bool p) { auto tmp = embryonics_; embryonics_ = { .id = n, .pool_depleted = p }; return tmp; };
protected:
embryon embryonics_= {0, false };
unsigned int bind_sock_family = AF_INET6;
int bind_sock_type = SOCK_DGRAM;
int bind_sock_protocol = IPPROTO_UDP;
sockaddr_storage udpcom_addr {};
socklen_t udpcom_addrlen {0};
public:
// Connection socket pool
//
// If the same source IP:PORT connection is already in place
// transparent bind to source IP:PORT fails, delaying DNS resolution.
// this connection database maintains opened sockets, which will be reused.
// Since we don't want one Com to close another Com opened socket,
// implement value as tuple of <fd,refcount>.
struct ConnectionsCache {
ConnectionsCache(UDPCom& slf): self(slf) {};
UDPCom& self;
std::optional<std::string> my_key;
std::optional<std::string> gen_cache_key (int _fd);
std::optional<std::string> gen_cache_key (const char *host, const char *port);
using fd_counter_type = std::pair<int,int>;
using map_type = mp::map<std::string, fd_counter_type, std::less<>>;
static inline map_type cache;
static inline std::recursive_mutex lock;
};
ConnectionsCache connections;
// allow older kernels to use UDP -- we have to set bind_sock_family to IPv4 variant
static inline unsigned int default_sock_family = AF_INET6;
std::string to_string(int verbosity) const override { return c_type(); }
std::string shortname() const override { static std::string s("udp"); return s; }
TYPENAME_OVERRIDE("UDPCom")
DECLARE_LOGGING(to_string)
private:
logan_lite log {"com.udp"};
};
#endif