Skip to content

Commit

Permalink
Max QoS, Retain Avail, Reason Strings, and Server Disconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
embhorn committed Aug 4, 2018
1 parent d37c130 commit 8e2e449
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 35 deletions.
20 changes: 17 additions & 3 deletions examples/mqttclient/mqttclient.c
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,18 @@ static int mqtt_property_cb(MqttClient *client, MqttProp *head, void *ctx)
prop->data_str.str,
prop->data_str.len);
break;

case MQTT_PROP_SUBSCRIPTION_ID_AVAIL:
((MQTTCtx*)client->ctx)->subId_not_avail =
prop->data_byte == 0;
break;

case MQTT_PROP_TOPIC_ALIAS_MAX:
((MQTTCtx*)client->ctx)->topic_alias_max =
(((MQTTCtx*)client->ctx)->topic_alias_max < prop->data_short) ?
((MQTTCtx*)client->ctx)->topic_alias_max : prop->data_short;
break;

case MQTT_PROP_MAX_PACKET_SZ:
if ((prop->data_int > 0) &&
(prop->data_int <= MQTT_PACKET_SZ_MAX))
Expand All @@ -143,9 +146,23 @@ static int mqtt_property_cb(MqttClient *client, MqttProp *head, void *ctx)
rc = MQTT_CODE_ERROR_PROPERTY;
}
break;

case MQTT_PROP_SERVER_KEEP_ALIVE:
((MQTTCtx*)client->ctx)->keep_alive_sec = prop->data_short;
break;

case MQTT_PROP_MAX_QOS:
client->max_qos = prop->data_byte;
break;

case MQTT_PROP_RETAIN_AVAIL:
client->retain_avail = prop->data_byte;
break;

case MQTT_PROP_REASON_STR:
PRINTF("Reason String: %s", prop->data_str.str);
break;

case MQTT_PROP_PLAYLOAD_FORMAT_IND:
case MQTT_PROP_MSG_EXPIRY_INTERVAL:
case MQTT_PROP_CONTENT_TYPE:
Expand All @@ -156,9 +173,6 @@ static int mqtt_property_cb(MqttClient *client, MqttProp *head, void *ctx)
case MQTT_PROP_TOPIC_ALIAS:
case MQTT_PROP_TYPE_MAX:
case MQTT_PROP_RECEIVE_MAX:
case MQTT_PROP_MAX_QOS:
case MQTT_PROP_RETAIN_AVAIL:
case MQTT_PROP_REASON_STR:
case MQTT_PROP_USER_PROP:
case MQTT_PROP_WILDCARD_SUB_AVAIL:
case MQTT_PROP_SHARED_SUBSCRIPTION_AVAIL:
Expand Down
114 changes: 87 additions & 27 deletions src/mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,34 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (p_decode) {
p_connect_ack = (MqttConnectAck*)p_decode;
}
#ifdef WOLFMQTT_V5
p_connect_ack->props = NULL;
#endif
rc = MqttDecode_ConnectAck(client->rx_buf, client->packet.buf_len,
p_connect_ack);
#ifdef WOLFMQTT_PROPERTY_CB
if (rc > 0) {
#ifdef WOLFMQTT_PROPERTY_CB
/* Check for properties set by the server */
if (client->property_cb) {
rc = client->property_cb(client, p_connect_ack->props,
client->disconnect_ctx);

/* Free the properties */
MqttProps_Free(p_connect_ack->props);
}
}
#endif
#ifdef WOLFMQTT_V5
/* Free the properties */
MqttProps_Free(p_connect_ack->props);
#endif
}
break;
}
case MQTT_PACKET_TYPE_PUBLISH:
{
byte msg_done;

if (msg->buffer_new) {
#ifdef WOLFMQTT_V5
msg->props = NULL;
#endif
/* Decode publish message */
rc = MqttDecode_Publish(client->rx_buf, client->packet.buf_len, msg);
if (rc <= 0) {
Expand All @@ -79,10 +86,11 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (client->property_cb) {
rc = client->property_cb(client, msg->props,
client->disconnect_ctx);

/* Free the properties */
MqttProps_Free(msg->props);
}
#endif
#ifdef WOLFMQTT_V5
/* Free the properties */
MqttProps_Free(msg->props);
#endif
}

Expand Down Expand Up @@ -184,7 +192,9 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (p_decode) {
p_publish_resp = (MqttPublishResp*)p_decode;
}

#ifdef WOLFMQTT_V5
p_publish_resp->props = NULL;
#endif
/* Decode publish response message */
rc = MqttDecode_PublishResp(client->rx_buf, client->packet.buf_len,
msg->type, p_publish_resp);
Expand All @@ -198,11 +208,12 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (client->property_cb) {
rc = client->property_cb(client, p_publish_resp->props,
client->disconnect_ctx);

/* Free the properties */
MqttProps_Free(p_publish_resp->props);
}
#endif
#ifdef WOLFMQTT_V5
/* Free the properties */
MqttProps_Free(p_publish_resp->props);
#endif

/* If Qos then send response */
if (msg->type == MQTT_PACKET_TYPE_PUBLISH_REC ||
Expand Down Expand Up @@ -231,6 +242,9 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (p_decode) {
p_subscribe_ack = (MqttSubscribeAck*)p_decode;
}
#ifdef WOLFMQTT_V5
p_subscribe_ack->props = NULL;
#endif
rc = MqttDecode_SubscribeAck(client->rx_buf, client->packet.buf_len,
p_subscribe_ack);
if (rc <= 0) {
Expand All @@ -243,11 +257,12 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (client->property_cb) {
rc = client->property_cb(client, p_subscribe_ack->props,
client->disconnect_ctx);

/* Free the properties */
MqttProps_Free(p_subscribe_ack->props);
}
#endif
#ifdef WOLFMQTT_V5
/* Free the properties */
MqttProps_Free(p_subscribe_ack->props);
#endif

break;
}
Expand All @@ -260,6 +275,9 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (p_decode) {
p_unsubscribe_ack = (MqttUnsubscribeAck*)p_decode;
}
#ifdef WOLFMQTT_V5
p_unsubscribe_ack->props = NULL;
#endif
rc = MqttDecode_UnsubscribeAck(client->rx_buf, client->packet.buf_len,
p_unsubscribe_ack);
if (rc <= 0) {
Expand All @@ -272,12 +290,12 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
if (client->property_cb) {
rc = client->property_cb(client, p_unsubscribe_ack->props,
client->disconnect_ctx);

/* Free the properties */
MqttProps_Free(p_unsubscribe_ack->props);
}
#endif

#ifdef WOLFMQTT_V5
/* Free the properties */
MqttProps_Free(p_unsubscribe_ack->props);
#endif
break;
}
case MQTT_PACKET_TYPE_PING_RESP:
Expand All @@ -291,21 +309,45 @@ static int MqttClient_HandlePayload(MqttClient* client, MqttMessage* msg,
{
MqttAuth auth, *p_auth = &auth;

p_auth->props = NULL;
/* Decode authorization */
rc = MqttDecode_Auth(client->rx_buf, client->packet.buf_len, p_auth);

#ifdef WOLFMQTT_PROPERTY_CB
if (rc > 0) {
#ifdef WOLFMQTT_PROPERTY_CB
/* Check for properties set by the server */
if (client->property_cb) {
rc = client->property_cb(client, p_auth->props,
client->disconnect_ctx);

/* Free the properties */
MqttProps_Free(p_auth->props);
}
#endif
/* Free the properties */
MqttProps_Free(p_auth->props);
}

break;
}

case MQTT_PACKET_TYPE_DISCONNECT:
{
MqttDisconnect disc, *p_disc = &disc;

p_disc->props = NULL;

/* Decode disconnect */
rc = MqttDecode_Disconnect(client->rx_buf, client->packet.buf_len,
p_disc);
if (rc > 0) {
#ifdef WOLFMQTT_PROPERTY_CB
/* Check for properties set by the server */
if (client->property_cb) {
rc = client->property_cb(client, p_disc->props,
client->disconnect_ctx);
}
#endif
/* Free the properties */
MqttProps_Free(p_disc->props);
}

break;
}
Expand Down Expand Up @@ -448,6 +490,10 @@ int MqttClient_Init(MqttClient *client, MqttNet* net,
client->rx_buf = rx_buf;
client->rx_buf_len = rx_buf_len;
client->cmd_timeout_ms = cmd_timeout_ms;
#ifdef WOLFMQTT_V5
client->max_qos = MQTT_QOS_2;
client->retain_avail = 1;
#endif

/* Init socket */
rc = MqttSocket_Init(client, net);
Expand All @@ -456,27 +502,27 @@ int MqttClient_Init(MqttClient *client, MqttNet* net,
}

#ifdef WOLFMQTT_DISCONNECT_CB
int MqttClient_SetDisconnectCallback(MqttClient *client, MqttDisconnectCb cb,
int MqttClient_SetDisconnectCallback(MqttClient *client, MqttDisconnectCb discCb,
void* ctx)
{
if (client == NULL)
return MQTT_CODE_ERROR_BAD_ARG;

client->disconnect_cb = cb;
client->disconnect_cb = discCb;
client->disconnect_ctx = ctx;

return MQTT_CODE_SUCCESS;
}
#endif

#ifdef WOLFMQTT_PROPERTY_CB
int MqttClient_SetPropertyCallback(MqttClient *client, MqttPropertyCb cb,
int MqttClient_SetPropertyCallback(MqttClient *client, MqttPropertyCb propCb,
void* ctx)
{
if (client == NULL)
return MQTT_CODE_ERROR_BAD_ARG;

client->property_cb = cb;
client->property_cb = propCb;
client->property_ctx = ctx;

return MQTT_CODE_SUCCESS;
Expand Down Expand Up @@ -559,6 +605,15 @@ int MqttClient_Publish(MqttClient *client, MqttPublish *publish)
return MQTT_CODE_ERROR_BAD_ARG;
}

#ifdef WOLFMQTT_V5
/* Validate publish request against server properties */
if ((publish->qos > client->max_qos) ||
((publish->retain == 1) && (client->retain_avail == 0)))
{
return MQTT_CODE_ERROR_SERVER_PROP;
}
#endif

switch (publish->stat)
{
case MQTT_MSG_BEGIN:
Expand Down Expand Up @@ -853,6 +908,11 @@ const char* MqttClient_ReturnCodeToString(int return_code)
return "Error (Memory)";
case MQTT_CODE_ERROR_STAT:
return "Error (State)";
case MQTT_CODE_ERROR_PROPERTY:
return "Error (Property)";
case MQTT_CODE_ERROR_SERVER_PROP:
return "Error (Server Property)";

}
return "Unknown";
}
Expand Down
39 changes: 38 additions & 1 deletion src/mqtt_packet.c
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,43 @@ int MqttEncode_Disconnect(byte *tx_buf, int tx_buf_len, MqttDisconnect* disconne
}

#ifdef WOLFMQTT_V5
int MqttDecode_Disconnect(byte *rx_buf, int rx_buf_len, MqttDisconnect *disc)
{
int header_len, remain_len;
byte *rx_payload;
word32 props_len = 0;

/* Validate required arguments */
if ((rx_buf == NULL) || (rx_buf_len <= 0) || (disc == NULL)) {
return MQTT_CODE_ERROR_BAD_ARG;
}

/* Decode fixed header */
header_len = MqttDecode_FixedHeader(rx_buf, rx_buf_len, &remain_len,
MQTT_PACKET_TYPE_DISCONNECT, NULL, NULL, NULL);
if (header_len < 0) {
return header_len;
}
rx_payload = &rx_buf[header_len];

if (remain_len > 0) {
/* Decode variable header */
disc->reason_code = *rx_payload++;

if (remain_len > 1) {
/* Decode Length of Properties */
rx_payload += MqttDecode_Vbi(rx_payload, &props_len);
if (props_len > 0) {
/* Decode the AUTH Properties */
rx_payload += MqttDecode_Props(MQTT_PACKET_TYPE_DISCONNECT, &disc->props,
rx_payload, props_len);
}
}
}
/* Return total length of packet */
return header_len + remain_len;
}

int MqttEncode_Auth(byte *tx_buf, int tx_buf_len, MqttAuth *auth)
{
int header_len, remain_len = 0;
Expand Down Expand Up @@ -1541,7 +1578,7 @@ int MqttPacket_Write(MqttClient *client, byte* tx_buf, int tx_buf_len)
if ((client->packet_sz_max > 0) && (tx_buf_len >
(int)client->packet_sz_max))
{
rc = MQTT_CODE_ERROR_PAK_SIZE;
rc = MQTT_CODE_ERROR_SERVER_PROP;
}
else
#endif
Expand Down
8 changes: 5 additions & 3 deletions wolfmqtt/mqtt_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ typedef struct _MqttClient {

#ifdef WOLFMQTT_V5
word32 packet_sz_max; /* Server property */
byte max_qos; /* Server property */
byte retain_avail; /* Server property */
#endif

#ifdef WOLFMQTT_DISCONNECT_CB
Expand Down Expand Up @@ -178,22 +180,22 @@ WOLFMQTT_API int MqttClient_Init(
*/
WOLFMQTT_API int MqttClient_SetDisconnectCallback(
MqttClient *client,
MqttDisconnectCb cb,
MqttDisconnectCb discb,
void* ctx);
#endif

#ifdef WOLFMQTT_PROPERTY_CB
/*! \brief Sets a property callback with custom context
* \param client Pointer to MqttClient structure
(uninitialized is okay)
* \param disCb Pointer to property callback function
* \param propCb Pointer to property callback function
* \param ctx Pointer to your own context
* \return MQTT_CODE_SUCCESS or MQTT_CODE_ERROR_BAD_ARG
(see enum MqttPacketResponseCodes)
*/
WOLFMQTT_API int MqttClient_SetPropertyCallback(
MqttClient *client,
MqttPropertyCb cb,
MqttPropertyCb propCb,
void* ctx);
#endif

Expand Down
Loading

0 comments on commit 8e2e449

Please sign in to comment.