-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmqtt_client_app.c
1197 lines (1078 loc) · 43.2 KB
/
mqtt_client_app.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright (c) 2016, Texas Instruments Incorporated
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* * Neither the name of Texas Instruments Incorporated nor the names of
* its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
* THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
* CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
* EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
* PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
* OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
* OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
* EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/*****************************************************************************
Application Name - MQTT Client
Application Overview - The device is running a MQTT client which is
connected to the online broker. Three LEDs on the
device can be controlled from a web client by
publishing msg on appropriate topics. Similarly,
message can be published on pre-configured topics
by pressing the switch buttons on the device.
Application Details - Refer to 'MQTT Client' README.html
*****************************************************************************/
//*****************************************************************************
//
//! \addtogroup mqtt_server
//! @{
//
//*****************************************************************************
/* Standard includes
* */
#include <FreeRTOS.h>
#include <stdlib.h>
#include <pthread.h>
#include <mqueue.h>
#include <time.h>
#include <unistd.h>
#include <inttypes.h>
/* TI-Driver includes */
#include <ti/drivers/GPIO.h>
#include <ti/drivers/SPI.h>
#include <ti/drivers/Timer.h>
/* Simplelink includes */
#include <ti/drivers/net/wifi/simplelink.h>
/* SlNetSock includes */
#include <ti/drivers/net/wifi/slnetifwifi.h>
/* MQTT Library includes */
#include <ti/net/mqtt/mqttclient.h>
/* Common interface includes */
#include "network_if.h"
#include "uart_term.h"
#include "stdio.h"
/* Application includes */
#include "Board.h"
#include "client_cbs.h"
#include "debug.h"
#include "timer_pub.h"
#include "mqtt_queue.h"
#include "statisticsTask.h"
//*****************************************************************************
// LOCAL DEFINES
//*****************************************************************************
/* enables secured client */
//#define SECURE_CLIENT
/* enables client authentication by the server */
//#define CLNT_USR_PWD
#define CLIENT_INIT_STATE (0x01)
#define MQTT_INIT_STATE (0x04)
#define APPLICATION_VERSION "1.1.1"
#define APPLICATION_NAME "MQTT client"
#define SLNET_IF_WIFI_PRIO (5)
/* Operate Lib in MQTT 3.1 mode. */
#define MQTT_3_1_1 false
#define MQTT_3_1 true
#define WILL_TOPIC "Client"
#define WILL_MSG "Client Stopped"
#define WILL_QOS MQTT_QOS_0
#define WILL_RETAIN false
/* Defining Broker IP address and port Number */
//#define SERVER_ADDRESS "messagesight.demos.ibm.com"
#define SERVER_ADDRESS "192.168.4.1"
#define SERVER_IP_ADDRESS "192.168.4.1"
#define PORT_NUMBER 1883
#define SECURED_PORT_NUMBER 8883
#define LOOPBACK_PORT 1882
/* Clean session flag */
#define CLEAN_SESSION true
/* Retain Flag. Used in publish message. */
#define RETAIN_ENABLE 1
/* Defining Number of subscription topics */
#define SUBSCRIPTION_TOPIC_COUNT 1
//simulating sensor
/* Defining Subscription Topic Values */
#define SUBSCRIPTION_TOPIC0 "/team18/navig"
//#define SUBSCRIPTION_TOPIC2 "/team18/ahmed"
//#define SUBSCRIPTION_TOPIC2 "/team18/arm___"
////#define SUBSCRIPTION_TOPIC3 "/cc3200/ToggleLEDCmdL3"
//#define SUBSCRIPTION_TOPIC3 "/team18/sensor"
/* Defining Publish Topic Values */
#define PUBLISH_TOPIC0 "/team18/jie_"
//#define PUBLISH_TOPIC0 "/team18/rocky"
//#define PUBLISH_TOPIC0 "/team18/ahmed"
#define PUBLISH_TOPIC1 "/team18/s"
/* Spawn task priority and Task and Thread Stack Size */
#define TASKSTACKSIZE 2048
#define RXTASKSIZE 8192
#define MQTTTHREADSIZE 4096
#define SPAWN_TASK_PRIORITY 9
#define BOARD_ID "jie__"
#define Board1_msg "Number_of_missed_message_Rocky"
#define Board2_msg "Number_of_missed_message_Ahmed"
#define Board3_msg "Number_of_missed_message_Jacob"
#define TXBUFFSIZE 400
//#define BOARD_ID "ahmed"
/* secured client requires time configuration, in order to verify server */
/* certificate validity
* (date). */
/* Day of month (DD format) range 1-31 */
#define DAY 1
/* Month (MM format) in the range of 1-12 */
#define MONTH 5
/* Year (YYYY format) */
#define YEAR 2017
/* Hours in the range of 0-23 */
#define HOUR 12
/* Minutes in the range of 0-59 */
#define MINUTES 33
/* Seconds in the range of 0-59 */
#define SEC 21
/* Number of files used for secure connection */
#define CLIENT_NUM_SECURE_FILES 1
/* Expiration value for the timer that is being used to toggle the Led. */
#define TIMER_EXPIRATION_VALUE 100 * 1000000
//*****************************************************************************
// LOCAL FUNCTION PROTOTYPES
//*****************************************************************************
void pushButtonInterruptHandler2(uint_least8_t index);
void pushButtonInterruptHandler3(uint_least8_t index);
void TimerPeriodicIntHandler(sigval val);
void LedTimerConfigNStart();
void LedTimerDeinitStop();
//static void DisplayBanner(char * AppName);
void * MqttClient(void *pvParameters);
void Mqtt_ClientStop(uint8_t disconnect);
void Mqtt_ServerStop();
void Mqtt_Stop();
void Mqtt_start();
int32_t Mqtt_IF_Connect(timer_t* g_timer);
int32_t MqttServer_start();
int32_t MqttClient_start();
//int32_t MQTT_SendMsgToQueue(struct msgQueue *queueElement);
//*****************************************************************************
// GLOBAL VARIABLES
//*****************************************************************************
/* Connection state: (0) - connected, (negative) - disconnected */
//uint32_t gInitState = 0;
//uint32_t memPtrCounterfree = 0;
//bool gResetApplication = false;
static MQTTClient_Handle gMqttClient;
MQTTClient_Params MqttClientExmple_params;
unsigned short g_usTimerInts;
/* Receive task handle */
/* AP Security Parameters */
SlWlanSecParams_t SecurityParams = { 0 };
/* Client ID */
/* If ClientId isn't set, the MAC address of the device will be copied into */
/* the ClientID parameter. */
char ClientId[13] = { '\0' };
/* Client User Name and Password */
//const char *ClientUsername = "username1";
const char ClientUsername[] = "username1";
//const char *ClientPassword = "pwd1";
const char ClientPassword[] = "pwd1";
/* Subscription topics and qos values */
static const char * const topic[SUBSCRIPTION_TOPIC_COUNT] = {
SUBSCRIPTION_TOPIC0,
// SUBSCRIPTION_TOPIC1,
// SUBSCRIPTION_TOPIC2
};
const unsigned char qos[SUBSCRIPTION_TOPIC_COUNT] = { MQTT_QOS_0, MQTT_QOS_0,
MQTT_QOS_0 };
/* Publishing topics and messages */
const char *publish_topic0 = { PUBLISH_TOPIC0 };
const char *publish_topic1 = { PUBLISH_TOPIC1 };
//char *publish_data = { PUBLISH_TOPIC0_DATA };
/* Message Queue */
//mqd_t g_PBQueue;
//pthread_t appThread = (pthread_t) NULL;
/* Printing new line */
//char lineBreak[] = "\n\r";
//*****************************************************************************
// Banner VARIABLES
//*****************************************************************************
#ifdef SECURE_CLIENT
char *Mqtt_Client_secure_files[CLIENT_NUM_SECURE_FILES] =
{ "ca-cert.pem"};
/*Initialization structure to be used with sl_ExtMqtt_Init API. In order to */
/*use secured socket method, the flag MQTTCLIENT_NETCONN_SEC, cipher, */
/*n_files and secure_files must be configured. */
/*certificates also must be programmed ("ca-cert.pem"). */
/*The first parameter is a bit mask which configures server address type and */
/*security mode. */
/*Server address type: IPv4, IPv6 and URL must be declared with The */
/*corresponding flag. */
/*Security mode: The flag MQTTCLIENT_NETCONN_SEC enables the security (TLS) */
/*which includes domain name verification and certificate catalog */
/*verification, those verifications can be disabled by adding to the bit mask*/
/*MQTTCLIENT_NETCONN_SKIP_DOMAIN_NAME_VERIFICATION and */
/*MQTTCLIENT_NETCONN_SKIP_CERTIFICATE_CATALOG_VERIFICATION flags */
/*Example: MQTTCLIENT_NETCONN_IP6 | MQTTCLIENT_NETCONN_SEC | */
/*MQTTCLIENT_NETCONN_SKIP_CERTIFICATE_CATALOG_VERIFICATION */
/*For this bit mask, the IPv6 address type will be in use, the security */
/*feature will be enable and the certificate catalog verification will be */
/*skipped. */
/*Note: The domain name verification requires URL Server address type */
/* otherwise, this verification will be disabled. */
MQTTClient_ConnParams Mqtt_ClientCtx =
{
MQTTCLIENT_NETCONN_IP4 | MQTTCLIENT_NETCONN_SEC,
SERVER_IP_ADDRESS, //SERVER_ADDRESS,
SECURED_PORT_NUMBER,// PORT_NUMBER
SLNETSOCK_SEC_METHOD_SSLv3_TLSV1_2,
SLNETSOCK_SEC_CIPHER_FULL_LIST,
CLIENT_NUM_SECURE_FILES,
Mqtt_Client_secure_files
};
void setTime()
{
SlDateTime_t dateTime =
{ 0};
dateTime.tm_day = (uint32_t)DAY;
dateTime.tm_mon = (uint32_t)MONTH;
dateTime.tm_year = (uint32_t)YEAR;
dateTime.tm_hour = (uint32_t)HOUR;
dateTime.tm_min = (uint32_t)MINUTES;
dateTime.tm_sec = (uint32_t)SEC;
sl_DeviceSet(SL_DEVICE_GENERAL, SL_DEVICE_GENERAL_DATE_TIME,
sizeof(SlDateTime_t), (uint8_t *)(&dateTime));
}
#else
MQTTClient_ConnParams Mqtt_ClientCtx = {
MQTTCLIENT_NETCONN_URL,
SERVER_ADDRESS,
PORT_NUMBER,
0, 0, 0,
NULL };
#endif
/* Initialize the will_param structure to the default will parameters */
MQTTClient_Will will_param = {
WILL_TOPIC,
WILL_MSG,
WILL_QOS,
WILL_RETAIN };
//*****************************************************************************
//
//! MQTT_SendMsgToQueue - Utility function that receive msgQueue parameter and
//! tries to push it the queue with minimal time for timeout of 0.
//! If the queue isn't full the parameter will be stored and the function
//! will return 0.
//! If the queue is full and the timeout expired (because the timeout parameter
//! is 0 it will expire immediately), the parameter is thrown away and the
//! function will return -1 as an error for full queue.
//!
//! \param[in] struct msgQueue *queueElement
//!
//! \return 0 on success, -1 on error
//
//*****************************************************************************
//int32_t MQTT_SendMsgToQueue(struct msgQueue *queueElement)
//{
// dbgOutputLoc(MQTT_MSG_QUEUE_PRE_LOC);
// struct timespec abstime = {0};
//
// clock_gettime(CLOCK_REALTIME, &abstime);
//
// if(g_PBQueue)
// {
// /* send message to the queue */
// if(mq_timedsend(g_PBQueue, (char *) queueElement,
// sizeof(struct msgQueue), 0, &abstime) == 0)
// {
// return(0);
// }
// }
// dbgOutputLoc(MQTT_MSG_QUEUE_POST_LOC);
// return(-1);
//}
//*****************************************************************************
//
//! Push Button Handler1(GPIOSW2). Press push button1 (GPIOSW2) Whenever user
//! wants to publish a message. Write message into message queue signaling the
//! event publish messages
//!
//! \param none
//!
//! return none
//
//*****************************************************************************
//void pushButtonInterruptHandler2(uint_least8_t index)
//{
// struct msgQueue queueElement;
//
// /* Disable the SW2 interrupt */
// GPIO_disableInt(Board_GPIO_BUTTON0); // SW2
//
// queueElement.event = PUBLISH_PUSH_BUTTON_PRESSED;
// queueElement.msgPtr = NULL;
//
// /* write message indicating publish message */
// if(MQTT_SendMsgToQueue(&queueElement))
// {
// UART_PRINT("\n\n\rQueue is full\n\n\r");
// }
//}
//*****************************************************************************
//
//! Push Button Handler2(GPIOSW3). Press push button3 Whenever user wants to
//! disconnect from the remote broker. Write message into message queue
//! indicating disconnect from broker.
//!
//! \param none
//!
//! return none
//
//*****************************************************************************
//void pushButtonInterruptHandler3(uint_least8_t index)
//{
// struct msgQueue queueElement;
// struct msgQueue queueElemRecv;
//
// queueElement.event = DISC_PUSH_BUTTON_PRESSED;
// queueElement.msgPtr = NULL;
//
// /* write message indicating disconnect push button pressed message */
// if(MQTT_SendMsgToQueue(&queueElement))
// {
// UART_PRINT(
// "\n\n\rQueue is full, throw first msg and send the new one\n\n\r");
// mq_receive(g_PBQueue, (char*) &queueElemRecv, sizeof(struct msgQueue),
// NULL);
// MQTT_SendMsgToQueue(&queueElement);
// }
//}
//*****************************************************************************
//
//! Periodic Timer Interrupt Handler
//!
//! \param None
//!
//! \return None
//
//*****************************************************************************
void TimerPeriodicIntHandler(sigval val)
{
/* Increment our interrupt counter. */
g_usTimerInts++;
if (!(g_usTimerInts & 0x1))
{
/* Turn Led Off */
GPIO_write(Board_GPIO_LED0, Board_GPIO_LED_OFF);
}
else
{
/* Turn Led On */
GPIO_write(Board_GPIO_LED0, Board_GPIO_LED_ON);
}
}
//*****************************************************************************
//
//! Function to configure and start timer to blink the LED while device is
//! trying to connect to an AP
//!
//! \param none
//!
//! return none
//
//*****************************************************************************
void LedTimerConfigNStart(timer_t* g_timer)
{
struct itimerspec value;
sigevent sev;
/* Create Timer */
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_notify_function = &TimerPeriodicIntHandler;
timer_create(2, &sev, g_timer);
/* start timer */
value.it_interval.tv_sec = 0;
value.it_interval.tv_nsec = TIMER_EXPIRATION_VALUE;
value.it_value.tv_sec = 0;
value.it_value.tv_nsec = TIMER_EXPIRATION_VALUE;
timer_settime(*g_timer, 0, &value, NULL);
}
//*****************************************************************************
//
//! Disable the LED blinking Timer as Device is connected to AP
//!
//! \param none
//!
//! return none
//
//*****************************************************************************
void LedTimerDeinitStop(timer_t* g_timer)
{
/* Disable the LED blinking Timer as Device is connected to AP. */
timer_delete(*g_timer);
}
//*****************************************************************************
//
//! Application startup display on UART
//!
//! \param none
//!
//! \return none
//!
//*****************************************************************************
//static void DisplayBanner(char * AppName)
//{
// UART_PRINT("\n\n\n\r");
// UART_PRINT("\t\t *************************************************\n\r");
// UART_PRINT("\t\t CC32xx %s Application \n\r", AppName);
// UART_PRINT("\t\t *************************************************\n\r");
// UART_PRINT("\n\n\n\r");
//}
void * MqttClientThread(void * pvParameters)
{
// dbgOutputLoc(MQTT_CLIENT_THREAD_PRE_LOC);
msgQueue_t2 queueElement;
TaskHandle_t testHandle = xTaskGetCurrentTaskHandle();
// msgQueue queueElemRecv;
MQTTClient_run((MQTTClient_Handle) pvParameters);
queueElement.event = LOCAL_CLIENT_DISCONNECTION;
// queueElement.msgPtr = NULL;
/*write message indicating disconnect Broker message. */
// if(MQTT_SendMsgToQueue(&queueElement))
if (!sendMsgToMqtt(&queueElement))
{
// UART_PRINT(
// "\n\n\rQueue is full, throw first msg and send the new one\n\n\r");
// mq_receive(g_PBQueue, (char*) &queueElemRecv, sizeof(struct msgQueue),
// NULL);
// MQTT_SendMsgToQueue(&queueElement);
}
// dbgOutputLoc(MQTT_CLIENT_THREAD_POST_LOC);
// pthread_exit(0);
//
// return (NULL);
vTaskSuspend(NULL);
return (NULL);
}
//*****************************************************************************
//
//! Task implementing MQTT Server plus client bridge
//!
//! This function
//! 1. Initializes network driver and connects to the default AP
//! 2. Initializes the mqtt client ans server libraries and set up MQTT
//! with the remote broker.
//! 3. set up the button events and their callbacks(for publishing)
//! 4. handles the callback signals
//!
//! \param none
//!
//! \return None
//!
//*****************************************************************************
void * MqttClient(void *pvParameters)
{
TaskHandle_t testHandle = xTaskGetCurrentTaskHandle();
// dbgOutputLoc(MQTT_CLIENT_PRE_LOC);
msgQueue_t2 queueElemRecv;
long lRetVal = -1;
int msg_send_num = 0;
int pub_count = 0;
char txBuff[TXBUFFSIZE] = { 0 };
//const char *publish_data = { PUBLISH_TOPIC0_DATA };
/*Initializing Client and Subscribing to the Broker. */
// if (gApConnectionState >= 0)
// {
lRetVal = MqttClient_start();
if (lRetVal == -1)
{
//UART_PRINT("MQTT Client lib initialization failed\n\r");
// pthread_exit(0);
// return (NULL);
dbgTerminalError();
}
// }
sendMqttFinishToSensorQ();
//int count = 0;
/*handling the signals from various callbacks including the push button */
/*prompting the client to publish a msg on PUB_TOPIC OR msg received by */
/*the server on enrolled topic(for which the on-board client ha enrolled)*/
/*from a local client(will be published to the remote broker by the */
/*client) OR msg received by the client from the remote broker (need to */
/*be sent to the server to see if any local client has subscribed on the */
/*same topic). */
while (1)
{
recieveMsgFromMqtt(&queueElemRecv);
// msgQueue_t data_to_rover;
switch (queueElemRecv.event)
{
case PROCESS_MSG:
// dbgOutputLoc(MAIN_MSG_EVENT_ENTER_LOC);
if (msg_send_num >= 100)
{ //send statistics every 100 messages, and zero this variable
//dbgOutputLoc(PUBLISH_STATS_LOC);
msg_send_num = 0;
// queueElemSend.event = STATS_PUBLISH;
// sendMsgToStatQueue(&queueElemSend);
}
else
{
// dbgOutputLoc(PUBLISH_TI_LOC);
int curPos;
curPos = snprintf(txBuff, TXBUFFSIZE,
"{\"PUB_Board_ID\": \"%s\", \"Message_ID\""
": \"%"PRIu32"\", \"Object_Detected\": \"",
BOARD_ID, pub_count);
int objectNum = 0;
int i;
for (i = 0; i < 3; i++)
{
if (queueElemRecv.detection[i])
{
objectNum++;
}
}
curPos += snprintf(txBuff + curPos, TXBUFFSIZE,
"%d\", \"PixyCamState\":\"%c\"", objectNum,
queueElemRecv.state + '0');
for (i = 0; i < 3; i++)
{
if (queueElemRecv.detection[i])
{
curPos +=
snprintf(
txBuff + curPos,
TXBUFFSIZE,
", \"TrackingIndex%d\": \"%u\", \"Color%d\": \"%c\", \"Direction%d\": \"%c\", \"Angle%d\": \"%d\", \"Block_Size%d\": \"%u\"",
i, queueElemRecv.trackIndex[i], i,
queueElemRecv.color[i] + '0', i,
queueElemRecv.direction[i] + '0', i,
queueElemRecv.angle[i], i,
queueElemRecv.blockSize[i]);
}
}
curPos += snprintf(txBuff + curPos, TXBUFFSIZE, ""
", \"Dist_Warn\": \"%d\", \"IR_Distance\": \"%d\"}", queueElemRecv.shortDistWarn, queueElemRecv.irDistance);
// snprintf(
// txBuff,
// TXBUFFSIZE,
// "{\"PUB_Board_ID\": \"%s\", \"Message_ID\""
// ": \"%"PRIu32"\", \"Message_Content\": \"%c%c%c%c\", \"Target\": \"Same\", \"Angle\": \"%d\"}",
// BOARD_ID, pub_count, statePixy, isInsight, color,
// direction, queueElemRecv.angle);
// dbgOutputLoc(PUBLISH_STATS_LOC);
lRetVal = MQTTClient_publish(
gMqttClient,
(char*) publish_topic0,
strlen((char*) publish_topic0),
txBuff,
strlen((char*) txBuff),
MQTT_QOS_0
| ((RETAIN_ENABLE) ? MQTT_PUBLISH_RETAIN : 0));
pub_count++;
// dbgOutputLoc(MAIN_MSG_EVENT_OUT_LOC);
msg_send_num++;
}
break;
case DISTANCE_MSG:{
uint8_t handred = (queueElemRecv.distance >> 16);
uint8_t decade = (queueElemRecv.distance >> 8);
uint8_t digit = queueElemRecv.distance;
snprintf(txBuff, TXBUFFSIZE, "{\"Distance\": \"%c%c%c\"}", handred,
decade, digit);
// lRetVal = MQTTClient_publish(
// gMqttClient, (char*) publish_topic0,
// strlen((char*) publish_topic0), txBuff,
// strlen((char*) txBuff),
// MQTT_QOS_0 | ((RETAIN_ENABLE) ? MQTT_PUBLISH_RETAIN : 0));
pub_count++;
// dbgOutputLoc(MAIN_MSG_EVENT_OUT_LOC);
msg_send_num++;
break;
}
case STATS_MSG:
snprintf(
txBuff,
TXBUFFSIZE,
"{\"Board_ID\": \"%s\", \"Number_of_times_publish\": %"PRIu32", \"%s\": %"PRIu32", "
"\"%s\": %"PRIu32", \"%s\": %"PRIu32"}",
BOARD_ID, pub_count, Board1_msg, queueElemRecv.T1_miss_num,
Board2_msg,
queueElemRecv.T2_miss_num, Board3_msg,
queueElemRecv.T3_miss_num);
lRetVal = MQTTClient_publish(
gMqttClient, (char*) publish_topic1,
strlen((char*) publish_topic1), txBuff,
strlen((char*) txBuff),
MQTT_QOS_0 | ((RETAIN_ENABLE) ? MQTT_PUBLISH_RETAIN : 0));
break;
default:
// dbgOutputLoc(MAIN_DEFAULT_EVENT_LOC);
break;
}
//dbgOutputLoc(MQTT_CLIENT_POST_LOC);
}
}
//*****************************************************************************
//
//! This function connect the MQTT device to an AP with the SSID which was
//! configured in SSID_NAME definition which can be found in Network_if.h file,
//! if the device can't connect to to this AP a request from the user for other
//! SSID will appear.
//!
//! \param none
//!
//! \return None
//!
//*****************************************************************************
int32_t Mqtt_IF_Connect(timer_t* g_timer)
{
int32_t lRetVal;
char SSID_Remote_Name[32];
int8_t Str_Length;
memset(SSID_Remote_Name, '\0', sizeof(SSID_Remote_Name));
Str_Length = strlen(SSID_NAME);
if (Str_Length)
{
/*Copy the Default SSID to the local variable */
strncpy(SSID_Remote_Name, SSID_NAME, Str_Length);
}
/*Display Application Banner */
// DisplayBanner(APPLICATION_NAME);
GPIO_write(Board_GPIO_LED0, Board_GPIO_LED_OFF);
// GPIO_write(Board_GPIO_LED1, Board_GPIO_LED_OFF);
// GPIO_write(Board_GPIO_LED2, Board_GPIO_LED_OFF);
/*Reset The state of the machine */
Network_IF_ResetMCUStateMachine();
/*Start the driver */
lRetVal = Network_IF_InitDriver(ROLE_STA);
if (lRetVal < 0)
{
// UART_PRINT("Failed to start SimpleLink Device\n\r", lRetVal);
return (-1);
}
/*switch on Board_GPIO_LED2 to indicate Simplelink is properly up. */
// GPIO_write(Board_GPIO_LED2, Board_GPIO_LED_ON);
/*Start Timer to blink Board_GPIO_LED0 till AP connection */
//LedTimerConfigNStart(g_timer);
/*Initialize AP security params */
SecurityParams.Key = (signed char *) SECURITY_KEY;
SecurityParams.KeyLen = strlen(SECURITY_KEY);
SecurityParams.Type = SECURITY_TYPE;
/*Connect to the Access Point */
lRetVal = Network_IF_ConnectAP(SSID_Remote_Name, SecurityParams);
if (lRetVal < 0)
{
//UART_PRINT("Connection to an AP failed\n\r");
return (-1);
}
/*Disable the LED blinking Timer as Device is connected to AP. */
//LedTimerDeinitStop(g_timer);
/*Switch ON Board_GPIO_LED0 to indicate that Device acquired an IP. */
GPIO_write(Board_GPIO_LED0, Board_GPIO_LED_ON);
sleep(1);
GPIO_write(Board_GPIO_LED0, Board_GPIO_LED_OFF);
// GPIO_write(Board_GPIO_LED1, Board_GPIO_LED_OFF);
// GPIO_write(Board_GPIO_LED2, Board_GPIO_LED_OFF);
return (0);
}
//*****************************************************************************
//!
//! MQTT Start - Initialize and create all the items required to run the MQTT
//! protocol
//!
//! \param none
//!
//! \return None
//!
//*****************************************************************************
void Mqtt_start()
{
TaskHandle_t testHandle = xTaskGetCurrentTaskHandle();
// dbgOutputLoc(MQTT_START_PRE_LOC);
pthread_t mqttThread = (pthread_t) NULL;
int32_t threadArg = 100;
pthread_attr_t pAttrs;
struct sched_param priParam;
int32_t retc = 0;
// mq_attr attr;
// unsigned mode = 0;
pthread_t statisticsThread = (pthread_t) NULL;
// TimerHandle_t mqtt_timer_handle = init_timer();
// xTimerStart(mqtt_timer_handle, portMAX_DELAY);
// createMqttQ();
// createRoverQ();
// createStatQ();
/*Set priority and stack size attributes */
pthread_attr_init(&pAttrs);
priParam.sched_priority = 2;
retc = pthread_attr_setschedparam(&pAttrs, &priParam);
retc |= pthread_attr_setstacksize(&pAttrs, MQTTTHREADSIZE);
retc |= pthread_attr_setdetachstate(&pAttrs, PTHREAD_CREATE_DETACHED);
if (retc != 0)
{
//gInitState &= ~MQTT_INIT_STATE;
// UART_PRINT("MQTT thread create fail\n\r");
//return;
dbgTerminalError();
}
retc = pthread_create(&mqttThread, &pAttrs, MqttClient,
(void *) &threadArg);
if (retc != 0)
{
// gInitState &= ~MQTT_INIT_STATE;
// // UART_PRINT("MQTT thread create fail\n\r");
// return;
dbgTerminalError();
}
retc = pthread_create(&statisticsThread, &pAttrs, statisticsTask,
(void *) &threadArg);
if (retc != 0)
{
// gInitState &= ~MQTT_INIT_STATE;
// // UART_PRINT("Read from thread create fail\n\r");
// return;
dbgTerminalError();
}
// gInitState &= ~MQTT_INIT_STATE;
// dbgOutputLoc(MQTT_START_POST_LOC);
}
//*****************************************************************************
//!
//! MQTT Stop - Close the client instance and free all the items required to
//! run the MQTT protocol
//!
//! \param none
//!
//! \return None
//!
//*****************************************************************************
void Mqtt_Stop()
{
msgQueue_t2 queueElement;
// if (gApConnectionState >= 0)
// {
Mqtt_ClientStop(1);
// }
// dbgOutputLoc(MQTT_STOP_PRE_SEND_LOC);
queueElement.event = THREAD_TERMINATE_REQ;
// queueElement.msgPtr = NULL;
/*write message indicating publish message */
//if(MQTT_SendMsgToQueue(&queueElement))
if (sendMsgToMqtt(&queueElement))
{
// UART_PRINT(
// "\n\n\rQueue is full, throw first msg and send the new one\n\n\r");
// mq_receive(g_PBQueue, (char*) &queueElemRecv, sizeof(struct msgQueue),
// NULL);
//MQTT_SendMsgToQueue(&queueElement);
}
// sleep(2);
// mq_close(g_PBQueue);
// g_PBQueue = NULL;
sl_Stop(SL_STOP_TIMEOUT);
//UART_PRINT("\n\r Client Stop completed\r\n");
}
int32_t MqttClient_start()
{
// dbgOutputLoc(MQTT_CLIENT_START_PRE_LOC);
TaskHandle_t testHandle = xTaskGetCurrentTaskHandle();
int32_t lRetVal = -1;
pthread_t g_rx_task_hndl = (pthread_t) NULL;
int32_t threadArg = 100;
pthread_attr_t pAttrs;
struct sched_param priParam;
uint32_t gUiConnFlag = 0;
MqttClientExmple_params.clientId = ClientId;
MqttClientExmple_params.connParams = &Mqtt_ClientCtx;
MqttClientExmple_params.mqttMode31 = MQTT_3_1;
MqttClientExmple_params.blockingSend = true;
// gInitState |= CLIENT_INIT_STATE;
/*Initialize MQTT client lib */
gMqttClient = MQTTClient_create(MqttClientCallback,
&MqttClientExmple_params);
if (gMqttClient == NULL)
{
/*lib initialization failed */
// gInitState &= ~CLIENT_INIT_STATE;
return (-1);
// dbgTerminalError();
}
/*Open Client Receive Thread start the receive task. Set priority and */
/*stack size attributes */
pthread_attr_init(&pAttrs);
priParam.sched_priority = 2;
lRetVal = pthread_attr_setschedparam(&pAttrs, &priParam);
lRetVal |= pthread_attr_setstacksize(&pAttrs, RXTASKSIZE);
lRetVal |= pthread_attr_setdetachstate(&pAttrs, PTHREAD_CREATE_DETACHED);
lRetVal |= pthread_create(&g_rx_task_hndl, &pAttrs, MqttClientThread,
(void *) &threadArg);
if (lRetVal != 0)
{
//UART_PRINT("Client Thread Create Failed failed\n\r");
// gInitState &= ~CLIENT_INIT_STATE;
return (-1);
// dbgTerminalError();
}
#ifdef SECURE_CLIENT
setTime();
#endif
/*setting will parameters */
MQTTClient_set(gMqttClient, MQTTClient_WILL_PARAM, &will_param,
sizeof(will_param));
#ifdef CLNT_USR_PWD
/*Set user name for client connection */
MQTTClient_set(gMqttClient, MQTTClient_USER_NAME, (void *)ClientUsername,
strlen(
(char*)ClientUsername));
/*Set password */
MQTTClient_set(gMqttClient, MQTTClient_PASSWORD, (void *)ClientPassword,
strlen(
(char*)ClientPassword));
#endif
/*Initiate MQTT Connect */
// if (gApConnectionState >= 0)
// {
#if CLEAN_SESSION == false
bool clean = CLEAN_SESSION;
MQTTClient_set(gMqttClient, MQTTClient_CLEAN_CONNECT, (void *)&clean,
sizeof(bool));
#endif
/*The return code of MQTTClient_connect is the ConnACK value that
returns from the server */
lRetVal = MQTTClient_connect(gMqttClient);
/*negative lRetVal means error,
0 means connection successful without session stored by the server,
greater than 0 means successful connection with session stored by
the server */
if (0 > lRetVal)
{
/*lib initialization failed */
//UART_PRINT("Connection to broker failed, Error code: %u\n\r",
// lRetVal);
gUiConnFlag = 0;
}
else
{
gUiConnFlag = 1;
}
/*Subscribe to topics when session is not stored by the server */
if ((gUiConnFlag == 1) && (0 == lRetVal))
{
uint8_t subIndex;
MQTTClient_SubscribeParams subscriptionInfo[SUBSCRIPTION_TOPIC_COUNT];
for (subIndex = 0; subIndex < SUBSCRIPTION_TOPIC_COUNT; subIndex++)
{
subscriptionInfo[subIndex].topic = topic[subIndex];
subscriptionInfo[subIndex].qos = qos[subIndex];
}
if (MQTTClient_subscribe(gMqttClient, subscriptionInfo,
SUBSCRIPTION_TOPIC_COUNT) < 0)
{
//UART_PRINT("\n\r Subscription Error \n\r");
MQTTClient_disconnect(gMqttClient);
gUiConnFlag = 0;
}
}
// }
// gInitState &= ~CLIENT_INIT_STATE;
// dbgOutputLoc(MQTT_CLIENT_START_POST_LOC);
return (0);
}
//*****************************************************************************
//!
//! MQTT Client stop - Unsubscribe from the subscription topics and exit the
//! MQTT client lib.
//!
//! \param none
//!
//! \return None
//!
//*****************************************************************************