From 8e2e449f83b675be37eddacd69294146b2eb7db1 Mon Sep 17 00:00:00 2001 From: Eric Blankenhorn Date: Sat, 4 Aug 2018 16:20:32 -0500 Subject: [PATCH] Max QoS, Retain Avail, Reason Strings, and Server Disconnect --- examples/mqttclient/mqttclient.c | 20 +++++- src/mqtt_client.c | 114 +++++++++++++++++++++++-------- src/mqtt_packet.c | 39 ++++++++++- wolfmqtt/mqtt_client.h | 8 ++- wolfmqtt/mqtt_packet.h | 1 + wolfmqtt/mqtt_types.h | 2 +- 6 files changed, 149 insertions(+), 35 deletions(-) diff --git a/examples/mqttclient/mqttclient.c b/examples/mqttclient/mqttclient.c index 5a3e0be9b..ac2a4fbd9 100644 --- a/examples/mqttclient/mqttclient.c +++ b/examples/mqttclient/mqttclient.c @@ -121,15 +121,18 @@ static int mqtt_property_cb(MqttClient *client, MqttProp *head, void *ctx) prop->data_str.str, prop->data_str.len); break; + case MQTT_PROP_SUBSCRIPTION_ID_AVAIL: ((MQTTCtx*)client->ctx)->subId_not_avail = prop->data_byte == 0; break; + case MQTT_PROP_TOPIC_ALIAS_MAX: ((MQTTCtx*)client->ctx)->topic_alias_max = (((MQTTCtx*)client->ctx)->topic_alias_max < prop->data_short) ? ((MQTTCtx*)client->ctx)->topic_alias_max : prop->data_short; break; + case MQTT_PROP_MAX_PACKET_SZ: if ((prop->data_int > 0) && (prop->data_int <= MQTT_PACKET_SZ_MAX)) @@ -143,9 +146,23 @@ static int mqtt_property_cb(MqttClient *client, MqttProp *head, void *ctx) rc = MQTT_CODE_ERROR_PROPERTY; } break; + case MQTT_PROP_SERVER_KEEP_ALIVE: ((MQTTCtx*)client->ctx)->keep_alive_sec = prop->data_short; break; + + case MQTT_PROP_MAX_QOS: + client->max_qos = prop->data_byte; + break; + + case MQTT_PROP_RETAIN_AVAIL: + client->retain_avail = prop->data_byte; + break; + + case MQTT_PROP_REASON_STR: + PRINTF("Reason String: %s", prop->data_str.str); + break; + case MQTT_PROP_PLAYLOAD_FORMAT_IND: case MQTT_PROP_MSG_EXPIRY_INTERVAL: case MQTT_PROP_CONTENT_TYPE: @@ -156,9 +173,6 @@ static int mqtt_property_cb(MqttClient *client, MqttProp *head, void *ctx) case MQTT_PROP_TOPIC_ALIAS: case MQTT_PROP_TYPE_MAX: case MQTT_PROP_RECEIVE_MAX: - case MQTT_PROP_MAX_QOS: - case MQTT_PROP_RETAIN_AVAIL: - case MQTT_PROP_REASON_STR: case MQTT_PROP_USER_PROP: case MQTT_PROP_WILDCARD_SUB_AVAIL: case MQTT_PROP_SHARED_SUBSCRIPTION_AVAIL: diff --git a/src/mqtt_client.c b/src/mqtt_client.c index f45cfda53..54818d8f9 100644 --- a/src/mqtt_client.c +++ b/src/mqtt_client.c @@ -48,20 +48,24 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (p_decode) { p_connect_ack = (MqttConnectAck*)p_decode; } +#ifdef WOLFMQTT_V5 + p_connect_ack->props = NULL; +#endif rc = MqttDecode_ConnectAck(client->rx_buf, client->packet.buf_len, p_connect_ack); -#ifdef WOLFMQTT_PROPERTY_CB if (rc > 0) { +#ifdef WOLFMQTT_PROPERTY_CB /* Check for properties set by the server */ if (client->property_cb) { rc = client->property_cb(client, p_connect_ack->props, client->disconnect_ctx); - - /* Free the properties */ - MqttProps_Free(p_connect_ack->props); } - } #endif +#ifdef WOLFMQTT_V5 + /* Free the properties */ + MqttProps_Free(p_connect_ack->props); +#endif + } break; } case MQTT_PACKET_TYPE_PUBLISH: @@ -69,6 +73,9 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, byte msg_done; if (msg->buffer_new) { +#ifdef WOLFMQTT_V5 + msg->props = NULL; +#endif /* Decode publish message */ rc = MqttDecode_Publish(client->rx_buf, client->packet.buf_len, msg); if (rc <= 0) { @@ -79,10 +86,11 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (client->property_cb) { rc = client->property_cb(client, msg->props, client->disconnect_ctx); - - /* Free the properties */ - MqttProps_Free(msg->props); } +#endif +#ifdef WOLFMQTT_V5 + /* Free the properties */ + MqttProps_Free(msg->props); #endif } @@ -184,7 +192,9 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (p_decode) { p_publish_resp = (MqttPublishResp*)p_decode; } - +#ifdef WOLFMQTT_V5 + p_publish_resp->props = NULL; +#endif /* Decode publish response message */ rc = MqttDecode_PublishResp(client->rx_buf, client->packet.buf_len, msg->type, p_publish_resp); @@ -198,11 +208,12 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (client->property_cb) { rc = client->property_cb(client, p_publish_resp->props, client->disconnect_ctx); - - /* Free the properties */ - MqttProps_Free(p_publish_resp->props); } #endif +#ifdef WOLFMQTT_V5 + /* Free the properties */ + MqttProps_Free(p_publish_resp->props); +#endif /* If Qos then send response */ if (msg->type == MQTT_PACKET_TYPE_PUBLISH_REC || @@ -231,6 +242,9 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (p_decode) { p_subscribe_ack = (MqttSubscribeAck*)p_decode; } +#ifdef WOLFMQTT_V5 + p_subscribe_ack->props = NULL; +#endif rc = MqttDecode_SubscribeAck(client->rx_buf, client->packet.buf_len, p_subscribe_ack); if (rc <= 0) { @@ -243,11 +257,12 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (client->property_cb) { rc = client->property_cb(client, p_subscribe_ack->props, client->disconnect_ctx); - - /* Free the properties */ - MqttProps_Free(p_subscribe_ack->props); } #endif +#ifdef WOLFMQTT_V5 + /* Free the properties */ + MqttProps_Free(p_subscribe_ack->props); +#endif break; } @@ -260,6 +275,9 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (p_decode) { p_unsubscribe_ack = (MqttUnsubscribeAck*)p_decode; } +#ifdef WOLFMQTT_V5 + p_unsubscribe_ack->props = NULL; +#endif rc = MqttDecode_UnsubscribeAck(client->rx_buf, client->packet.buf_len, p_unsubscribe_ack); if (rc <= 0) { @@ -272,12 +290,12 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, if (client->property_cb) { rc = client->property_cb(client, p_unsubscribe_ack->props, client->disconnect_ctx); - - /* Free the properties */ - MqttProps_Free(p_unsubscribe_ack->props); } #endif - +#ifdef WOLFMQTT_V5 + /* Free the properties */ + MqttProps_Free(p_unsubscribe_ack->props); +#endif break; } case MQTT_PACKET_TYPE_PING_RESP: @@ -291,21 +309,45 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg, { MqttAuth auth, *p_auth = &auth; + p_auth->props = NULL; /* Decode authorization */ rc = MqttDecode_Auth(client->rx_buf, client->packet.buf_len, p_auth); -#ifdef WOLFMQTT_PROPERTY_CB if (rc > 0) { +#ifdef WOLFMQTT_PROPERTY_CB /* Check for properties set by the server */ if (client->property_cb) { rc = client->property_cb(client, p_auth->props, client->disconnect_ctx); - - /* Free the properties */ - MqttProps_Free(p_auth->props); } +#endif + /* Free the properties */ + MqttProps_Free(p_auth->props); } + + break; + } + + case MQTT_PACKET_TYPE_DISCONNECT: + { + MqttDisconnect disc, *p_disc = &disc; + + p_disc->props = NULL; + + /* Decode disconnect */ + rc = MqttDecode_Disconnect(client->rx_buf, client->packet.buf_len, + p_disc); + if (rc > 0) { +#ifdef WOLFMQTT_PROPERTY_CB + /* Check for properties set by the server */ + if (client->property_cb) { + rc = client->property_cb(client, p_disc->props, + client->disconnect_ctx); + } #endif + /* Free the properties */ + MqttProps_Free(p_disc->props); + } break; } @@ -448,6 +490,10 @@ int MqttClient_Init(MqttClient *client, MqttNet* net, client->rx_buf = rx_buf; client->rx_buf_len = rx_buf_len; client->cmd_timeout_ms = cmd_timeout_ms; +#ifdef WOLFMQTT_V5 + client->max_qos = MQTT_QOS_2; + client->retain_avail = 1; +#endif /* Init socket */ rc = MqttSocket_Init(client, net); @@ -456,13 +502,13 @@ int MqttClient_Init(MqttClient *client, MqttNet* net, } #ifdef WOLFMQTT_DISCONNECT_CB -int MqttClient_SetDisconnectCallback(MqttClient *client, MqttDisconnectCb cb, +int MqttClient_SetDisconnectCallback(MqttClient *client, MqttDisconnectCb discCb, void* ctx) { if (client == NULL) return MQTT_CODE_ERROR_BAD_ARG; - client->disconnect_cb = cb; + client->disconnect_cb = discCb; client->disconnect_ctx = ctx; return MQTT_CODE_SUCCESS; @@ -470,13 +516,13 @@ int MqttClient_SetDisconnectCallback(MqttClient *client, MqttDisconnectCb cb, #endif #ifdef WOLFMQTT_PROPERTY_CB -int MqttClient_SetPropertyCallback(MqttClient *client, MqttPropertyCb cb, +int MqttClient_SetPropertyCallback(MqttClient *client, MqttPropertyCb propCb, void* ctx) { if (client == NULL) return MQTT_CODE_ERROR_BAD_ARG; - client->property_cb = cb; + client->property_cb = propCb; client->property_ctx = ctx; return MQTT_CODE_SUCCESS; @@ -559,6 +605,15 @@ int MqttClient_Publish(MqttClient *client, MqttPublish *publish) return MQTT_CODE_ERROR_BAD_ARG; } +#ifdef WOLFMQTT_V5 + /* Validate publish request against server properties */ + if ((publish->qos > client->max_qos) || + ((publish->retain == 1) && (client->retain_avail == 0))) + { + return MQTT_CODE_ERROR_SERVER_PROP; + } +#endif + switch (publish->stat) { case MQTT_MSG_BEGIN: @@ -853,6 +908,11 @@ const char* MqttClient_ReturnCodeToString(int return_code) return "Error (Memory)"; case MQTT_CODE_ERROR_STAT: return "Error (State)"; + case MQTT_CODE_ERROR_PROPERTY: + return "Error (Property)"; + case MQTT_CODE_ERROR_SERVER_PROP: + return "Error (Server Property)"; + } return "Unknown"; } diff --git a/src/mqtt_packet.c b/src/mqtt_packet.c index 8ec0e299d..666addda4 100755 --- a/src/mqtt_packet.c +++ b/src/mqtt_packet.c @@ -1350,6 +1350,43 @@ int MqttEncode_Disconnect(byte *tx_buf, int tx_buf_len, MqttDisconnect* disconne } #ifdef WOLFMQTT_V5 +int MqttDecode_Disconnect(byte *rx_buf, int rx_buf_len, MqttDisconnect *disc) +{ + int header_len, remain_len; + byte *rx_payload; + word32 props_len = 0; + + /* Validate required arguments */ + if ((rx_buf == NULL) || (rx_buf_len <= 0) || (disc == NULL)) { + return MQTT_CODE_ERROR_BAD_ARG; + } + + /* Decode fixed header */ + header_len = MqttDecode_FixedHeader(rx_buf, rx_buf_len, &remain_len, + MQTT_PACKET_TYPE_DISCONNECT, NULL, NULL, NULL); + if (header_len < 0) { + return header_len; + } + rx_payload = &rx_buf[header_len]; + + if (remain_len > 0) { + /* Decode variable header */ + disc->reason_code = *rx_payload++; + + if (remain_len > 1) { + /* Decode Length of Properties */ + rx_payload += MqttDecode_Vbi(rx_payload, &props_len); + if (props_len > 0) { + /* Decode the AUTH Properties */ + rx_payload += MqttDecode_Props(MQTT_PACKET_TYPE_DISCONNECT, &disc->props, + rx_payload, props_len); + } + } + } + /* Return total length of packet */ + return header_len + remain_len; +} + int MqttEncode_Auth(byte *tx_buf, int tx_buf_len, MqttAuth *auth) { int header_len, remain_len = 0; @@ -1541,7 +1578,7 @@ int MqttPacket_Write(MqttClient *client, byte* tx_buf, int tx_buf_len) if ((client->packet_sz_max > 0) && (tx_buf_len > (int)client->packet_sz_max)) { - rc = MQTT_CODE_ERROR_PAK_SIZE; + rc = MQTT_CODE_ERROR_SERVER_PROP; } else #endif diff --git a/wolfmqtt/mqtt_client.h b/wolfmqtt/mqtt_client.h index 73939b3e9..56bd1c03c 100644 --- a/wolfmqtt/mqtt_client.h +++ b/wolfmqtt/mqtt_client.h @@ -129,6 +129,8 @@ typedef struct _MqttClient { #ifdef WOLFMQTT_V5 word32 packet_sz_max; /* Server property */ + byte max_qos; /* Server property */ + byte retain_avail; /* Server property */ #endif #ifdef WOLFMQTT_DISCONNECT_CB @@ -178,7 +180,7 @@ WOLFMQTT_API int MqttClient_Init( */ WOLFMQTT_API int MqttClient_SetDisconnectCallback( MqttClient *client, - MqttDisconnectCb cb, + MqttDisconnectCb discb, void* ctx); #endif @@ -186,14 +188,14 @@ WOLFMQTT_API int MqttClient_SetDisconnectCallback( /*! \brief Sets a property callback with custom context * \param client Pointer to MqttClient structure (uninitialized is okay) - * \param disCb Pointer to property callback function + * \param propCb Pointer to property callback function * \param ctx Pointer to your own context * \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG (see enum MqttPacketResponseCodes) */ WOLFMQTT_API int MqttClient_SetPropertyCallback( MqttClient *client, - MqttPropertyCb cb, + MqttPropertyCb propCb, void* ctx); #endif diff --git a/wolfmqtt/mqtt_packet.h b/wolfmqtt/mqtt_packet.h index dc5f1b45c..db7779d64 100644 --- a/wolfmqtt/mqtt_packet.h +++ b/wolfmqtt/mqtt_packet.h @@ -558,6 +558,7 @@ WOLFMQTT_LOCAL int MqttEncode_Disconnect(byte *tx_buf, int tx_buf_len, MqttDisconnect* disconnect); #ifdef WOLFMQTT_V5 +int MqttDecode_Disconnect(byte *rx_buf, int rx_buf_len, MqttDisconnect *disc); WOLFMQTT_LOCAL int MqttDecode_Auth(byte *rx_buf, int rx_buf_len, MqttAuth *auth); WOLFMQTT_LOCAL int MqttEncode_Auth(byte *tx_buf, int tx_buf_len, diff --git a/wolfmqtt/mqtt_types.h b/wolfmqtt/mqtt_types.h index 3a00f67ff..91451fa07 100644 --- a/wolfmqtt/mqtt_types.h +++ b/wolfmqtt/mqtt_types.h @@ -104,7 +104,7 @@ enum MqttPacketResponseCodes { MQTT_CODE_ERROR_MEMORY = -9, MQTT_CODE_ERROR_STAT = -10, MQTT_CODE_ERROR_PROPERTY = -11, - MQTT_CODE_ERROR_PAK_SIZE = -12, + MQTT_CODE_ERROR_SERVER_PROP = -12, MQTT_CODE_CONTINUE = -101, MQTT_CODE_STDIN_WAKE = -102,