Initial commit
This commit is contained in:
commit
1a3d99c60e
69
async_tasks/general_tasks.py
Normal file
69
async_tasks/general_tasks.py
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
|
||||||
|
from lib.topic_message import TopicMessage, TMQueue
|
||||||
|
from lib.TMP117 import TMP117
|
||||||
|
|
||||||
|
async def measure_sensors(
|
||||||
|
sensors: list[TMP117],
|
||||||
|
interval_s: float | int,
|
||||||
|
microcontroller_id: int,
|
||||||
|
message_queue: TMQueue,
|
||||||
|
temperature_topic: str):
|
||||||
|
"""
|
||||||
|
Readout configuration of all sensors, then timestamp, temperature of all sensors. Adds data to message_queue. Performs every interval_s, correcting for execution time.
|
||||||
|
"""
|
||||||
|
next_time = time.ticks_us()
|
||||||
|
while True:
|
||||||
|
# Initialise
|
||||||
|
temperatures = []
|
||||||
|
timestamps = []
|
||||||
|
configs = []
|
||||||
|
|
||||||
|
|
||||||
|
# Readout configuration of sensors pre-temperature measurement
|
||||||
|
for sensor in sensors:
|
||||||
|
try:
|
||||||
|
configs.append(sensor.read_configuration_integer())
|
||||||
|
except OSError as E:
|
||||||
|
print(f"ERROR: Sensor {sensor.name}, {E}")
|
||||||
|
configs.append(None)
|
||||||
|
|
||||||
|
# Read time and temperature
|
||||||
|
for sensor in sensors:
|
||||||
|
try:
|
||||||
|
temperatures.append(sensor.read_temperature())
|
||||||
|
except OSError as E:
|
||||||
|
print(f"ERROR: Sensor {sensor.name}, {E}")
|
||||||
|
temperatures.append(None)
|
||||||
|
timestamps.append(time.time_ns())
|
||||||
|
|
||||||
|
# Package into message
|
||||||
|
for i in range(len(sensors)):
|
||||||
|
message = json.dumps({
|
||||||
|
"micro": microcontroller_id,
|
||||||
|
"sensor": sensors[i].name,
|
||||||
|
"temperature": temperatures[i],
|
||||||
|
"time": timestamps[i],
|
||||||
|
"config": configs[i]
|
||||||
|
})
|
||||||
|
x = TopicMessage(temperature_topic, message)
|
||||||
|
message_queue.add(x)
|
||||||
|
|
||||||
|
# Calculate time to sleep
|
||||||
|
next_time = time.ticks_add(next_time, interval_s*1000000)
|
||||||
|
sleep_time = time.ticks_diff(next_time, time.ticks_us())/1e6
|
||||||
|
await asyncio.sleep(sleep_time)
|
||||||
|
|
||||||
|
async def switch_inbound_message_queue(inbound_queue: TMQueue, interval_s):
|
||||||
|
"""Checks inbound queue and decides what to do with the messages"""
|
||||||
|
while True:
|
||||||
|
# Iterate through all inbound messages and execute relevant task.
|
||||||
|
while not inbound_queue.is_empty():
|
||||||
|
tmsg = inbound_queue.get()
|
||||||
|
topic = tmsg.topic
|
||||||
|
message = tmsg.message
|
||||||
|
pass
|
||||||
|
await asyncio.sleep(interval_s)
|
||||||
|
|
46
async_tasks/wired_tasks.py
Normal file
46
async_tasks/wired_tasks.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
import select
|
||||||
|
import sys
|
||||||
|
import machine
|
||||||
|
import time
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
from lib.topic_message import TopicMessage, TMQueue, string_to_topicmessage
|
||||||
|
|
||||||
|
async def usb_sync_clock(
|
||||||
|
interval_s: float | int):
|
||||||
|
"""Synchronise clock by posting a request and readin in the output immediately"""
|
||||||
|
while True:
|
||||||
|
poll_obj = select.poll()
|
||||||
|
poll_obj.register(sys.stdin, select.POLLIN)
|
||||||
|
sys.stdout.write("REQUEST_TIME \n")
|
||||||
|
poll_results = poll_obj.poll(10)
|
||||||
|
if poll_results:
|
||||||
|
timestamp_ms = float(sys.stdin.readline())
|
||||||
|
t = time.gmtime(int(timestamp_ms/1000))
|
||||||
|
machine.RTC().datetime((t[0], t[1], t[2], t[6], t[3], t[4], t[5], 0))
|
||||||
|
else:
|
||||||
|
await asyncio.sleep(interval_s)
|
||||||
|
|
||||||
|
async def usb_in_message_parser(
|
||||||
|
inbound_queue: TMQueue,
|
||||||
|
interval_s: float | int):
|
||||||
|
"""Parse regular TopicMessage messages sent in through USB"""
|
||||||
|
poll_obj = select.poll()
|
||||||
|
poll_obj.register(sys.stdin, select.POLLIN)
|
||||||
|
while True:
|
||||||
|
poll_results = poll_obj.poll(10)
|
||||||
|
if poll_results:
|
||||||
|
line = sys.stdin.readline()
|
||||||
|
inbound_queue.add(string_to_topicmessage(line))
|
||||||
|
else:
|
||||||
|
await asyncio.sleep(interval_s)
|
||||||
|
|
||||||
|
async def usb_send_data(
|
||||||
|
outbound_queue: TMQueue,
|
||||||
|
interval_s: float | int):
|
||||||
|
"""Send all messages in outbound queue via USB"""
|
||||||
|
while True:
|
||||||
|
while not outbound_queue.is_empty():
|
||||||
|
item: TopicMessage = outbound_queue.get()
|
||||||
|
sys.stdout.write(f"{item.topic}: {item.message}\n")
|
||||||
|
await asyncio.sleep(interval_s)
|
59
async_tasks/wireless_tasks.py
Normal file
59
async_tasks/wireless_tasks.py
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
import ntptime
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
|
||||||
|
from lib.mqtt_as import MQTTClient
|
||||||
|
from lib.topic_message import TopicMessage, TMQueue
|
||||||
|
|
||||||
|
|
||||||
|
async def mqtt_up(
|
||||||
|
mqtt_client: MQTTClient,
|
||||||
|
inbound_topic,
|
||||||
|
todo):
|
||||||
|
"""
|
||||||
|
Perform asynchronous "todo" when MQTT client is connected.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
await mqtt_client.up.wait() # type: ignore
|
||||||
|
mqtt_client.up.clear()
|
||||||
|
await mqtt_client.subscribe(inbound_topic)
|
||||||
|
await todo()
|
||||||
|
|
||||||
|
async def mqtt_down(
|
||||||
|
mqtt_client: MQTTClient,
|
||||||
|
todo):
|
||||||
|
"""
|
||||||
|
Perform asynchronous "todo" when MQTT client is disconnected.
|
||||||
|
"""
|
||||||
|
while True:
|
||||||
|
await mqtt_client.down.wait() # type: ignore
|
||||||
|
mqtt_client.down.clear()
|
||||||
|
await todo()
|
||||||
|
|
||||||
|
async def wireless_sync_clock(
|
||||||
|
mqtt_client: MQTTClient,
|
||||||
|
interval_s):
|
||||||
|
"""Sync clock via NTP server"""
|
||||||
|
while True:
|
||||||
|
ntptime.settime()
|
||||||
|
await asyncio.sleep(interval_s)
|
||||||
|
|
||||||
|
async def mqtt_publish_tmsgs(
|
||||||
|
mqtt_client: MQTTClient,
|
||||||
|
outbound_queue: TMQueue,
|
||||||
|
post_interval: float | int):
|
||||||
|
"""Publishes all TopicMessage items in queue over MQTT"""
|
||||||
|
while True:
|
||||||
|
while not outbound_queue.is_empty():
|
||||||
|
item = outbound_queue.get()
|
||||||
|
await mqtt_client.publish(item.topic, item.message, qos=1)
|
||||||
|
await asyncio.sleep(post_interval)
|
||||||
|
|
||||||
|
async def mqtt_message_parser(
|
||||||
|
mqtt_client: MQTTClient,
|
||||||
|
inbound_queue: TMQueue):
|
||||||
|
"""Parse messages recieved by MQTT client and add them to the inbound queue"""
|
||||||
|
async for topic, message, retained in mqtt_client.queue: # type: ignore
|
||||||
|
topic = topic.decode()
|
||||||
|
message = message.decode()
|
||||||
|
inbound_queue.add(TopicMessage(topic, message))
|
22
event_loops.py
Normal file
22
event_loops.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
import asyncio
|
||||||
|
|
||||||
|
from lib.topic_message import TMQueue
|
||||||
|
from async_tasks.general_tasks import measure_sensors
|
||||||
|
|
||||||
|
async def main_wifi(uuid, sensors, mqtt_client, temperature_topic, inbound_topic):
|
||||||
|
from async_tasks.wireless_tasks import mqtt_publish_tmsgs
|
||||||
|
outbound_queue = TMQueue()
|
||||||
|
asyncio.create_task(measure_sensors(sensors, 1, uuid, outbound_queue, temperature_topic))
|
||||||
|
asyncio.create_task(mqtt_publish_tmsgs(mqtt_client, outbound_queue, 0.5))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
|
async def main_wired(uuid, sensors, temperature_topic, inbound_topic):
|
||||||
|
from async_tasks.wired_tasks import usb_send_data
|
||||||
|
outbound_queue = TMQueue()
|
||||||
|
asyncio.create_task(measure_sensors(sensors, 1, uuid, outbound_queue, temperature_topic))
|
||||||
|
asyncio.create_task(usb_send_data(outbound_queue, 0.5))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(5)
|
37
example_global_config.json
Normal file
37
example_global_config.json
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
[
|
||||||
|
{
|
||||||
|
"micro_id": 1000000000000000,
|
||||||
|
"wifi_or_usb": "usb",
|
||||||
|
|
||||||
|
"temperature_topic": "TEMPERATURE TOPIC",
|
||||||
|
"inbound_topic": "INBOUND TOPIC",
|
||||||
|
|
||||||
|
"network_settings":{
|
||||||
|
"ssid": "SSID",
|
||||||
|
"wifi_pw": "PASSWORD",
|
||||||
|
|
||||||
|
"mqtt_server": "IP ADDRESS",
|
||||||
|
"mqtt_port": 1883,
|
||||||
|
|
||||||
|
"mqtt_queue_length": 4
|
||||||
|
},
|
||||||
|
"i2c_buses":[
|
||||||
|
{
|
||||||
|
"name": "EG I2C",
|
||||||
|
"scl": 1,
|
||||||
|
"sda": 0,
|
||||||
|
"freq": 400000,
|
||||||
|
"hard_i2c": true,
|
||||||
|
"hard_i2c_bus": 0
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"sensors":[
|
||||||
|
{
|
||||||
|
"name": "EG SENSOR",
|
||||||
|
"i2c_name": "EG I2C",
|
||||||
|
"i2c_addr": 72,
|
||||||
|
"config": 544
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
84
lib/TMP117.py
Normal file
84
lib/TMP117.py
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
import machine
|
||||||
|
|
||||||
|
class TMP117:
|
||||||
|
RESOLUTION = 0.0078125
|
||||||
|
|
||||||
|
TEMP_ADDR = 0x00
|
||||||
|
CONFIG_ADDR = 0x01
|
||||||
|
NIST_ADDR = 0x05
|
||||||
|
|
||||||
|
TEMP_SIZE = 16
|
||||||
|
|
||||||
|
CONFIG_REGISTER_MAP = {
|
||||||
|
"HIGH_Alert": 15,
|
||||||
|
"LOW_Alert": 14,
|
||||||
|
"Data_Ready": 13,
|
||||||
|
"EEPROM_Busy": 12,
|
||||||
|
"MOD1": 11,
|
||||||
|
"MOD0": 10,
|
||||||
|
"CONV2": 9,
|
||||||
|
"CONV1": 8,
|
||||||
|
"CONV0": 7,
|
||||||
|
"AVG1": 6,
|
||||||
|
"AVG0": 5,
|
||||||
|
"T/nA": 4,
|
||||||
|
"POL": 3,
|
||||||
|
"DR/Alert": 2,
|
||||||
|
"Soft_Reset": 1,
|
||||||
|
"--": 0
|
||||||
|
}
|
||||||
|
DEFAULT_CONFIG_INT = 554
|
||||||
|
|
||||||
|
def __init__(self, name: str, i2c: machine.I2C|machine.SoftI2C, i2c_addr, config_int):
|
||||||
|
self.name = name
|
||||||
|
self.i2c_addr = i2c_addr
|
||||||
|
self.i2c = i2c
|
||||||
|
|
||||||
|
self.NIST_ID = int.from_bytes(self.i2c.readfrom_mem(self.i2c_addr, self.NIST_ADDR, 2), "big")
|
||||||
|
# Set configuration
|
||||||
|
self.write_configuration_integer(config_int)
|
||||||
|
|
||||||
|
def bytes_to_temperature(self, binary:bytes):
|
||||||
|
"""Converts a 2's complement 16 bit value to the appropriate temperature value."""
|
||||||
|
val = int.from_bytes(binary, "big")
|
||||||
|
if (val & (1 << (self.TEMP_SIZE - 1))) != 0:
|
||||||
|
val = val - (1 << self.TEMP_SIZE)
|
||||||
|
|
||||||
|
return val * TMP117.RESOLUTION
|
||||||
|
|
||||||
|
def int_to_config_dict(self, integer:int):
|
||||||
|
"""Converts a configuration """
|
||||||
|
assert integer < 2**(16)
|
||||||
|
bits = list(f"{integer:016b}")
|
||||||
|
bits.reverse()
|
||||||
|
|
||||||
|
result = {}
|
||||||
|
for key in self.CONFIG_REGISTER_MAP.keys():
|
||||||
|
result[key] = bits[self.CONFIG_REGISTER_MAP[key]]
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def config_dict_to_int(self, config_dict):
|
||||||
|
array = [None for _ in range(16)]
|
||||||
|
|
||||||
|
for option in config_dict.keys():
|
||||||
|
array[self.CONFIG_REGISTER_MAP[option]] = config_dict[option]
|
||||||
|
|
||||||
|
array.reverse()
|
||||||
|
bit_string = ""
|
||||||
|
for bit in array:
|
||||||
|
bit_string += str(bit)
|
||||||
|
write_byte = int(bit_string, 2)
|
||||||
|
|
||||||
|
def write_configuration_integer(self, config_int:int):
|
||||||
|
write_byte = config_int.to_bytes(2, "big")
|
||||||
|
self.i2c.writeto_mem(self.i2c_addr, self.CONFIG_ADDR, write_byte)
|
||||||
|
|
||||||
|
def read_configuration_integer(self):
|
||||||
|
binary = self.i2c.readfrom_mem(self.i2c_addr, self.CONFIG_ADDR, 2)
|
||||||
|
integer = int.from_bytes(binary, "big")
|
||||||
|
return integer
|
||||||
|
|
||||||
|
def read_temperature(self):
|
||||||
|
binary = self.i2c.readfrom_mem(self.i2c_addr, self.TEMP_ADDR, 2)
|
||||||
|
return self.bytes_to_temperature(binary)
|
803
lib/mqtt_as.py
Normal file
803
lib/mqtt_as.py
Normal file
@ -0,0 +1,803 @@
|
|||||||
|
# mqtt_as.py Asynchronous version of umqtt.robust
|
||||||
|
# (C) Copyright Peter Hinch 2017-2023.
|
||||||
|
# Released under the MIT licence.
|
||||||
|
|
||||||
|
# Pyboard D support added also RP2/default
|
||||||
|
# Various improvements contributed by Kevin Köck.
|
||||||
|
|
||||||
|
import gc
|
||||||
|
import usocket as socket
|
||||||
|
import ustruct as struct
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
from ubinascii import hexlify
|
||||||
|
import uasyncio as asyncio
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
from utime import ticks_ms, ticks_diff
|
||||||
|
from uerrno import EINPROGRESS, ETIMEDOUT
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
from micropython import const
|
||||||
|
from machine import unique_id
|
||||||
|
import network
|
||||||
|
|
||||||
|
gc.collect()
|
||||||
|
from sys import platform
|
||||||
|
|
||||||
|
VERSION = (0, 7, 2)
|
||||||
|
|
||||||
|
# Default short delay for good SynCom throughput (avoid sleep(0) with SynCom).
|
||||||
|
_DEFAULT_MS = const(20)
|
||||||
|
_SOCKET_POLL_DELAY = const(5) # 100ms added greatly to publish latency
|
||||||
|
|
||||||
|
# Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection().
|
||||||
|
ESP32 = platform == "esp32"
|
||||||
|
RP2 = platform == "rp2"
|
||||||
|
if ESP32:
|
||||||
|
# https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942
|
||||||
|
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors
|
||||||
|
elif RP2:
|
||||||
|
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110]
|
||||||
|
else:
|
||||||
|
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT]
|
||||||
|
|
||||||
|
ESP8266 = platform == "esp8266"
|
||||||
|
PYBOARD = platform == "pyboard"
|
||||||
|
|
||||||
|
# Default "do little" coro for optional user replacement
|
||||||
|
async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program
|
||||||
|
await asyncio.sleep_ms(_DEFAULT_MS)
|
||||||
|
|
||||||
|
|
||||||
|
class MsgQueue:
|
||||||
|
def __init__(self, size):
|
||||||
|
self._q = [0 for _ in range(max(size, 4))]
|
||||||
|
self._size = size
|
||||||
|
self._wi = 0
|
||||||
|
self._ri = 0
|
||||||
|
self._evt = asyncio.Event()
|
||||||
|
self.discards = 0
|
||||||
|
|
||||||
|
def put(self, *v):
|
||||||
|
self._q[self._wi] = v
|
||||||
|
self._evt.set()
|
||||||
|
self._wi = (self._wi + 1) % self._size
|
||||||
|
if self._wi == self._ri: # Would indicate empty
|
||||||
|
self._ri = (self._ri + 1) % self._size # Discard a message
|
||||||
|
self.discards += 1
|
||||||
|
|
||||||
|
def __aiter__(self):
|
||||||
|
return self
|
||||||
|
|
||||||
|
async def __anext__(self):
|
||||||
|
if self._ri == self._wi: # Empty
|
||||||
|
self._evt.clear()
|
||||||
|
await self._evt.wait()
|
||||||
|
r = self._q[self._ri]
|
||||||
|
self._ri = (self._ri + 1) % self._size
|
||||||
|
return r
|
||||||
|
|
||||||
|
|
||||||
|
config = {
|
||||||
|
"client_id": hexlify(unique_id()),
|
||||||
|
"server": None,
|
||||||
|
"port": 0,
|
||||||
|
"user": "",
|
||||||
|
"password": "",
|
||||||
|
"keepalive": 60,
|
||||||
|
"ping_interval": 0,
|
||||||
|
"ssl": False,
|
||||||
|
"ssl_params": {},
|
||||||
|
"response_time": 10,
|
||||||
|
"clean_init": True,
|
||||||
|
"clean": True,
|
||||||
|
"max_repubs": 4,
|
||||||
|
"will": None,
|
||||||
|
"subs_cb": lambda *_: None,
|
||||||
|
"wifi_coro": eliza,
|
||||||
|
"connect_coro": eliza,
|
||||||
|
"ssid": None,
|
||||||
|
"wifi_pw": None,
|
||||||
|
"queue_len": 0,
|
||||||
|
"gateway": False,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class MQTTException(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def pid_gen():
|
||||||
|
pid = 0
|
||||||
|
while True:
|
||||||
|
pid = pid + 1 if pid < 65535 else 1
|
||||||
|
yield pid
|
||||||
|
|
||||||
|
|
||||||
|
def qos_check(qos):
|
||||||
|
if not (qos == 0 or qos == 1):
|
||||||
|
raise ValueError("Only qos 0 and 1 are supported.")
|
||||||
|
|
||||||
|
|
||||||
|
# MQTT_base class. Handles MQTT protocol on the basis of a good connection.
|
||||||
|
# Exceptions from connectivity failures are handled by MQTTClient subclass.
|
||||||
|
class MQTT_base:
|
||||||
|
REPUB_COUNT = 0 # TEST
|
||||||
|
DEBUG = False
|
||||||
|
|
||||||
|
def __init__(self, config):
|
||||||
|
self._events = config["queue_len"] > 0
|
||||||
|
# MQTT config
|
||||||
|
self._client_id = config["client_id"]
|
||||||
|
self._user = config["user"]
|
||||||
|
self._pswd = config["password"]
|
||||||
|
self._keepalive = config["keepalive"]
|
||||||
|
if self._keepalive >= 65536:
|
||||||
|
raise ValueError("invalid keepalive time")
|
||||||
|
self._response_time = config["response_time"] * 1000 # Repub if no PUBACK received (ms).
|
||||||
|
self._max_repubs = config["max_repubs"]
|
||||||
|
self._clean_init = config["clean_init"] # clean_session state on first connection
|
||||||
|
self._clean = config["clean"] # clean_session state on reconnect
|
||||||
|
will = config["will"]
|
||||||
|
if will is None:
|
||||||
|
self._lw_topic = False
|
||||||
|
else:
|
||||||
|
self._set_last_will(*will)
|
||||||
|
# WiFi config
|
||||||
|
self._ssid = config["ssid"] # Required for ESP32 / Pyboard D. Optional ESP8266
|
||||||
|
self._wifi_pw = config["wifi_pw"]
|
||||||
|
self._ssl = config["ssl"]
|
||||||
|
self._ssl_params = config["ssl_params"]
|
||||||
|
# Callbacks and coros
|
||||||
|
if self._events:
|
||||||
|
self.up = asyncio.Event()
|
||||||
|
self.down = asyncio.Event()
|
||||||
|
self.queue = MsgQueue(config["queue_len"])
|
||||||
|
else: # Callbacks
|
||||||
|
self._cb = config["subs_cb"]
|
||||||
|
self._wifi_handler = config["wifi_coro"]
|
||||||
|
self._connect_handler = config["connect_coro"]
|
||||||
|
# Network
|
||||||
|
self.port = config["port"]
|
||||||
|
if self.port == 0:
|
||||||
|
self.port = 8883 if self._ssl else 1883
|
||||||
|
self.server = config["server"]
|
||||||
|
if self.server is None:
|
||||||
|
raise ValueError("no server specified.")
|
||||||
|
self._sock = None
|
||||||
|
self._sta_if = network.WLAN(network.STA_IF)
|
||||||
|
self._sta_if.active(True)
|
||||||
|
if config["gateway"]: # Called from gateway (hence ESP32).
|
||||||
|
import aioespnow # Set up ESPNOW
|
||||||
|
|
||||||
|
while not (sta := self._sta_if).active():
|
||||||
|
time.sleep(0.1)
|
||||||
|
sta.config(pm=sta.PM_NONE) # No power management
|
||||||
|
sta.active(True)
|
||||||
|
self._espnow = aioespnow.AIOESPNow() # Returns AIOESPNow enhanced with async support
|
||||||
|
self._espnow.active(True)
|
||||||
|
|
||||||
|
self.newpid = pid_gen()
|
||||||
|
self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response
|
||||||
|
self.last_rx = ticks_ms() # Time of last communication from broker
|
||||||
|
self.lock = asyncio.Lock()
|
||||||
|
|
||||||
|
def _set_last_will(self, topic, msg, retain=False, qos=0):
|
||||||
|
qos_check(qos)
|
||||||
|
if not topic:
|
||||||
|
raise ValueError("Empty topic.")
|
||||||
|
self._lw_topic = topic
|
||||||
|
self._lw_msg = msg
|
||||||
|
self._lw_qos = qos
|
||||||
|
self._lw_retain = retain
|
||||||
|
|
||||||
|
def dprint(self, msg, *args):
|
||||||
|
if self.DEBUG:
|
||||||
|
print(msg % args)
|
||||||
|
|
||||||
|
def _timeout(self, t):
|
||||||
|
return ticks_diff(ticks_ms(), t) > self._response_time
|
||||||
|
|
||||||
|
async def _as_read(self, n, sock=None): # OSError caught by superclass
|
||||||
|
if sock is None:
|
||||||
|
sock = self._sock
|
||||||
|
# Declare a byte array of size n. That space is needed anyway, better
|
||||||
|
# to just 'allocate' it in one go instead of appending to an
|
||||||
|
# existing object, this prevents reallocation and fragmentation.
|
||||||
|
data = bytearray(n)
|
||||||
|
buffer = memoryview(data)
|
||||||
|
size = 0
|
||||||
|
t = ticks_ms()
|
||||||
|
while size < n:
|
||||||
|
if self._timeout(t) or not self.isconnected():
|
||||||
|
raise OSError(-1, "Timeout on socket read")
|
||||||
|
try:
|
||||||
|
msg_size = sock.readinto(buffer[size:], n - size)
|
||||||
|
except OSError as e: # ESP32 issues weird 119 errors here
|
||||||
|
msg_size = None
|
||||||
|
if e.args[0] not in BUSY_ERRORS:
|
||||||
|
raise
|
||||||
|
if msg_size == 0: # Connection closed by host
|
||||||
|
raise OSError(-1, "Connection closed by host")
|
||||||
|
if msg_size is not None: # data received
|
||||||
|
size += msg_size
|
||||||
|
t = ticks_ms()
|
||||||
|
self.last_rx = ticks_ms()
|
||||||
|
await asyncio.sleep_ms(_SOCKET_POLL_DELAY)
|
||||||
|
return data
|
||||||
|
|
||||||
|
async def _as_write(self, bytes_wr, length=0, sock=None):
|
||||||
|
if sock is None:
|
||||||
|
sock = self._sock
|
||||||
|
|
||||||
|
# Wrap bytes in memoryview to avoid copying during slicing
|
||||||
|
bytes_wr = memoryview(bytes_wr)
|
||||||
|
if length:
|
||||||
|
bytes_wr = bytes_wr[:length]
|
||||||
|
t = ticks_ms()
|
||||||
|
while bytes_wr:
|
||||||
|
if self._timeout(t) or not self.isconnected():
|
||||||
|
raise OSError(-1, "Timeout on socket write")
|
||||||
|
try:
|
||||||
|
n = sock.write(bytes_wr)
|
||||||
|
except OSError as e: # ESP32 issues weird 119 errors here
|
||||||
|
n = 0
|
||||||
|
if e.args[0] not in BUSY_ERRORS:
|
||||||
|
raise
|
||||||
|
if n:
|
||||||
|
t = ticks_ms()
|
||||||
|
bytes_wr = bytes_wr[n:]
|
||||||
|
await asyncio.sleep_ms(_SOCKET_POLL_DELAY)
|
||||||
|
|
||||||
|
async def _send_str(self, s):
|
||||||
|
await self._as_write(struct.pack("!H", len(s)))
|
||||||
|
await self._as_write(s)
|
||||||
|
|
||||||
|
async def _recv_len(self):
|
||||||
|
n = 0
|
||||||
|
sh = 0
|
||||||
|
while 1:
|
||||||
|
res = await self._as_read(1)
|
||||||
|
b = res[0]
|
||||||
|
n |= (b & 0x7F) << sh
|
||||||
|
if not b & 0x80:
|
||||||
|
return n
|
||||||
|
sh += 7
|
||||||
|
|
||||||
|
async def _connect(self, clean):
|
||||||
|
self._sock = socket.socket()
|
||||||
|
self._sock.setblocking(False)
|
||||||
|
try:
|
||||||
|
self._sock.connect(self._addr)
|
||||||
|
except OSError as e:
|
||||||
|
if e.args[0] not in BUSY_ERRORS:
|
||||||
|
raise
|
||||||
|
await asyncio.sleep_ms(_DEFAULT_MS)
|
||||||
|
self.dprint("Connecting to broker.")
|
||||||
|
if self._ssl:
|
||||||
|
try:
|
||||||
|
import ssl
|
||||||
|
except ImportError:
|
||||||
|
import ussl as ssl
|
||||||
|
|
||||||
|
self._sock = ssl.wrap_socket(self._sock, **self._ssl_params)
|
||||||
|
premsg = bytearray(b"\x10\0\0\0\0\0")
|
||||||
|
msg = bytearray(b"\x04MQTT\x04\0\0\0") # Protocol 3.1.1
|
||||||
|
|
||||||
|
sz = 10 + 2 + len(self._client_id)
|
||||||
|
msg[6] = clean << 1
|
||||||
|
if self._user:
|
||||||
|
sz += 2 + len(self._user) + 2 + len(self._pswd)
|
||||||
|
msg[6] |= 0xC0
|
||||||
|
if self._keepalive:
|
||||||
|
msg[7] |= self._keepalive >> 8
|
||||||
|
msg[8] |= self._keepalive & 0x00FF
|
||||||
|
if self._lw_topic:
|
||||||
|
sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg)
|
||||||
|
msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3
|
||||||
|
msg[6] |= self._lw_retain << 5
|
||||||
|
|
||||||
|
i = 1
|
||||||
|
while sz > 0x7F:
|
||||||
|
premsg[i] = (sz & 0x7F) | 0x80
|
||||||
|
sz >>= 7
|
||||||
|
i += 1
|
||||||
|
premsg[i] = sz
|
||||||
|
await self._as_write(premsg, i + 2)
|
||||||
|
await self._as_write(msg)
|
||||||
|
await self._send_str(self._client_id)
|
||||||
|
if self._lw_topic:
|
||||||
|
await self._send_str(self._lw_topic)
|
||||||
|
await self._send_str(self._lw_msg)
|
||||||
|
if self._user:
|
||||||
|
await self._send_str(self._user)
|
||||||
|
await self._send_str(self._pswd)
|
||||||
|
# Await CONNACK
|
||||||
|
# read causes ECONNABORTED if broker is out; triggers a reconnect.
|
||||||
|
resp = await self._as_read(4)
|
||||||
|
self.dprint("Connected to broker.") # Got CONNACK
|
||||||
|
if resp[3] != 0 or resp[0] != 0x20 or resp[1] != 0x02:
|
||||||
|
# Bad CONNACK e.g. authentication fail.
|
||||||
|
raise OSError(
|
||||||
|
-1, f"Connect fail: 0x{(resp[0] << 8) + resp[1]:04x} {resp[3]} (README 7)"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _ping(self):
|
||||||
|
async with self.lock:
|
||||||
|
await self._as_write(b"\xc0\0")
|
||||||
|
|
||||||
|
# Check internet connectivity by sending DNS lookup to Google's 8.8.8.8
|
||||||
|
async def wan_ok(
|
||||||
|
self,
|
||||||
|
packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01",
|
||||||
|
):
|
||||||
|
if not self.isconnected(): # WiFi is down
|
||||||
|
return False
|
||||||
|
length = 32 # DNS query and response packet size
|
||||||
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||||||
|
s.setblocking(False)
|
||||||
|
s.connect(("8.8.8.8", 53))
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
try:
|
||||||
|
await self._as_write(packet, sock=s)
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
res = await self._as_read(length, s)
|
||||||
|
if len(res) == length:
|
||||||
|
return True # DNS response size OK
|
||||||
|
except OSError: # Timeout on read: no connectivity.
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
s.close()
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def broker_up(self): # Test broker connectivity
|
||||||
|
if not self.isconnected():
|
||||||
|
return False
|
||||||
|
tlast = self.last_rx
|
||||||
|
if ticks_diff(ticks_ms(), tlast) < 1000:
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
await self._ping()
|
||||||
|
except OSError:
|
||||||
|
return False
|
||||||
|
t = ticks_ms()
|
||||||
|
while not self._timeout(t):
|
||||||
|
await asyncio.sleep_ms(100)
|
||||||
|
if ticks_diff(self.last_rx, tlast) > 0: # Response received
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
async def disconnect(self):
|
||||||
|
if self._sock is not None:
|
||||||
|
await self._kill_tasks(False) # Keep socket open
|
||||||
|
try:
|
||||||
|
async with self.lock:
|
||||||
|
self._sock.write(b"\xe0\0") # Close broker connection
|
||||||
|
await asyncio.sleep_ms(100)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self._close()
|
||||||
|
self._has_connected = False
|
||||||
|
|
||||||
|
def _close(self):
|
||||||
|
if self._sock is not None:
|
||||||
|
self._sock.close()
|
||||||
|
|
||||||
|
def close(self): # API. See https://github.com/peterhinch/micropython-mqtt/issues/60
|
||||||
|
self._close()
|
||||||
|
try:
|
||||||
|
self._sta_if.disconnect() # Disconnect Wi-Fi to avoid errors
|
||||||
|
except OSError:
|
||||||
|
self.dprint("Wi-Fi not started, unable to disconnect interface")
|
||||||
|
self._sta_if.active(False)
|
||||||
|
|
||||||
|
async def _await_pid(self, pid):
|
||||||
|
t = ticks_ms()
|
||||||
|
while pid in self.rcv_pids: # local copy
|
||||||
|
if self._timeout(t) or not self.isconnected():
|
||||||
|
break # Must repub or bail out
|
||||||
|
await asyncio.sleep_ms(100)
|
||||||
|
else:
|
||||||
|
return True # PID received. All done.
|
||||||
|
return False
|
||||||
|
|
||||||
|
# qos == 1: coro blocks until wait_msg gets correct PID.
|
||||||
|
# If WiFi fails completely subclass re-publishes with new PID.
|
||||||
|
async def publish(self, topic, msg, retain, qos):
|
||||||
|
pid = next(self.newpid)
|
||||||
|
if qos:
|
||||||
|
self.rcv_pids.add(pid)
|
||||||
|
async with self.lock:
|
||||||
|
await self._publish(topic, msg, retain, qos, 0, pid)
|
||||||
|
if qos == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
count = 0
|
||||||
|
while 1: # Await PUBACK, republish on timeout
|
||||||
|
if await self._await_pid(pid):
|
||||||
|
return
|
||||||
|
# No match
|
||||||
|
if count >= self._max_repubs or not self.isconnected():
|
||||||
|
raise OSError(-1) # Subclass to re-publish with new PID
|
||||||
|
async with self.lock:
|
||||||
|
await self._publish(topic, msg, retain, qos, dup=1, pid=pid) # Add pid
|
||||||
|
count += 1
|
||||||
|
self.REPUB_COUNT += 1
|
||||||
|
|
||||||
|
async def _publish(self, topic, msg, retain, qos, dup, pid):
|
||||||
|
pkt = bytearray(b"\x30\0\0\0")
|
||||||
|
pkt[0] |= qos << 1 | retain | dup << 3
|
||||||
|
sz = 2 + len(topic) + len(msg)
|
||||||
|
if qos > 0:
|
||||||
|
sz += 2
|
||||||
|
if sz >= 2097152:
|
||||||
|
raise MQTTException("Strings too long.")
|
||||||
|
i = 1
|
||||||
|
while sz > 0x7F:
|
||||||
|
pkt[i] = (sz & 0x7F) | 0x80
|
||||||
|
sz >>= 7
|
||||||
|
i += 1
|
||||||
|
pkt[i] = sz
|
||||||
|
await self._as_write(pkt, i + 1)
|
||||||
|
await self._send_str(topic)
|
||||||
|
if qos > 0:
|
||||||
|
struct.pack_into("!H", pkt, 0, pid)
|
||||||
|
await self._as_write(pkt, 2)
|
||||||
|
await self._as_write(msg)
|
||||||
|
|
||||||
|
# Can raise OSError if WiFi fails. Subclass traps.
|
||||||
|
async def subscribe(self, topic, qos):
|
||||||
|
pkt = bytearray(b"\x82\0\0\0")
|
||||||
|
pid = next(self.newpid)
|
||||||
|
self.rcv_pids.add(pid)
|
||||||
|
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid)
|
||||||
|
async with self.lock:
|
||||||
|
await self._as_write(pkt)
|
||||||
|
await self._send_str(topic)
|
||||||
|
await self._as_write(qos.to_bytes(1, "little"))
|
||||||
|
|
||||||
|
if not await self._await_pid(pid):
|
||||||
|
raise OSError(-1)
|
||||||
|
|
||||||
|
# Can raise OSError if WiFi fails. Subclass traps.
|
||||||
|
async def unsubscribe(self, topic):
|
||||||
|
pkt = bytearray(b"\xa2\0\0\0")
|
||||||
|
pid = next(self.newpid)
|
||||||
|
self.rcv_pids.add(pid)
|
||||||
|
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid)
|
||||||
|
async with self.lock:
|
||||||
|
await self._as_write(pkt)
|
||||||
|
await self._send_str(topic)
|
||||||
|
|
||||||
|
if not await self._await_pid(pid):
|
||||||
|
raise OSError(-1)
|
||||||
|
|
||||||
|
# Wait for a single incoming MQTT message and process it.
|
||||||
|
# Subscribed messages are delivered to a callback previously
|
||||||
|
# set by .setup() method. Other (internal) MQTT
|
||||||
|
# messages processed internally.
|
||||||
|
# Immediate return if no data available. Called from ._handle_msg().
|
||||||
|
async def wait_msg(self):
|
||||||
|
try:
|
||||||
|
res = self._sock.read(1) # Throws OSError on WiFi fail
|
||||||
|
except OSError as e:
|
||||||
|
if e.args[0] in BUSY_ERRORS: # Needed by RP2
|
||||||
|
await asyncio.sleep_ms(0)
|
||||||
|
return
|
||||||
|
raise
|
||||||
|
if res is None:
|
||||||
|
return
|
||||||
|
if res == b"":
|
||||||
|
raise OSError(-1, "Empty response")
|
||||||
|
|
||||||
|
if res == b"\xd0": # PINGRESP
|
||||||
|
await self._as_read(1) # Update .last_rx time
|
||||||
|
return
|
||||||
|
op = res[0]
|
||||||
|
|
||||||
|
if op == 0x40: # PUBACK: save pid
|
||||||
|
sz = await self._as_read(1)
|
||||||
|
if sz != b"\x02":
|
||||||
|
raise OSError(-1, "Invalid PUBACK packet")
|
||||||
|
rcv_pid = await self._as_read(2)
|
||||||
|
pid = rcv_pid[0] << 8 | rcv_pid[1]
|
||||||
|
if pid in self.rcv_pids:
|
||||||
|
self.rcv_pids.discard(pid)
|
||||||
|
else:
|
||||||
|
raise OSError(-1, "Invalid pid in PUBACK packet")
|
||||||
|
|
||||||
|
if op == 0x90: # SUBACK
|
||||||
|
resp = await self._as_read(4)
|
||||||
|
if resp[3] == 0x80:
|
||||||
|
raise OSError(-1, "Invalid SUBACK packet")
|
||||||
|
pid = resp[2] | (resp[1] << 8)
|
||||||
|
if pid in self.rcv_pids:
|
||||||
|
self.rcv_pids.discard(pid)
|
||||||
|
else:
|
||||||
|
raise OSError(-1, "Invalid pid in SUBACK packet")
|
||||||
|
|
||||||
|
if op == 0xB0: # UNSUBACK
|
||||||
|
resp = await self._as_read(3)
|
||||||
|
pid = resp[2] | (resp[1] << 8)
|
||||||
|
if pid in self.rcv_pids:
|
||||||
|
self.rcv_pids.discard(pid)
|
||||||
|
else:
|
||||||
|
raise OSError(-1)
|
||||||
|
|
||||||
|
if op & 0xF0 != 0x30:
|
||||||
|
return
|
||||||
|
sz = await self._recv_len()
|
||||||
|
topic_len = await self._as_read(2)
|
||||||
|
topic_len = (topic_len[0] << 8) | topic_len[1]
|
||||||
|
topic = await self._as_read(topic_len)
|
||||||
|
sz -= topic_len + 2
|
||||||
|
if op & 6:
|
||||||
|
pid = await self._as_read(2)
|
||||||
|
pid = pid[0] << 8 | pid[1]
|
||||||
|
sz -= 2
|
||||||
|
msg = await self._as_read(sz)
|
||||||
|
retained = op & 0x01
|
||||||
|
if self._events:
|
||||||
|
self.queue.put(topic, msg, bool(retained))
|
||||||
|
else:
|
||||||
|
self._cb(topic, msg, bool(retained))
|
||||||
|
if op & 6 == 2: # qos 1
|
||||||
|
pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK
|
||||||
|
struct.pack_into("!H", pkt, 2, pid)
|
||||||
|
await self._as_write(pkt)
|
||||||
|
elif op & 6 == 4: # qos 2 not supported
|
||||||
|
raise OSError(-1, "QoS 2 not supported")
|
||||||
|
|
||||||
|
|
||||||
|
# MQTTClient class. Handles issues relating to connectivity.
|
||||||
|
|
||||||
|
|
||||||
|
class MQTTClient(MQTT_base):
|
||||||
|
def __init__(self, config):
|
||||||
|
super().__init__(config)
|
||||||
|
self._isconnected = False # Current connection state
|
||||||
|
keepalive = 1000 * self._keepalive # ms
|
||||||
|
self._ping_interval = keepalive // 4 if keepalive else 20000
|
||||||
|
p_i = config["ping_interval"] * 1000 # Can specify shorter e.g. for subscribe-only
|
||||||
|
if p_i and p_i < self._ping_interval:
|
||||||
|
self._ping_interval = p_i
|
||||||
|
self._in_connect = False
|
||||||
|
self._has_connected = False # Define 'Clean Session' value to use.
|
||||||
|
self._tasks = []
|
||||||
|
if ESP8266:
|
||||||
|
import esp
|
||||||
|
|
||||||
|
esp.sleep_type(0) # Improve connection integrity at cost of power consumption.
|
||||||
|
|
||||||
|
async def wifi_connect(self, quick=False):
|
||||||
|
s = self._sta_if
|
||||||
|
if ESP8266:
|
||||||
|
if s.isconnected(): # 1st attempt, already connected.
|
||||||
|
return
|
||||||
|
s.active(True)
|
||||||
|
s.connect() # ESP8266 remembers connection.
|
||||||
|
for _ in range(60):
|
||||||
|
if (
|
||||||
|
s.status() != network.STAT_CONNECTING
|
||||||
|
): # Break out on fail or success. Check once per sec.
|
||||||
|
break
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
if (
|
||||||
|
s.status() == network.STAT_CONNECTING
|
||||||
|
): # might hang forever awaiting dhcp lease renewal or something else
|
||||||
|
s.disconnect()
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
if not s.isconnected() and self._ssid is not None and self._wifi_pw is not None:
|
||||||
|
s.connect(self._ssid, self._wifi_pw)
|
||||||
|
while (
|
||||||
|
s.status() == network.STAT_CONNECTING
|
||||||
|
): # Break out on fail or success. Check once per sec.
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
else:
|
||||||
|
s.active(True)
|
||||||
|
if RP2: # Disable auto-sleep.
|
||||||
|
# https://datasheets.raspberrypi.com/picow/connecting-to-the-internet-with-pico-w.pdf
|
||||||
|
# para 3.6.3
|
||||||
|
s.config(pm=0xA11140)
|
||||||
|
s.connect(self._ssid, self._wifi_pw)
|
||||||
|
for _ in range(60): # Break out on fail or success. Check once per sec.
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
# Loop while connecting or no IP
|
||||||
|
if s.isconnected():
|
||||||
|
break
|
||||||
|
if ESP32:
|
||||||
|
# Status values >= STAT_IDLE can occur during connect:
|
||||||
|
# STAT_IDLE 1000, STAT_CONNECTING 1001, STAT_GOT_IP 1010
|
||||||
|
if s.status() < network.STAT_IDLE: # Error statuses
|
||||||
|
break # are in range 200..204
|
||||||
|
elif PYBOARD: # No symbolic constants in network
|
||||||
|
if not 1 <= s.status() <= 2:
|
||||||
|
break
|
||||||
|
elif RP2: # 1 is STAT_CONNECTING. 2 reported by user (No IP?)
|
||||||
|
if not 1 <= s.status() <= 2:
|
||||||
|
break
|
||||||
|
else: # Timeout: still in connecting state
|
||||||
|
s.disconnect()
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
if not s.isconnected(): # Timed out
|
||||||
|
raise OSError("Wi-Fi connect timed out")
|
||||||
|
if not quick: # Skip on first connection only if power saving
|
||||||
|
# Ensure connection stays up for a few secs.
|
||||||
|
self.dprint("Checking WiFi integrity.")
|
||||||
|
for _ in range(5):
|
||||||
|
if not s.isconnected():
|
||||||
|
raise OSError("Connection Unstable") # in 1st 5 secs
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
self.dprint("Got reliable connection")
|
||||||
|
|
||||||
|
async def connect(self, *, quick=False): # Quick initial connect option for battery apps
|
||||||
|
if not self._has_connected:
|
||||||
|
await self.wifi_connect(quick) # On 1st call, caller handles error
|
||||||
|
# Note this blocks if DNS lookup occurs. Do it once to prevent
|
||||||
|
# blocking during later internet outage:
|
||||||
|
self._addr = socket.getaddrinfo(self.server, self.port)[0][-1]
|
||||||
|
self._in_connect = True # Disable low level ._isconnected check
|
||||||
|
try:
|
||||||
|
if not self._has_connected and self._clean_init and not self._clean:
|
||||||
|
# Power up. Clear previous session data but subsequently save it.
|
||||||
|
# Issue #40
|
||||||
|
await self._connect(True) # Connect with clean session
|
||||||
|
try:
|
||||||
|
async with self.lock:
|
||||||
|
self._sock.write(b"\xe0\0") # Force disconnect but keep socket open
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self.dprint("Waiting for disconnect")
|
||||||
|
await asyncio.sleep(2) # Wait for broker to disconnect
|
||||||
|
self.dprint("About to reconnect with unclean session.")
|
||||||
|
await self._connect(self._clean)
|
||||||
|
except Exception:
|
||||||
|
self._close()
|
||||||
|
self._in_connect = False # Caller may run .isconnected()
|
||||||
|
raise
|
||||||
|
self.rcv_pids.clear()
|
||||||
|
# If we get here without error broker/LAN must be up.
|
||||||
|
self._isconnected = True
|
||||||
|
self._in_connect = False # Low level code can now check connectivity.
|
||||||
|
if not self._events:
|
||||||
|
asyncio.create_task(self._wifi_handler(True)) # User handler.
|
||||||
|
if not self._has_connected:
|
||||||
|
self._has_connected = True # Use normal clean flag on reconnect.
|
||||||
|
asyncio.create_task(self._keep_connected())
|
||||||
|
# Runs forever unless user issues .disconnect()
|
||||||
|
|
||||||
|
asyncio.create_task(self._handle_msg()) # Task quits on connection fail.
|
||||||
|
self._tasks.append(asyncio.create_task(self._keep_alive()))
|
||||||
|
if self.DEBUG:
|
||||||
|
self._tasks.append(asyncio.create_task(self._memory()))
|
||||||
|
if self._events:
|
||||||
|
self.up.set() # Connectivity is up
|
||||||
|
else:
|
||||||
|
asyncio.create_task(self._connect_handler(self)) # User handler.
|
||||||
|
|
||||||
|
# Launched by .connect(). Runs until connectivity fails. Checks for and
|
||||||
|
# handles incoming messages.
|
||||||
|
async def _handle_msg(self):
|
||||||
|
try:
|
||||||
|
while self.isconnected():
|
||||||
|
async with self.lock:
|
||||||
|
await self.wait_msg() # Immediate return if no message
|
||||||
|
await asyncio.sleep_ms(_DEFAULT_MS) # Let other tasks get lock
|
||||||
|
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self._reconnect() # Broker or WiFi fail.
|
||||||
|
|
||||||
|
# Keep broker alive MQTT spec 3.1.2.10 Keep Alive.
|
||||||
|
# Runs until ping failure or no response in keepalive period.
|
||||||
|
async def _keep_alive(self):
|
||||||
|
while self.isconnected():
|
||||||
|
pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval
|
||||||
|
if pings_due >= 4:
|
||||||
|
self.dprint("Reconnect: broker fail.")
|
||||||
|
break
|
||||||
|
await asyncio.sleep_ms(self._ping_interval)
|
||||||
|
try:
|
||||||
|
await self._ping()
|
||||||
|
except OSError:
|
||||||
|
break
|
||||||
|
self._reconnect() # Broker or WiFi fail.
|
||||||
|
|
||||||
|
async def _kill_tasks(self, kill_skt): # Cancel running tasks
|
||||||
|
for task in self._tasks:
|
||||||
|
task.cancel()
|
||||||
|
self._tasks.clear()
|
||||||
|
await asyncio.sleep_ms(0) # Ensure cancellation complete
|
||||||
|
if kill_skt: # Close socket
|
||||||
|
self._close()
|
||||||
|
|
||||||
|
# DEBUG: show RAM messages.
|
||||||
|
async def _memory(self):
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(20)
|
||||||
|
gc.collect()
|
||||||
|
self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc())
|
||||||
|
|
||||||
|
def isconnected(self):
|
||||||
|
if self._in_connect: # Disable low-level check during .connect()
|
||||||
|
return True
|
||||||
|
if self._isconnected and not self._sta_if.isconnected(): # It's going down.
|
||||||
|
self._reconnect()
|
||||||
|
return self._isconnected
|
||||||
|
|
||||||
|
def _reconnect(self): # Schedule a reconnection if not underway.
|
||||||
|
if self._isconnected:
|
||||||
|
self._isconnected = False
|
||||||
|
asyncio.create_task(self._kill_tasks(True)) # Shut down tasks and socket
|
||||||
|
if self._events: # Signal an outage
|
||||||
|
self.down.set()
|
||||||
|
else:
|
||||||
|
asyncio.create_task(self._wifi_handler(False)) # User handler.
|
||||||
|
|
||||||
|
# Await broker connection.
|
||||||
|
async def _connection(self):
|
||||||
|
while not self._isconnected:
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
# Scheduled on 1st successful connection. Runs forever maintaining wifi and
|
||||||
|
# broker connection. Must handle conditions at edge of WiFi range.
|
||||||
|
async def _keep_connected(self):
|
||||||
|
while self._has_connected:
|
||||||
|
if self.isconnected(): # Pause for 1 second
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
gc.collect()
|
||||||
|
else: # Link is down, socket is closed, tasks are killed
|
||||||
|
try:
|
||||||
|
self._sta_if.disconnect()
|
||||||
|
except OSError:
|
||||||
|
self.dprint("Wi-Fi not started, unable to disconnect interface")
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
try:
|
||||||
|
await self.wifi_connect()
|
||||||
|
except OSError:
|
||||||
|
continue
|
||||||
|
if not self._has_connected: # User has issued the terminal .disconnect()
|
||||||
|
self.dprint("Disconnected, exiting _keep_connected")
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
await self.connect()
|
||||||
|
# Now has set ._isconnected and scheduled _connect_handler().
|
||||||
|
self.dprint("Reconnect OK!")
|
||||||
|
except OSError as e:
|
||||||
|
self.dprint("Error in reconnect. %s", e)
|
||||||
|
# Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received.
|
||||||
|
self._close() # Disconnect and try again.
|
||||||
|
self._in_connect = False
|
||||||
|
self._isconnected = False
|
||||||
|
self.dprint("Disconnected, exited _keep_connected")
|
||||||
|
|
||||||
|
async def subscribe(self, topic, qos=0):
|
||||||
|
qos_check(qos)
|
||||||
|
while 1:
|
||||||
|
await self._connection()
|
||||||
|
try:
|
||||||
|
return await super().subscribe(topic, qos)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self._reconnect() # Broker or WiFi fail.
|
||||||
|
|
||||||
|
async def unsubscribe(self, topic):
|
||||||
|
while 1:
|
||||||
|
await self._connection()
|
||||||
|
try:
|
||||||
|
return await super().unsubscribe(topic)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self._reconnect() # Broker or WiFi fail.
|
||||||
|
|
||||||
|
async def publish(self, topic, msg, retain=False, qos=0):
|
||||||
|
qos_check(qos)
|
||||||
|
while 1:
|
||||||
|
await self._connection()
|
||||||
|
try:
|
||||||
|
return await super().publish(topic, msg, retain, qos)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self._reconnect() # Broker or WiFi fail.
|
34
lib/topic_message.py
Normal file
34
lib/topic_message.py
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
|
||||||
|
class TopicMessage():
|
||||||
|
"Object containing both a topic and message string attribute. Can convert itself to a string representation"
|
||||||
|
def __init__(self, topic:str = "", message:str = ""):
|
||||||
|
self.topic = topic
|
||||||
|
self.message = message
|
||||||
|
def to_string(self) -> str:
|
||||||
|
return f"{self.topic}: {self.message}"
|
||||||
|
|
||||||
|
class TMQueue():
|
||||||
|
"""Simple queue (FIFO) of TopicMessage objects"""
|
||||||
|
def __init__(self):
|
||||||
|
self.data: list[TopicMessage] = list()
|
||||||
|
|
||||||
|
def get(self):
|
||||||
|
if not self.is_empty():
|
||||||
|
item = self.data.pop(0)
|
||||||
|
return item
|
||||||
|
else:
|
||||||
|
raise IndexError
|
||||||
|
|
||||||
|
def add(self, item):
|
||||||
|
self.data.append(item)
|
||||||
|
|
||||||
|
def is_empty(self):
|
||||||
|
if len(self.data) == 0:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
|
def string_to_topicmessage(s: str):
|
||||||
|
"""Convert string to a TopicMessage object"""
|
||||||
|
topic, message = str.split(":", maxsplit = 1)
|
||||||
|
return TopicMessage(topic, message)
|
125
main.py
Normal file
125
main.py
Normal file
@ -0,0 +1,125 @@
|
|||||||
|
import json
|
||||||
|
import machine
|
||||||
|
import asyncio
|
||||||
|
import ntptime
|
||||||
|
import select
|
||||||
|
import time
|
||||||
|
import sys
|
||||||
|
|
||||||
|
import lib.TMP117
|
||||||
|
import lib.mqtt_as
|
||||||
|
|
||||||
|
import event_loops
|
||||||
|
|
||||||
|
LED = machine.Pin("LED", machine.Pin.OUT)
|
||||||
|
LED.on()
|
||||||
|
|
||||||
|
def search_dict_list(dictionary, attribute, value):
|
||||||
|
"""Returns first instance where attribute = value in list of dictionaries"""
|
||||||
|
return [item for item in dictionary if item[attribute] == value][0]
|
||||||
|
|
||||||
|
# First, load correct config
|
||||||
|
with open("global_config.json") as f:
|
||||||
|
jdata = json.load(f)
|
||||||
|
|
||||||
|
uuid = int.from_bytes(machine.unique_id(), "big")
|
||||||
|
|
||||||
|
config = search_dict_list(jdata, "micro_id", uuid)
|
||||||
|
|
||||||
|
# Setup i2c buses
|
||||||
|
i2c_buses = []
|
||||||
|
for bus_config in config["i2c_buses"]:
|
||||||
|
bus = {
|
||||||
|
"name": bus_config["name"]
|
||||||
|
}
|
||||||
|
if bus_config["hard_i2c"]:
|
||||||
|
bus["obj"] = machine.I2C(
|
||||||
|
bus_config["hard_i2c_bus"],
|
||||||
|
scl = machine.Pin(bus_config["scl"]),
|
||||||
|
sda = machine.Pin(bus_config["sda"]),
|
||||||
|
freq = bus_config["freq"])
|
||||||
|
else:
|
||||||
|
bus["obj"] = machine.SoftI2C(
|
||||||
|
scl = machine.Pin(bus_config["scl"]),
|
||||||
|
sda = machine.Pin(bus_config["sda"]),
|
||||||
|
freq = bus_config["freq"])
|
||||||
|
|
||||||
|
i2c_buses.append(bus)
|
||||||
|
|
||||||
|
# Setup sensors
|
||||||
|
sensors = []
|
||||||
|
for sensor_config in config["sensors"]:
|
||||||
|
|
||||||
|
i2c_conn = search_dict_list(i2c_buses, "name", sensor_config["i2c_name"])["obj"]
|
||||||
|
sensor = lib.TMP117.TMP117(
|
||||||
|
name = sensor_config["name"],
|
||||||
|
i2c = i2c_conn,
|
||||||
|
i2c_addr = sensor_config["i2c_addr"],
|
||||||
|
config_int = sensor_config["config"]
|
||||||
|
)
|
||||||
|
sensors.append(sensor)
|
||||||
|
|
||||||
|
print(uuid)
|
||||||
|
print(i2c_buses)
|
||||||
|
print(sensors)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
temperature_topic = config["temperature_topic"]
|
||||||
|
inbound_topic = config["inbound_topic"]
|
||||||
|
# Setup communication
|
||||||
|
if config["wifi_or_usb"] == "wifi":
|
||||||
|
# Setup MQTT Client
|
||||||
|
mqtt_config = lib.mqtt_as.config
|
||||||
|
mqtt_config["ssid"] = config["network_settings"]["ssid"]
|
||||||
|
mqtt_config["wifi_pw"] = config["network_settings"]["wifi_pw"]
|
||||||
|
mqtt_config["server"] = config["network_settings"]["mqtt_server"]
|
||||||
|
mqtt_config["port"] = config["network_settings"]["mqtt_port"]
|
||||||
|
mqtt_config["queue_len"] = config["network_settings"]["mqtt_queue_length"]
|
||||||
|
|
||||||
|
mqtt_client = lib.mqtt_as.MQTTClient(mqtt_config)
|
||||||
|
|
||||||
|
# Connect to MQTT broker and sync clock via NTP server
|
||||||
|
while not mqtt_client.isconnected():
|
||||||
|
try:
|
||||||
|
asyncio.run(mqtt_client.connect())
|
||||||
|
ntptime.settime()
|
||||||
|
except OSError as E:
|
||||||
|
print(f"Connection failed. ERROR: {E}. Retrying.")
|
||||||
|
|
||||||
|
LED.off()
|
||||||
|
try:
|
||||||
|
asyncio.run(event_loops.main_wifi(uuid, sensors, mqtt_client, temperature_topic, inbound_topic))
|
||||||
|
finally:
|
||||||
|
mqtt_client.close()
|
||||||
|
LED.on()
|
||||||
|
asyncio.new_event_loop()
|
||||||
|
|
||||||
|
|
||||||
|
elif config["wifi_or_usb"] == "usb":
|
||||||
|
while True:
|
||||||
|
poll_obj = select.poll()
|
||||||
|
poll_obj.register(sys.stdin, select.POLLIN)
|
||||||
|
|
||||||
|
t1 = time.ticks_ms()
|
||||||
|
sys.stdout.write("REQUEST_TIME\n")
|
||||||
|
|
||||||
|
poll_results = poll_obj.poll(100)
|
||||||
|
if poll_results:
|
||||||
|
timestamp_ms = int(sys.stdin.readline())
|
||||||
|
print(f"Recieved {timestamp_ms}")
|
||||||
|
t2 = time.ticks_ms()
|
||||||
|
t = time.gmtime(timestamp_ms//1000)
|
||||||
|
machine.RTC().datetime((t[0], t[1], t[2], t[6], t[3], t[4], t[5], 0))
|
||||||
|
print(f"Time: {time.time(), time.gmtime()}, took {time.ticks_diff(t2, t1)} ms latency")
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(5)
|
||||||
|
|
||||||
|
LED.off()
|
||||||
|
try:
|
||||||
|
asyncio.run(event_loops.main_wired(uuid, sensors, temperature_topic, inbound_topic))
|
||||||
|
finally:
|
||||||
|
LED.on()
|
||||||
|
asyncio.new_event_loop()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user