From: Steven Baltakatei Sandoval Date: Sun, 18 Apr 2021 23:13:40 +0000 (+0000) Subject: feat(examples):Add baltakatei's all-in-one-enviro-mini py script X-Git-Url: https://zdv2.bktei.com/gitweb/EVA-2020-02-2.git/commitdiff_plain/f0613e570ac423997b56da25d336132926ecc2ee?ds=sidebyside feat(examples):Add baltakatei's all-in-one-enviro-mini py script - Flips screen vertically - Displays up to last 48 hours of data - Indicates highest and lowest values --- diff --git a/examples/all-in-one-enviro-mini-bk.py b/examples/all-in-one-enviro-mini-bk.py new file mode 100755 index 0000000..5a3dc82 --- /dev/null +++ b/examples/all-in-one-enviro-mini-bk.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python3 + +import time +import colorsys +import os +import sys +import ST7735 +import schedule # https://pypi.org/project/schedule/ via https://stackoverflow.com/a/16786600 +import datetime # http://stackoverflow.com/questions/2150739/ddg#28147286 +try: + # Transitional fix for breaking change in LTR559 + from ltr559 import LTR559 + ltr559 = LTR559() +except ImportError: + import ltr559 + +from bme280 import BME280 +from enviroplus import gas +from subprocess import PIPE, Popen +from PIL import Image +from PIL import ImageDraw +from PIL import ImageFont +from fonts.ttf import RobotoMedium as UserFont +import logging +import numpy as np + +logging.basicConfig( + format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s', + level=logging.INFO, + datefmt='%Y-%m-%d %H:%M:%S') + +logging.info("""all-in-one.py - Displays readings from all of Enviro plus' sensors +Press Ctrl+C to exit! +""") + +# Script Constants +varLenBufferTTL = int((2*24*60*60)*10**9) # time-to-live in nanoseconds + +# BME280 temperature/pressure/humidity sensor +bme280 = BME280() + +# Create ST7735 LCD display class +st7735 = ST7735.ST7735( + port=0, + cs=1, + dc=9, + backlight=12, + #rotation=270, + rotation=90, + spi_speed_hz=10000000 +) + +# Initialize display +st7735.begin() + +WIDTH = st7735.width +HEIGHT = st7735.height + +# Set up canvas and font +img = Image.new('RGB', (WIDTH, HEIGHT), color=(0, 0, 0)) +draw = ImageDraw.Draw(img) +path = os.path.dirname(os.path.realpath(__file__)) +font_size = 20 +font = ImageFont.truetype(UserFont, font_size) +font2_size = 15 +font2 = ImageFont.truetype(UserFont, font2_size) + +message = "" + +# The position of the top bar +top_pos = 25 + + +# Displays data and text on the 0.96" LCD +def display_text(variable, data, unit): + # Maintain length of list + values[variable] = values[variable][1:] + [data] + # Scale the values for the variable between 0 and 1 + vmin = min(values[variable]) + vmax = max(values[variable]) + colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]] + # Format the variable name and value + message = "{}: {:.1f} {}".format(variable[:4], data, unit) + logging.info(message) + draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255)) + for i in range(len(colours)): + # Convert the values to colours from red to blue + colour = (1.0 - colours[i]) * 0.6 + r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)] + # Draw a 1-pixel wide rectangle of colour + draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b)) + # Draw a line graph in black + line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos + draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0)) + # Write the text at the top in black + draw.text((0, 0), message, font=font, fill=(0, 0, 0)) + st7735.display(img) + +# Displays data and text on the 0.96" LCD +def display_text2(variable, data, unit, values): + # Scale the values for the variable between 0 and 1 + print('DEBUG:len(values[' + str(variable) + ']):' + str(len(values[variable]))) + #print('DEBUG:values[' + str(variable) + ']:' + str(values[variable])) + vmin = min(values[variable]) + vmax = max(values[variable]) + colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]] + # Format the variable name and value + message = "{}: {:.1f} {}".format(variable[:4], data, unit) + #message = "{}: {:.1f} {}".format(variable[:4], values[variable][-1], unit) + logging.info(message) + draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255)) + for i in range(len(colours)): + # Convert the values to colours from red to blue + colour = (1.0 - colours[i]) * 0.6 + r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)] + # Draw a 1-pixel wide rectangle of colour + draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b)) + # Draw a line graph in black + line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos + draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0)) + # Write the text at the top in black + draw.text((0, 0), message, font=font, fill=(0, 0, 0)) + # Write text (test) + maxMsg = "MAX:{:.1f}".format(vmax) + durMsg = "HR:{:.1f}".format(span_time_h) + minMsg = "MIN:{:.1f}".format(vmin) + maxMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(1/4)) + top_pos - (font2_size/2) ) + durMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(2/4)) + top_pos - (font2_size/2) ) + minMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(3/4)) + top_pos - (font2_size/2) ) + maxMsg_x = int( WIDTH*(3/100) ) + durMsg_x = int( WIDTH*(3/100) ) + minMsg_x = int( WIDTH*(3/100) ) + draw.text((maxMsg_x, maxMsg_y), maxMsg, font=font2, fill=(0, 0, 0)) + draw.text((durMsg_x, durMsg_y), durMsg, font=font2, fill=(0, 0, 0)) + draw.text((minMsg_x, minMsg_y), minMsg, font=font2, fill=(0, 0, 0)) + st7735.display(img) + + +# Get the temperature of the CPU for compensation +def get_cpu_temperature(): + process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE, universal_newlines=True) + output, _error = process.communicate() + return float(output[output.index('=') + 1:output.rindex("'")]) + + +# Tuning factor for compensation. Decrease this number to adjust the +# temperature down, and increase to adjust up +factor = 2.25 + +cpu_temps = [get_cpu_temperature()] * 5 + +delay = 0.5 # Debounce the proximity tap +mode = 0 # The starting mode +last_page = 0 +light = 1 + +# Create a values dict to store the data +variables = ["temperature", + "pressure", + "humidity", + "light"] +values = {} # Initialize values dictionary +for v in variables: + values[v] = [1] * WIDTH # Init a WIDTH-length list as value for each string in variables + +# Create a varLenBuffer dict to store recent data +varLenBuffer = {} +for v in variables: + varLenBuffer[v] = [] # Init an empty list for each string in variables + +# Create a varLenBufferFlt dict to store recent data as floats only +varLenBufferFlt = {} +for v in variables: + varLenBufferFlt[v] = [] # Init an empty list for each string in variables + +# Create a fixLenBuffer dict to store data for displaying +fixLenBuffer = {} +for v in variables: + fixLenBuffer[v] = [] # Init an empty list for each string in variables + +pollDelay = 5.0 + +def pollSensors(): + # Desc: Update variables containing latest sensor tuples + # Output: (time [ns], unit, float) + # now_temp_tuple (°C) + # now_pressure_tuple (hPa) + # now_humidity_tuple (%) + # now_illuminance_tuple (lux) + # Depends: time, bme280, ltr559, get_cpu_temperature() + + # Tell function to modify these global variables + global now_temp_tuple + global now_pressure_tuple + global now_humidity_tuple + global now_illuminance_tuple + # Initialize + cpu_temps = [] + poll_time_ns_start = time.time_ns() # Get time reading (unix spoech, nanoseconds) + # Get temperature reading + cpu_temp = get_cpu_temperature() # get °C from CPU + # Smooth out with some averaging to decrease jitter + cpu_temps = cpu_temps[1:] + [cpu_temp] + avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps)) + raw_temp = bme280.get_temperature() # get °C from BME280 sensor + now_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor) + now_temp_tuple = (time.time_ns(), '°C', now_temp) + # Get pressure reading + now_time_ns = time.time_ns() # Get time reading (unix epoch, nanoseconds) + now_pressure = bme280.get_pressure() # get hPa from BME280 sensor + now_pressure_tuple = (time.time_ns(), 'hPa', now_pressure) + # Get humidity reading + now_humidity = bme280.get_humidity() # get % humidity from BME280 sensor + now_humidity_tuple = (time.time_ns(), '%', now_humidity) + # Get light reading + proximity = ltr559.get_proximity() # get proximity reading + if proximity < 10: + now_illuminance = ltr559.get_lux() # get lux reading from LTR-559 sensor if nothing is nearby + now_illuminance_tuple = (time.time_ns(), 'lux', now_illuminance) + poll_time_ns_end = time.time_ns() + #print('DEBUG:poll time (s):' + str((poll_time_ns_end - poll_time_ns_start)/1000000000)) + + +def dateIso8601Str(): + nowUTC = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat() + return str(nowUTC) + +def medianSubset(listIn: list = [], listOutLen: int = 0) -> list: + # Input: int: listOutLen: quantity of elements in output list + # list: listIn: input list consisting of integers or floats + # Output: list: ints/floats of specified size + # Ref/Attrib: PEP 3107 typing https://stackoverflow.com/a/21384492 + # Version: 0.1.0 + #print('DEBUG:listOutLen:' + str(listOutLen)) + #print('DEBUG:listIn:' + str(listIn)) + + # Exit for invalid input + if not isinstance(listOutLen, int): + raise ValueError('ERROR:Not a valid int:' + str(listOutLen)) + else: + if not listOutLen > 0: + raise ValueError('ERROR:Invalid value:' + str(listOutLen)) + if not isinstance(listIn, list): + raise ValueError('ERROR:Not a valid list:' + str(listOutLen)) + if not all([( (isinstance(x,int)) or (isinstance(x,float)) ) for x in listIn]): + raise ValueError('ERROR:Input list contains something besides integers or floating point numbers.') + + # Initialize listOut + listOut = [None] * listOutLen + #print('DEBUG:listOut:' + str(listOut)) + + # Calc listIn length + listInLen = len(listIn) + #print('DEBUG:listInLen:' + str(listInLen)) + + # Calc subset length float + subsetLenFloat = ( (max([listInLen,listOutLen]) - 1) /min([listInLen,listOutLen])) + subsetIndRatio = ( (listInLen)/(listOutLen) ) + #print('DEBUG:subsetLenFloat: %.5f' % subsetLenFloat) + #print('DEBUG:subsetLenFloat2: %.5f' % subsetIndRatio) + + # Iterate for each element in listOut + for i_out in range(listOutLen): + #print('DEBUG:i_out:' + str(i_out)) + ## Decide to expand or reduce listIn to produce listOut + if listInLen > listOutLen: + ### reduce listIn to listOut + #print('DEBUG:listOutLen:' + str(listOutLen)) + #print('DEBUG:listInLen:' + str(listInLen)) + if i_out == 0: + #### Initialize subsetIndLo in first loop + subsetIndLo = int(0) + #print('DEBUG:subsetIndLo:' + str(subsetIndLo)) + #print('DEBUG:i_out:' + str(i_out)) + #### Calc indices of i_out'th subset of listIn + subsetIndHi = (listInLen - 1) * (i_out + 1) // listOutLen + subsetLen = subsetIndHi - subsetIndLo + 1 + #print('DEBUG:subsetIndLo:' + str(subsetIndLo)) + #print('DEBUG:subsetIndHi:' + str(subsetIndHi)) + #print('DEBUG:subsetLen:' + str(subsetLen)) + #### Extract subset from listIn using indices inclusively + subset = listIn[ int(subsetIndLo) : int(subsetIndHi)+1 ] + #print('DEBUG:subset:' + str(subset)) + #### Calculate median for subset + subsetMedian = np.median(subset) + #print('DEBUG:subset median:' + str(subsetMedian)) + #### Set listOut element + listOut[i_out] = subsetMedian + #### Housekeeping + ##### Update subsetIndLo for next loop + subsetIndLo = subsetIndHi + 1 + #print('DEBUG:Updated subsetIndLo:' + str(subsetIndLo)) + elif listOutLen > listInLen: + ### Expand listIn to listOut + #print('DEBUG:listOutLen:' + str(listOutLen)) + #print('DEBUG:listInLen:' + str(listInLen)) + #### Identify index list of lists mapping listIn to ListOut + expandIndex = int(i_out / subsetLenFloat) + expandIndex = min([expandIndex,(listInLen - 1)]) + #print('DEBUG:expandIndex:' + str(expandIndex)) + listOut[i_out] = listIn[expandIndex] + #print('DEBUG:listOut[i_out]:' + str(listOut[i_out])) + elif listOutLen == listInLen: + listOut = listIn + #print('DEBUG:end for loop===========') + return listOut + +def updateBuffer(): + global now_temp_tuple + global now_pressure_tuple + global now_humidity_tuple + global now_illuminance_tuple + global varLenBuffer + global fixLenBuffer + global fixLenBufferFlt + global span_time_h + #print('DEBUG:This is the updateBuffer() function.') + #print('DEBUG:===========================================================') + #print('DEBUG:===========================================================') + # Capture new sensor tuples + pollSensors() + #print('DEBUG:now_temp_tuple:' + str(now_temp_tuple)) + #print('DEBUG:now_pressure_tuple:' + str(now_pressure_tuple)) + #print('DEBUG:now_humidity_tuple:' + str(now_humidity_tuple)) + #print('DEBUG:now_illuminance_tuple:' + str(now_illuminance_tuple)) + + # Append new sensor tuples to varying-length buffer + ## Temperature + varLenBuffer[variables[0]].append(now_temp_tuple) + ## Pressure + varLenBuffer[variables[1]].append(now_pressure_tuple) + ## Humidity + varLenBuffer[variables[2]].append(now_humidity_tuple) + ## Illuminance + varLenBuffer[variables[3]].append(now_illuminance_tuple) + #print('DEBUG:varLenBuffer:' + str(varLenBuffer)) + + # Trim outdated sensor tuples from varying-length buffer + ## iterate through each tuple list and remove old tuples + varLenBufferTemp = [] + for v in variables: + #varLenBufferTemp = varLenBuffer[v].copy() + now_time_ns = time.time_ns() # get ns timestamp of now + thn_time_ns = varLenBuffer[v][0][0] # get ns timestamp of earliest tuple + dif_time_ns = now_time_ns - thn_time_ns # calc nanosecond difference + #print('DEBUG:varLenBufferTTL:' + str(varLenBufferTTL)) + #print('DEBUG:now:' + str(now_time_ns)) + #print('DEBUG:thn:' + str(thn_time_ns)) + #print('DEBUG:dif:' + str(dif_time_ns)) + #print('DEBUG:dif(s):' + str(dif_time_ns / 1000000000)) + if dif_time_ns > varLenBufferTTL: + varLenBuffer[v].pop(0) # discard earliest tuple if age > varLenBufferTTL + print('DEBUG:Len of varLenBuffer[' + str(v) + ']:' + str(len(varLenBuffer[v]))) + #print('DEBUG:*******************************************') + #print('DEBUG:varLenBuffer[variables[' + str(v) + ']]:' + str(varLenBuffer[v])) + #print('DEBUG:*******************************************') + + # Calculate buffer time span in hours + ## Get earliest timestamp (use temperature tuples) + first_time_ns = varLenBuffer[variables[0]][0][0] + last_time_ns = varLenBuffer[variables[0]][-1][0] + span_time_ns = int(last_time_ns - first_time_ns) + span_time_h = float(span_time_ns / (10**9*60*60)) # nanoseconds to hours + + # Convert tuple buffer into float buffer + for v in variables: + varLenBufferFlt[v].clear() # clear old float list + #print('DEBUG:v:' + str(v)) + for t in varLenBuffer[v]: + #print('DEBUG:t:' + str(t)) + #print('DEBUG:t[2]:' + str(t[2])) + #print('DEBUG:------------------------------------------') + #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v])) + #print('DEBUG:------------------------------------------') + if isinstance(t[2], float): + varLenBufferFlt[v].append(float(t[2])) # build new float list + else: + varLenBufferFlt[v].append(float(-273)) # add obvious zero otherwise + #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v])) + + # Compress/expand buffer to fixed-length buffer + for v in variables: + #print('DEBUG:varLenBufferFlt[0]:' + str(varLenBufferFlt[variables[0]])) + fixLenBuffer[v] = medianSubset(varLenBufferFlt[v], WIDTH) + print('DEBUG:Len of fixLenBuffer[' + str(v) + ']:' + str(len(fixLenBuffer[v]))) + #print('DEBUG:fixLenBuffer[' + str(v) + ']:' + str(fixLenBuffer[v])) + + +# The main loop +try: + # Schedule tasks + # schedule.every(1).second.do(updateBuffer) + schedule.every().minute.at(":00").do(updateBuffer) + # schedule.every().minute.at(":05").do(updateBuffer) + # schedule.every().minute.at(":10").do(updateBuffer) + # schedule.every().minute.at(":15").do(updateBuffer) + # schedule.every().minute.at(":20").do(updateBuffer) + # schedule.every().minute.at(":25").do(updateBuffer) + # schedule.every().minute.at(":30").do(updateBuffer) + # schedule.every().minute.at(":35").do(updateBuffer) + # schedule.every().minute.at(":40").do(updateBuffer) + # schedule.every().minute.at(":45").do(updateBuffer) + # schedule.every().minute.at(":50").do(updateBuffer) + # schedule.every().minute.at(":55").do(updateBuffer) + updateBuffer() # initial run + pollSensors() # initial run + while True: + proximity = ltr559.get_proximity() + + # If the proximity crosses the threshold, toggle the display mode + if proximity > 1500 and time.time() - last_page > delay: + mode += 1 + mode %= len(variables) + last_page = time.time() + + # Run scheduled tasks + schedule.run_pending() + + # One display mode for each variable + if mode == 0: + # variable = "temperature" + ## run function to display latest temp values in variables[mode] list + unit = "°C" + cpu_temp = get_cpu_temperature() + # Smooth out with some averaging to decrease jitter + cpu_temps = cpu_temps[1:] + [cpu_temp] + avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps)) + raw_temp = bme280.get_temperature() + data = raw_temp - ((avg_cpu_temp - raw_temp) / factor) + #display_text(variables[mode], data, unit) + display_text2(variables[mode],data,unit,fixLenBuffer) + + if mode == 1: + # variable = "pressure" + unit = "hPa" + data = bme280.get_pressure() + #display_text(variables[mode], data, unit) + display_text2(variables[mode],data,unit,fixLenBuffer) + + if mode == 2: + # variable = "humidity" + unit = "%" + data = bme280.get_humidity() + #display_text(variables[mode], data, unit) + display_text2(variables[mode],data,unit,fixLenBuffer) + + if mode == 3: + # variable = "light" + unit = "Lux" + if proximity < 10: + data = ltr559.get_lux() + else: + data = 1 + #display_text(variables[mode], data, unit) + display_text2(variables[mode],data,unit,fixLenBuffer) + + + +# Exit cleanly +except KeyboardInterrupt: + sys.exit(0)