59 lines
1.7 KiB
Python
59 lines
1.7 KiB
Python
|
import ntptime
|
||
|
import asyncio
|
||
|
|
||
|
|
||
|
from lib.mqtt_as import MQTTClient
|
||
|
from lib.topic_message import TopicMessage, TMQueue
|
||
|
|
||
|
|
||
|
async def mqtt_up(
|
||
|
mqtt_client: MQTTClient,
|
||
|
inbound_topic,
|
||
|
todo):
|
||
|
"""
|
||
|
Perform asynchronous "todo" when MQTT client is connected.
|
||
|
"""
|
||
|
while True:
|
||
|
await mqtt_client.up.wait() # type: ignore
|
||
|
mqtt_client.up.clear()
|
||
|
await mqtt_client.subscribe(inbound_topic)
|
||
|
await todo()
|
||
|
|
||
|
async def mqtt_down(
|
||
|
mqtt_client: MQTTClient,
|
||
|
todo):
|
||
|
"""
|
||
|
Perform asynchronous "todo" when MQTT client is disconnected.
|
||
|
"""
|
||
|
while True:
|
||
|
await mqtt_client.down.wait() # type: ignore
|
||
|
mqtt_client.down.clear()
|
||
|
await todo()
|
||
|
|
||
|
async def wireless_sync_clock(
|
||
|
mqtt_client: MQTTClient,
|
||
|
interval_s):
|
||
|
"""Sync clock via NTP server"""
|
||
|
while True:
|
||
|
ntptime.settime()
|
||
|
await asyncio.sleep(interval_s)
|
||
|
|
||
|
async def mqtt_publish_tmsgs(
|
||
|
mqtt_client: MQTTClient,
|
||
|
outbound_queue: TMQueue,
|
||
|
post_interval: float | int):
|
||
|
"""Publishes all TopicMessage items in queue over MQTT"""
|
||
|
while True:
|
||
|
while not outbound_queue.is_empty():
|
||
|
item = outbound_queue.get()
|
||
|
await mqtt_client.publish(item.topic, item.message, qos=1)
|
||
|
await asyncio.sleep(post_interval)
|
||
|
|
||
|
async def mqtt_message_parser(
|
||
|
mqtt_client: MQTTClient,
|
||
|
inbound_queue: TMQueue):
|
||
|
"""Parse messages recieved by MQTT client and add them to the inbound queue"""
|
||
|
async for topic, message, retained in mqtt_client.queue: # type: ignore
|
||
|
topic = topic.decode()
|
||
|
message = message.decode()
|
||
|
inbound_queue.add(TopicMessage(topic, message))
|