A repository for MicroPython code that communicates with and relays information from the Texas Instruments TMP117/119 temperature sensor.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

46 lines
1.6 KiB

2 months ago
  1. import select
  2. import sys
  3. import machine
  4. import time
  5. import asyncio
  6. from lib.topic_message import TopicMessage, TMQueue, string_to_topicmessage
  7. async def usb_sync_clock(
  8. interval_s: float | int):
  9. """Synchronise clock by posting a request and readin in the output immediately"""
  10. while True:
  11. poll_obj = select.poll()
  12. poll_obj.register(sys.stdin, select.POLLIN)
  13. sys.stdout.write("REQUEST_TIME \n")
  14. poll_results = poll_obj.poll(10)
  15. if poll_results:
  16. timestamp_ms = float(sys.stdin.readline())
  17. t = time.gmtime(int(timestamp_ms/1000))
  18. machine.RTC().datetime((t[0], t[1], t[2], t[6], t[3], t[4], t[5], 0))
  19. else:
  20. await asyncio.sleep(interval_s)
  21. async def usb_in_message_parser(
  22. inbound_queue: TMQueue,
  23. interval_s: float | int):
  24. """Parse regular TopicMessage messages sent in through USB"""
  25. poll_obj = select.poll()
  26. poll_obj.register(sys.stdin, select.POLLIN)
  27. while True:
  28. poll_results = poll_obj.poll(10)
  29. if poll_results:
  30. line = sys.stdin.readline()
  31. inbound_queue.add(string_to_topicmessage(line))
  32. else:
  33. await asyncio.sleep(interval_s)
  34. async def usb_send_data(
  35. outbound_queue: TMQueue,
  36. interval_s: float | int):
  37. """Send all messages in outbound queue via USB"""
  38. while True:
  39. while not outbound_queue.is_empty():
  40. item: TopicMessage = outbound_queue.get()
  41. sys.stdout.write(f"{item.topic}: {item.message}\n")
  42. await asyncio.sleep(interval_s)