Skip to main content
Explorer
April 25, 2024
Solved

STM32 MQTT

  • April 25, 2024
  • 4 replies
  • 3936 views

Hi, 

I'm using a STM32F407VGTx, trying to use MQTT on this microcontroller.


I first made a test program on a NUCLEO dev kit (also a F4 controller) and it worked.
Now I get an error and it doesn't want to connect to my PC which is the broker. I have tried the following things:

- I have made sure my firewall doesn't block port 1883 en 8883.

- I can ping the controller.

- I made an extra init flag to make sure that the init code runs first before trying to subscribe or connect to anything.

 

I got the following error first: ERR_RTE = -4 which is a routing problem.

But his has changed and now I get:

Assertion "mqtt_sub_unsub: client != NULL" failed at line 1183 in ../Middlewares/Third_Party/LwIP/src/apps/mqtt/mqtt.c

(so I'm not sure if the previous error is fixed)

 

I'm not immediately sure what the problem can be. For more context i'm using freertos and have given the task a stack size of 256 words. I have included the code from the task. Anyone have any ideas?

 

Kind regards, 

William

    This topic has been closed for replies.
    Best answer by WilliamVR

    Well, I'm not sure, but I think that the MQTT software that comes with lwip in cubeIDE is not thread save and should not be used in combination with freertos. My time for working on this project is up, but what I think a solution can be is: switch to coreMQTT and the necessary dependencies. 

     

    I don't know if anyone from ST can confirm this.

    4 replies

    ST Employee
    April 29, 2024

    Hello

    In lwip initialisation, after adding a network interface with netif_add(), do you set it as default interface ?

    netif_set_default(&netif);

     

    WilliamVRAuthor
    Explorer
    April 29, 2024

    Hi I use the default generated code the netif_set_default is present.

    void MX_LWIP_Init(void)
    {
     /* IP addresses initialization */
     IP_ADDRESS[0] = 192;
     IP_ADDRESS[1] = 168;
     IP_ADDRESS[2] = 0;
     IP_ADDRESS[3] = 2;
     NETMASK_ADDRESS[0] = 255;
     NETMASK_ADDRESS[1] = 255;
     NETMASK_ADDRESS[2] = 255;
     NETMASK_ADDRESS[3] = 0;
     GATEWAY_ADDRESS[0] = 192;
     GATEWAY_ADDRESS[1] = 168;
     GATEWAY_ADDRESS[2] = 0;
     GATEWAY_ADDRESS[3] = 1;
    
    /* USER CODE BEGIN IP_ADDRESSES */
    /* USER CODE END IP_ADDRESSES */
    
     /* Initialize the LwIP stack with RTOS */
     tcpip_init( NULL, NULL );
    
     /* IP addresses initialization without DHCP (IPv4) */
     IP4_ADDR(&ipaddr, IP_ADDRESS[0], IP_ADDRESS[1], IP_ADDRESS[2], IP_ADDRESS[3]);
     IP4_ADDR(&netmask, NETMASK_ADDRESS[0], NETMASK_ADDRESS[1] , NETMASK_ADDRESS[2], NETMASK_ADDRESS[3]);
     IP4_ADDR(&gw, GATEWAY_ADDRESS[0], GATEWAY_ADDRESS[1], GATEWAY_ADDRESS[2], GATEWAY_ADDRESS[3]);
    
     /* add the network interface (IPv4/IPv6) with RTOS */
     netif_add(&gnetif, &ipaddr, &netmask, &gw, NULL, &ethernetif_init, &tcpip_input);
    
     /* Registers the default network interface */
     netif_set_default(&gnetif); //<- here
    
     /* We must always bring the network interface up connection or not... */
     netif_set_up(&gnetif);
    
     /* Set the link callback function, this function is called on change of link status*/
     netif_set_link_callback(&gnetif, ethernet_link_status_updated);
    
     /* Create the Ethernet link handler thread */
    /* USER CODE BEGIN H7_OS_THREAD_DEF_CREATE_CMSIS_RTOS_V1 */
     osThreadDef(EthLink, ethernet_link_thread, osPriorityBelowNormal, 0, configMINIMAL_STACK_SIZE *2);
     osThreadCreate (osThread(EthLink), &gnetif);
    /* USER CODE END H7_OS_THREAD_DEF_CREATE_CMSIS_RTOS_V1 */
    
    /* USER CODE BEGIN 3 */
    
    /* USER CODE END 3 */
    }

     

    WilliamVRAuthor
    Explorer
    May 3, 2024

    I havent fixed to problem but what I suspect is that the freertos task doesn't get enougf memory allocated. I have seen it subscribe without any errors when I commented a large piece of code from another task. I will investigate it further and leave my findings here.

    WilliamVRAuthorAnswer
    Explorer
    June 11, 2024

    Well, I'm not sure, but I think that the MQTT software that comes with lwip in cubeIDE is not thread save and should not be used in combination with freertos. My time for working on this project is up, but what I think a solution can be is: switch to coreMQTT and the necessary dependencies. 

     

    I don't know if anyone from ST can confirm this.

    WilliamVRAuthor
    Explorer
    May 16, 2024

    oké,

    I have gotten some things to work. The problem was that my task stack size wasn't big enough. I couldn't make this bigger because I put my TOTAL_HEAP_SIZE to big. The current problem I still face is that I can publish data but not receive data.

    If anyone sees this this is my code atm:

     

    /*
     * MQTT_Task.c
     *
     * Created on: Mar 21, 2024
     * Author: William Van Raemdonck
     */
    #include "main.h"
    #include "./TasksInc/MQTT_Task.h"
    #include "./TasksInc/IO_Task.h"
    #include "./configIRIS.h"
    #include "lwip/apps/mqtt.h"
    #include "lwip.h"
    #include "lwip/ip.h"
    #include "lwip/ip_addr.h"
    #include "string.h"
    #include "stm32f4xx_hal.h"
    
    
    char Topic[] = "/IRIS";
    
    // Digital Inputs
    const char* digitalInputs[] = {
    		"/digital/input/digitalInput_0",
    		"/digital/input/digitalInput_1",
    		"/digital/input/digitalInput_2",
    		"/digital/input/digitalInput_3",
    		"/digital/input/digitalInput_4",
    		"/digital/input/digitalInput_5",
    		"/digital/input/digitalInput_6",
    		"/digital/input/digitalInput_7",
    		"/digital/input/digitalInput_8",
    		"/digital/input/digitalInput_9",
    		"/digital/input/digitalInput_10",
    		"/digital/input/digitalInput_11",
    		"/digital/input/digitalInput_12",
    		"/digital/input/digitalInput_13",
    		"/digital/input/digitalInput_14",
    		"/digital/input/digitalInput_15",
    		"/digital/input/digitalInput_16",
    		"/digital/input/digitalInput_17",
    		"/digital/input/digitalInput_18",
    		"/digital/input/digitalInput_19",
    		"/digital/input/digitalInput_20",
    		"/digital/input/digitalInput_21",
    		"/digital/input/digitalInput_22",
    		"/digital/input/digitalInput_23"
    };
    
    // Analog Inputs
    const char* analogInputs[] = {
    		"/analogue/input/adc_0-50V_0",
    		"/analogue/input/adc_0-50V_1",
    		"/analogue/input/adc_0-50V_2",
    		"/analogue/input/adc_0-50V_3",
    		"/analogue/input/adc_0-50V_4",
    		"/analogue/input/adc_0-50V_5",
    		"/analogue/input/adc_0-50V_6",
    		"/analogue/input/adc_0-50V_7"
    };
    
    // Digital Outputs
    const char* digitalOutputsHighSide[] = {
    		"/digital/output/highside_0",
    		"/digital/output/highside_1",
    		"/digital/output/highside_2",
    		"/digital/output/highside_3",
    		"/digital/output/highside_4",
    		"/digital/output/highside_5",
    		"/digital/output/highside_6",
    		"/digital/output/highside_7"
    };
    
    const char* digitalOutputsOpenCollector[] = {
    		"/digital/output/oc_0",
    		"/digital/output/oc_1",
    		"/digital/output/oc_2",
    		"/digital/output/oc_3",
    		"/digital/output/oc_4",
    		"/digital/output/oc_5",
    		"/digital/output/oc_6",
    		"/digital/output/oc_7"
    };
    
    const char* digitalOutputsRelais[] = {
    		"/digital/output/relais_0",
    		"/digital/output/relais_1",
    		"/digital/output/relais_2",
    		"/digital/output/relais_3",
    		"/digital/output/relais_4",
    		"/digital/output/relais_5",
    		"/digital/output/relais_6",
    		"/digital/output/relais_7"
    };
    
    mqtt_client_t *client;
    
    extern osMutexId UART_MutexHandle;
    
    ip4_addr_t ipAddrBroker;
    uint8_t IP_ADDRESS_BROKER[4];
    char MQTTusername[64];
    char MQTTpassword[64];
    char MQTTid[64];
    
    char debugMQTT[128];
    
    char* receivedMqttCommand;
    char receivedDA[20];
    char receivedIO[20];
    char receivedIndex[20];
    
    extern osMessageQId IO_OUTPUT_PIN_QUEUEHandle;
    extern osMessageQId IO_OUTPUT_DATA_QUEUEHandle;
    
    //CONFIG struct
    extern IRISConfigstr configIRIS;
    
    int initFlagMQTT = 0;
    
    void MQTT_Init(void){
    	//TODO
    	IP_ADDRESS_BROKER[0] = 192;	//configIRIS.MQTTbroker[0]
    	IP_ADDRESS_BROKER[1] = 168;	//configIRIS.MQTTbroker[1]
    	IP_ADDRESS_BROKER[2] = 0;	//configIRIS.MQTTbroker[2]
    	IP_ADDRESS_BROKER[3] = 1;	//configIRIS.MQTTbroker[3]
    
    	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
    	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);
    
    	client = mqtt_client_new();
    
    	osPrintf("MQTT", "new cient made");
    
    	if(client != NULL) {
    		example_do_connect(client);
    	}
    
    	initFlagMQTT = 1;
    }
    
    void MQTT_Subscribe(void){
    	char fullTopic[50] = "";
    
    	osPrintf("MQTT", "subscribed to topics:");
    
    	//	SUBSCRIBE to all available outputs
    	//	OC
    	for (int i = 0; i < 8; i++) {
    		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsOpenCollector[i]);
    		mqtt_subscribe(client, fullTopic, 0,0, NULL);
    		osPrintf("MQTT-subscribe", digitalOutputsOpenCollector[i]);
    	}
    
    	//	HSD
    	for (int i = 0; i < 8; i++) {
    		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsHighSide[i]);
    		mqtt_subscribe(client, fullTopic, 0,0, NULL);
    		osPrintf("MQTT-subscribe", digitalOutputsHighSide[i]);
    
    	}
    
    	//	REL
    	for (int i = 0; i < 8; i++) {
    		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsRelais[i]);
    		mqtt_subscribe(client, fullTopic, 0,0, NULL);
    		osPrintf("MQTT-subscribe", digitalOutputsRelais[i]);
    
    	}
    }
    
    void example_do_connect(mqtt_client_t *client)
    {
    	struct mqtt_connect_client_info_t ci;
    	err_t err;
    
    	/* Setup an empty client info structure */
    	memset(&ci, 0, sizeof(ci));
    
    	/* Minimal amount of information required is client identifier, so set it here */
    	ci.client_id = "IRIS";
    	ci.client_user = "username";
    	ci.client_pass = "password";
    
    	strcpy(MQTTusername, ci.client_user);
    	strcpy(MQTTpassword, ci.client_pass);
    	strcpy(MQTTid, ci.client_id);
    
    	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
    	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);
    
    	/* Initiate client and connect to server, if this fails immediately an error code is returned
     otherwise mqtt_connection_cb will be called with connection result after attempting
     to establish a connection with the server.
     For now MQTT version 3.1.1 is always used */
    
    	err = mqtt_client_connect(client, &ipAddrBroker, 1883, mqtt_connection_cb, 0, &ci);
    
    	/* For now just print the result code if something goes wrong */
    	if(err != ERR_OK) {
    		osPrintf("MQTT", "mqtt_connect return: ");
    		itoa(err, debugMQTT, 10);
    		osPrintf("MQTT", debugMQTT);
    	}
    }
    
    static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
    {
    	err_t err;
    	if(status == MQTT_CONNECT_ACCEPTED) {
    		osPrintf("MQTT", "mqtt_connection_cb: Successfully connected\r\n");
    
    		/* Setup callback for incoming publish requests */
    		mqtt_set_inpub_callback(client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, arg);
    
    		/* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */
    		err = mqtt_subscribe(client, "subtopic", 1, mqtt_sub_request_cb, arg);
    
    		//subscribe to own topics
    		MQTT_Subscribe();
    
    		if(err != ERR_OK) {
    			snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_subscribe return: %d", err);
    			osPrintf("MQTT", debugMQTT);
    		}
    	}
    	else
    	{
    		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_connection_cb: Disconnected, reason: %d", status);
    		osPrintf("MQTT", debugMQTT);
    
    		/* Its more nice to be connected, so try to reconnect */
    		example_do_connect(client);
    	}
    }
    
    static void mqtt_sub_request_cb(void *arg, err_t result)
    {
    	/* Just print the result code here for simplicity,
     normal behaviour would be to take some action if subscribe fails like
     notifying user, retry subscribe or disconnect from server */
    
    	snprintf(debugMQTT, sizeof(debugMQTT), "Subscribe result: %d (0 = ERR_OK)", result);
    	osPrintf("MQTT", debugMQTT);
    }
    
    
    //-----------------------------------------------------------------------------------
    //	INCOMIMING
    
    
    static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
    {
    	char inpub_type[20];
    	int inpub_pin = 0;
    	int index = 0;
    	char *parts[4]; // Array to store extracted parts
    
    	//DEBUG
    	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish at topic: %s", topic);
    	osPrintf("MQTT", debugMQTT);
    
    	//extract pin number and type
    	receivedMqttCommand = strtok(topic, "/");
    	while (receivedMqttCommand != NULL && index < 4) {
    		parts[index++] = receivedMqttCommand;
    		receivedMqttCommand = strtok(NULL, "/");
    	}
    	sscanf(parts[3], "%[^_]_%d", inpub_type, &inpub_pin);
    
    
    	if(strcmp(parts[0], &Topic[1]) != 0){
    		osPrintf("debug", "received MQTT packet not for this device or wrong topic");
    	}
    	else{
    		snprintf(debugMQTT, sizeof(debugMQTT), "type: %s, pin: %d", inpub_type, inpub_pin);
    		osPrintf("MQTT", debugMQTT);
    
    		uint8_t offset = 0;
    		if(strcmp(inpub_type, "highside") == 0){
    			offset = HIGHSIDE_0;
    		}
    		else if(strcmp(inpub_type, "oc") == 0){
    			offset = OPEN_COLLECTOR_0;
    		}
    		else if(strcmp(inpub_type, "relay") == 0){
    			offset = RELAY_0;
    		}
    		else{
    			osPrintf("ERROR", "MQTT - Not a valid topic");
    		}
    
    		osMessagePut(IO_OUTPUT_PIN_QUEUEHandle, offset + inpub_pin, HAL_MAX_DELAY);
    
    		//osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, inpub_type, HAL_MAX_DELAY);
    	}
    }
    
    static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
    {
    	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish payload with length: %d with flags: %d", len, (unsigned int)flags);
    	osPrintf("MQTT", debugMQTT);
    
    	if(flags & MQTT_DATA_FLAG_LAST) {
    		/* Last fragment of payload received (or whole part if payload fits receive buffer
     See MQTT_VAR_HEADER_BUFFER_LEN) */
    		uint8_t dataNew = 0;
    
    
    		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_incoming_data_cb: %s", (char*)data);
    		osPrintf("MQTT", debugMQTT);
    
    		dataNew = atoi((char*)data);
    		osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, dataNew, HAL_MAX_DELAY);
    
    	} else {
    		/* Handle fragmented payload, store in buffer, write to file or whatever */
    	}
    }
    
    void example_publish(mqtt_client_t *client, void *arg, const char* subTopic)
    {
    	const char *pub_payload= "1";	//tmp
    	err_t err;
    	u8_t qos = 2; /* 0 1 or 2, see MQTT specification */
    	u8_t retain = 0; /* No don't retain such crappy payload... */
    
    	char fullTopic[50] = "";
    	strcat(fullTopic, Topic);
    	strcat(fullTopic, subTopic);
    
    	snprintf(fullTopic, 50, "%s%s", Topic, subTopic);
    
    
    	err = mqtt_publish(client, fullTopic, pub_payload, strlen(pub_payload), qos, retain, mqtt_pub_request_cb, arg);
    
    	//strcat(fullTopic, "\r\n");
    	//osPrintf("MQTT", "TOPIC:" );
    	//osPrintf("MQTT", fullTopic);
    
    	if(err != ERR_OK) {
    		if(err != ERR_MEM){
    			snprintf(debugMQTT, sizeof(debugMQTT), "Publish err: %d", err);
    			osPrintf("MQTT", debugMQTT);
    		}
    	}
    }
    
    /* Called when publish is complete either with sucess or failure */
    static void mqtt_pub_request_cb(void *arg, err_t result)
    {
    	if(result != ERR_OK) {
    		snprintf(debugMQTT, sizeof(debugMQTT), "Publish result: %d", result);
    		osPrintf("MQTT",debugMQTT);
    	}
    	else{
    		osPrintf("MQTT", "Publish result: Success!");
    	}
    }
    
    
    void MQTT_Task(void){
    	example_publish(client,	NULL, digitalInputs[0]);
    }
    
    
    
    
    
    
    
    
    
    
    
    
    
    

     

    WilliamVRAuthor
    Explorer
    May 31, 2024

    So the receiving was fixed! 
    It turn out my RAM allocation wasn't just right.

    But now there is a diffrent problem.
    After a while of publishen data it stops sending and gets a ERR_MEM.
    This is because the tcp send buffer for lwip fills up and overflows.
    I don't know if there is a way to detect this.

    WilliamVR_0-1717140375877.png

    This is what wireshark sees. After this retransmission it stops working. It handles other retransmissions just fine.
    This is my code atm.

    All help is welcome!


    /*
     * MQTT_Task.c
     *
     * Created on: Mar 21, 2024
     * Author: William Van Raemdonck
     */
    #include "main.h"
    #include "./TasksInc/MQTT_Task.h"
    #include "./TasksInc/IO_Task.h"
    #include "./configIRIS.h"
    #include "lwip/apps/mqtt.h"
    #include "lwip.h"
    #include "lwip/ip.h"
    #include "lwip/ip_addr.h"
    #include "string.h"
    #include "stm32f4xx_hal.h"
    
    
    char Topic[] = "/IRIS";
    
    // Digital Inputs
    const char* digitalInputs[] = {
    		"/digital/input/digitalInput_0",
    		"/digital/input/digitalInput_1",
    		"/digital/input/digitalInput_2",
    		"/digital/input/digitalInput_3",
    		"/digital/input/digitalInput_4",
    		"/digital/input/digitalInput_5",
    		"/digital/input/digitalInput_6",
    		"/digital/input/digitalInput_7",
    		"/digital/input/digitalInput_8",
    		"/digital/input/digitalInput_9",
    		"/digital/input/digitalInput_10",
    		"/digital/input/digitalInput_11",
    		"/digital/input/digitalInput_12",
    		"/digital/input/digitalInput_13",
    		"/digital/input/digitalInput_14",
    		"/digital/input/digitalInput_15",
    		"/digital/input/digitalInput_16",
    		"/digital/input/digitalInput_17",
    		"/digital/input/digitalInput_18",
    		"/digital/input/digitalInput_19",
    		"/digital/input/digitalInput_20",
    		"/digital/input/digitalInput_21",
    		"/digital/input/digitalInput_22",
    		"/digital/input/digitalInput_23"
    };
    
    // Analog Inputs
    const char* analogInputs[] = {
    		"/analogue/input/adc_0-50V_0",
    		"/analogue/input/adc_0-50V_1",
    		"/analogue/input/adc_0-50V_2",
    		"/analogue/input/adc_0-50V_3",
    		"/analogue/input/adc_0-50V_4",
    		"/analogue/input/adc_0-50V_5",
    		"/analogue/input/adc_0-50V_6",
    		"/analogue/input/adc_0-50V_7"
    };
    
    // Digital Outputs
    const char* digitalOutputsHighSide[] = {
    		"/digital/output/highside_0",
    		"/digital/output/highside_1",
    		"/digital/output/highside_2",
    		"/digital/output/highside_3",
    		"/digital/output/highside_4",
    		"/digital/output/highside_5",
    		"/digital/output/highside_6",
    		"/digital/output/highside_7"
    };
    
    const char* digitalOutputsOpenCollector[] = {
    		"/digital/output/oc_0",
    		"/digital/output/oc_1",
    		"/digital/output/oc_2",
    		"/digital/output/oc_3",
    		"/digital/output/oc_4",
    		"/digital/output/oc_5",
    		"/digital/output/oc_6",
    		"/digital/output/oc_7"
    };
    
    const char* digitalOutputsRelais[] = {
    		"/digital/output/relais_0",
    		"/digital/output/relais_1",
    		"/digital/output/relais_2",
    		"/digital/output/relais_3",
    		"/digital/output/relais_4",
    		"/digital/output/relais_5",
    		"/digital/output/relais_6",
    		"/digital/output/relais_7"
    };
    
    mqtt_client_t *client;
    
    extern osMutexId UART_MutexHandle;
    
    ip4_addr_t ipAddrBroker;
    uint8_t IP_ADDRESS_BROKER[4];
    char MQTTusername[64];
    char MQTTpassword[64];
    char MQTTid[64];
    
    char debugMQTT[128];
    
    char* receivedMqttCommand;
    char receivedDA[20];
    char receivedIO[20];
    char receivedIndex[20];
    
    extern osMessageQId IO_OUTPUT_PIN_QUEUEHandle;
    extern osMessageQId IO_OUTPUT_DATA_QUEUEHandle;
    
    //CONFIG struct
    extern IRISConfigstr configIRIS;
    
    //IO
    extern uint8_t digitalInputsFlash[24];
    extern uint8_t openCollectorFlash[8];
    extern uint8_t highsideDriverFlash[8];
    extern uint8_t relaisFlash[8];
    extern float ADCInputFlash[8];
    
    int initFlagMQTT = 0;
    
    void MQTT_Init(void){
    	//TODO
    	IP_ADDRESS_BROKER[0] = 192;	//configIRIS.MQTTbroker[0]
    	IP_ADDRESS_BROKER[1] = 168;	//configIRIS.MQTTbroker[1]
    	IP_ADDRESS_BROKER[2] = 0;	//configIRIS.MQTTbroker[2]
    	IP_ADDRESS_BROKER[3] = 1;	//configIRIS.MQTTbroker[3]
    
    	configIRIS.MQTTbroker[0] = 192;
    	configIRIS.MQTTbroker[1] = 168;
    	configIRIS.MQTTbroker[2] = 0;
    	configIRIS.MQTTbroker[3] = 1;
    
    	configIRIS.MQTTport = 1883;
    
    	strcpy(configIRIS.MQTTtopic, "/IRIS");
    
    
    	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
    	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);
    
    	client = mqtt_client_new();
    
    	osPrintf("MQTT", "new cient made");
    
    	if(client != NULL) {
    		example_do_connect(client);
    	}
    
    	initFlagMQTT = 1;
    }
    
    void MQTT_Subscribe(void){
    	char fullTopic[50] = "";
    
    	osPrintf("MQTT", "subscribed to topics:");
    
    	//	SUBSCRIBE to all available outputs
    	//	OC
    	for (int i = 0; i < 8; i++) {
    		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsOpenCollector[i]);
    		mqtt_subscribe(client, fullTopic, 0,0, NULL);
    		osPrintf("MQTT-subscribe", digitalOutputsOpenCollector[i]);
    	}
    
    	//	HSD
    	for (int i = 0; i < 8; i++) {
    		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsHighSide[i]);
    		mqtt_subscribe(client, fullTopic, 0,0, NULL);
    		osPrintf("MQTT-subscribe", digitalOutputsHighSide[i]);
    
    	}
    
    	//	REL
    	for (int i = 0; i < 8; i++) {
    		snprintf(fullTopic, 50, "%s%s", Topic, digitalOutputsRelais[i]);
    		mqtt_subscribe(client, fullTopic, 0,0, NULL);
    		osPrintf("MQTT-subscribe", digitalOutputsRelais[i]);
    
    	}
    }
    
    void example_do_connect(mqtt_client_t *client)
    {
    	struct mqtt_connect_client_info_t ci;
    	err_t err;
    
    	/* Setup an empty client info structure */
    	memset(&ci, 0, sizeof(ci));
    
    	/* Minimal amount of information required is client identifier, so set it here */
    	ci.client_id = "IRIS";
    	ci.client_user = "username";
    	ci.client_pass = "password";
    
    	strcpy(MQTTusername, ci.client_user);
    	strcpy(MQTTpassword, ci.client_pass);
    	strcpy(MQTTid, ci.client_id);
    
    	IP4_ADDR(&ipAddrBroker, IP_ADDRESS_BROKER[0], IP_ADDRESS_BROKER[1], IP_ADDRESS_BROKER[2], IP_ADDRESS_BROKER[3]);
    	IP_SET_TYPE_VAL(*(ipAddrBroker), IPADDR_TYPE_V4);
    
    	/* Initiate client and connect to server, if this fails immediately an error code is returned
     otherwise mqtt_connection_cb will be called with connection result after attempting
     to establish a connection with the server.
     For now MQTT version 3.1.1 is always used */
    
    	err = mqtt_client_connect(client, &ipAddrBroker, 1883, mqtt_connection_cb, 0, &ci);
    
    	/* For now just print the result code if something goes wrong */
    	if(err != ERR_OK) {
    		osPrintf("MQTT", "mqtt_connect return: ");
    		itoa(err, debugMQTT, 10);
    		osPrintf("MQTT", debugMQTT);
    	}
    }
    
    static void mqtt_connection_cb(mqtt_client_t *client, void *arg, mqtt_connection_status_t status)
    {
    	err_t err;
    	if(status == MQTT_CONNECT_ACCEPTED) {
    		osPrintf("MQTT", "mqtt_connection_cb: Successfully connected\r\n");
    
    		/* Setup callback for incoming publish requests */
    		mqtt_set_inpub_callback(client, mqtt_incoming_publish_cb, mqtt_incoming_data_cb, arg);
    
    		/* Subscribe to a topic named "subtopic" with QoS level 1, call mqtt_sub_request_cb with result */
    		err = mqtt_subscribe(client, "subtopic", 1, mqtt_sub_request_cb, arg);
    
    		//subscribe to own topics
    		MQTT_Subscribe();
    
    		if(err != ERR_OK) {
    			snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_subscribe return: %d", err);
    			osPrintf("MQTT", debugMQTT);
    		}
    	}
    	else
    	{
    		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_connection_cb: Disconnected, reason: %d", status);
    		osPrintf("MQTT", debugMQTT);
    
    		/* Its more nice to be connected, so try to reconnect */
    		example_do_connect(client);
    	}
    }
    
    static void mqtt_sub_request_cb(void *arg, err_t result)
    {
    	/* Just print the result code here for simplicity,
     normal behaviour would be to take some action if subscribe fails like
     notifying user, retry subscribe or disconnect from server */
    
    	snprintf(debugMQTT, sizeof(debugMQTT), "Subscribe result: %d (0 = ERR_OK)", result);
    	osPrintf("MQTT", debugMQTT);
    }
    
    
    //-----------------------------------------------------------------------------------
    //	INCOMIMING
    
    
    static void mqtt_incoming_publish_cb(void *arg, const char *topic, u32_t tot_len)
    {
    	char inpub_type[20];
    	int inpub_pin = 0;
    	int index = 0;
    	char *parts[4]; // Array to store extracted parts
    
    	//DEBUG
    	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish at topic: %s", topic);
    	osPrintf("MQTT", debugMQTT);
    
    	//extract pin number and type
    	receivedMqttCommand = strtok(topic, "/");
    	while (receivedMqttCommand != NULL && index < 4) {
    		parts[index++] = receivedMqttCommand;
    		receivedMqttCommand = strtok(NULL, "/");
    	}
    	sscanf(parts[3], "%[^_]_%d", inpub_type, &inpub_pin);
    
    
    	if(strcmp(parts[0], &Topic[1]) != 0){
    		osPrintf("debug", "received MQTT packet not for this device or wrong topic");
    	}
    	else{
    		snprintf(debugMQTT, sizeof(debugMQTT), "type: %s, pin: %d", inpub_type, inpub_pin);
    		osPrintf("MQTT", debugMQTT);
    
    		uint8_t offset = 0;
    		if(strcmp(inpub_type, "highside") == 0){
    			offset = HIGHSIDE_0;
    		}
    		else if(strcmp(inpub_type, "oc") == 0){
    			offset = OPEN_COLLECTOR_0;
    		}
    		else if(strcmp(inpub_type, "relais") == 0){
    			offset = RELAY_0;
    		}
    		else{
    			osPrintf("ERROR", "MQTT - Not a valid topic");
    		}
    
    		osMessagePut(IO_OUTPUT_PIN_QUEUEHandle, offset + inpub_pin, HAL_MAX_DELAY);
    
    		//osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, inpub_type, HAL_MAX_DELAY);
    	}
    }
    
    static void mqtt_incoming_data_cb(void *arg, const u8_t *data, u16_t len, u8_t flags)
    {
    	snprintf(debugMQTT, sizeof(debugMQTT), "Incoming publish payload with length: %d with flags: %d", len, (unsigned int)flags);
    	osPrintf("MQTT", debugMQTT);
    
    	if(flags & MQTT_DATA_FLAG_LAST) {
    		/* Last fragment of payload received (or whole part if payload fits receive buffer
     See MQTT_VAR_HEADER_BUFFER_LEN) */
    		uint8_t dataNew = 0;
    
    
    		snprintf(debugMQTT, sizeof(debugMQTT), "mqtt_incoming_data_cb: %s", (char*)data);
    		osPrintf("MQTT", debugMQTT);
    
    		dataNew = atoi((char*)data);
    		osMessagePut(IO_OUTPUT_DATA_QUEUEHandle, dataNew, HAL_MAX_DELAY);
    
    	} else {
    		/* Handle fragmented payload, store in buffer, write to file or whatever */
    	}
    }
    
    void example_publish(mqtt_client_t *client, void *arg, const char* subTopic)
    {
    	const char *pub_payload= "1";	//tmp
    	err_t err;
    	u8_t qos = 2; /* 0 1 or 2, see MQTT specification */
    	u8_t retain = 0; /* No don't retain such crappy payload... */
    
    	char fullTopic[50] = "";
    	strcat(fullTopic, Topic);
    	strcat(fullTopic, subTopic);
    
    	snprintf(fullTopic, 50, "%s%s", Topic, subTopic);
    
    	snprintf(debugMQTT, sizeof(debugMQTT), "Publish at %s", fullTopic);
    	osPrintf("MQTT",debugMQTT);
    	err = mqtt_publish(client, fullTopic, pub_payload, strlen(pub_payload), qos, retain, mqtt_pub_request_cb, arg);
    
    	if(err != ERR_OK) {
    		snprintf(debugMQTT, sizeof(debugMQTT), "Publish err: %d", err);
    		osPrintf("MQTT", debugMQTT);
    
    		if(err == ERR_MEM){
    			osPrintf("MQTT", "oeps, buffer vol");
    		}
    	}
    }
    
    /* Called when publish is complete either with sucess or failure */
    static void mqtt_pub_request_cb(void *arg, err_t result)
    {
    	if(result != ERR_OK) {
    		snprintf(debugMQTT, sizeof(debugMQTT), "Publish result: %d", result);
    		osPrintf("MQTT",debugMQTT);
    	}
    	else{
    		osPrintf("MQTT", "Publish result: Success!");
    	}
    }
    
    
    void MQTT_Task(void){
    	//MQTT_REQ_MAX_IN_FLIGHT
    	if(initFlagMQTT){
    		//digital inputs
    		for (int i = 0; i < 24; i++) {
    			example_publish(client,	NULL, digitalInputs[i]);
    			HAL_Delay(500);
    		}
    
    		//analog inputs
    		for (int i = 0; i < 8; i++) {
    			example_publish(client,	NULL, analogInputs[i]);
    			HAL_Delay(500);
    		}
    	}
    
    }