diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 96aee156..3aee5147 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -568,6 +568,28 @@ bool PubSubClient::beginPublish(const char* topic, size_t plength, uint8_t qos, } this->_qos = qos; // save the QoS for later endPublish() operation // check if the header and the topic (including 2 length bytes) fit into the buffer + +#if MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE > 0 + int total_msg_len = MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 + plength; // header + topic length + topic + payload + if (total_msg_len > this->_client->availableForWrite()) { + for (uint16_t i = 0; i < MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE; i++) + { + if (this->_client->availableForWrite() > total_msg_len) + { + break; // Enough space in the client(socket) buffer + } else + { + // Not enough space in the client(socket) buffer + // so let's empty the buffer first + _client->flush(); + this->loop(); + + delay(1); // Give some time to the client to flush + } + } + } +#endif + if (connected() && MQTT_MAX_HEADER_SIZE + strlen(topic) + 2 <= this->bufferSize) { // first write the topic at the end of the maximal variable header (MQTT_MAX_HEADER_SIZE) to the buffer size_t topicLen = writeString(topic, this->buffer, MQTT_MAX_HEADER_SIZE, this->bufferSize) - MQTT_MAX_HEADER_SIZE; @@ -749,6 +771,26 @@ bool PubSubClient::subscribe(const char* topic, uint8_t qos) { length = writeNextMsgId(buffer, length, this->bufferSize); // buffer size is checked before length = writeString(topic, this->buffer, length, this->bufferSize); this->buffer[length++] = qos; + +#if MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE > 0 + if (int(length) > this->_client->availableForWrite()) { + for (uint16_t i = 0; i < MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE; i++) + { + if (this->_client->availableForWrite() > int(length)) + { + break; // Enough space in the client(socket) buffer + } else + { + // Not enough space in the client(socket) buffer + // so let's empty the buffer first + _client->flush(); + this->loop(); + + delay(1); // Give some time to the client to flush + } + } + } +#endif return write(MQTTSUBSCRIBE | MQTT_QOS_GET_HDR(MQTT_QOS1), this->buffer, length - MQTT_MAX_HEADER_SIZE); } return false; diff --git a/src/PubSubClient.h b/src/PubSubClient.h index 2e884299..de8fa6bd 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -78,6 +78,18 @@ #undef MQTT_MAX_TRANSFER_SIZE #endif +/** + * @brief Sets the maximum number of times the network client is checked for the amount of bytes is available to be written. + * Some hardware has a work buffer limit. Before a message to the MQTT server is sent in 'publish' or 'subscribe' + * the ehternet client is checked weather is has enough buffer to be able to send the entire message. + * Use 50 on hardware such as the Teensy 4.1 with lwip/QNEthernet lib. + * @note Defaults to undefined, which does not use the client function 'availableForWrite' at all. + */ +#ifndef MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE // just a hack that it gets shown in Doxygen +#define MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE 50 +#undef MQTT_MAX_RETRY_FOR_AVAILABLE_FOR_WRITE +#endif + /** * @defgroup group_state state() result * @brief These values indicate the current PubSubClient::state() of the client.