Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paho.mqtt-c master asyncrounous subscribtion keep alive issue #215

Open
prashanthk232 opened this issue Dec 10, 2020 · 3 comments
Open

Paho.mqtt-c master asyncrounous subscribtion keep alive issue #215

prashanthk232 opened this issue Dec 10, 2020 · 3 comments

Comments

@prashanthk232
Copy link

When i suscribe in mqtt asyncrounous the controller not come back to my main ( ) function. It stay in keepalive function loop. after subscription i want to do some task on my application but controller is still there in keep alive function. please any one tell me how to run my appliction after subscribe

@Boukibouk
Copy link

Hello @prashanthk232,
Could you please give more information's (code example)?
What do you mean by asynchronous subscribe ?

  • You send the subscribe packet and check later the SUBACK (Yield or task main function)?

In my application I had similar behavior (poor network quality) the implementation was something like,
Split the Network received buffer (Socket) with the MQTT receive buffer.
With a period Task i fill the Socket data's received in the Network receive buffer then
call the function MQTTPacket_readnb() (with some timeout inside) .
Based on this if a data frame is completed (check MQTTPacket_readnb return value), I store in the MQTT receive buffer the previous frame.
If the data frame in the Network received buffer is not completed the function MQTTPacket_readnb will let you know .
Note : A timeout mechanism is necessary to don't be locked .

You only call the Yield function once you frame is completed.

For the keep alive i move it out of the cycle function and handle separately with the same asynchronous mechanism.

I could share the code if needed.

Cheers
Yacine

@prashanthk232
Copy link
Author

Thank you 😊 please share it

@Boukibouk
Copy link

Below the implementation of the function MQTTPacket_readnb()

The structure MQTT transport now include a timer + callback function to reset the input buffer data in case of Timeout

typedef struct {
	void (*ResetReceivedDataTimeout)(void);
	Timer timer;
	int (*getfn)(void *, unsigned char*, int); /* must return -1 for error, 0 for call again, or the number of bytes read */
	void *sck;	/* pointer to whatever the system may use to identify the transport */
	int multiplier;
	int rem_len;
	int len;
	char state;
}MQTTTransport; 

``
int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp)
{
	int rc = -1, frc;
	MQTTHeader header ={ 0 };

	switch (trp->state) {
	default:
		trp->state = 0;
		/*FALLTHROUGH*/
	case 0:
		/* read the header byte.  This has the packet type in it */
		if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1)
			goto exit;
		if (frc == 0)
			return 0;
		trp->len = 0;
		++trp->state;
		/*FALLTHROUGH*/
		/* read the remaining length.  This is variable in itself */
	case 1:
		if ((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR)
			goto exit;
		if (frc == 0)
			return 0;
		trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */
		if ((trp->rem_len + trp->len) > buflen)
			goto exit;
		++trp->state;
		TimerInit(&(trp->timer));
		TimerCountdownMS(&(trp->timer), 120000);
		/*FALLTHROUGH*/
	case 2:
		if (trp->rem_len) {
			if (!TimerIsExpired(&(trp->timer)))
			{
				/* read the rest of the buffer using a callback to supply the rest of the data */
				if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1)
					goto exit;
				if (frc == 0)
					return 0;
				trp->rem_len -= frc;
				trp->len += frc;
				if (trp->rem_len)
					return 0;
			}
			else
			{
				/* Timer expires when waiting for data reset the state */
				/* Callback to restore the received data structure */
				trp->ResetReceivedDataTimeout( );
				trp->state = 0;
				return 0;
			}

		}
		header.byte = buf[0];
		rc = header.bits.type;
		break;
	}

	exit:
	trp->state = 0;
	return rc;
}

Once the function MQTTPacket_readnb() return a value different from 0, the function cycles is called.
NOTE : The function " cycle" has been modified to full support the asynchronous functionality which means the KeepAlive
is commented and call independently of this function

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants