Skip to content

Commit fad9483

Browse files
committed
Fix race condition in PUB/SUB and other async reply commands where the client can be freed before our handler is executed on the client thread. When this occurs the client pointer is dangling
1 parent 6b6e6f6 commit fad9483

File tree

6 files changed

+113
-74
lines changed

6 files changed

+113
-74
lines changed

.vscode/settings.json

Lines changed: 0 additions & 56 deletions
This file was deleted.

src/ae.cpp

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,36 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
191191
}
192192
}
193193

194+
// Unlike write() this is an all or nothing thing. We will block if a partial write is hit
195+
ssize_t safe_write(int fd, const void *pv, size_t cb)
196+
{
197+
const char *pcb = (const char*)pv;
198+
ssize_t written = 0;
199+
do
200+
{
201+
ssize_t rval = write(fd, pcb, cb);
202+
if (rval > 0)
203+
{
204+
pcb += rval;
205+
cb -= rval;
206+
written += rval;
207+
}
208+
else if (errno == EAGAIN)
209+
{
210+
if (written == 0)
211+
break;
212+
// if we've already written something then we're committed so keep trying
213+
}
214+
else
215+
{
216+
if (rval == 0)
217+
return written;
218+
return rval;
219+
}
220+
} while (cb);
221+
return written;
222+
}
223+
194224
int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
195225
aeFileProc *proc, void *clientData, int fSynchronous)
196226
{
@@ -212,9 +242,10 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
212242
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::defer_lock);
213243
if (fSynchronous)
214244
cmd.pctl->mutexcv.lock();
215-
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
245+
auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
216246
if (size != sizeof(cmd))
217247
{
248+
AE_ASSERT(size == sizeof(cmd) || size <= 0);
218249
AE_ASSERT(errno == EAGAIN);
219250
ret = AE_ERR;
220251
}

src/debug.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ typedef ucontext_t sigcontext_t;
5555
#endif
5656
#endif
5757

58+
bool g_fInCrash = false;
59+
5860
/* ================================= Debugging ============================== */
5961

6062
/* Compute the sha1 of string at 's' with 'len' bytes long.
@@ -1356,6 +1358,7 @@ void dumpX86Calls(void *addr, size_t len) {
13561358

13571359
void sigsegvHandler(int sig, siginfo_t *info, void *secret) {
13581360
ucontext_t *uc = (ucontext_t*) secret;
1361+
g_fInCrash = true;
13591362
void *eip = getMcontextEip(uc);
13601363
sds infostring, clients;
13611364
struct sigaction act;

src/networking.cpp

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ client *createClient(int fd, int iel) {
174174
c->bufAsync = NULL;
175175
c->buflenAsync = 0;
176176
c->bufposAsync = 0;
177+
c->casyncOpsPending = 0;
177178
memset(c->uuid, 0, UUID_BINARY_LEN);
178179

179180
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
@@ -1003,7 +1004,6 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) {
10031004
serverLog(LL_WARNING,
10041005
"Error registering fd event for the new client: %s (fd=%d)",
10051006
strerror(errno),fd);
1006-
close(fd); /* May be already closed, just ignore errors */
10071007
return;
10081008
}
10091009

@@ -1266,17 +1266,17 @@ void unlinkClient(client *c) {
12661266
}
12671267
}
12681268

1269-
void freeClient(client *c) {
1269+
bool freeClient(client *c) {
12701270
listNode *ln;
12711271
serverAssert(c->fd == -1 || GlobalLocksAcquired());
12721272
AssertCorrectThread(c);
12731273
std::unique_lock<decltype(c->lock)> ulock(c->lock);
12741274

12751275
/* If a client is protected, yet we need to free it right now, make sure
12761276
* to at least use asynchronous freeing. */
1277-
if (c->flags & CLIENT_PROTECTED) {
1277+
if (c->flags & CLIENT_PROTECTED || c->casyncOpsPending) {
12781278
freeClientAsync(c);
1279-
return;
1279+
return false;
12801280
}
12811281

12821282
/* If it is our master that's beging disconnected we should make sure
@@ -1291,7 +1291,7 @@ void freeClient(client *c) {
12911291
CLIENT_BLOCKED)))
12921292
{
12931293
replicationCacheMaster(MasterInfoFromClient(c), c);
1294-
return;
1294+
return false;
12951295
}
12961296
}
12971297

@@ -1370,6 +1370,7 @@ void freeClient(client *c) {
13701370
ulock.unlock();
13711371
fastlock_free(&c->lock);
13721372
zfree(c);
1373+
return true;
13731374
}
13741375

13751376
/* Schedule a client to free it at a safe time in the serverCron() function.
@@ -1382,28 +1383,37 @@ void freeClientAsync(client *c) {
13821383
* may access the list while Redis uses I/O threads. All the other accesses
13831384
* are in the context of the main thread while the other threads are
13841385
* idle. */
1385-
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return;
1386+
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; // check without the lock first
13861387
std::lock_guard<decltype(c->lock)> clientlock(c->lock);
13871388
AeLocker lock;
13881389
lock.arm(c);
1390+
if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; // race condition after we acquire the lock
13891391
c->flags |= CLIENT_CLOSE_ASAP;
13901392
listAddNodeTail(g_pserver->clients_to_close,c);
13911393
}
13921394

13931395
void freeClientsInAsyncFreeQueue(int iel) {
1396+
serverAssert(GlobalLocksAcquired());
13941397
listIter li;
13951398
listNode *ln;
13961399
listRewind(g_pserver->clients_to_close,&li);
13971400

1398-
while((ln = listNext(&li))) {
1401+
// Store the clients in a temp vector since freeClient will modify this list
1402+
std::vector<client*> vecclientsFree;
1403+
while((ln = listNext(&li)))
1404+
{
13991405
client *c = (client*)listNodeValue(ln);
1400-
if (c->iel != iel)
1401-
continue; // wrong thread
1406+
if (c->iel == iel)
1407+
{
1408+
vecclientsFree.push_back(c);
1409+
listDelNode(g_pserver->clients_to_close, ln);
1410+
}
1411+
}
14021412

1413+
for (client *c : vecclientsFree)
1414+
{
14031415
c->flags &= ~CLIENT_CLOSE_ASAP;
14041416
freeClient(c);
1405-
listDelNode(g_pserver->clients_to_close,ln);
1406-
listRewind(g_pserver->clients_to_close,&li);
14071417
}
14081418
}
14091419

@@ -1551,6 +1561,15 @@ void ProcessPendingAsyncWrites()
15511561
std::lock_guard<decltype(c->lock)> lock(c->lock);
15521562

15531563
serverAssert(c->fPendingAsyncWrite);
1564+
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
1565+
{
1566+
c->bufposAsync = 0;
1567+
c->buflenAsync = 0;
1568+
zfree(c->bufAsync);
1569+
c->bufAsync = nullptr;
1570+
c->fPendingAsyncWrite = FALSE;
1571+
continue;
1572+
}
15541573

15551574
// TODO: Append to end of reply block?
15561575

@@ -1587,8 +1606,36 @@ void ProcessPendingAsyncWrites()
15871606
continue;
15881607

15891608
asyncCloseClientOnOutputBufferLimitReached(c);
1590-
if (aeCreateRemoteFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR)
1591-
continue; // We can retry later in the cron
1609+
if (c->flags & CLIENT_CLOSE_ASAP)
1610+
continue; // we will never write this so don't post an op
1611+
1612+
std::atomic_thread_fence(std::memory_order_seq_cst);
1613+
1614+
if (c->casyncOpsPending == 0)
1615+
{
1616+
if (FCorrectThread(c))
1617+
{
1618+
prepareClientToWrite(c, false); // queue an event
1619+
}
1620+
else
1621+
{
1622+
// We need to start the write on the client's thread
1623+
if (aePostFunction(g_pserver->rgthreadvar[c->iel].el, [c]{
1624+
// Install a write handler. Don't do the actual write here since we don't want
1625+
// to duplicate the throttling and safety mechanisms of the normal write code
1626+
std::lock_guard<decltype(c->lock)> lock(c->lock);
1627+
serverAssert(c->casyncOpsPending > 0);
1628+
c->casyncOpsPending--;
1629+
aeCreateFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, AE_WRITABLE|AE_WRITE_THREADSAFE, sendReplyToClient, c);
1630+
}, false) == AE_ERR
1631+
)
1632+
{
1633+
// Posting the function failed
1634+
continue; // We can retry later in the cron
1635+
}
1636+
++c->casyncOpsPending; // race is handled by the client lock in the lambda
1637+
}
1638+
}
15921639
}
15931640
}
15941641

@@ -1628,13 +1675,15 @@ int handleClientsWithPendingWrites(int iel) {
16281675
std::unique_lock<decltype(c->lock)> lock(c->lock);
16291676

16301677
/* Try to write buffers to the client socket. */
1631-
if (writeToClient(c->fd,c,0) == C_ERR) {
1678+
if (writeToClient(c->fd,c,0) == C_ERR)
1679+
{
16321680
if (c->flags & CLIENT_CLOSE_ASAP)
16331681
{
16341682
lock.release(); // still locked
16351683
AeLocker ae;
16361684
ae.arm(c);
1637-
freeClient(c); // writeToClient will only async close, but there's no need to wait
1685+
if (!freeClient(c)) // writeToClient will only async close, but there's no need to wait
1686+
c->lock.unlock(); // if we just got put on the async close list, then we need to remove the lock
16381687
}
16391688
continue;
16401689
}

src/pubsub.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ int clientSubscriptionsCount(client *c) {
143143
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
144144
* 0 if the client was already subscribed to that channel. */
145145
int pubsubSubscribeChannel(client *c, robj *channel) {
146+
serverAssert(GlobalLocksAcquired());
147+
serverAssert(c->lock.fOwnLock());
146148
dictEntry *de;
147149
list *clients = NULL;
148150
int retval = 0;
@@ -202,6 +204,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
202204

203205
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
204206
int pubsubSubscribePattern(client *c, robj *pattern) {
207+
serverAssert(GlobalLocksAcquired());
205208
int retval = 0;
206209

207210
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
@@ -244,6 +247,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
244247
/* Unsubscribe from all the channels. Return the number of channels the
245248
* client was subscribed to. */
246249
int pubsubUnsubscribeAllChannels(client *c, int notify) {
250+
serverAssert(GlobalLocksAcquired());
247251
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
248252
dictEntry *de;
249253
int count = 0;
@@ -262,6 +266,7 @@ int pubsubUnsubscribeAllChannels(client *c, int notify) {
262266
/* Unsubscribe from all the patterns. Return the number of patterns the
263267
* client was subscribed from. */
264268
int pubsubUnsubscribeAllPatterns(client *c, int notify) {
269+
serverAssert(GlobalLocksAcquired());
265270
listNode *ln;
266271
listIter li;
267272
int count = 0;
@@ -278,6 +283,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
278283

279284
/* Publish a message */
280285
int pubsubPublishMessage(robj *channel, robj *message) {
286+
serverAssert(GlobalLocksAcquired());
281287
int receivers = 0;
282288
dictEntry *de;
283289
listNode *ln;
@@ -293,6 +299,8 @@ int pubsubPublishMessage(robj *channel, robj *message) {
293299
listRewind(list,&li);
294300
while ((ln = listNext(&li)) != NULL) {
295301
client *c = reinterpret_cast<client*>(ln->value);
302+
if (c->flags & CLIENT_CLOSE_ASAP) // avoid blocking if the write will be ignored
303+
continue;
296304
fastlock_lock(&c->lock);
297305
addReplyPubsubMessage(c,channel,message);
298306
fastlock_unlock(&c->lock);
@@ -311,6 +319,8 @@ int pubsubPublishMessage(robj *channel, robj *message) {
311319
(char*)ptrFromObj(channel),
312320
sdslen(szFromObj(channel)),0))
313321
{
322+
if (pat->pclient->flags & CLIENT_CLOSE_ASAP)
323+
continue;
314324
fastlock_lock(&pat->pclient->lock);
315325
addReplyPubsubPatMessage(pat->pclient,
316326
pat->pattern,channel,message);

src/server.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,7 @@ typedef struct client {
925925
time_t lastinteraction; /* Time of the last interaction, used for timeout */
926926
time_t obuf_soft_limit_reached_time;
927927
std::atomic<int> flags; /* Client flags: CLIENT_* macros. */
928+
int casyncOpsPending;
928929
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
929930
int authenticated; /* Needed when the default user requires auth. */
930931
int replstate; /* Replication state if this is a slave. */
@@ -1694,7 +1695,7 @@ void redisSetProcTitle(const char *title);
16941695
/* networking.c -- Networking and Client related operations */
16951696
client *createClient(int fd, int iel);
16961697
void closeTimedoutClients(void);
1697-
void freeClient(client *c);
1698+
bool freeClient(client *c);
16981699
void freeClientAsync(client *c);
16991700
void resetClient(client *c);
17001701
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask);
@@ -2503,9 +2504,10 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len);
25032504
int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags);
25042505

25052506
int moduleGILAcquiredByModule(void);
2507+
extern bool g_fInCrash;
25062508
static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate
25072509
{
2508-
return aeThreadOwnsLock() || moduleGILAcquiredByModule();
2510+
return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash;
25092511
}
25102512

25112513
inline int ielFromEventLoop(const aeEventLoop *eventLoop)

0 commit comments

Comments
 (0)