CSS811 Gas Sensor ESP8266 MQTT Publisher

Introduction

Hardware

Software

Results

Introduction

I have been focussing efforts on building and testing interesting and useful sensors with data sent to the cloud by either esp8266 or ESP32 devices (in one case I used a Raspberry Pi Zero W). I solder the device SOM to my own PCB design that plugs directly into any 5 volt USB charger. The PCB provides 3.3 volts, reset button, UART pins, and a few commonly used GPIO pins. Furthermore, I use directly the espressif SDK or a compatible third party derivative built on one of my host ubuntu operating systems and completely avoid needing any unnecessary arduino hardware. Connection to the device is via USB UART. In this particular case in one of my Oracle Virtual Machines I am running ubuntu 14.04 and using the esp-open-rtos sdk found on github here. I found this sdk very useful in that it contains drivers and examples for many popular sensors and in particular the I2C driver supports CLOCK STRETCHING, which the CSS811 requires. With regards to the cloud I am finding good success with MQTT. Besides using a limited “free” broker provided by various services like AWS, I find it is quite easy to set up one’s own broker by downloading “mosquitto” to any machine you choose and to use port forwarding to make the broker available to subscribers in the cloud.

Hardware

The CSS811 sensor measures total volatile organic compounds (TVOC) and has an I2C interface. Its algorithms process the measurements and provide a TVOC value as well as a value for equivalent CO2 concentration (eCO2).

The CSS811 sensor is mounted to a small PCB, all of which is provided by Adafruit. This sensor has provisions to input temperature and humidity that the sensor’s algorithm uses to make adjustments to its readings. Therefore, I matched this sensor with the BMP280 that measures pressure, temperature, and humidity sensor and is also available from Adafruit in the form of a small PCB with an I2C interface.

The complete assembly and its elements are best shown in the following figures.

Figure 1. Complete assembly connected to host.
Figure 2. ESP8266 board and charger.
Figure 3. Sensors mounted on daughter board.
Figure 4. Daughter board assembly mounted on ESP8266 board.

Software

In the esp-open-rtos sdk there are three standalone examples the code from which was used and merged into a single source file for compilation. In the examples directory the code is found in directories MQTTClient, css811, and bmp280. Since the objective is to make measurements and send them via WIFI using the MQTT protocol, the example MQTTClient was used as the base because this is exactly what that code does. It is structured in the form of three separate tasks: a WIFI task, a measurement and message preparation task, and a MQTT publish task. A Queue is created and used to by the measurement task to send messages to the MQTT publish task, which in turn sends out the message via the WIFI task.

The main program is presented below:

/*mqttclient.c*/
#include "espressif/esp_common.h"
#include "esp/uart.h"
#include <string.h>
#include <FreeRTOS.h>
#include <task.h>
#include <ssid_config.h>
#include <espressif/esp_sta.h>
#include <espressif/esp_wifi.h>
#include <paho_mqtt_c/MQTTESP8266.h>
#include <paho_mqtt_c/MQTTClient.h>
#include <semphr.h>
/* -- includes ----------------------------------------------------- */
#include "bmp280/bmp280.h"
#include "ccs811.h"
#include "sntp.h"
#include <lwip/sockets.h>
#include <time.h>

/* -- platform dependent definitions ------------------------------- */

#ifdef ESP_PLATFORM  // ESP32 (ESP-IDF)
// user task stack depth for ESP32
#define TASK_STACK_DEPTH 2048
#else  // ESP8266 (esp-open-rtos)
// user task stack depth for ESP8266
#define TASK_STACK_DEPTH 512  //this was doubled from 256
#endif  // ESP_PLATFORM

// I2C interface defintions for ESP32 and ESP8266
#define I2C_BUS       0
#define I2C_SCL_PIN   14
#define I2C_SDA_PIN   13
#define I2C_FREQ      I2C_FREQ_100K
#define SNTP_SERVERS 	"0.pool.ntp.org", "1.pool.ntp.org", \
						"2.pool.ntp.org", "3.pool.ntp.org"
#define vTaskDelayMs(ms)	vTaskDelay((ms)/portTICK_PERIOD_MS)
static ccs811_sensor_t* sensor;
#define MQTT_HOST ("xxx.xxx.x.xxx")
#define MQTT_PORT 8883
#define MQTT_USER "xxxxx" //NULL
#define MQTT_PASS "yyyyyyyy" //NULL
SemaphoreHandle_t wifi_alive;
QueueHandle_t publish_queue;
#define PUB_MSG_LEN 130  //originally was 16
size_t nbytes = 0;

void user_task_periodic(void *pvParameters)
{
    uint16_t tvoc;
    uint16_t eco2;
    TickType_t xLastWakeTime = xTaskGetTickCount();
    char msg[PUB_MSG_LEN];
    bmp280_params_t  params;
    float pressure, temperature, humidity;
    bmp280_init_default_params(&params);
    bmp280_t bmp280_dev;
    bmp280_dev.i2c_dev.bus = I2C_BUS;
    bmp280_dev.i2c_dev.addr = BMP280_I2C_ADDRESS_0;
    while (!bmp280_init(&bmp280_dev, &params)) {
        printf("BMP280 initialization failed\n");
        vTaskDelay(1000 / portTICK_PERIOD_MS);
    }
    bool bme280p = bmp280_dev.id == BME280_CHIP_ID;
    printf("BMP280: found %s\n", bme280p ? "BME280" : "BMP280");
    while (1)
    {
         vTaskDelayUntil(&xLastWakeTime, 10000 / portTICK_PERIOD_MS); 
         time_t ts = time(NULL);
        // get the results from CCS811 and do something with them
        if (ccs811_get_results (sensor, &tvoc, &eco2, 0, 0))
            printf("CCS811 Sensor periodic: TVOC %d ppb, eCO2 %d ppm %s",
                    tvoc, eco2, ctime(&ts));
       // get environmental data from another sensor and set them
       if (!bmp280_read_float(&bmp280_dev, &temperature, &pressure, &humidity)) {
           printf("Temperature/pressure reading failed\n");
           break;
       }
        ccs811_set_environmental_data (sensor, temperature, humidity);
        //this trick returns the size of the message to be sent
        nbytes = snprintf(NULL, 0, "CCS811 Sensor periodic: TVOC %d ppb, eCO2 %d ppm temp %.2f °C, hum %.2f %% %s",
                   tvoc, eco2, temperature, humidity, ctime(&ts)) + 1;
        printf("send result size = %d\r\n", nbytes);
            // set CCS811 environmental data with values fetched from b
        //size of msg buffer is PUB_MSG_LEN and must be larger than the largest nbytes
        snprintf(msg, nbytes, "CCS811 Sensor periodic: TVOC %d ppb, eCO2 %d ppm temp %.2f °C, hum %.2f %% %s",
                   tvoc, eco2, temperature, humidity, ctime(&ts));
        if (xQueueSend(publish_queue, (void *)msg, 0) == pdFALSE) {
            printf("Publish queue overflow.\r\n");
        }
    }
}

void startSNTP(){
	const char *servers[] = {SNTP_SERVERS};

	/* Wait until we have joined AP and are assigned an IP */
	while (sdk_wifi_station_get_connect_status() != STATION_GOT_IP) {
		vTaskDelayMs(100);
	}
	/* Start SNTP */
	printf("Starting SNTP... ");
	/* SNTP will request an update each 5 minutes */
	sntp_set_update_delay(5*60000);
	/* Set GMT+1 zone, daylight savings off */
	const struct timezone tz = {-8*60, 1};
	/* SNTP initialization */
	sntp_initialize(&tz);
	/* Servers must be configured right after initialization */
	sntp_set_servers(servers, sizeof(servers) / sizeof(char*));
	printf("DONE!\n");
}

//since this client is also a subscriber this function prints received messages
static void  topic_received(mqtt_message_data_t *md)
{
    int i;
    mqtt_message_t *message = md->message;
    printf("Received: ");
    for( i = 0; i < md->topic->lenstring.len; ++i)
        printf("%c", md->topic->lenstring.data[ i ]);
    printf(" = ");
    for( i = 0; i < (int)message->payloadlen; ++i)
        printf("%c", ((char *)(message->payload))[i]);
    printf("\r\n");
}

static const char *  get_my_id(void)
{
    // Use MAC address for Station as unique ID
    static char my_id[13];
    static bool my_id_done = false;
    int8_t i;
    uint8_t x;
    if (my_id_done)
        return my_id;
    if (!sdk_wifi_get_macaddr(STATION_IF, (uint8_t *)my_id))
        return NULL;
    for (i = 5; i >= 0; --i)
    {
        x = my_id[i] & 0x0F;
        if (x > 9) x += 7;
        my_id[i * 2 + 1] = x + '0';
        x = my_id[i] >> 4;
        if (x > 9) x += 7;
        my_id[i * 2] = x + '0';
    }
    my_id[12] = '\0';
    my_id_done = true;
    return my_id;
}

static void  mqtt_task(void *pvParameters)
{
    int ret         = 0;
    struct mqtt_network network;//see MQTTESP8266.h
    mqtt_client_t client   = mqtt_client_default;//see MQTTClient.h
    char mqtt_client_id[20];//see MQTTClient.h
    uint8_t mqtt_buf[150];  //must enlarge the buffers from 100
    uint8_t mqtt_readbuf[150];
    mqtt_packet_connect_data_t data =   mqtt_packet_connect_data_initializer;//see MQTTConnect.h for both
    mqtt_network_new( &network );//see MQTTESP8266.h
    memset(mqtt_client_id, 0, sizeof(mqtt_client_id));
    strcpy(mqtt_client_id, "ESP-");
    strcat(mqtt_client_id, get_my_id());
    startSNTP();
    while(1) {
        xSemaphoreTake(wifi_alive, portMAX_DELAY);
        printf("%s: started\n\r", __func__);
        printf("%s: (Re)connecting to MQTT server %s ... ",__func__,
               MQTT_HOST);
        ret = mqtt_network_connect(&network, MQTT_HOST, MQTT_PORT);//see MQTTESP8266
        if( ret ){
            printf("error: %d\n\r", ret);
            taskYIELD();//see free rtos (tells kernel it can switch to another task)
            continue;
        }
        printf("done\n\r");
        mqtt_client_new(&client, &network, 5000, mqtt_buf, 150,//enlarge these buffers from 100
                      mqtt_readbuf, 150);//see MQTTClient.h
        //data variables found in MQTTConnect.h
        data.willFlag       = 0;
        data.MQTTVersion    = 3;
        data.clientID.cstring   = mqtt_client_id;
        data.username.cstring   = MQTT_USER;
        data.password.cstring   = MQTT_PASS;
        data.keepAliveInterval  = 10;
        data.cleansession   = 0;
        printf("Send MQTT connect ... ");
        ret = mqtt_connect(&client, &data);//see MQTTClient.h
        if(ret){
            printf("error: %d\n\r", ret);
            mqtt_network_disconnect(&network);//see MQTTClient.h
            taskYIELD();//see free rtos
            continue;
        }
        printf("done\r\n");
        ret = mqtt_subscribe(&client, "esp8266/co2", MQTT_QOS1, topic_received);//see MQTTClient.h
        printf("return subscribe = %d\r\n", ret);
        xQueueReset(publish_queue);
        while(1){
            char msg[PUB_MSG_LEN - 1] = "\0";
            while(xQueueReceive(publish_queue, (void *)msg, 0) ==
                  pdTRUE){
                printf("got message to publish\r\n");
                printf("msg is: %s", msg);
                mqtt_message_t message;//see MQTTClient.h
                message.payload = msg;
                message.payloadlen = nbytes;
                message.dup = 0;
                message.qos = MQTT_QOS1;
                message.retained = 0;
                ret = mqtt_publish(&client, "esp8266/co2", &message);//see MQTTClient.h
                if (ret != MQTT_SUCCESS ){
                    printf("error while publishing message: %d\n", ret );
                    break;
                }
            }
            //mqtt_yield uses mqtt_time_t which is found in MQTTESP8266.  calls cycle() which 
            //reads the socket to see what work is done and continues to do so for  the time
            //set by the timer 
            ret = mqtt_yield(&client, 1000);  //originally 1000 //see MQTTClient.h
            if (ret == MQTT_DISCONNECTED)
                break;
        }
        printf("Connection dropped, request restart\n\r");
        mqtt_network_disconnect(&network);//see MQTTESP8266.h
        taskYIELD();//see free rtos
    }
}

static void  wifi_task(void *pvParameters)
{
    uint8_t status  = 0;
    uint8_t retries = 30;
    struct sdk_station_config config = {
        .ssid = WIFI_SSID,
        .password = WIFI_PASS,
    };
    printf("WiFi: connecting to WiFi\n\r");
    sdk_wifi_set_opmode(STATION_MODE);
    sdk_wifi_station_set_config(&config);
    while(1)
    {
        while ((status != STATION_GOT_IP) && (retries)){
            status = sdk_wifi_station_get_connect_status();
            printf("%s: status = %d\n\r", __func__, status );
            if( status == STATION_WRONG_PASSWORD ){
                printf("WiFi: wrong password\n\r");
                break;
            } else if( status == STATION_NO_AP_FOUND ) {
                printf("WiFi: AP not found\n\r");
                break;
            } else if( status == STATION_CONNECT_FAIL ) {
                printf("WiFi: connection failed\r\n");
                break;
            }
            vTaskDelay( 1000 / portTICK_PERIOD_MS );
            --retries;
        }
        if (status == STATION_GOT_IP) {
            printf("WiFi: Connected\n\r");
            xSemaphoreGive( wifi_alive );
            taskYIELD();
        }
        while ((status = sdk_wifi_station_get_connect_status()) == STATION_GOT_IP) {
            xSemaphoreGive( wifi_alive );
            taskYIELD();
        }
        printf("WiFi: disconnected\n\r");
        sdk_wifi_station_disconnect();
        vTaskDelay( 1000 / portTICK_PERIOD_MS );
    }
}

void user_init(void)
{
    uart_set_baud(0, 115200);
    printf("SDK version:%s\n", sdk_system_get_sdk_version());
    vTaskDelay(1);
    vSemaphoreCreateBinary(wifi_alive);
    publish_queue = xQueueCreate(3, PUB_MSG_LEN);
    xTaskCreate(&wifi_task, "wifi_task",  512, NULL, 2, NULL);

    /** -- MANDATORY PART -- */

    // init all I2C bus interfaces at which CCS811 sensors are connected
    i2c_init (I2C_BUS, I2C_SCL_PIN, I2C_SDA_PIN, I2C_FREQ);
    
    // longer clock stretching is required for CCS811
    i2c_set_clock_stretch (I2C_BUS, CCS811_I2C_CLOCK_STRETCH);

    // init the sensor with slave address CCS811_I2C_ADDRESS_1 connected I2C_BUS.
    sensor = ccs811_init_sensor (I2C_BUS, CCS811_I2C_ADDRESS_1);

    if (sensor)
    {

        // create a periodic task that uses the sensor
        xTaskCreate(user_task_periodic, "user_task_periodic", TASK_STACK_DEPTH, NULL, 3, NULL);

        // start periodic measurement with one measurement per second
        ccs811_set_mode (sensor, ccs811_mode_1s);
    }
    else
        printf("Could not initialize the CCS811 sensor\n")

    xTaskCreate(&mqtt_task, "mqtt_task", 1024, NULL, 4, NULL);  //was 4
}

Here is the Makefile and it importantly tells the compiler where to find the needed drivers:

PROGRAM=mqtt_client
EXTRA_COMPONENTS = extras/paho_mqtt_c extras/i2c extras/ccs811 extras/sntp extras/bmp280
include ../../common.mk

Here are important points regarding this program:

  1. Time is obtained from an SNTP server and used to time/date stamp each message.
  2. The MQTT connection is tcp and ssl/tls is not used. It appears that the particular mqtt drivers found in the directory extras/paho_mqtt_c do not have such provisions.
  3. In the main code of mqtt_client.c certain constants needed to be increased from the original values in the example. The published message is quite long and now #define PUB_MSG_LEN 130 and task stack depth is now #define TASK_STACK_DEPTH 512.
  4. The buffers in mqtt_client_new must be enlarged as shown.
  5. The mqtt_task is also a subscriber as well as a publisher, which is its primary role.
  6. The local SSID and password must be provided in ssid_config.h.

Results

The message result obtained by a subscriber is as follow:

CCS811 Sensor periodic: TVOC 1443 ppb, eCO2 2342 ppm temp 28.55 °C, hum 30.37 % Thu May 27 17:57:24 2021

The result with high TVOC and eCO2 values suggest that the VOC presence is quite high. The somewhat high temperature measured is about 5 C over the room temperature and results from the heating caused by the esp8266 PCB assembly.

Experience with this now working sensor has not been sufficient to report upon its performance in various environments nor to fully understand its underlying principal of operation and its limitations. A good simple introduction to how this sensor works and what the measurements mean is found at this site. The most significant point to learn is that this sensor measures only VOCs and does not at all measure CO2. The value eCO2 is a purely calculated value of the amount of CO2 in the air that would be considered degraded air quality equivalent to the TVOC value. Many posters on the web confuse the eCO2 value with actual presence of CO2 in that amount.

As more data and information is obtained I will report it by adding to this post.

You may also like...

Leave a Reply

Your email address will not be published. Required fields are marked *