--- /dev/null
+#!/usr/bin/env python3
+
+import time
+import colorsys
+import os
+import sys
+import ST7735
+import schedule # https://pypi.org/project/schedule/ via https://stackoverflow.com/a/16786600
+import datetime # http://stackoverflow.com/questions/2150739/ddg#28147286
+try:
+ # Transitional fix for breaking change in LTR559
+ from ltr559 import LTR559
+ ltr559 = LTR559()
+except ImportError:
+ import ltr559
+
+from bme280 import BME280
+from enviroplus import gas
+from subprocess import PIPE, Popen
+from PIL import Image
+from PIL import ImageDraw
+from PIL import ImageFont
+from fonts.ttf import RobotoMedium as UserFont
+import logging
+import numpy as np
+
+logging.basicConfig(
+ format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
+ level=logging.INFO,
+ datefmt='%Y-%m-%d %H:%M:%S')
+
+logging.info("""all-in-one.py - Displays readings from all of Enviro plus' sensors
+Press Ctrl+C to exit!
+""")
+
+# Script Constants
+varLenBufferTTL = int((2*24*60*60)*10**9) # time-to-live in nanoseconds
+
+# BME280 temperature/pressure/humidity sensor
+bme280 = BME280()
+
+# Create ST7735 LCD display class
+st7735 = ST7735.ST7735(
+ port=0,
+ cs=1,
+ dc=9,
+ backlight=12,
+ #rotation=270,
+ rotation=90,
+ spi_speed_hz=10000000
+)
+
+# Initialize display
+st7735.begin()
+
+WIDTH = st7735.width
+HEIGHT = st7735.height
+
+# Set up canvas and font
+img = Image.new('RGB', (WIDTH, HEIGHT), color=(0, 0, 0))
+draw = ImageDraw.Draw(img)
+path = os.path.dirname(os.path.realpath(__file__))
+font_size = 20
+font = ImageFont.truetype(UserFont, font_size)
+font2_size = 15
+font2 = ImageFont.truetype(UserFont, font2_size)
+
+message = ""
+
+# The position of the top bar
+top_pos = 25
+
+
+# Displays data and text on the 0.96" LCD
+def display_text(variable, data, unit):
+ # Maintain length of list
+ values[variable] = values[variable][1:] + [data]
+ # Scale the values for the variable between 0 and 1
+ vmin = min(values[variable])
+ vmax = max(values[variable])
+ colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]]
+ # Format the variable name and value
+ message = "{}: {:.1f} {}".format(variable[:4], data, unit)
+ logging.info(message)
+ draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255))
+ for i in range(len(colours)):
+ # Convert the values to colours from red to blue
+ colour = (1.0 - colours[i]) * 0.6
+ r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
+ # Draw a 1-pixel wide rectangle of colour
+ draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b))
+ # Draw a line graph in black
+ line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos
+ draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0))
+ # Write the text at the top in black
+ draw.text((0, 0), message, font=font, fill=(0, 0, 0))
+ st7735.display(img)
+
+# Displays data and text on the 0.96" LCD
+def display_text2(variable, data, unit, values):
+ # Scale the values for the variable between 0 and 1
+ print('DEBUG:len(values[' + str(variable) + ']):' + str(len(values[variable])))
+ #print('DEBUG:values[' + str(variable) + ']:' + str(values[variable]))
+ vmin = min(values[variable])
+ vmax = max(values[variable])
+ colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]]
+ # Format the variable name and value
+ message = "{}: {:.1f} {}".format(variable[:4], data, unit)
+ #message = "{}: {:.1f} {}".format(variable[:4], values[variable][-1], unit)
+ logging.info(message)
+ draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255))
+ for i in range(len(colours)):
+ # Convert the values to colours from red to blue
+ colour = (1.0 - colours[i]) * 0.6
+ r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
+ # Draw a 1-pixel wide rectangle of colour
+ draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b))
+ # Draw a line graph in black
+ line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos
+ draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0))
+ # Write the text at the top in black
+ draw.text((0, 0), message, font=font, fill=(0, 0, 0))
+ # Write text (test)
+ maxMsg = "MAX:{:.1f}".format(vmax)
+ durMsg = "HR:{:.1f}".format(span_time_h)
+ minMsg = "MIN:{:.1f}".format(vmin)
+ maxMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(1/4)) + top_pos - (font2_size/2) )
+ durMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(2/4)) + top_pos - (font2_size/2) )
+ minMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(3/4)) + top_pos - (font2_size/2) )
+ maxMsg_x = int( WIDTH*(3/100) )
+ durMsg_x = int( WIDTH*(3/100) )
+ minMsg_x = int( WIDTH*(3/100) )
+ draw.text((maxMsg_x, maxMsg_y), maxMsg, font=font2, fill=(0, 0, 0))
+ draw.text((durMsg_x, durMsg_y), durMsg, font=font2, fill=(0, 0, 0))
+ draw.text((minMsg_x, minMsg_y), minMsg, font=font2, fill=(0, 0, 0))
+ st7735.display(img)
+
+
+# Get the temperature of the CPU for compensation
+def get_cpu_temperature():
+ process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE, universal_newlines=True)
+ output, _error = process.communicate()
+ return float(output[output.index('=') + 1:output.rindex("'")])
+
+
+# Tuning factor for compensation. Decrease this number to adjust the
+# temperature down, and increase to adjust up
+factor = 2.25
+
+cpu_temps = [get_cpu_temperature()] * 5
+
+delay = 0.5 # Debounce the proximity tap
+mode = 0 # The starting mode
+last_page = 0
+light = 1
+
+# Create a values dict to store the data
+variables = ["temperature",
+ "pressure",
+ "humidity",
+ "light"]
+values = {} # Initialize values dictionary
+for v in variables:
+ values[v] = [1] * WIDTH # Init a WIDTH-length list as value for each string in variables
+
+# Create a varLenBuffer dict to store recent data
+varLenBuffer = {}
+for v in variables:
+ varLenBuffer[v] = [] # Init an empty list for each string in variables
+
+# Create a varLenBufferFlt dict to store recent data as floats only
+varLenBufferFlt = {}
+for v in variables:
+ varLenBufferFlt[v] = [] # Init an empty list for each string in variables
+
+# Create a fixLenBuffer dict to store data for displaying
+fixLenBuffer = {}
+for v in variables:
+ fixLenBuffer[v] = [] # Init an empty list for each string in variables
+
+pollDelay = 5.0
+
+def pollSensors():
+ # Desc: Update variables containing latest sensor tuples
+ # Output: (time [ns], unit, float)
+ # now_temp_tuple (°C)
+ # now_pressure_tuple (hPa)
+ # now_humidity_tuple (%)
+ # now_illuminance_tuple (lux)
+ # Depends: time, bme280, ltr559, get_cpu_temperature()
+
+ # Tell function to modify these global variables
+ global now_temp_tuple
+ global now_pressure_tuple
+ global now_humidity_tuple
+ global now_illuminance_tuple
+ # Initialize
+ cpu_temps = []
+ poll_time_ns_start = time.time_ns() # Get time reading (unix spoech, nanoseconds)
+ # Get temperature reading
+ cpu_temp = get_cpu_temperature() # get °C from CPU
+ # Smooth out with some averaging to decrease jitter
+ cpu_temps = cpu_temps[1:] + [cpu_temp]
+ avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
+ raw_temp = bme280.get_temperature() # get °C from BME280 sensor
+ now_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
+ now_temp_tuple = (time.time_ns(), '°C', now_temp)
+ # Get pressure reading
+ now_time_ns = time.time_ns() # Get time reading (unix epoch, nanoseconds)
+ now_pressure = bme280.get_pressure() # get hPa from BME280 sensor
+ now_pressure_tuple = (time.time_ns(), 'hPa', now_pressure)
+ # Get humidity reading
+ now_humidity = bme280.get_humidity() # get % humidity from BME280 sensor
+ now_humidity_tuple = (time.time_ns(), '%', now_humidity)
+ # Get light reading
+ proximity = ltr559.get_proximity() # get proximity reading
+ if proximity < 10:
+ now_illuminance = ltr559.get_lux() # get lux reading from LTR-559 sensor if nothing is nearby
+ now_illuminance_tuple = (time.time_ns(), 'lux', now_illuminance)
+ poll_time_ns_end = time.time_ns()
+ #print('DEBUG:poll time (s):' + str((poll_time_ns_end - poll_time_ns_start)/1000000000))
+
+
+def dateIso8601Str():
+ nowUTC = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
+ return str(nowUTC)
+
+def medianSubset(listIn: list = [], listOutLen: int = 0) -> list:
+ # Input: int: listOutLen: quantity of elements in output list
+ # list: listIn: input list consisting of integers or floats
+ # Output: list: ints/floats of specified size
+ # Ref/Attrib: PEP 3107 typing https://stackoverflow.com/a/21384492
+ # Version: 0.1.0
+ #print('DEBUG:listOutLen:' + str(listOutLen))
+ #print('DEBUG:listIn:' + str(listIn))
+
+ # Exit for invalid input
+ if not isinstance(listOutLen, int):
+ raise ValueError('ERROR:Not a valid int:' + str(listOutLen))
+ else:
+ if not listOutLen > 0:
+ raise ValueError('ERROR:Invalid value:' + str(listOutLen))
+ if not isinstance(listIn, list):
+ raise ValueError('ERROR:Not a valid list:' + str(listOutLen))
+ if not all([( (isinstance(x,int)) or (isinstance(x,float)) ) for x in listIn]):
+ raise ValueError('ERROR:Input list contains something besides integers or floating point numbers.')
+
+ # Initialize listOut
+ listOut = [None] * listOutLen
+ #print('DEBUG:listOut:' + str(listOut))
+
+ # Calc listIn length
+ listInLen = len(listIn)
+ #print('DEBUG:listInLen:' + str(listInLen))
+
+ # Calc subset length float
+ subsetLenFloat = ( (max([listInLen,listOutLen]) - 1) /min([listInLen,listOutLen]))
+ subsetIndRatio = ( (listInLen)/(listOutLen) )
+ #print('DEBUG:subsetLenFloat: %.5f' % subsetLenFloat)
+ #print('DEBUG:subsetLenFloat2: %.5f' % subsetIndRatio)
+
+ # Iterate for each element in listOut
+ for i_out in range(listOutLen):
+ #print('DEBUG:i_out:' + str(i_out))
+ ## Decide to expand or reduce listIn to produce listOut
+ if listInLen > listOutLen:
+ ### reduce listIn to listOut
+ #print('DEBUG:listOutLen:' + str(listOutLen))
+ #print('DEBUG:listInLen:' + str(listInLen))
+ if i_out == 0:
+ #### Initialize subsetIndLo in first loop
+ subsetIndLo = int(0)
+ #print('DEBUG:subsetIndLo:' + str(subsetIndLo))
+ #print('DEBUG:i_out:' + str(i_out))
+ #### Calc indices of i_out'th subset of listIn
+ subsetIndHi = (listInLen - 1) * (i_out + 1) // listOutLen
+ subsetLen = subsetIndHi - subsetIndLo + 1
+ #print('DEBUG:subsetIndLo:' + str(subsetIndLo))
+ #print('DEBUG:subsetIndHi:' + str(subsetIndHi))
+ #print('DEBUG:subsetLen:' + str(subsetLen))
+ #### Extract subset from listIn using indices inclusively
+ subset = listIn[ int(subsetIndLo) : int(subsetIndHi)+1 ]
+ #print('DEBUG:subset:' + str(subset))
+ #### Calculate median for subset
+ subsetMedian = np.median(subset)
+ #print('DEBUG:subset median:' + str(subsetMedian))
+ #### Set listOut element
+ listOut[i_out] = subsetMedian
+ #### Housekeeping
+ ##### Update subsetIndLo for next loop
+ subsetIndLo = subsetIndHi + 1
+ #print('DEBUG:Updated subsetIndLo:' + str(subsetIndLo))
+ elif listOutLen > listInLen:
+ ### Expand listIn to listOut
+ #print('DEBUG:listOutLen:' + str(listOutLen))
+ #print('DEBUG:listInLen:' + str(listInLen))
+ #### Identify index list of lists mapping listIn to ListOut
+ expandIndex = int(i_out / subsetLenFloat)
+ expandIndex = min([expandIndex,(listInLen - 1)])
+ #print('DEBUG:expandIndex:' + str(expandIndex))
+ listOut[i_out] = listIn[expandIndex]
+ #print('DEBUG:listOut[i_out]:' + str(listOut[i_out]))
+ elif listOutLen == listInLen:
+ listOut = listIn
+ #print('DEBUG:end for loop===========')
+ return listOut
+
+def updateBuffer():
+ global now_temp_tuple
+ global now_pressure_tuple
+ global now_humidity_tuple
+ global now_illuminance_tuple
+ global varLenBuffer
+ global fixLenBuffer
+ global fixLenBufferFlt
+ global span_time_h
+ #print('DEBUG:This is the updateBuffer() function.')
+ #print('DEBUG:===========================================================')
+ #print('DEBUG:===========================================================')
+ # Capture new sensor tuples
+ pollSensors()
+ #print('DEBUG:now_temp_tuple:' + str(now_temp_tuple))
+ #print('DEBUG:now_pressure_tuple:' + str(now_pressure_tuple))
+ #print('DEBUG:now_humidity_tuple:' + str(now_humidity_tuple))
+ #print('DEBUG:now_illuminance_tuple:' + str(now_illuminance_tuple))
+
+ # Append new sensor tuples to varying-length buffer
+ ## Temperature
+ varLenBuffer[variables[0]].append(now_temp_tuple)
+ ## Pressure
+ varLenBuffer[variables[1]].append(now_pressure_tuple)
+ ## Humidity
+ varLenBuffer[variables[2]].append(now_humidity_tuple)
+ ## Illuminance
+ varLenBuffer[variables[3]].append(now_illuminance_tuple)
+ #print('DEBUG:varLenBuffer:' + str(varLenBuffer))
+
+ # Trim outdated sensor tuples from varying-length buffer
+ ## iterate through each tuple list and remove old tuples
+ varLenBufferTemp = []
+ for v in variables:
+ #varLenBufferTemp = varLenBuffer[v].copy()
+ now_time_ns = time.time_ns() # get ns timestamp of now
+ thn_time_ns = varLenBuffer[v][0][0] # get ns timestamp of earliest tuple
+ dif_time_ns = now_time_ns - thn_time_ns # calc nanosecond difference
+ #print('DEBUG:varLenBufferTTL:' + str(varLenBufferTTL))
+ #print('DEBUG:now:' + str(now_time_ns))
+ #print('DEBUG:thn:' + str(thn_time_ns))
+ #print('DEBUG:dif:' + str(dif_time_ns))
+ #print('DEBUG:dif(s):' + str(dif_time_ns / 1000000000))
+ if dif_time_ns > varLenBufferTTL:
+ varLenBuffer[v].pop(0) # discard earliest tuple if age > varLenBufferTTL
+ print('DEBUG:Len of varLenBuffer[' + str(v) + ']:' + str(len(varLenBuffer[v])))
+ #print('DEBUG:*******************************************')
+ #print('DEBUG:varLenBuffer[variables[' + str(v) + ']]:' + str(varLenBuffer[v]))
+ #print('DEBUG:*******************************************')
+
+ # Calculate buffer time span in hours
+ ## Get earliest timestamp (use temperature tuples)
+ first_time_ns = varLenBuffer[variables[0]][0][0]
+ last_time_ns = varLenBuffer[variables[0]][-1][0]
+ span_time_ns = int(last_time_ns - first_time_ns)
+ span_time_h = float(span_time_ns / (10**9*60*60)) # nanoseconds to hours
+
+ # Convert tuple buffer into float buffer
+ for v in variables:
+ varLenBufferFlt[v].clear() # clear old float list
+ #print('DEBUG:v:' + str(v))
+ for t in varLenBuffer[v]:
+ #print('DEBUG:t:' + str(t))
+ #print('DEBUG:t[2]:' + str(t[2]))
+ #print('DEBUG:------------------------------------------')
+ #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v]))
+ #print('DEBUG:------------------------------------------')
+ if isinstance(t[2], float):
+ varLenBufferFlt[v].append(float(t[2])) # build new float list
+ else:
+ varLenBufferFlt[v].append(float(-273)) # add obvious zero otherwise
+ #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v]))
+
+ # Compress/expand buffer to fixed-length buffer
+ for v in variables:
+ #print('DEBUG:varLenBufferFlt[0]:' + str(varLenBufferFlt[variables[0]]))
+ fixLenBuffer[v] = medianSubset(varLenBufferFlt[v], WIDTH)
+ print('DEBUG:Len of fixLenBuffer[' + str(v) + ']:' + str(len(fixLenBuffer[v])))
+ #print('DEBUG:fixLenBuffer[' + str(v) + ']:' + str(fixLenBuffer[v]))
+
+
+# The main loop
+try:
+ # Schedule tasks
+ # schedule.every(1).second.do(updateBuffer)
+ schedule.every().minute.at(":00").do(updateBuffer)
+ # schedule.every().minute.at(":05").do(updateBuffer)
+ # schedule.every().minute.at(":10").do(updateBuffer)
+ # schedule.every().minute.at(":15").do(updateBuffer)
+ # schedule.every().minute.at(":20").do(updateBuffer)
+ # schedule.every().minute.at(":25").do(updateBuffer)
+ # schedule.every().minute.at(":30").do(updateBuffer)
+ # schedule.every().minute.at(":35").do(updateBuffer)
+ # schedule.every().minute.at(":40").do(updateBuffer)
+ # schedule.every().minute.at(":45").do(updateBuffer)
+ # schedule.every().minute.at(":50").do(updateBuffer)
+ # schedule.every().minute.at(":55").do(updateBuffer)
+ updateBuffer() # initial run
+ pollSensors() # initial run
+ while True:
+ proximity = ltr559.get_proximity()
+
+ # If the proximity crosses the threshold, toggle the display mode
+ if proximity > 1500 and time.time() - last_page > delay:
+ mode += 1
+ mode %= len(variables)
+ last_page = time.time()
+
+ # Run scheduled tasks
+ schedule.run_pending()
+
+ # One display mode for each variable
+ if mode == 0:
+ # variable = "temperature"
+ ## run function to display latest temp values in variables[mode] list
+ unit = "°C"
+ cpu_temp = get_cpu_temperature()
+ # Smooth out with some averaging to decrease jitter
+ cpu_temps = cpu_temps[1:] + [cpu_temp]
+ avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
+ raw_temp = bme280.get_temperature()
+ data = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
+ #display_text(variables[mode], data, unit)
+ display_text2(variables[mode],data,unit,fixLenBuffer)
+
+ if mode == 1:
+ # variable = "pressure"
+ unit = "hPa"
+ data = bme280.get_pressure()
+ #display_text(variables[mode], data, unit)
+ display_text2(variables[mode],data,unit,fixLenBuffer)
+
+ if mode == 2:
+ # variable = "humidity"
+ unit = "%"
+ data = bme280.get_humidity()
+ #display_text(variables[mode], data, unit)
+ display_text2(variables[mode],data,unit,fixLenBuffer)
+
+ if mode == 3:
+ # variable = "light"
+ unit = "Lux"
+ if proximity < 10:
+ data = ltr559.get_lux()
+ else:
+ data = 1
+ #display_text(variables[mode], data, unit)
+ display_text2(variables[mode],data,unit,fixLenBuffer)
+
+
+
+# Exit cleanly
+except KeyboardInterrupt:
+ sys.exit(0)