Adding retained messages to Adafruit's MQTT library

Recently, when building a weather station based on Adafruit’s Huzzah ESP8266 breakout board, I needed the ability to send messages that should be retained on the MQTT server.

I am using mosquitto on my home server, acting as a message broker.  My weather stations send messages to it and these are then forwarded to the subscribers who have signed up for them.  At the moment, these subscribers are a weather logging system, a current weather display panel on the different computers at home, and, occasionally, a command-line client used for troubleshooting.

As the weather stations are battery powered, they will not run always on, always connected, but they wake up every few minutes, check the weather, and transmit the results to the MQTT server before they go to sleep again.

Any subscribers will receive these messages as soon as they are sent.  However, when a subscriber starts running and connects to the MQTT server, it will not receive any messages until the relevant weather station sends one.  By default, there’s no concept of the most recent values.  This is kind of inconvenient for the weather panel application on the computers, as they will not show any values until the next time the weather station wakes up.  Depending on the battery levels and configuration, that could be quite a while…

To do that, MQTT supports retained messages.  These are messages that have a special flag set which tells the server they can be retained.  A subscriber that connects will then get the most recent retained message as soon as the connection has been established, and then receive updates whenever the weather station sends them through.

Adafruit’s MQTT library makes the MQTT communication bit incredibly easy, but unfortunately it does not support retained messages out of the box as of the time of writing.  There are a number of pull-requests for this feature on their github page, so it is possible this situation will change.  For the same reason, I have not requested a pull of my changes.

I decided to add this feature to a forked version of their library, mainly as an exercise and also to learn the inner workings of the library better.  MQTT is an open standard and the documentation for the protocol can be found on the OASIS server.

My forked version can be found on github, at https://github.com/erikbakke/Adafruit_MQTT_Library.

In Adafruit_MQTT.h

Let’s start with the class declarations in the header file Adafruit_MQTT.h.  In the Adafruit_MQTT class there are two main publish methods, one which takes a string payload, and one which takes a pointer to binary data as well as the length of the binary data buffer.  I added a new uint8_t parameter to control whether the published message should be retained:

bool publish(const char *topic, const char *payload, uint8_t qos = 0, uint8_t retain = 0);
bool publish(const char *topic, uint8_t *payload, uint16_t bLen, uint8_t qos = 0, uint8_t retain = 0);

These methods build the actual MQTT packet data and then pass this to publishPacket(), which is where the control header fields get added and the packet sent to the server.  As the retain flag is in one of these header fields, I also needed to add the flag as a parameter here.

uint16_t publishPacket(uint8_t *packet, const char *topic, uint8_t *payload, uint16_t bLen, uint8_t qos, uint8_t retain);

In the Adafruit_MQTT_Publish class, the retain parameter is added to the constructor class, so the behaviour can be controlled per feed.

Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver, const char *feed, uint8_t qos = 0, uint8_t retain = 0);

This needs to be backed by a member variable, which I added to the bottom of the class declaration.

uint8_t retain;

That’s it for the class declarations so let’s move on to the definitions, which are found in the Adafruit_MQTT.cpp file.

In Adafruit_MQTT.cpp

In Adafruit_MQTT, the const char* based publish() method is modified to take the retain parameter and pass it on to the binary buffer based method.

bool Adafruit_MQTT::publish(const char *topic, const char *data, uint8_t qos, uint8_t retain) {

    return publish(topic, (uint8_t*)(data), strlen(data), qos, retain);
}

That constructor is also modified to take the additional parameter and pass it to the publishPacket() method.

bool Adafruit_MQTT::publish(const char *topic, uint8_t *data, uint16_t bLen, uint8_t qos, uint8_t retain) {
     uint16_t len = publishPacket(buffer, topic, data, bLen, qos, retain);

    if (!sendPacket(buffer, len))
        return false;
...

publishPacket(), in turn, takes that parameter…

uint16_t Adafruit_MQTT::publishPacket(uint8_t *packet, const char *topic,
                                      uint8_t *data, uint16_t bLen, uint8_t qos, uint8_t retain) {

…and puts it into the header field.

// Now you can start generating the packet!
p[0] = MQTT_CTRL_PUBLISH << 4 | qos << 1 | retain;
p++;

In the Adafruit_MQTT_Publish class, the constructor has been modified to take the new retain parameter and stuff it into a member variable.

Adafruit_MQTT_Publish::Adafruit_MQTT_Publish(Adafruit_MQTT *mqttserver,
const char *feed, uint8_t q, uint8_t r) {
    mqtt = mqttserver;
    topic = feed;
    qos = q;
    retain = r;
}

Next, each of the publish() methods are modified to pass the retain flag to mqtt->publish().

return mqtt->publish(topic, payload, qos, retain);

Or a more detailed example, which has been adapted from one of Adafruit’s MQTT library examples:

void MQTT_uploadToServer( float temp, float humidity, float voltage )
{
  WiFiClient client;
  float temp = -500; // Use this as a flag, as -500 degrees is physically impossible, it is below absolute zero
  int8_t ret;

  Adafruit_MQTT_Client mqtt( &client, MQTT_SERVER, MQTT_SERVERPORT, MQTT_USERNAME, MQTT_PASSWD );
  Adafruit_MQTT_Publish publish_temp = Adafruit_MQTT_Publish( &mqtt, MQTT_USERNAME "/feeds/" MQTT_NAME "/temp", MQTT_QOS_0, 1 );
  Adafruit_MQTT_Publish publish_humidity = Adafruit_MQTT_Publish( &mqtt, MQTT_USERNAME "/feeds/" MQTT_NAME "/humidity", MQTT_QOS_0, 1 );
  Adafruit_MQTT_Publish publish_voltage = Adafruit_MQTT_Publish( &mqtt, MQTT_USERNAME "/feeds/" MQTT_NAME "/voltage", MQTT_QOS_0, 1 );

  Serial.print("Connecting to MQTT... ");

  uint8_t retries = 3;
  while ((ret = mqtt.connect()) != 0) { // connect will return 0 for connected
    Serial.println(mqtt.connectErrorString(ret));
    Serial.println("Retrying MQTT connection in 5 seconds...");
    mqtt.disconnect();
    delay(5000); // wait 5 seconds
    retries--;
    if (retries == 0) {
      // basically die and wait for WDT to reset me
      while (1);
    }
  }
  Serial.println("MQTT Connected!");

  if( !isnan( temp ) )
  {
    publish_temp.publish( temp );
  }
  if( !isnan( humidity ) )
  {
    publish_humidity.publish( humidity );
  }
  if( !isnan( voltage ) )
  {
    publish_voltage.publish( voltage );
  }

  mqtt.disconnect();
}

If the example code doesn’t compile, make sure your copy of the MQTT library supports the additional retain flag.

In closing

The library can be used in the same way as the official Adafruit library, and it is 100% source compatible.  Unless you explicitly add an r-parameter value of 1 when creating the Adafruit_MQTT_Publish feed object, the library will behave exactly as the standard one.  Pass a value of 1, however, and the MQTT server will persist the message, eliminating the startup delay for subscribers to the topic.

My changes can be found on github, and a diff of my changes relative to the official version are here.

 

4 thoughts on “Adding retained messages to Adafruit's MQTT library”

  1. Is there anyway you can show an example of the Retain Flag in action. Since I am a n00b I am still trying to figure out how to implement.

    1. I have edited the post to add a more detailed example.
      The additional flag has been added to the end of the Adafruit_MQTT_Publish constructors.

  2. Great, thanks for sharing. My mqtt clients are now a litte bit userfriendly :).

    But, do you need to adapt also the subscribe methods from the adafruits mqtt library? In my first tests i do not receive the message on subscription.

    1. well, it seems that my mqtt broker (mosquitto) really sends PUBLISH package before sending the SUBACK … So that’s the “TODO” part in Adafruit_MQTT.cpp:185 :(. Do you have an idea how to handle this situation?

      Never the less your changes are good and i don’t understand why they aren’t merged already

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.