import ntptime import asyncio from lib.mqtt_as import MQTTClient from lib.topic_message import TopicMessage, TMQueue async def mqtt_up( mqtt_client: MQTTClient, inbound_topic, todo): """ Perform asynchronous "todo" when MQTT client is connected. """ while True: await mqtt_client.up.wait() # type: ignore mqtt_client.up.clear() await mqtt_client.subscribe(inbound_topic) await todo() async def mqtt_down( mqtt_client: MQTTClient, todo): """ Perform asynchronous "todo" when MQTT client is disconnected. """ while True: await mqtt_client.down.wait() # type: ignore mqtt_client.down.clear() await todo() async def wireless_sync_clock( mqtt_client: MQTTClient, interval_s): """Sync clock via NTP server""" while True: ntptime.settime() await asyncio.sleep(interval_s) async def mqtt_publish_tmsgs( mqtt_client: MQTTClient, outbound_queue: TMQueue, post_interval: float | int): """Publishes all TopicMessage items in queue over MQTT""" while True: while not outbound_queue.is_empty(): item = outbound_queue.get() await mqtt_client.publish(item.topic, item.message, qos=1) await asyncio.sleep(post_interval) async def mqtt_message_parser( mqtt_client: MQTTClient, inbound_queue: TMQueue): """Parse messages recieved by MQTT client and add them to the inbound queue""" async for topic, message, retained in mqtt_client.queue: # type: ignore topic = topic.decode() message = message.decode() inbound_queue.add(TopicMessage(topic, message))