Zigbee (CC2531) server in Debian

Working from How to Transform a RaspberryPi Into a Universal Zigbee and Z-Wave Bridge since RaspberryPi is Debian based. (I am running Debian 10 on a Mac Mini 3,1)

Bought a cheapo CC2531 USB ZigBee thing pre-flashed and plugged it in and it showed up.

# tail /var/log/syslog
Jan 25 12:58:40 mini31 kernel: [4555605.979593] usb 3-3: new full-speed USB device number 5 using ohci-pci
Jan 25 12:58:40 mini31 kernel: [4555606.228490] usb 3-3: New USB device found, idVendor=0451, idProduct=16a8, bcdDevice= 0.09
Jan 25 12:58:40 mini31 kernel: [4555606.228494] usb 3-3: New USB device strings: Mfr=1, Product=2, SerialNumber=3
Jan 25 12:58:40 mini31 kernel: [4555606.228497] usb 3-3: Product: TI CC2531 USB CDC
Jan 25 12:58:40 mini31 kernel: [4555606.228499] usb 3-3: Manufacturer: Texas Instruments
Jan 25 12:58:40 mini31 kernel: [4555606.228501] usb 3-3: SerialNumber: __0X00124B0009EBD991
Jan 25 12:58:40 mini31 kernel: [4555606.249872] cdc_acm 3-3:1.0: ttyACM0: USB ACM device
Jan 25 12:58:40 mini31 kernel: [4555606.252384] usbcore: registered new interface driver cdc_acm
Jan 25 12:58:40 mini31 kernel: [4555606.252386] cdc_acm: USB Abstract Control Model driver for USB modems and ISDN adapters

# lsusb | grep Texas
Bus 003 Device 005: ID 0451:16a8 Texas Instruments, Inc.

# 

Note that /dev/ttyACM0 has appeared.

Mosquitto MQTT server

Then copied and pasted as root, as you do...

# apt-get install mosquitto
# systemctl start mosquitto.service
# systemctl enable mosquitto.service

Is anything happening?

# screen -S mosquitto_sub mosquitto_sub -h localhost -p 1883 -t zigbee2mqtt/Home/# -F "%I %t %p"

At this point I could write a simple C# programme to see MQTT working.

using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Extensions.ManagedClient;
using MQTTnet.Formatter;
using MQTTnet.Protocol;

namespace ConsoleAppMqtt
{
    public static class Program
    {
        private static MqttFactory _F;
        private static IManagedMqttClient _Cpublisher;
        private static IManagedMqttClient _CsubscriberOdd;
        private static IManagedMqttClient _CsubscriberEven;
        private static IManagedMqttClient _Sys;

        private static MqttClientTlsOptions MqttClientTlsOptions = new MqttClientTlsOptions
        {
            UseTls = false,
            IgnoreCertificateChainErrors = true,
            IgnoreCertificateRevocationErrors = true,
            AllowUntrustedCertificates = true
        };

        private static MqttClientOptions GetOptions(string clientName)
        {
            return new MqttClientOptions
            {
                ClientId = clientName,
                ProtocolVersion = MqttProtocolVersion.V311,
                ChannelOptions = new MqttClientTcpOptions
                {
                    Server = "192.168.1.31",
                    Port = 1883,
                    TlsOptions = MqttClientTlsOptions
                },
                CleanSession = true, // Don't store up messages while we're offline.
                KeepAlivePeriod = TimeSpan.FromSeconds(5)
            };
        }

        public static void Main(string[] args)
        {
            _F = new MqttFactory();

            // Publisher
            _Cpublisher = _F.CreateManagedMqttClient();
            _Cpublisher.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
            _Cpublisher.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
            _Cpublisher.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
            _Cpublisher.ApplicationMessageProcessedHandler = new ApplicationMessageProcessedHandlerDelegate(MessageProcessedHandler);
            _Cpublisher.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientPublisher") }).Wait();

            // Odd subscriber
            _CsubscriberOdd = _F.CreateManagedMqttClient();
            _CsubscriberOdd.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
            _CsubscriberOdd.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
            _CsubscriberOdd.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
            _CsubscriberOdd.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(SubscriberMessageReceivedHandlerO);
            _CsubscriberOdd.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientOdd") }).Wait();
            _CsubscriberOdd.SubscribeAsync(new MqttTopicFilter { Topic = "numbers/odd" }).Wait();

            // Even subscriber.
            _CsubscriberEven = _F.CreateManagedMqttClient();
            _CsubscriberEven.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
            _CsubscriberEven.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
            _CsubscriberEven.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
            _CsubscriberEven.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(SubscriberMessageReceivedHandlerE);
            _CsubscriberEven.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientEven") }).Wait();
            _CsubscriberEven.SubscribeAsync(new MqttTopicFilter { Topic = "numbers/even" }).Wait();

            // SYS subscriber.
            _Sys = _F.CreateManagedMqttClient();
            _Sys.UseApplicationMessageReceivedHandler(HandleReceivedApplicationMessage);
            _Sys.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnConnected);
            _Sys.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnDisconnected);
            _Sys.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(SubscriberMessageReceivedHandlerS);
            _Sys.StartAsync(new ManagedMqttClientOptions { ClientOptions = GetOptions("ClientSYS") }).Wait();
            //_Sys.SubscribeAsync(new MqttTopicFilter { Topic = "$SYS/#" }).Wait();

            while (!_Cpublisher.IsConnected || !_CsubscriberOdd.IsConnected || !_CsubscriberEven.IsConnected || !_Sys.IsConnected)
            {
                Console.WriteLine("Waiting for connections");
                Thread.Sleep(1000);
            }

            // Send some messages.
            for (int i = 1; i <= 13; i++)
            {
                MqttApplicationMessage? message = new MqttApplicationMessageBuilder()
                    .WithTopic("numbers/" + (i % 2 == 0 ? "even" : "odd"))
                    .WithPayload($"Message {i}.")
                    .WithQualityOfServiceLevel(MqttQualityOfServiceLevel.ExactlyOnce)
                    .WithRetainFlag() // This means that the last message (12, 13) will be replayed immediately the next time we subscribe.
                    .Build();

                Console.WriteLine("-------- " + i + " --------");
                _Cpublisher.PublishAsync(message).Wait();
                Console.WriteLine("published");
                Thread.Sleep(5000);
            }

            Thread.Sleep(1000);
            while (_Cpublisher.PendingApplicationMessagesCount > 0 || _CsubscriberOdd.PendingApplicationMessagesCount > 0 || _CsubscriberEven.PendingApplicationMessagesCount > 0)
            {
                Console.WriteLine("Waiting for PendingApplicationMessagesCount");
                Thread.Sleep(1000);
            }

            // Stop.
            StopAsync().Wait();

            Console.WriteLine("Done. Press  to quit.");
            Console.ReadLine();
        }

        private static async Task StopAsync()
        {
            await _Cpublisher.StopAsync();
            await _CsubscriberOdd.StopAsync();
            await _CsubscriberEven.StopAsync();
            await _Sys.StopAsync();
        }

        private static void MessageProcessedHandler(ApplicationMessageProcessedEventArgs args)
        {
            // This appears to be the callback for when the publisher has received successfully.
            Console.WriteLine("MessageProcessedHandler (P): " + args.ApplicationMessage.ApplicationMessage.ConvertPayloadToString());
        }

        private static void HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs args)
        {
            // This is never called.
            string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}";
            Console.WriteLine("HandleReceivedApplicationMessage: " + item);
        }

        private static void SubscriberMessageReceivedHandlerO(MqttApplicationMessageReceivedEventArgs args)
        {
            string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}";
            Console.WriteLine("SubscriberMessageReceivedHandlerO: " + item);
        }

        private static void SubscriberMessageReceivedHandlerE(MqttApplicationMessageReceivedEventArgs args)
        {
            string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}";
            Console.WriteLine("SubscriberMessageReceivedHandlerE: " + item);
        }
        private static void SubscriberMessageReceivedHandlerS(MqttApplicationMessageReceivedEventArgs args)
        {
            string item = $"Timestamp: {DateTime.Now:HH:mm:ss} | Topic: {args.ApplicationMessage.Topic} | Payload: {args.ApplicationMessage.ConvertPayloadToString()} | QoS: {args.ApplicationMessage.QualityOfServiceLevel}";
            Console.WriteLine("SubscriberMessageReceivedHandlerS: " + item);
        }

        private static void OnConnected(MqttClientConnectedEventArgs args)
        {
            Console.WriteLine("Connected");
        }
        private static void OnDisconnected(MqttClientDisconnectedEventArgs args)
        {
            Console.WriteLine("Disconnected");
        }
    }
}

Node

Continuing... (Note the addition of npm.)

# apt-get install nodejs npm git make g++ gcc

Not so fast! The Debian package for node is version 10 which is defunct, and npm install will complain about this. Therefore it’s necessary to install a newer verson — 16 — from NodeSource.

I actually got it to run under node 10 but it was a processor hog and had my CPU running 20 degrees above what is normal. Upgrading to node 16 seems to have fixed this.

# cd
# curl -fsSL https://deb.nodesource.com/setup_lts.x | bash -
# apt-get install -y nodejs
# node --version
v16.13.2

zigbee2mqtt

# cd
# git clone https://github.com/Koenkk/zigbee2mqtt
# cd zigbee2mqtt/
# npm install

For /etc/systemd/system/zigbee2mqtt.service:

[Unit]
Description=zigbee2mqtt
After=network.target

[Service]
ExecStart=/usr/bin/npm start
WorkingDirectory=/opt/zigbee2mqtt
StandardOutput=inherit
StandardError=inherit
Restart=always

[Install]
WantedBy=multi-user.target

...and then...

The /opt directory felt like a good place to dump it; mssql is already there.

# mv /root/zigbee2mqtt /opt/zigbee2mqtt
# systemctl daemon-reload
# systemctl start zigbee2mqtt.service
# systemctl enable zigbee2mqtt.service

If it complains that the port is in use the try removing and re-inserting the USB adapter; I seemed to have got an instance of zigbee2mqtt running already somehow and removing the hardware killed it.

Platypush

Don’t bother; zigbee2mqtt-frontend seems to do the same and is already installed with zigbee2mqtt

# apt-get install redis-server
# systemctl start redis-server
# systemctl enable redis-server
# apt install python-pip
# cd
# pip3 install 'platypush[zigbee,http,mqtt]'
# touch /etc/platypush/config.yaml
# touch /etc/systemd/system/platypush.service
# systemctl daemon-reload
# systemctl start platypush.service
# systemctl enable platypush.service

And now I have something running at http://192.168.1.31:8008.

zigbe2mqtt frontend

Github page for zigbee2mqtt-frontend unfortunately there is basically no documentation; they assume that you’re already a developer of it — very fraustrating

# git clone https://github.com/nurikk/zigbee2mqtt-frontend.git
# cd zigbee2mqtt-frontend

Dashboards

Everyone wants dashboards; the solution appears to be influxDB and Grafana. Platypush appears to be useless.

Install influxdb and telegraf (copied from Influxdb download page).

# wget -qO- https://repos.influxdata.com/influxdb.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdb.gpg > /dev/null
# export DISTRIB_ID=$(lsb_release -si); export DISTRIB_CODENAME=$(lsb_release -sc)
# echo "deb [signed-by=/etc/apt/trusted.gpg.d/influxdb.gpg] https://repos.influxdata.com/${DISTRIB_ID,,} ${DISTRIB_CODENAME} stable" | sudo tee /etc/apt/sources.list.d/influxdb.list > /dev/null
# apt-get update
# apt-get install influxdb2
# systemctl unmask influxdb
# systemctl start 

Note that package influx-client is for v1 and is incompatible with influxdb2. Furthermore the influx cli for v2 has changed completely and is now useless. Did it work?

# influx ping
OK

The influx command appears not to be able to do anything more.

A website has appeared at http://localhost:8086! It appears to have dashboards so maybe Grafana won’t be needed.

Now set up telegraf to get data from mqtt into influxdb

# wget https://dl.influxdata.com/telegraf/releases/telegraf_1.21.4-1_amd64.deb
# dpkg -i telegraf_1.21.4-1_amd64.deb
# telegraf --sample-config > telegraf.conf
# systemctl start telegraf

The default telegraf.config is set up to collect statistics (disc, cpu,...) about the machine it’s running on so this needs commenting out.

Next, append to /etc/telegraf/telegraf.conf the following:

# Consumer for the temperature sensors.
[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = [
    "zigbee2mqtt/Home/+/Temp",,
    "zigbee2mqtt/Home/+/Radiator/+"
  ]
  data_format = "json_v2"
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "temperature"
    [[inputs.mqtt_consumer.topic_parsing]]
      topic = "zigbee2mqtt/Home/+/Temp"
      tags = "_/_/room/_"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "temperature"
      type = "float"

# Consumer for the TRVs (which have a different JSON format).
[[inputs.mqtt_consumer]]
  servers = ["tcp://127.0.0.1:1883"]
  topics = [
    "zigbee2mqtt/Home/+/Radiator",
  ]
  data_format = "json_v2"
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "temperature"
    [[inputs.mqtt_consumer.topic_parsing]]
      topic = "zigbee2mqtt/Home/+/Radiator"
      tags = "_/_/room/_"
    [[inputs.mqtt_consumer.topic_parsing]]
      topic = "zigbee2mqtt/Home/+/Radiator/+"
      tags = "_/_/room/_/position"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "local_temperature"
      rename = "temperature"
      type = "float"
  [[inputs.mqtt_consumer.json_v2]]
    measurement_name = "valve"
    [[inputs.mqtt_consumer.topic_parsing]]
      topic = "zigbee2mqtt/Home/+/Radiator"
      tags = "_/_/room/_"
    [[inputs.mqtt_consumer.topic_parsing]]
      topic = "zigbee2mqtt/Home/+/Radiator/+"
      tags = "_/_/room/_/position"
    [[inputs.mqtt_consumer.json_v2.field]]
      path = "position"
      rename = "valve"
      type = "float"
[[outputs.influxdb_v2]]
  urls = ["http://127.0.0.1:8086"]
  token = "token" # Create in the InfluxDB UI. Will be needed for command line too.
  organization = "mini31"
  bucket = "mini31"

The radiator TRV is a Moes BRT-100-TRV and whenever I set the day and time it reverts to an apparently random value. It is therefore useless. This turned out to be a bug in zigbee2mqtt that appears to have been fixed by version 1.27.

Here is a simple dashboard in the influxdb website. The gauges are set to aggregation latest so they’re effectively independent of the time period

Deleting old data

Just set the retention policy on the bucket.

# influx config create --config-name mini31 --host-url http://127.0.0.1:8086 --org mini31 --token MyTopSecretToken --active
# influx config list

This creates a file ~/.influxdbv2/configs which contains the token.

 # du -sh /var/lib/influxdb/
290M    /var/lib/influxdb/
# influx delete --org mini31 --bucket zigbee --start 2020-01-01T00:00:00.000Z --stop $(date --date '6 months ago' +"%Y-%m-%dT%H:%M:%SZ")
# du -sh /var/lib/influxdb/
290M    /var/lib/influxdb/

Oh.

Notifications

I want to get a text message telling me to open or close the windows whenever the outside temperature crosses the inside temperature.

The first task is to create a task to compute a new measure to flag this

option task = {
    name: "window_indicator_task",
    every: 13m,
    offset: 3m,
}

// Get outside data.
o = from(bucket: "zigbee")
    |> range(start: -6h, stop: now())
    //|> range(start: -6mo, stop: now())
    |> filter(fn: (r) => r["room"] == "Outside")
    |> filter(fn: (r) => r["_measurement"] == "temperature")
    |> aggregateWindow(every: 30m, fn: mean, createEmpty: false)

os = o |> timeShift(duration: 30m)

// Yes, plus.
// Get inside data.
i = from(bucket: "zigbee")
    |> range(start: -6h, stop: now())
    |> filter(fn: (r) => r["topic"] == "zigbee2mqtt/Home/Bed3/Temp" or r["topic"] == "zigbee2mqtt/Home/Bed1/Temp")
    |> filter(fn: (r) => r["_measurement"] == "temperature")
    |> drop(columns: ["room"])
    |> group()
    |> aggregateWindow(every: 30m, fn: mean, createEmpty: false)

// Join.
jo = join(tables: {o: o, os: os}, on: ["_time"])
    |> map(
        fn: (r) => ({
            _time: r._time,
            _value_o: r._value_o,
            _value_os: r._value_os,
        }),
    )

j = join(tables: {o: jo, i: i}, on: ["_time"])
    |> map(
        fn: (r) => ({
            _time: r._time,
            o: r._value_o,
            os: r._value_os,
            i: r._value,
        // doesn't get _i appended; fuck knows why.
        }),
    )
    |> map(
        fn: (r) => ({r with
            d: if float(v: r.o) > 19 and float(v: r.o) < float(v: r.i) and float(v: r.os) < float(v: r.i) then
                // It's hot outside and outside is cooler but earlier it was hotter: open the windows.
                1
            else if float(v: r.o) > 19 and float(v: r.o) < float(v: r.i) and float(v: r.os) > float(v: r.i) then
                // It's hot outside, and outside is hotter than inside but earlier it was cooler: close the windows.
                -1
            else
                0,
            s: if float(v: r.i) > 19 and float(v: r.o) < float(v: r.i) then
                // If it's hot inside and outside is cooler then the windows should be open.
                "open"
            else
                "closed",
        }),
    )

// Output for task
j
    |> map(
        fn: (r) => ({
            _time: r._time,
            _measurement: "window_indicator",
            _field: "window_indicator",
            _value: r.d,
        }),
    )
    |> to(bucket: "zigbee", org: "mini31")

j
    |> map(
        fn: (r) => ({
            _time: r._time,
            _measurement: "window_status",
            _field: "window_status",
            _value: r.s,
        }),
    )
    |> to(bucket: "zigbee", org: "mini31")

Note that the query language is InfluxDB, and not TICKscript — which looks similar.

The next task is to use the new computed measure to create check on the alerts page. When you use the UI to create a check it creates a task behind the scenes which doesn’ show on the tasks page. This is particularly irritating because htis means that you can’t click to see the history and logs of this alert task — you have to click to the history of a real task then paste the ID of the alert task into the address bar.

My window_indicator is ±1 when something needs to be done else zero.

The alert notification enpoints in InfluxDB are complete shit; the workaround is to have nc receive the HTML POSTs and then process them.

This script logs POST bodies to file, and scrapes the content to do the real message sending.

#!/bin/sh

file="/root/nc/in.txt"

date +"%Y-%m-%d %H:%M" >> "$file"

while IFS= read -r line || [ ! -z "$line" ]; do
        echo "$line" >> "$file"
        wi=$(echo "$line" | sed -nE 's/.*"window_indicator":[[:space:]]*([-0-9.]+).*/\1/p')
        if [ "$wi" = "1" ]; then
                wget http://192.168.42.129:8082 --post-data='{"to":"+447950952279", "message": "Open the windows."}' --header "Authorization: fd1176dc" -q -O/dev/null
        elif [ "$wi" = "-1" ]; then
                wget http://192.168.42.129:8082 --post-data='{"to":"+447950952279", "message": "Close the windows."}' --header "Authorization: fd1176dc" -q -O/dev/null
        elif [! -z "$wi" ] && [ ! "$wi" = "0" ]; then
                wget http://192.168.42.129:8082 --post-data='{"to":"+447950952279", "message": "Unknown window indicator value: $wi."}' --header "Authorization: fd1176dc" -q -O/dev/null
        fi
done;

echo "--------------------------------------------------------------------------------" >> "$file

Run

 # while true; do cat 201.txt | nc -l -p 8087 -q 1 | /root/nc/nc.sh; done

where 201.txt is:

HTTP/1.1 201 Created
Server: netcat@mini31
Content-Type: text/plain; charset=UTF-8

Created

and configure InfluxDB to post everything to http://127.0.0.1:8087

Finally, create a notification rule — this also has the secret task with inaccessile logs problem.

Some care may be necessary in choosing timings:

The while loop can be made into a systemd service. /etc/systemd/system/influxdb-nc.service:

[Service]
Type=simple
ExecStart=/root/nc/nc-run.sh
PIDFile=/root/nc/systemd-InfouxDB-nc-service-pid
Restart=always
RestartSec=0

[Unit]
Description=nc on port 8087 for InfluxDB HTTP notifications
After=network.target
Wants=network.target

[Install]
WantedBy=multi-user.target

And

# systemctl daemon-reload
# systemctl enable influxdb-nc
# systemctl start influxdb-nc

Home | More stuff | Octad of the week