@@ -125,6 +125,7 @@ struct mqtt_sock_s {
125
125
nni_stat_item msg_send_drop ;
126
126
nni_stat_item msg_recv_drop ;
127
127
nni_stat_item msg_bytes_cached ;
128
+ nni_stat_item msg_sqlite_cached ;
128
129
#endif
129
130
};
130
131
@@ -206,17 +207,26 @@ mqtt_sock_init(void *arg, nni_sock *sock)
206
207
nni_stat_init (& s -> msg_recv_drop , & msg_recv_drop );
207
208
static const nni_stat_info msg_bytes_cached = {
208
209
.si_name = "mqtt_msg_bytes_cached" ,
209
- .si_desc = "cached msg payload size " ,
210
+ .si_desc = "total payload size of cached msg in memory " ,
210
211
.si_type = NNG_STAT_COUNTER ,
211
212
.si_unit = NNG_UNIT_BYTES ,
212
213
.si_atomic = true,
213
214
};
214
215
nni_stat_init (& s -> msg_bytes_cached , & msg_bytes_cached );
216
+ static const nni_stat_info msg_sqlite_cached = {
217
+ .si_name = "mqtt_msg_sqlite_cached" ,
218
+ .si_desc = "count all cached msg in sqlite" ,
219
+ .si_type = NNG_STAT_COUNTER ,
220
+ .si_unit = NNG_UNIT_BYTES ,
221
+ .si_atomic = true,
222
+ };
223
+ nni_stat_init (& s -> msg_sqlite_cached , & msg_sqlite_cached );
215
224
nni_sock_add_stat (s -> nsock , & s -> mqtt_reconnect );
216
225
nni_sock_add_stat (s -> nsock , & s -> msg_resend );
217
226
nni_sock_add_stat (s -> nsock , & s -> msg_send_drop );
218
227
nni_sock_add_stat (s -> nsock , & s -> msg_recv_drop );
219
228
nni_sock_add_stat (s -> nsock , & s -> msg_bytes_cached );
229
+ nni_sock_add_stat (s -> nsock , & s -> msg_sqlite_cached );
220
230
#endif
221
231
}
222
232
@@ -322,9 +332,9 @@ mqtt_sock_set_cached_byte(void *arg, const void *buf, size_t sz, nni_type t)
322
332
if ((rv = nni_copyin_int (& len , buf , sz ,
323
333
NANO_MAX_PACKET_SIZE_NEG , NANO_MAX_PACKET_SIZE , t )) == 0 ) {
324
334
#ifdef NNG_ENABLE_STATS
325
- if (len > 0 )
335
+ if (len > 0 ) {
326
336
nni_stat_inc (& s -> msg_bytes_cached , len );
327
- else if (len < 0 ) {
337
+ } else if (len < 0 ) {
328
338
len = - len ;
329
339
nni_stat_dec (& s -> msg_bytes_cached , len );
330
340
}
@@ -581,7 +591,8 @@ mqtt_send_msg(nni_aio *aio, mqtt_ctx_t *arg, mqtt_sock_t *s)
581
591
msg = sqlite_get_cache_msg (sqlite );
582
592
#ifdef NNG_ENABLE_STATS
583
593
nni_stat_inc (& s -> msg_resend , 1 );
584
- nni_stat_dec (& s -> msg_bytes_cached , nng_msg_len (msg ));
594
+ nni_stat_set_value (& s -> msg_sqlite_cached ,
595
+ sqlite_get_cache_msg_count (s -> sqlite_opt ));
585
596
#endif
586
597
}
587
598
#endif
@@ -776,8 +787,9 @@ mqtt_pipe_close(void *arg)
776
787
// flush to disk
777
788
if (!nni_lmq_empty (& p -> send_messages )) {
778
789
log_info ("cached msg into sqlite" );
779
- sqlite_flush_lmq (
780
- mqtt_sock_get_sqlite_option (s ), & p -> send_messages );
790
+ sqlite_flush_lmq (s -> sqlite_opt , & p -> send_messages );
791
+ nni_stat_set_value (& s -> msg_sqlite_cached ,
792
+ sqlite_get_cache_msg_count (s -> sqlite_opt ));
781
793
}
782
794
#endif
783
795
@@ -924,9 +936,11 @@ mqtt_timer_cb(void *arg)
924
936
if (NULL != (msg = sqlite_get_cache_msg (sqlite ))) {
925
937
p -> busy = true;
926
938
nni_aio_set_msg (& p -> send_aio , msg );
939
+ log_info ("resend sqlite msg" );
927
940
#ifdef NNG_ENABLE_STATS
928
941
nni_stat_inc (& s -> msg_resend , 1 );
929
- nni_stat_dec (& s -> msg_bytes_cached , nng_msg_len (msg ));
942
+ nni_stat_set_value (& s -> msg_sqlite_cached ,
943
+ sqlite_get_cache_msg_count (sqlite ));
930
944
#endif
931
945
nni_pipe_send (p -> pipe , & p -> send_aio );
932
946
nni_mtx_unlock (& s -> mtx );
@@ -1531,11 +1545,12 @@ mqtt_ctx_send(void *arg, nni_aio *aio)
1531
1545
nni_lmq_put (& sqlite -> offline_cache , msg );
1532
1546
if (nni_lmq_full (& sqlite -> offline_cache )) {
1533
1547
sqlite_flush_offline_cache (sqlite );
1534
- }
1535
- nni_mtx_unlock (& s -> mtx );
1536
1548
#ifdef NNG_ENABLE_STATS
1537
- nni_stat_inc (& s -> msg_bytes_cached , nng_msg_len (msg ));
1549
+ nni_stat_set_value (& s -> msg_sqlite_cached ,
1550
+ sqlite_get_cache_msg_count (sqlite ));
1538
1551
#endif
1552
+ }
1553
+ nni_mtx_unlock (& s -> mtx );
1539
1554
nni_aio_set_msg (aio , NULL );
1540
1555
nni_aio_finish_error (aio , NNG_ECLOSED );
1541
1556
return ;
0 commit comments