feat(examples):Add baltakatei's all-in-one-enviro-mini py script
authorSteven Baltakatei Sandoval <baltakatei@gmail.com>
Sun, 18 Apr 2021 23:13:40 +0000 (23:13 +0000)
committerSteven Baltakatei Sandoval <baltakatei@gmail.com>
Sun, 18 Apr 2021 23:13:40 +0000 (23:13 +0000)
- Flips screen vertically
- Displays up to last 48 hours of data
- Indicates highest and lowest values

examples/all-in-one-enviro-mini-bk.py [new file with mode: 0755]

diff --git a/examples/all-in-one-enviro-mini-bk.py b/examples/all-in-one-enviro-mini-bk.py
new file mode 100755 (executable)
index 0000000..5a3dc82
--- /dev/null
@@ -0,0 +1,461 @@
+#!/usr/bin/env python3
+
+import time
+import colorsys
+import os
+import sys
+import ST7735
+import schedule # https://pypi.org/project/schedule/ via https://stackoverflow.com/a/16786600
+import datetime # http://stackoverflow.com/questions/2150739/ddg#28147286
+try:
+    # Transitional fix for breaking change in LTR559
+    from ltr559 import LTR559
+    ltr559 = LTR559()
+except ImportError:
+    import ltr559
+
+from bme280 import BME280
+from enviroplus import gas
+from subprocess import PIPE, Popen
+from PIL import Image
+from PIL import ImageDraw
+from PIL import ImageFont
+from fonts.ttf import RobotoMedium as UserFont
+import logging
+import numpy as np
+
+logging.basicConfig(
+    format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
+    level=logging.INFO,
+    datefmt='%Y-%m-%d %H:%M:%S')
+
+logging.info("""all-in-one.py - Displays readings from all of Enviro plus' sensors
+Press Ctrl+C to exit!
+""")
+
+# Script Constants
+varLenBufferTTL = int((2*24*60*60)*10**9) # time-to-live in nanoseconds
+
+# BME280 temperature/pressure/humidity sensor
+bme280 = BME280()
+
+# Create ST7735 LCD display class
+st7735 = ST7735.ST7735(
+    port=0,
+    cs=1,
+    dc=9,
+    backlight=12,
+    #rotation=270,
+    rotation=90,
+    spi_speed_hz=10000000
+)
+
+# Initialize display
+st7735.begin()
+
+WIDTH = st7735.width
+HEIGHT = st7735.height
+
+# Set up canvas and font
+img = Image.new('RGB', (WIDTH, HEIGHT), color=(0, 0, 0))
+draw = ImageDraw.Draw(img)
+path = os.path.dirname(os.path.realpath(__file__))
+font_size = 20
+font = ImageFont.truetype(UserFont, font_size)
+font2_size = 15
+font2 = ImageFont.truetype(UserFont, font2_size)
+
+message = ""
+
+# The position of the top bar
+top_pos = 25
+
+
+# Displays data and text on the 0.96" LCD
+def display_text(variable, data, unit):
+    # Maintain length of list
+    values[variable] = values[variable][1:] + [data]
+    # Scale the values for the variable between 0 and 1
+    vmin = min(values[variable])
+    vmax = max(values[variable])
+    colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]]
+    # Format the variable name and value
+    message = "{}: {:.1f} {}".format(variable[:4], data, unit)
+    logging.info(message)
+    draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255))
+    for i in range(len(colours)):
+        # Convert the values to colours from red to blue
+        colour = (1.0 - colours[i]) * 0.6
+        r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
+        # Draw a 1-pixel wide rectangle of colour
+        draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b))
+        # Draw a line graph in black
+        line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos
+        draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0))
+    # Write the text at the top in black
+    draw.text((0, 0), message, font=font, fill=(0, 0, 0))
+    st7735.display(img)
+
+# Displays data and text on the 0.96" LCD
+def display_text2(variable, data, unit, values):
+    # Scale the values for the variable between 0 and 1
+    print('DEBUG:len(values[' + str(variable) + ']):' + str(len(values[variable])))
+    #print('DEBUG:values[' + str(variable) + ']:' + str(values[variable]))
+    vmin = min(values[variable])
+    vmax = max(values[variable])
+    colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]]
+    # Format the variable name and value
+    message = "{}: {:.1f} {}".format(variable[:4], data, unit)
+    #message = "{}: {:.1f} {}".format(variable[:4], values[variable][-1], unit)
+    logging.info(message)
+    draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255))
+    for i in range(len(colours)):
+        # Convert the values to colours from red to blue
+        colour = (1.0 - colours[i]) * 0.6
+        r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
+        # Draw a 1-pixel wide rectangle of colour
+        draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b))
+        # Draw a line graph in black
+        line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos
+        draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0))
+    # Write the text at the top in black
+    draw.text((0, 0), message, font=font, fill=(0, 0, 0))
+    # Write text (test)
+    maxMsg = "MAX:{:.1f}".format(vmax)
+    durMsg = "HR:{:.1f}".format(span_time_h)
+    minMsg = "MIN:{:.1f}".format(vmin)
+    maxMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(1/4)) + top_pos - (font2_size/2) )
+    durMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(2/4)) + top_pos - (font2_size/2) )
+    minMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(3/4)) + top_pos - (font2_size/2) )
+    maxMsg_x = int( WIDTH*(3/100) )
+    durMsg_x = int( WIDTH*(3/100) )
+    minMsg_x = int( WIDTH*(3/100) )
+    draw.text((maxMsg_x, maxMsg_y), maxMsg, font=font2, fill=(0, 0, 0))
+    draw.text((durMsg_x, durMsg_y), durMsg, font=font2, fill=(0, 0, 0))
+    draw.text((minMsg_x, minMsg_y), minMsg, font=font2, fill=(0, 0, 0))
+    st7735.display(img)
+
+
+# Get the temperature of the CPU for compensation
+def get_cpu_temperature():
+    process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE, universal_newlines=True)
+    output, _error = process.communicate()
+    return float(output[output.index('=') + 1:output.rindex("'")])
+
+
+# Tuning factor for compensation. Decrease this number to adjust the
+# temperature down, and increase to adjust up
+factor = 2.25
+
+cpu_temps = [get_cpu_temperature()] * 5
+
+delay = 0.5  # Debounce the proximity tap
+mode = 0  # The starting mode
+last_page = 0
+light = 1
+
+# Create a values dict to store the data
+variables = ["temperature",
+             "pressure",
+             "humidity",
+             "light"]
+values = {} # Initialize values dictionary
+for v in variables:
+    values[v] = [1] * WIDTH # Init a WIDTH-length list as value for each string in variables
+
+# Create a varLenBuffer dict to store recent data
+varLenBuffer = {}
+for v in variables:
+    varLenBuffer[v] = [] # Init an empty list for each string in variables
+
+# Create a varLenBufferFlt dict to store recent data as floats only
+varLenBufferFlt = {}
+for v in variables:
+    varLenBufferFlt[v] = [] # Init an empty list for each string in variables
+    
+# Create a fixLenBuffer dict to store data for displaying
+fixLenBuffer = {}
+for v in variables:
+    fixLenBuffer[v] = [] # Init an empty list for each string in variables
+    
+pollDelay = 5.0
+
+def pollSensors():
+    # Desc: Update variables containing latest sensor tuples
+    # Output: (time [ns], unit, float)
+    #         now_temp_tuple (°C)
+    #         now_pressure_tuple (hPa)
+    #         now_humidity_tuple (%)
+    #         now_illuminance_tuple (lux)
+    # Depends: time, bme280, ltr559, get_cpu_temperature()
+
+    # Tell function to modify these global variables
+    global now_temp_tuple
+    global now_pressure_tuple
+    global now_humidity_tuple
+    global now_illuminance_tuple
+    # Initialize
+    cpu_temps = []
+    poll_time_ns_start = time.time_ns() # Get time reading (unix spoech, nanoseconds)
+    # Get temperature reading
+    cpu_temp = get_cpu_temperature() # get °C from CPU
+    # Smooth out with some averaging to decrease jitter
+    cpu_temps = cpu_temps[1:] + [cpu_temp]
+    avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
+    raw_temp = bme280.get_temperature() # get °C from BME280 sensor
+    now_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
+    now_temp_tuple = (time.time_ns(), '°C', now_temp)
+    # Get pressure reading
+    now_time_ns = time.time_ns() # Get time reading (unix epoch, nanoseconds)
+    now_pressure = bme280.get_pressure() # get hPa from BME280 sensor
+    now_pressure_tuple = (time.time_ns(), 'hPa', now_pressure)
+    # Get humidity reading
+    now_humidity = bme280.get_humidity() # get % humidity from BME280 sensor
+    now_humidity_tuple = (time.time_ns(), '%', now_humidity)
+    # Get light reading
+    proximity = ltr559.get_proximity() # get proximity reading
+    if proximity < 10:
+        now_illuminance = ltr559.get_lux() # get lux reading from LTR-559 sensor if nothing is nearby
+        now_illuminance_tuple = (time.time_ns(), 'lux', now_illuminance)
+    poll_time_ns_end = time.time_ns()
+    #print('DEBUG:poll time (s):' + str((poll_time_ns_end - poll_time_ns_start)/1000000000))
+
+
+def dateIso8601Str():
+    nowUTC = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
+    return str(nowUTC)
+
+def medianSubset(listIn: list = [], listOutLen: int = 0) -> list:
+    # Input: int:  listOutLen: quantity of elements in output list
+    #        list: listIn: input list consisting of integers or floats
+    # Output: list: ints/floats of specified size
+    # Ref/Attrib: PEP 3107 typing https://stackoverflow.com/a/21384492
+    # Version: 0.1.0
+    #print('DEBUG:listOutLen:' + str(listOutLen))
+    #print('DEBUG:listIn:' + str(listIn))
+
+    # Exit for invalid input
+    if not isinstance(listOutLen, int):
+        raise ValueError('ERROR:Not a valid int:' + str(listOutLen))
+    else:
+        if not listOutLen > 0:
+            raise ValueError('ERROR:Invalid value:' + str(listOutLen))
+    if not isinstance(listIn, list):
+        raise ValueError('ERROR:Not a valid list:' + str(listOutLen))
+    if not all([( (isinstance(x,int)) or (isinstance(x,float)) ) for x in listIn]):
+        raise ValueError('ERROR:Input list contains something besides integers or floating point numbers.')
+    
+    # Initialize listOut
+    listOut = [None] * listOutLen
+    #print('DEBUG:listOut:' + str(listOut))
+    
+    # Calc listIn length
+    listInLen = len(listIn)
+    #print('DEBUG:listInLen:' + str(listInLen))
+
+    # Calc subset length float
+    subsetLenFloat = ( (max([listInLen,listOutLen]) - 1) /min([listInLen,listOutLen]))
+    subsetIndRatio = ( (listInLen)/(listOutLen) )
+    #print('DEBUG:subsetLenFloat: %.5f' % subsetLenFloat)
+    #print('DEBUG:subsetLenFloat2: %.5f' % subsetIndRatio)
+    
+    # Iterate for each element in listOut
+    for i_out in range(listOutLen):
+        #print('DEBUG:i_out:' + str(i_out))
+        ## Decide to expand or reduce listIn to produce listOut
+        if listInLen > listOutLen:
+            ### reduce listIn to listOut
+            #print('DEBUG:listOutLen:' + str(listOutLen))
+            #print('DEBUG:listInLen:' + str(listInLen))            
+            if i_out == 0:
+                #### Initialize subsetIndLo in first loop
+                subsetIndLo = int(0)
+                #print('DEBUG:subsetIndLo:' + str(subsetIndLo))
+            #print('DEBUG:i_out:' + str(i_out))
+            #### Calc indices of i_out'th subset of listIn
+            subsetIndHi = (listInLen - 1) * (i_out + 1) // listOutLen
+            subsetLen = subsetIndHi - subsetIndLo + 1
+            #print('DEBUG:subsetIndLo:' + str(subsetIndLo))
+            #print('DEBUG:subsetIndHi:' + str(subsetIndHi))
+            #print('DEBUG:subsetLen:' + str(subsetLen))
+            #### Extract subset from listIn using indices inclusively
+            subset = listIn[ int(subsetIndLo) : int(subsetIndHi)+1 ]
+            #print('DEBUG:subset:' + str(subset))
+            #### Calculate median for subset
+            subsetMedian = np.median(subset)
+            #print('DEBUG:subset median:' + str(subsetMedian))
+            #### Set listOut element
+            listOut[i_out] = subsetMedian
+            #### Housekeeping
+            ##### Update subsetIndLo for next loop
+            subsetIndLo = subsetIndHi + 1
+            #print('DEBUG:Updated subsetIndLo:' + str(subsetIndLo))
+        elif listOutLen > listInLen:
+            ### Expand listIn to listOut
+            #print('DEBUG:listOutLen:' + str(listOutLen))
+            #print('DEBUG:listInLen:' + str(listInLen))
+            #### Identify index list of lists mapping listIn to ListOut
+            expandIndex = int(i_out / subsetLenFloat)
+            expandIndex = min([expandIndex,(listInLen - 1)])
+            #print('DEBUG:expandIndex:' + str(expandIndex))
+            listOut[i_out] = listIn[expandIndex]
+            #print('DEBUG:listOut[i_out]:' + str(listOut[i_out]))
+        elif listOutLen == listInLen:
+            listOut = listIn
+            #print('DEBUG:end for loop===========')
+    return listOut
+
+def updateBuffer():
+    global now_temp_tuple
+    global now_pressure_tuple
+    global now_humidity_tuple
+    global now_illuminance_tuple
+    global varLenBuffer
+    global fixLenBuffer
+    global fixLenBufferFlt
+    global span_time_h
+    #print('DEBUG:This is the updateBuffer() function.')
+    #print('DEBUG:===========================================================')
+    #print('DEBUG:===========================================================')
+    # Capture new sensor tuples
+    pollSensors()
+    #print('DEBUG:now_temp_tuple:' + str(now_temp_tuple))
+    #print('DEBUG:now_pressure_tuple:' + str(now_pressure_tuple))
+    #print('DEBUG:now_humidity_tuple:' + str(now_humidity_tuple))
+    #print('DEBUG:now_illuminance_tuple:' + str(now_illuminance_tuple))    
+
+    # Append new sensor tuples to varying-length buffer
+    ## Temperature
+    varLenBuffer[variables[0]].append(now_temp_tuple)
+    ## Pressure
+    varLenBuffer[variables[1]].append(now_pressure_tuple)
+    ## Humidity
+    varLenBuffer[variables[2]].append(now_humidity_tuple)
+    ## Illuminance
+    varLenBuffer[variables[3]].append(now_illuminance_tuple)
+    #print('DEBUG:varLenBuffer:' + str(varLenBuffer))
+
+    # Trim outdated sensor tuples from varying-length buffer
+    ## iterate through each tuple list and remove old tuples
+    varLenBufferTemp = []
+    for v in variables:
+        #varLenBufferTemp = varLenBuffer[v].copy()
+        now_time_ns = time.time_ns() # get ns timestamp of now
+        thn_time_ns = varLenBuffer[v][0][0] # get ns timestamp of earliest tuple
+        dif_time_ns = now_time_ns - thn_time_ns # calc nanosecond difference
+        #print('DEBUG:varLenBufferTTL:' + str(varLenBufferTTL))
+        #print('DEBUG:now:' + str(now_time_ns))
+        #print('DEBUG:thn:' + str(thn_time_ns))
+        #print('DEBUG:dif:' + str(dif_time_ns))
+        #print('DEBUG:dif(s):' + str(dif_time_ns / 1000000000))
+        if dif_time_ns > varLenBufferTTL:
+            varLenBuffer[v].pop(0) # discard earliest tuple if age > varLenBufferTTL
+        print('DEBUG:Len of varLenBuffer[' + str(v) + ']:' + str(len(varLenBuffer[v])))
+        #print('DEBUG:*******************************************')
+        #print('DEBUG:varLenBuffer[variables[' + str(v) + ']]:' + str(varLenBuffer[v]))
+        #print('DEBUG:*******************************************')
+
+    # Calculate buffer time span in hours
+    ## Get earliest timestamp (use temperature tuples)
+    first_time_ns = varLenBuffer[variables[0]][0][0]
+    last_time_ns = varLenBuffer[variables[0]][-1][0]
+    span_time_ns = int(last_time_ns - first_time_ns)
+    span_time_h = float(span_time_ns / (10**9*60*60)) # nanoseconds to hours
+        
+    # Convert tuple buffer into float buffer
+    for v in variables:
+        varLenBufferFlt[v].clear() # clear old float list
+        #print('DEBUG:v:' + str(v))
+        for t in varLenBuffer[v]:
+            #print('DEBUG:t:' + str(t))
+            #print('DEBUG:t[2]:' + str(t[2]))
+            #print('DEBUG:------------------------------------------')
+            #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v]))
+            #print('DEBUG:------------------------------------------')
+            if isinstance(t[2], float):
+                varLenBufferFlt[v].append(float(t[2])) # build new float list
+            else:
+                varLenBufferFlt[v].append(float(-273)) # add obvious zero otherwise
+                #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v]))
+
+    # Compress/expand buffer to fixed-length buffer
+    for v in variables:
+        #print('DEBUG:varLenBufferFlt[0]:' + str(varLenBufferFlt[variables[0]]))
+        fixLenBuffer[v] = medianSubset(varLenBufferFlt[v], WIDTH)
+        print('DEBUG:Len of fixLenBuffer[' + str(v) + ']:' + str(len(fixLenBuffer[v])))
+        #print('DEBUG:fixLenBuffer[' + str(v) + ']:' + str(fixLenBuffer[v]))
+
+        
+# The main loop
+try:
+    # Schedule tasks
+    # schedule.every(1).second.do(updateBuffer)
+    schedule.every().minute.at(":00").do(updateBuffer)
+    # schedule.every().minute.at(":05").do(updateBuffer)
+    # schedule.every().minute.at(":10").do(updateBuffer)
+    # schedule.every().minute.at(":15").do(updateBuffer)
+    # schedule.every().minute.at(":20").do(updateBuffer)
+    # schedule.every().minute.at(":25").do(updateBuffer)
+    # schedule.every().minute.at(":30").do(updateBuffer)
+    # schedule.every().minute.at(":35").do(updateBuffer)
+    # schedule.every().minute.at(":40").do(updateBuffer)
+    # schedule.every().minute.at(":45").do(updateBuffer)
+    # schedule.every().minute.at(":50").do(updateBuffer)
+    # schedule.every().minute.at(":55").do(updateBuffer)
+    updateBuffer() # initial run
+    pollSensors() # initial run
+    while True:
+        proximity = ltr559.get_proximity()
+
+        # If the proximity crosses the threshold, toggle the display mode
+        if proximity > 1500 and time.time() - last_page > delay:
+            mode += 1
+            mode %= len(variables)
+            last_page = time.time()
+
+        # Run scheduled tasks
+        schedule.run_pending()
+        
+        # One display mode for each variable
+        if mode == 0:
+            # variable = "temperature"
+            ## run function to display latest temp values in variables[mode] list
+            unit = "°C"
+            cpu_temp = get_cpu_temperature()
+            # Smooth out with some averaging to decrease jitter
+            cpu_temps = cpu_temps[1:] + [cpu_temp]
+            avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
+            raw_temp = bme280.get_temperature()
+            data = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
+            #display_text(variables[mode], data, unit)
+            display_text2(variables[mode],data,unit,fixLenBuffer)
+
+        if mode == 1:
+            # variable = "pressure"
+            unit = "hPa"
+            data = bme280.get_pressure()
+            #display_text(variables[mode], data, unit)
+            display_text2(variables[mode],data,unit,fixLenBuffer)
+            
+        if mode == 2:
+            # variable = "humidity"
+            unit = "%"
+            data = bme280.get_humidity()
+            #display_text(variables[mode], data, unit)
+            display_text2(variables[mode],data,unit,fixLenBuffer)
+            
+        if mode == 3:
+            # variable = "light"
+            unit = "Lux"
+            if proximity < 10:
+                data = ltr559.get_lux()
+            else:
+                data = 1
+            #display_text(variables[mode], data, unit)
+            display_text2(variables[mode],data,unit,fixLenBuffer)
+      
+        
+
+# Exit cleanly
+except KeyboardInterrupt:
+    sys.exit(0)