Source code for dhtioc.reader
#!/usr/bin/env python3
__all__ = "DHT_sensor PIN READ_PERIOD".split()
"""Read the sensor and cache the values.
.. autosummary::
~DHT_sensor
~main
"""
import adafruit_dht
import atexit
import board
import logging
import time
from .utils import run_in_thread
logger = logging.getLogger(__name__)
LOOP_SLEEP = 0.02
READ_PERIOD = 2.0
PIN = board.D4
[docs]class DHT_sensor:
"""
Get readings from DH22 sensor.
.. autosummary::
~read
~read_in_background_thread
~ready
~terminate_background_thread
"""
def __init__(self, pin, period):
"""
Connect with DHT22 sensor and read values
PARAMETERS
pin :
*object*
RPi pin to which DHT22 signal is connected,
instance of ``board.Pin``.
period :
*float*
Try to read the sensor every ``period`` seconds.
"""
self.sensor = adafruit_dht.DHT22(pin)
self.period = period
self.temperature = None
self.humidity = None
self.t0 = time.time()
self.run_permitted = True
atexit.register(self.terminate_background_thread)
self.read_in_background_thread()
def __str__(self):
"""Default string."""
if self.ready:
return f"RH={self.humidity:.1f}% T={self.temperature*9/5+32:.1f}F"
else:
return "no signal yet"
[docs] def read(self):
"""Read signals from the DHT22 sensor."""
try:
self.temperature = self.sensor.temperature
self.humidity = self.sensor.humidity
logger.info(f"{time.time()-self.t0:.2f} {self}")
except Exception as exc: # be prepared, it happens too much
logger.debug(f"{time.time()-self.t0:.2f} {exc}")
@run_in_thread
def read_in_background_thread(self):
"""Monitor the sensor for new values."""
time_to_read = time.time()
while self.run_permitted:
if time.time() >= time_to_read:
time_to_read += self.period
self.read()
time.sleep(LOOP_SLEEP)
@property
def ready(self):
"""Has a value been read for both humidity and temperature?"""
return None not in (self.humidity, self.temperature)
[docs] def terminate_background_thread(self, *args, **kwargs):
"""Signal the background thread to stop."""
logger.debug("terminate background thread")
self.run_permitted = False
time.sleep(LOOP_SLEEP*4)
def main():
"""Development use only."""
sensor = DHT_sensor(PIN, READ_PERIOD)
t0 = time.time()
while True:
print(f"{time.time()-t0:.2f} {sensor}")
time.sleep(2)
if __name__ == "__main__":
main()