Skip to content

Commit

Permalink
smax-lazy: Tweaks to cache access
Browse files Browse the repository at this point in the history
  • Loading branch information
attipaci committed Feb 9, 2025
1 parent 603c426 commit ffc357b
Showing 1 changed file with 65 additions and 30 deletions.
95 changes: 65 additions & 30 deletions src/smax-lazy.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static boolean DestroyMonitorAsync(LazyMonitor *m);
static int GetChannelLookupIndex(const char *channel);
static __inline__ int GetTableIndex(const LazyMonitor *m);
static LazyMonitor *GetMonitorAsync(const char *table, const char *key);
static LazyMonitor *GetSpecificMonitorAsync(const char *table, const char *key);
static LazyMonitor *GetExistingMonitorAsync(const char *table, const char *key);
static void ProcessLazyUpdates(const char *pattern, const char *channel, const char *msg, long length);

/**
Expand Down Expand Up @@ -120,6 +120,7 @@ static void ApplyUpdateAsync(LazyMonitor *update, LazyMonitor *m) {
m->meta = update->meta;
m->updateTime = time(NULL);
m->isCurrent = TRUE;
m->isPending = FALSE;
pthread_mutex_unlock(&dataLock);

// we'll destroy the old data / meta with the update!
Expand All @@ -140,12 +141,11 @@ static void ApplyUpdate(void *arg) {
if(!update) return;

pthread_mutex_lock(&monitorLock);
m = GetSpecificMonitorAsync(update->table, update->key);
m = GetExistingMonitorAsync(update->table, update->key);
pthread_mutex_unlock(&monitorLock);

if(m) {
ApplyUpdateAsync(update, m);
m->isPending = FALSE;
Release(m);
}

Expand Down Expand Up @@ -188,15 +188,16 @@ static LazyMonitor *CreateStaging(const LazyMonitor *m) {
* @param m Pointer to a lazy monitor datum.
* @return X_SUCCESS (0) if successfull or else an error (<0) from smaxPull().
*/
static int UpdateCachedAsync(LazyMonitor *m, boolean background) {
static const char *fn = "UpdateCachedAsync";
static int QueueUpdateAsync(LazyMonitor *m) {
static const char *fn = "QueueUpdateAsync";

LazyMonitor *staging;
XType type;
int status = X_SUCCESS;
void *ptr;
int status = X_SUCCESS;

if(!m) return x_error(X_NULL, EINVAL, fn, "input parameter 'm' is NULL");
if(!m->isPending) return X_SUCCESS;

staging = CreateStaging(m);
if(!staging) return x_trace(fn, NULL, X_NULL);
Expand All @@ -210,20 +211,48 @@ static int UpdateCachedAsync(LazyMonitor *m, boolean background) {
ptr = staging->data;
}

if(background && smaxIsPipelined() && !m->isPending) {
m->isPending = TRUE;
status = smaxQueue(m->table, m->key, type, 1, ptr, staging->meta);
if(!status) {
m->users++;
smaxQueueCallback(ApplyUpdate, staging);
}
m->isPending = TRUE;
status = smaxQueue(m->table, m->key, type, 1, ptr, staging->meta);
if(!status) {
m->users++;
smaxQueueCallback(ApplyUpdate, staging);
}

return X_SUCCESS;
}

/**
* Updates the monitored data in the cache, by pulling from SMA-X. The update is essentially atomic
* as it happens with a single reassignment of a pointer.
*
* @param m Pointer to a lazy monitor datum.
* @return X_SUCCESS (0) if successfull or else an error (<0) from smaxPull().
*/
static int UpdateCachedAsync(LazyMonitor *m) {
static const char *fn = "UpdateCachedAsync";
LazyMonitor *staging;
XType type;
int status = X_SUCCESS;
void *ptr;

if(!m) return x_error(X_NULL, EINVAL, fn, "input parameter 'm' is NULL");

staging = CreateStaging(m);
if(!staging) return x_trace(fn, NULL, X_NULL);

if(m->key) {
type = X_RAW;
ptr = &staging->data;
}
else {
status = smaxPull(m->table, m->key, type, 1, ptr, staging->meta);
if(!status) ApplyUpdateAsync(staging, m);
DestroyMonitorAsync(staging);
type = X_STRUCT;
ptr = staging->data;
}

status = smaxPull(m->table, m->key, type, 1, ptr, staging->meta);
if(!status) ApplyUpdateAsync(staging, m);
DestroyMonitorAsync(staging);

prop_error(fn, status);
return X_SUCCESS;
}
Expand Down Expand Up @@ -306,7 +335,7 @@ static LazyMonitor *GetCreateMonitor(const char *table, const char *key, XType t

pthread_mutex_lock(&monitorLock);

m = GetSpecificMonitorAsync(lazytab, key);
m = GetExistingMonitorAsync(lazytab, key);
if(!m) m = CreateMonitorAsync(lazytab, key, type, withMeta);

pthread_mutex_unlock(&monitorLock);
Expand All @@ -319,7 +348,7 @@ static LazyMonitor *GetCreateMonitor(const char *table, const char *key, XType t
return m;
}

static int FetchData(LazyMonitor *m, XType type, int count, void *value, XMeta *meta) {
static int FetchDataAsync(LazyMonitor *m, XType type, int count, void *value, XMeta *meta) {
static const char *fn = "FetchData";

int status = X_SUCCESS;
Expand All @@ -328,9 +357,9 @@ static int FetchData(LazyMonitor *m, XType type, int count, void *value, XMeta *
// Update monitor to include metadata and pull to set it.
m->meta = (XMeta *) calloc(1, sizeof(XMeta));
x_check_alloc(m->meta);
status = UpdateCachedAsync(m, FALSE);
status = UpdateCachedAsync(m);
}
else if(!m->isCurrent && !m->isCached) status = UpdateCachedAsync(m, FALSE);
else if(!m->isCurrent) status = (m->isCached && smaxIsPipelined()) ? QueueUpdateAsync(m) : UpdateCachedAsync(m);

xvprintf("SMA-X: Lazy pull %s:%s (status=%d)\n", m->table, m->key ? m->key : "", status);

Expand All @@ -348,8 +377,6 @@ static int FetchData(LazyMonitor *m, XType type, int count, void *value, XMeta *
pthread_mutex_unlock(&dataLock);
}

Release(m);

prop_error(fn, status);
return X_SUCCESS;
}
Expand Down Expand Up @@ -377,10 +404,11 @@ int smaxLazyCache(const char *table, const char *key, XType type) {
m = GetCreateMonitor(table, key, type, TRUE);
if(!m) return x_trace("smaxLazyCache", NULL, X_NO_SERVICE);

UpdateCachedAsync(m);
m->isCached = TRUE;
UpdateCachedAsync(m, FALSE);
Release(m);


return X_SUCCESS;
}

Expand Down Expand Up @@ -410,8 +438,9 @@ int smaxGetLazyCached(const char *table, const char *key, XType type, int count,
m = GetCreateMonitor(table, key, type, meta != NULL);
if(!m) return x_trace(fn, NULL, X_NO_SERVICE);

status = FetchData(m, type, count, value, meta);
m->isCached = TRUE; // Set after the first non-mirrored fetch...
status = FetchDataAsync(m, type, count, value, meta);
m->isCached = TRUE; // Set after the first non-cached fetch...
Release(m);

prop_error(fn, status);
return X_SUCCESS;
Expand Down Expand Up @@ -446,13 +475,17 @@ int smaxLazyPull(const char *table, const char *key, XType type, int count, void
static const char *fn = "smaxLazyPull";

LazyMonitor *m;
int status;

if(!value) return x_error(X_NULL, EINVAL, fn, "value is NULL");

m = GetCreateMonitor(table, key, type, meta != NULL);
if(!m) return x_trace(fn, NULL, X_NO_SERVICE);

prop_error(fn, FetchData(m, type, count, value, meta));
status = FetchDataAsync(m, type, count, value, meta);
Release(m);

prop_error(fn, status);
return X_SUCCESS;
}

Expand Down Expand Up @@ -860,7 +893,7 @@ static __inline__ int GetTableIndex(const LazyMonitor *m) {
*
* \sa Release()
*/
static LazyMonitor *GetSpecificMonitorAsync(const char *table, const char *key) {
static LazyMonitor *GetExistingMonitorAsync(const char *table, const char *key) {
LazyMonitor *m;

m = monitorTable[GetLookupIndex(table, key)];
Expand Down Expand Up @@ -891,12 +924,12 @@ static LazyMonitor *GetSpecificMonitorAsync(const char *table, const char *key)
* \sa Release()
*/
static LazyMonitor *GetMonitorAsync(const char *table, const char *key) {
LazyMonitor *m = GetSpecificMonitorAsync(table, key);
LazyMonitor *m = GetExistingMonitorAsync(table, key);
if(!m) {
// Try as struct...
char *id = xGetAggregateID(table, key);
if(!id) return NULL;
m = GetSpecificMonitorAsync(id, NULL);
m = GetExistingMonitorAsync(id, NULL);
free(id);
}
return m;
Expand Down Expand Up @@ -953,7 +986,9 @@ static void ProcessLazyUpdates(const char *pattern, const char *channel, const c
RemoveMonitorAsync(m);
DestroyMonitorAsync(m);
}
else if(m->isCached) UpdateCachedAsync(m, TRUE); // queue for a background update.
else if(m->isCached) {
QueueUpdateAsync(m); // queue for a background update.
}

// We found the match and dealt with it. Done with this particular ID.
break;
Expand Down

0 comments on commit ffc357b

Please sign in to comment.