A repository for MicroPython code that communicates with and relays information from the Texas Instruments TMP117/119 temperature sensor.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

46 lines
1.6 KiB

import select
import sys
import machine
import time
import asyncio
from lib.topic_message import TopicMessage, TMQueue, string_to_topicmessage
async def usb_sync_clock(
interval_s: float | int):
"""Synchronise clock by posting a request and readin in the output immediately"""
while True:
poll_obj = select.poll()
poll_obj.register(sys.stdin, select.POLLIN)
sys.stdout.write("REQUEST_TIME \n")
poll_results = poll_obj.poll(10)
if poll_results:
timestamp_ms = float(sys.stdin.readline())
t = time.gmtime(int(timestamp_ms/1000))
machine.RTC().datetime((t[0], t[1], t[2], t[6], t[3], t[4], t[5], 0))
else:
await asyncio.sleep(interval_s)
async def usb_in_message_parser(
inbound_queue: TMQueue,
interval_s: float | int):
"""Parse regular TopicMessage messages sent in through USB"""
poll_obj = select.poll()
poll_obj.register(sys.stdin, select.POLLIN)
while True:
poll_results = poll_obj.poll(10)
if poll_results:
line = sys.stdin.readline()
inbound_queue.add(string_to_topicmessage(line))
else:
await asyncio.sleep(interval_s)
async def usb_send_data(
outbound_queue: TMQueue,
interval_s: float | int):
"""Send all messages in outbound queue via USB"""
while True:
while not outbound_queue.is_empty():
item: TopicMessage = outbound_queue.get()
sys.stdout.write(f"{item.topic}: {item.message}\n")
await asyncio.sleep(interval_s)