From 1a3d99c60e13aa16aa502ee4e00746ed8c64e5c3 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 14 Aug 2024 12:54:42 +0200 Subject: [PATCH] Initial commit --- async_tasks/general_tasks.py | 69 +++ async_tasks/wired_tasks.py | 46 ++ async_tasks/wireless_tasks.py | 59 +++ event_loops.py | 22 + example_global_config.json | 37 ++ lib/TMP117.py | 84 ++++ lib/mqtt_as.py | 803 ++++++++++++++++++++++++++++++++++ lib/topic_message.py | 34 ++ main.py | 125 ++++++ readme.md | 3 + 10 files changed, 1282 insertions(+) create mode 100644 async_tasks/general_tasks.py create mode 100644 async_tasks/wired_tasks.py create mode 100644 async_tasks/wireless_tasks.py create mode 100644 event_loops.py create mode 100644 example_global_config.json create mode 100644 lib/TMP117.py create mode 100644 lib/mqtt_as.py create mode 100644 lib/topic_message.py create mode 100644 main.py create mode 100644 readme.md diff --git a/async_tasks/general_tasks.py b/async_tasks/general_tasks.py new file mode 100644 index 0000000..2ad7eba --- /dev/null +++ b/async_tasks/general_tasks.py @@ -0,0 +1,69 @@ +import time +import asyncio +import json + +from lib.topic_message import TopicMessage, TMQueue +from lib.TMP117 import TMP117 + +async def measure_sensors( + sensors: list[TMP117], + interval_s: float | int, + microcontroller_id: int, + message_queue: TMQueue, + temperature_topic: str): + """ + Readout configuration of all sensors, then timestamp, temperature of all sensors. Adds data to message_queue. Performs every interval_s, correcting for execution time. + """ + next_time = time.ticks_us() + while True: + # Initialise + temperatures = [] + timestamps = [] + configs = [] + + + # Readout configuration of sensors pre-temperature measurement + for sensor in sensors: + try: + configs.append(sensor.read_configuration_integer()) + except OSError as E: + print(f"ERROR: Sensor {sensor.name}, {E}") + configs.append(None) + + # Read time and temperature + for sensor in sensors: + try: + temperatures.append(sensor.read_temperature()) + except OSError as E: + print(f"ERROR: Sensor {sensor.name}, {E}") + temperatures.append(None) + timestamps.append(time.time_ns()) + + # Package into message + for i in range(len(sensors)): + message = json.dumps({ + "micro": microcontroller_id, + "sensor": sensors[i].name, + "temperature": temperatures[i], + "time": timestamps[i], + "config": configs[i] + }) + x = TopicMessage(temperature_topic, message) + message_queue.add(x) + + # Calculate time to sleep + next_time = time.ticks_add(next_time, interval_s*1000000) + sleep_time = time.ticks_diff(next_time, time.ticks_us())/1e6 + await asyncio.sleep(sleep_time) + +async def switch_inbound_message_queue(inbound_queue: TMQueue, interval_s): + """Checks inbound queue and decides what to do with the messages""" + while True: + # Iterate through all inbound messages and execute relevant task. + while not inbound_queue.is_empty(): + tmsg = inbound_queue.get() + topic = tmsg.topic + message = tmsg.message + pass + await asyncio.sleep(interval_s) + diff --git a/async_tasks/wired_tasks.py b/async_tasks/wired_tasks.py new file mode 100644 index 0000000..8533225 --- /dev/null +++ b/async_tasks/wired_tasks.py @@ -0,0 +1,46 @@ +import select +import sys +import machine +import time +import asyncio + +from lib.topic_message import TopicMessage, TMQueue, string_to_topicmessage + +async def usb_sync_clock( + interval_s: float | int): + """Synchronise clock by posting a request and readin in the output immediately""" + while True: + poll_obj = select.poll() + poll_obj.register(sys.stdin, select.POLLIN) + sys.stdout.write("REQUEST_TIME \n") + poll_results = poll_obj.poll(10) + if poll_results: + timestamp_ms = float(sys.stdin.readline()) + t = time.gmtime(int(timestamp_ms/1000)) + machine.RTC().datetime((t[0], t[1], t[2], t[6], t[3], t[4], t[5], 0)) + else: + await asyncio.sleep(interval_s) + +async def usb_in_message_parser( + inbound_queue: TMQueue, + interval_s: float | int): + """Parse regular TopicMessage messages sent in through USB""" + poll_obj = select.poll() + poll_obj.register(sys.stdin, select.POLLIN) + while True: + poll_results = poll_obj.poll(10) + if poll_results: + line = sys.stdin.readline() + inbound_queue.add(string_to_topicmessage(line)) + else: + await asyncio.sleep(interval_s) + +async def usb_send_data( + outbound_queue: TMQueue, + interval_s: float | int): + """Send all messages in outbound queue via USB""" + while True: + while not outbound_queue.is_empty(): + item: TopicMessage = outbound_queue.get() + sys.stdout.write(f"{item.topic}: {item.message}\n") + await asyncio.sleep(interval_s) diff --git a/async_tasks/wireless_tasks.py b/async_tasks/wireless_tasks.py new file mode 100644 index 0000000..c54dfc7 --- /dev/null +++ b/async_tasks/wireless_tasks.py @@ -0,0 +1,59 @@ +import ntptime +import asyncio + + +from lib.mqtt_as import MQTTClient +from lib.topic_message import TopicMessage, TMQueue + + +async def mqtt_up( + mqtt_client: MQTTClient, + inbound_topic, + todo): + """ + Perform asynchronous "todo" when MQTT client is connected. + """ + while True: + await mqtt_client.up.wait() # type: ignore + mqtt_client.up.clear() + await mqtt_client.subscribe(inbound_topic) + await todo() + +async def mqtt_down( + mqtt_client: MQTTClient, + todo): + """ + Perform asynchronous "todo" when MQTT client is disconnected. + """ + while True: + await mqtt_client.down.wait() # type: ignore + mqtt_client.down.clear() + await todo() + +async def wireless_sync_clock( + mqtt_client: MQTTClient, + interval_s): + """Sync clock via NTP server""" + while True: + ntptime.settime() + await asyncio.sleep(interval_s) + +async def mqtt_publish_tmsgs( + mqtt_client: MQTTClient, + outbound_queue: TMQueue, + post_interval: float | int): + """Publishes all TopicMessage items in queue over MQTT""" + while True: + while not outbound_queue.is_empty(): + item = outbound_queue.get() + await mqtt_client.publish(item.topic, item.message, qos=1) + await asyncio.sleep(post_interval) + +async def mqtt_message_parser( + mqtt_client: MQTTClient, + inbound_queue: TMQueue): + """Parse messages recieved by MQTT client and add them to the inbound queue""" + async for topic, message, retained in mqtt_client.queue: # type: ignore + topic = topic.decode() + message = message.decode() + inbound_queue.add(TopicMessage(topic, message)) \ No newline at end of file diff --git a/event_loops.py b/event_loops.py new file mode 100644 index 0000000..75f8512 --- /dev/null +++ b/event_loops.py @@ -0,0 +1,22 @@ +import asyncio + +from lib.topic_message import TMQueue +from async_tasks.general_tasks import measure_sensors + +async def main_wifi(uuid, sensors, mqtt_client, temperature_topic, inbound_topic): + from async_tasks.wireless_tasks import mqtt_publish_tmsgs + outbound_queue = TMQueue() + asyncio.create_task(measure_sensors(sensors, 1, uuid, outbound_queue, temperature_topic)) + asyncio.create_task(mqtt_publish_tmsgs(mqtt_client, outbound_queue, 0.5)) + + while True: + await asyncio.sleep(5) + +async def main_wired(uuid, sensors, temperature_topic, inbound_topic): + from async_tasks.wired_tasks import usb_send_data + outbound_queue = TMQueue() + asyncio.create_task(measure_sensors(sensors, 1, uuid, outbound_queue, temperature_topic)) + asyncio.create_task(usb_send_data(outbound_queue, 0.5)) + + while True: + await asyncio.sleep(5) \ No newline at end of file diff --git a/example_global_config.json b/example_global_config.json new file mode 100644 index 0000000..dd0e301 --- /dev/null +++ b/example_global_config.json @@ -0,0 +1,37 @@ +[ + { + "micro_id": 1000000000000000, + "wifi_or_usb": "usb", + + "temperature_topic": "TEMPERATURE TOPIC", + "inbound_topic": "INBOUND TOPIC", + + "network_settings":{ + "ssid": "SSID", + "wifi_pw": "PASSWORD", + + "mqtt_server": "IP ADDRESS", + "mqtt_port": 1883, + + "mqtt_queue_length": 4 + }, + "i2c_buses":[ + { + "name": "EG I2C", + "scl": 1, + "sda": 0, + "freq": 400000, + "hard_i2c": true, + "hard_i2c_bus": 0 + } + ], + "sensors":[ + { + "name": "EG SENSOR", + "i2c_name": "EG I2C", + "i2c_addr": 72, + "config": 544 + } + ] + } +] \ No newline at end of file diff --git a/lib/TMP117.py b/lib/TMP117.py new file mode 100644 index 0000000..8fe6e19 --- /dev/null +++ b/lib/TMP117.py @@ -0,0 +1,84 @@ +import machine + +class TMP117: + RESOLUTION = 0.0078125 + + TEMP_ADDR = 0x00 + CONFIG_ADDR = 0x01 + NIST_ADDR = 0x05 + + TEMP_SIZE = 16 + + CONFIG_REGISTER_MAP = { + "HIGH_Alert": 15, + "LOW_Alert": 14, + "Data_Ready": 13, + "EEPROM_Busy": 12, + "MOD1": 11, + "MOD0": 10, + "CONV2": 9, + "CONV1": 8, + "CONV0": 7, + "AVG1": 6, + "AVG0": 5, + "T/nA": 4, + "POL": 3, + "DR/Alert": 2, + "Soft_Reset": 1, + "--": 0 + } + DEFAULT_CONFIG_INT = 554 + + def __init__(self, name: str, i2c: machine.I2C|machine.SoftI2C, i2c_addr, config_int): + self.name = name + self.i2c_addr = i2c_addr + self.i2c = i2c + + self.NIST_ID = int.from_bytes(self.i2c.readfrom_mem(self.i2c_addr, self.NIST_ADDR, 2), "big") + # Set configuration + self.write_configuration_integer(config_int) + + def bytes_to_temperature(self, binary:bytes): + """Converts a 2's complement 16 bit value to the appropriate temperature value.""" + val = int.from_bytes(binary, "big") + if (val & (1 << (self.TEMP_SIZE - 1))) != 0: + val = val - (1 << self.TEMP_SIZE) + + return val * TMP117.RESOLUTION + + def int_to_config_dict(self, integer:int): + """Converts a configuration """ + assert integer < 2**(16) + bits = list(f"{integer:016b}") + bits.reverse() + + result = {} + for key in self.CONFIG_REGISTER_MAP.keys(): + result[key] = bits[self.CONFIG_REGISTER_MAP[key]] + + return result + + def config_dict_to_int(self, config_dict): + array = [None for _ in range(16)] + + for option in config_dict.keys(): + array[self.CONFIG_REGISTER_MAP[option]] = config_dict[option] + + array.reverse() + bit_string = "" + for bit in array: + bit_string += str(bit) + write_byte = int(bit_string, 2) + + def write_configuration_integer(self, config_int:int): + write_byte = config_int.to_bytes(2, "big") + self.i2c.writeto_mem(self.i2c_addr, self.CONFIG_ADDR, write_byte) + + def read_configuration_integer(self): + binary = self.i2c.readfrom_mem(self.i2c_addr, self.CONFIG_ADDR, 2) + integer = int.from_bytes(binary, "big") + return integer + + def read_temperature(self): + binary = self.i2c.readfrom_mem(self.i2c_addr, self.TEMP_ADDR, 2) + return self.bytes_to_temperature(binary) diff --git a/lib/mqtt_as.py b/lib/mqtt_as.py new file mode 100644 index 0000000..83b254d --- /dev/null +++ b/lib/mqtt_as.py @@ -0,0 +1,803 @@ +# mqtt_as.py Asynchronous version of umqtt.robust +# (C) Copyright Peter Hinch 2017-2023. +# Released under the MIT licence. + +# Pyboard D support added also RP2/default +# Various improvements contributed by Kevin Köck. + +import gc +import usocket as socket +import ustruct as struct + +gc.collect() +from ubinascii import hexlify +import uasyncio as asyncio + +gc.collect() +from utime import ticks_ms, ticks_diff +from uerrno import EINPROGRESS, ETIMEDOUT + +gc.collect() +from micropython import const +from machine import unique_id +import network + +gc.collect() +from sys import platform + +VERSION = (0, 7, 2) + +# Default short delay for good SynCom throughput (avoid sleep(0) with SynCom). +_DEFAULT_MS = const(20) +_SOCKET_POLL_DELAY = const(5) # 100ms added greatly to publish latency + +# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection(). +ESP32 = platform == "esp32" +RP2 = platform == "rp2" +if ESP32: + # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942 + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors +elif RP2: + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110] +else: + BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] + +ESP8266 = platform == "esp8266" +PYBOARD = platform == "pyboard" + +# Default "do little" coro for optional user replacement +async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program + await asyncio.sleep_ms(_DEFAULT_MS) + + +class MsgQueue: + def __init__(self, size): + self._q = [0 for _ in range(max(size, 4))] + self._size = size + self._wi = 0 + self._ri = 0 + self._evt = asyncio.Event() + self.discards = 0 + + def put(self, *v): + self._q[self._wi] = v + self._evt.set() + self._wi = (self._wi + 1) % self._size + if self._wi == self._ri: # Would indicate empty + self._ri = (self._ri + 1) % self._size # Discard a message + self.discards += 1 + + def __aiter__(self): + return self + + async def __anext__(self): + if self._ri == self._wi: # Empty + self._evt.clear() + await self._evt.wait() + r = self._q[self._ri] + self._ri = (self._ri + 1) % self._size + return r + + +config = { + "client_id": hexlify(unique_id()), + "server": None, + "port": 0, + "user": "", + "password": "", + "keepalive": 60, + "ping_interval": 0, + "ssl": False, + "ssl_params": {}, + "response_time": 10, + "clean_init": True, + "clean": True, + "max_repubs": 4, + "will": None, + "subs_cb": lambda *_: None, + "wifi_coro": eliza, + "connect_coro": eliza, + "ssid": None, + "wifi_pw": None, + "queue_len": 0, + "gateway": False, +} + + +class MQTTException(Exception): + pass + + +def pid_gen(): + pid = 0 + while True: + pid = pid + 1 if pid < 65535 else 1 + yield pid + + +def qos_check(qos): + if not (qos == 0 or qos == 1): + raise ValueError("Only qos 0 and 1 are supported.") + + +# MQTT_base class. Handles MQTT protocol on the basis of a good connection. +# Exceptions from connectivity failures are handled by MQTTClient subclass. +class MQTT_base: + REPUB_COUNT = 0 # TEST + DEBUG = False + + def __init__(self, config): + self._events = config["queue_len"] > 0 + # MQTT config + self._client_id = config["client_id"] + self._user = config["user"] + self._pswd = config["password"] + self._keepalive = config["keepalive"] + if self._keepalive >= 65536: + raise ValueError("invalid keepalive time") + self._response_time = config["response_time"] * 1000 # Repub if no PUBACK received (ms). + self._max_repubs = config["max_repubs"] + self._clean_init = config["clean_init"] # clean_session state on first connection + self._clean = config["clean"] # clean_session state on reconnect + will = config["will"] + if will is None: + self._lw_topic = False + else: + self._set_last_will(*will) + # WiFi config + self._ssid = config["ssid"] # Required for ESP32 / Pyboard D. Optional ESP8266 + self._wifi_pw = config["wifi_pw"] + self._ssl = config["ssl"] + self._ssl_params = config["ssl_params"] + # Callbacks and coros + if self._events: + self.up = asyncio.Event() + self.down = asyncio.Event() + self.queue = MsgQueue(config["queue_len"]) + else: # Callbacks + self._cb = config["subs_cb"] + self._wifi_handler = config["wifi_coro"] + self._connect_handler = config["connect_coro"] + # Network + self.port = config["port"] + if self.port == 0: + self.port = 8883 if self._ssl else 1883 + self.server = config["server"] + if self.server is None: + raise ValueError("no server specified.") + self._sock = None + self._sta_if = network.WLAN(network.STA_IF) + self._sta_if.active(True) + if config["gateway"]: # Called from gateway (hence ESP32). + import aioespnow # Set up ESPNOW + + while not (sta := self._sta_if).active(): + time.sleep(0.1) + sta.config(pm=sta.PM_NONE) # No power management + sta.active(True) + self._espnow = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support + self._espnow.active(True) + + self.newpid = pid_gen() + self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response + self.last_rx = ticks_ms() # Time of last communication from broker + self.lock = asyncio.Lock() + + def _set_last_will(self, topic, msg, retain=False, qos=0): + qos_check(qos) + if not topic: + raise ValueError("Empty topic.") + self._lw_topic = topic + self._lw_msg = msg + self._lw_qos = qos + self._lw_retain = retain + + def dprint(self, msg, *args): + if self.DEBUG: + print(msg % args) + + def _timeout(self, t): + return ticks_diff(ticks_ms(), t) > self._response_time + + async def _as_read(self, n, sock=None): # OSError caught by superclass + if sock is None: + sock = self._sock + # Declare a byte array of size n. That space is needed anyway, better + # to just 'allocate' it in one go instead of appending to an + # existing object, this prevents reallocation and fragmentation. + data = bytearray(n) + buffer = memoryview(data) + size = 0 + t = ticks_ms() + while size < n: + if self._timeout(t) or not self.isconnected(): + raise OSError(-1, "Timeout on socket read") + try: + msg_size = sock.readinto(buffer[size:], n - size) + except OSError as e: # ESP32 issues weird 119 errors here + msg_size = None + if e.args[0] not in BUSY_ERRORS: + raise + if msg_size == 0: # Connection closed by host + raise OSError(-1, "Connection closed by host") + if msg_size is not None: # data received + size += msg_size + t = ticks_ms() + self.last_rx = ticks_ms() + await asyncio.sleep_ms(_SOCKET_POLL_DELAY) + return data + + async def _as_write(self, bytes_wr, length=0, sock=None): + if sock is None: + sock = self._sock + + # Wrap bytes in memoryview to avoid copying during slicing + bytes_wr = memoryview(bytes_wr) + if length: + bytes_wr = bytes_wr[:length] + t = ticks_ms() + while bytes_wr: + if self._timeout(t) or not self.isconnected(): + raise OSError(-1, "Timeout on socket write") + try: + n = sock.write(bytes_wr) + except OSError as e: # ESP32 issues weird 119 errors here + n = 0 + if e.args[0] not in BUSY_ERRORS: + raise + if n: + t = ticks_ms() + bytes_wr = bytes_wr[n:] + await asyncio.sleep_ms(_SOCKET_POLL_DELAY) + + async def _send_str(self, s): + await self._as_write(struct.pack("!H", len(s))) + await self._as_write(s) + + async def _recv_len(self): + n = 0 + sh = 0 + while 1: + res = await self._as_read(1) + b = res[0] + n |= (b & 0x7F) << sh + if not b & 0x80: + return n + sh += 7 + + async def _connect(self, clean): + self._sock = socket.socket() + self._sock.setblocking(False) + try: + self._sock.connect(self._addr) + except OSError as e: + if e.args[0] not in BUSY_ERRORS: + raise + await asyncio.sleep_ms(_DEFAULT_MS) + self.dprint("Connecting to broker.") + if self._ssl: + try: + import ssl + except ImportError: + import ussl as ssl + + self._sock = ssl.wrap_socket(self._sock, **self._ssl_params) + premsg = bytearray(b"\x10\0\0\0\0\0") + msg = bytearray(b"\x04MQTT\x04\0\0\0") # Protocol 3.1.1 + + sz = 10 + 2 + len(self._client_id) + msg[6] = clean << 1 + if self._user: + sz += 2 + len(self._user) + 2 + len(self._pswd) + msg[6] |= 0xC0 + if self._keepalive: + msg[7] |= self._keepalive >> 8 + msg[8] |= self._keepalive & 0x00FF + if self._lw_topic: + sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) + msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 + msg[6] |= self._lw_retain << 5 + + i = 1 + while sz > 0x7F: + premsg[i] = (sz & 0x7F) | 0x80 + sz >>= 7 + i += 1 + premsg[i] = sz + await self._as_write(premsg, i + 2) + await self._as_write(msg) + await self._send_str(self._client_id) + if self._lw_topic: + await self._send_str(self._lw_topic) + await self._send_str(self._lw_msg) + if self._user: + await self._send_str(self._user) + await self._send_str(self._pswd) + # Await CONNACK + # read causes ECONNABORTED if broker is out; triggers a reconnect. + resp = await self._as_read(4) + self.dprint("Connected to broker.") # Got CONNACK + if resp[3] != 0 or resp[0] != 0x20 or resp[1] != 0x02: + # Bad CONNACK e.g. authentication fail. + raise OSError( + -1, f"Connect fail: 0x{(resp[0] << 8) + resp[1]:04x} {resp[3]} (README 7)" + ) + + async def _ping(self): + async with self.lock: + await self._as_write(b"\xc0\0") + + # Check internet connectivity by sending DNS lookup to Google's 8.8.8.8 + async def wan_ok( + self, + packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01", + ): + if not self.isconnected(): # WiFi is down + return False + length = 32 # DNS query and response packet size + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.setblocking(False) + s.connect(("8.8.8.8", 53)) + await asyncio.sleep(1) + try: + await self._as_write(packet, sock=s) + await asyncio.sleep(2) + res = await self._as_read(length, s) + if len(res) == length: + return True # DNS response size OK + except OSError: # Timeout on read: no connectivity. + return False + finally: + s.close() + return False + + async def broker_up(self): # Test broker connectivity + if not self.isconnected(): + return False + tlast = self.last_rx + if ticks_diff(ticks_ms(), tlast) < 1000: + return True + try: + await self._ping() + except OSError: + return False + t = ticks_ms() + while not self._timeout(t): + await asyncio.sleep_ms(100) + if ticks_diff(self.last_rx, tlast) > 0: # Response received + return True + return False + + async def disconnect(self): + if self._sock is not None: + await self._kill_tasks(False) # Keep socket open + try: + async with self.lock: + self._sock.write(b"\xe0\0") # Close broker connection + await asyncio.sleep_ms(100) + except OSError: + pass + self._close() + self._has_connected = False + + def _close(self): + if self._sock is not None: + self._sock.close() + + def close(self): # API. See https://github.com/peterhinch/micropython-mqtt/issues/60 + self._close() + try: + self._sta_if.disconnect() # Disconnect Wi-Fi to avoid errors + except OSError: + self.dprint("Wi-Fi not started, unable to disconnect interface") + self._sta_if.active(False) + + async def _await_pid(self, pid): + t = ticks_ms() + while pid in self.rcv_pids: # local copy + if self._timeout(t) or not self.isconnected(): + break # Must repub or bail out + await asyncio.sleep_ms(100) + else: + return True # PID received. All done. + return False + + # qos == 1: coro blocks until wait_msg gets correct PID. + # If WiFi fails completely subclass re-publishes with new PID. + async def publish(self, topic, msg, retain, qos): + pid = next(self.newpid) + if qos: + self.rcv_pids.add(pid) + async with self.lock: + await self._publish(topic, msg, retain, qos, 0, pid) + if qos == 0: + return + + count = 0 + while 1: # Await PUBACK, republish on timeout + if await self._await_pid(pid): + return + # No match + if count >= self._max_repubs or not self.isconnected(): + raise OSError(-1) # Subclass to re-publish with new PID + async with self.lock: + await self._publish(topic, msg, retain, qos, dup=1, pid=pid) # Add pid + count += 1 + self.REPUB_COUNT += 1 + + async def _publish(self, topic, msg, retain, qos, dup, pid): + pkt = bytearray(b"\x30\0\0\0") + pkt[0] |= qos << 1 | retain | dup << 3 + sz = 2 + len(topic) + len(msg) + if qos > 0: + sz += 2 + if sz >= 2097152: + raise MQTTException("Strings too long.") + i = 1 + while sz > 0x7F: + pkt[i] = (sz & 0x7F) | 0x80 + sz >>= 7 + i += 1 + pkt[i] = sz + await self._as_write(pkt, i + 1) + await self._send_str(topic) + if qos > 0: + struct.pack_into("!H", pkt, 0, pid) + await self._as_write(pkt, 2) + await self._as_write(msg) + + # Can raise OSError if WiFi fails. Subclass traps. + async def subscribe(self, topic, qos): + pkt = bytearray(b"\x82\0\0\0") + pid = next(self.newpid) + self.rcv_pids.add(pid) + struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid) + async with self.lock: + await self._as_write(pkt) + await self._send_str(topic) + await self._as_write(qos.to_bytes(1, "little")) + + if not await self._await_pid(pid): + raise OSError(-1) + + # Can raise OSError if WiFi fails. Subclass traps. + async def unsubscribe(self, topic): + pkt = bytearray(b"\xa2\0\0\0") + pid = next(self.newpid) + self.rcv_pids.add(pid) + struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid) + async with self.lock: + await self._as_write(pkt) + await self._send_str(topic) + + if not await self._await_pid(pid): + raise OSError(-1) + + # Wait for a single incoming MQTT message and process it. + # Subscribed messages are delivered to a callback previously + # set by .setup() method. Other (internal) MQTT + # messages processed internally. + # Immediate return if no data available. Called from ._handle_msg(). + async def wait_msg(self): + try: + res = self._sock.read(1) # Throws OSError on WiFi fail + except OSError as e: + if e.args[0] in BUSY_ERRORS: # Needed by RP2 + await asyncio.sleep_ms(0) + return + raise + if res is None: + return + if res == b"": + raise OSError(-1, "Empty response") + + if res == b"\xd0": # PINGRESP + await self._as_read(1) # Update .last_rx time + return + op = res[0] + + if op == 0x40: # PUBACK: save pid + sz = await self._as_read(1) + if sz != b"\x02": + raise OSError(-1, "Invalid PUBACK packet") + rcv_pid = await self._as_read(2) + pid = rcv_pid[0] << 8 | rcv_pid[1] + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1, "Invalid pid in PUBACK packet") + + if op == 0x90: # SUBACK + resp = await self._as_read(4) + if resp[3] == 0x80: + raise OSError(-1, "Invalid SUBACK packet") + pid = resp[2] | (resp[1] << 8) + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1, "Invalid pid in SUBACK packet") + + if op == 0xB0: # UNSUBACK + resp = await self._as_read(3) + pid = resp[2] | (resp[1] << 8) + if pid in self.rcv_pids: + self.rcv_pids.discard(pid) + else: + raise OSError(-1) + + if op & 0xF0 != 0x30: + return + sz = await self._recv_len() + topic_len = await self._as_read(2) + topic_len = (topic_len[0] << 8) | topic_len[1] + topic = await self._as_read(topic_len) + sz -= topic_len + 2 + if op & 6: + pid = await self._as_read(2) + pid = pid[0] << 8 | pid[1] + sz -= 2 + msg = await self._as_read(sz) + retained = op & 0x01 + if self._events: + self.queue.put(topic, msg, bool(retained)) + else: + self._cb(topic, msg, bool(retained)) + if op & 6 == 2: # qos 1 + pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK + struct.pack_into("!H", pkt, 2, pid) + await self._as_write(pkt) + elif op & 6 == 4: # qos 2 not supported + raise OSError(-1, "QoS 2 not supported") + + +# MQTTClient class. Handles issues relating to connectivity. + + +class MQTTClient(MQTT_base): + def __init__(self, config): + super().__init__(config) + self._isconnected = False # Current connection state + keepalive = 1000 * self._keepalive # ms + self._ping_interval = keepalive // 4 if keepalive else 20000 + p_i = config["ping_interval"] * 1000 # Can specify shorter e.g. for subscribe-only + if p_i and p_i < self._ping_interval: + self._ping_interval = p_i + self._in_connect = False + self._has_connected = False # Define 'Clean Session' value to use. + self._tasks = [] + if ESP8266: + import esp + + esp.sleep_type(0) # Improve connection integrity at cost of power consumption. + + async def wifi_connect(self, quick=False): + s = self._sta_if + if ESP8266: + if s.isconnected(): # 1st attempt, already connected. + return + s.active(True) + s.connect() # ESP8266 remembers connection. + for _ in range(60): + if ( + s.status() != network.STAT_CONNECTING + ): # Break out on fail or success. Check once per sec. + break + await asyncio.sleep(1) + if ( + s.status() == network.STAT_CONNECTING + ): # might hang forever awaiting dhcp lease renewal or something else + s.disconnect() + await asyncio.sleep(1) + if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None: + s.connect(self._ssid, self._wifi_pw) + while ( + s.status() == network.STAT_CONNECTING + ): # Break out on fail or success. Check once per sec. + await asyncio.sleep(1) + else: + s.active(True) + if RP2: # Disable auto-sleep. + # https://datasheets.raspberrypi.com/picow/connecting-to-the-internet-with-pico-w.pdf + # para 3.6.3 + s.config(pm=0xA11140) + s.connect(self._ssid, self._wifi_pw) + for _ in range(60): # Break out on fail or success. Check once per sec. + await asyncio.sleep(1) + # Loop while connecting or no IP + if s.isconnected(): + break + if ESP32: + # Status values >= STAT_IDLE can occur during connect: + # STAT_IDLE 1000, STAT_CONNECTING 1001, STAT_GOT_IP 1010 + if s.status() < network.STAT_IDLE: # Error statuses + break # are in range 200..204 + elif PYBOARD: # No symbolic constants in network + if not 1 <= s.status() <= 2: + break + elif RP2: # 1 is STAT_CONNECTING. 2 reported by user (No IP?) + if not 1 <= s.status() <= 2: + break + else: # Timeout: still in connecting state + s.disconnect() + await asyncio.sleep(1) + + if not s.isconnected(): # Timed out + raise OSError("Wi-Fi connect timed out") + if not quick: # Skip on first connection only if power saving + # Ensure connection stays up for a few secs. + self.dprint("Checking WiFi integrity.") + for _ in range(5): + if not s.isconnected(): + raise OSError("Connection Unstable") # in 1st 5 secs + await asyncio.sleep(1) + self.dprint("Got reliable connection") + + async def connect(self, *, quick=False): # Quick initial connect option for battery apps + if not self._has_connected: + await self.wifi_connect(quick) # On 1st call, caller handles error + # Note this blocks if DNS lookup occurs. Do it once to prevent + # blocking during later internet outage: + self._addr = socket.getaddrinfo(self.server, self.port)[0][-1] + self._in_connect = True # Disable low level ._isconnected check + try: + if not self._has_connected and self._clean_init and not self._clean: + # Power up. Clear previous session data but subsequently save it. + # Issue #40 + await self._connect(True) # Connect with clean session + try: + async with self.lock: + self._sock.write(b"\xe0\0") # Force disconnect but keep socket open + except OSError: + pass + self.dprint("Waiting for disconnect") + await asyncio.sleep(2) # Wait for broker to disconnect + self.dprint("About to reconnect with unclean session.") + await self._connect(self._clean) + except Exception: + self._close() + self._in_connect = False # Caller may run .isconnected() + raise + self.rcv_pids.clear() + # If we get here without error broker/LAN must be up. + self._isconnected = True + self._in_connect = False # Low level code can now check connectivity. + if not self._events: + asyncio.create_task(self._wifi_handler(True)) # User handler. + if not self._has_connected: + self._has_connected = True # Use normal clean flag on reconnect. + asyncio.create_task(self._keep_connected()) + # Runs forever unless user issues .disconnect() + + asyncio.create_task(self._handle_msg()) # Task quits on connection fail. + self._tasks.append(asyncio.create_task(self._keep_alive())) + if self.DEBUG: + self._tasks.append(asyncio.create_task(self._memory())) + if self._events: + self.up.set() # Connectivity is up + else: + asyncio.create_task(self._connect_handler(self)) # User handler. + + # Launched by .connect(). Runs until connectivity fails. Checks for and + # handles incoming messages. + async def _handle_msg(self): + try: + while self.isconnected(): + async with self.lock: + await self.wait_msg() # Immediate return if no message + await asyncio.sleep_ms(_DEFAULT_MS) # Let other tasks get lock + + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + # Keep broker alive MQTT spec 3.1.2.10 Keep Alive. + # Runs until ping failure or no response in keepalive period. + async def _keep_alive(self): + while self.isconnected(): + pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval + if pings_due >= 4: + self.dprint("Reconnect: broker fail.") + break + await asyncio.sleep_ms(self._ping_interval) + try: + await self._ping() + except OSError: + break + self._reconnect() # Broker or WiFi fail. + + async def _kill_tasks(self, kill_skt): # Cancel running tasks + for task in self._tasks: + task.cancel() + self._tasks.clear() + await asyncio.sleep_ms(0) # Ensure cancellation complete + if kill_skt: # Close socket + self._close() + + # DEBUG: show RAM messages. + async def _memory(self): + while True: + await asyncio.sleep(20) + gc.collect() + self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc()) + + def isconnected(self): + if self._in_connect: # Disable low-level check during .connect() + return True + if self._isconnected and not self._sta_if.isconnected(): # It's going down. + self._reconnect() + return self._isconnected + + def _reconnect(self): # Schedule a reconnection if not underway. + if self._isconnected: + self._isconnected = False + asyncio.create_task(self._kill_tasks(True)) # Shut down tasks and socket + if self._events: # Signal an outage + self.down.set() + else: + asyncio.create_task(self._wifi_handler(False)) # User handler. + + # Await broker connection. + async def _connection(self): + while not self._isconnected: + await asyncio.sleep(1) + + # Scheduled on 1st successful connection. Runs forever maintaining wifi and + # broker connection. Must handle conditions at edge of WiFi range. + async def _keep_connected(self): + while self._has_connected: + if self.isconnected(): # Pause for 1 second + await asyncio.sleep(1) + gc.collect() + else: # Link is down, socket is closed, tasks are killed + try: + self._sta_if.disconnect() + except OSError: + self.dprint("Wi-Fi not started, unable to disconnect interface") + await asyncio.sleep(1) + try: + await self.wifi_connect() + except OSError: + continue + if not self._has_connected: # User has issued the terminal .disconnect() + self.dprint("Disconnected, exiting _keep_connected") + break + try: + await self.connect() + # Now has set ._isconnected and scheduled _connect_handler(). + self.dprint("Reconnect OK!") + except OSError as e: + self.dprint("Error in reconnect. %s", e) + # Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received. + self._close() # Disconnect and try again. + self._in_connect = False + self._isconnected = False + self.dprint("Disconnected, exited _keep_connected") + + async def subscribe(self, topic, qos=0): + qos_check(qos) + while 1: + await self._connection() + try: + return await super().subscribe(topic, qos) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + async def unsubscribe(self, topic): + while 1: + await self._connection() + try: + return await super().unsubscribe(topic) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. + + async def publish(self, topic, msg, retain=False, qos=0): + qos_check(qos) + while 1: + await self._connection() + try: + return await super().publish(topic, msg, retain, qos) + except OSError: + pass + self._reconnect() # Broker or WiFi fail. diff --git a/lib/topic_message.py b/lib/topic_message.py new file mode 100644 index 0000000..ab1a2d6 --- /dev/null +++ b/lib/topic_message.py @@ -0,0 +1,34 @@ + +class TopicMessage(): + "Object containing both a topic and message string attribute. Can convert itself to a string representation" + def __init__(self, topic:str = "", message:str = ""): + self.topic = topic + self.message = message + def to_string(self) -> str: + return f"{self.topic}: {self.message}" + +class TMQueue(): + """Simple queue (FIFO) of TopicMessage objects""" + def __init__(self): + self.data: list[TopicMessage] = list() + + def get(self): + if not self.is_empty(): + item = self.data.pop(0) + return item + else: + raise IndexError + + def add(self, item): + self.data.append(item) + + def is_empty(self): + if len(self.data) == 0: + return True + else: + return False + +def string_to_topicmessage(s: str): + """Convert string to a TopicMessage object""" + topic, message = str.split(":", maxsplit = 1) + return TopicMessage(topic, message) diff --git a/main.py b/main.py new file mode 100644 index 0000000..930524b --- /dev/null +++ b/main.py @@ -0,0 +1,125 @@ +import json +import machine +import asyncio +import ntptime +import select +import time +import sys + +import lib.TMP117 +import lib.mqtt_as + +import event_loops + +LED = machine.Pin("LED", machine.Pin.OUT) +LED.on() + +def search_dict_list(dictionary, attribute, value): + """Returns first instance where attribute = value in list of dictionaries""" + return [item for item in dictionary if item[attribute] == value][0] + +# First, load correct config +with open("global_config.json") as f: + jdata = json.load(f) + +uuid = int.from_bytes(machine.unique_id(), "big") + +config = search_dict_list(jdata, "micro_id", uuid) + +# Setup i2c buses +i2c_buses = [] +for bus_config in config["i2c_buses"]: + bus = { + "name": bus_config["name"] + } + if bus_config["hard_i2c"]: + bus["obj"] = machine.I2C( + bus_config["hard_i2c_bus"], + scl = machine.Pin(bus_config["scl"]), + sda = machine.Pin(bus_config["sda"]), + freq = bus_config["freq"]) + else: + bus["obj"] = machine.SoftI2C( + scl = machine.Pin(bus_config["scl"]), + sda = machine.Pin(bus_config["sda"]), + freq = bus_config["freq"]) + + i2c_buses.append(bus) + +# Setup sensors +sensors = [] +for sensor_config in config["sensors"]: + + i2c_conn = search_dict_list(i2c_buses, "name", sensor_config["i2c_name"])["obj"] + sensor = lib.TMP117.TMP117( + name = sensor_config["name"], + i2c = i2c_conn, + i2c_addr = sensor_config["i2c_addr"], + config_int = sensor_config["config"] + ) + sensors.append(sensor) + +print(uuid) +print(i2c_buses) +print(sensors) + + + +temperature_topic = config["temperature_topic"] +inbound_topic = config["inbound_topic"] +# Setup communication +if config["wifi_or_usb"] == "wifi": + # Setup MQTT Client + mqtt_config = lib.mqtt_as.config + mqtt_config["ssid"] = config["network_settings"]["ssid"] + mqtt_config["wifi_pw"] = config["network_settings"]["wifi_pw"] + mqtt_config["server"] = config["network_settings"]["mqtt_server"] + mqtt_config["port"] = config["network_settings"]["mqtt_port"] + mqtt_config["queue_len"] = config["network_settings"]["mqtt_queue_length"] + + mqtt_client = lib.mqtt_as.MQTTClient(mqtt_config) + + # Connect to MQTT broker and sync clock via NTP server + while not mqtt_client.isconnected(): + try: + asyncio.run(mqtt_client.connect()) + ntptime.settime() + except OSError as E: + print(f"Connection failed. ERROR: {E}. Retrying.") + + LED.off() + try: + asyncio.run(event_loops.main_wifi(uuid, sensors, mqtt_client, temperature_topic, inbound_topic)) + finally: + mqtt_client.close() + LED.on() + asyncio.new_event_loop() + + +elif config["wifi_or_usb"] == "usb": + while True: + poll_obj = select.poll() + poll_obj.register(sys.stdin, select.POLLIN) + + t1 = time.ticks_ms() + sys.stdout.write("REQUEST_TIME\n") + + poll_results = poll_obj.poll(100) + if poll_results: + timestamp_ms = int(sys.stdin.readline()) + print(f"Recieved {timestamp_ms}") + t2 = time.ticks_ms() + t = time.gmtime(timestamp_ms//1000) + machine.RTC().datetime((t[0], t[1], t[2], t[6], t[3], t[4], t[5], 0)) + print(f"Time: {time.time(), time.gmtime()}, took {time.ticks_diff(t2, t1)} ms latency") + break + else: + time.sleep(5) + + LED.off() + try: + asyncio.run(event_loops.main_wired(uuid, sensors, temperature_topic, inbound_topic)) + finally: + LED.on() + asyncio.new_event_loop() + diff --git a/readme.md b/readme.md new file mode 100644 index 0000000..4539fc3 --- /dev/null +++ b/readme.md @@ -0,0 +1,3 @@ +# Code for TMP117 + Pico W Temperature Measurement System + +A "global_config.json" file is required for the code to run. Here, all the settings are defined. An example is given in "example_global_config.json". \ No newline at end of file