fix(examples):Add delay to throw away bad initial sensor reading
[EVA-2020-02-2.git] / examples / all-in-one-enviro-mini-bk.py
1 #!/usr/bin/env python3
2
3 import time
4 import colorsys
5 import os
6 import sys
7 import ST7735
8 import schedule # https://pypi.org/project/schedule/ via https://stackoverflow.com/a/16786600
9 import datetime # http://stackoverflow.com/questions/2150739/ddg#28147286
10 try:
11 # Transitional fix for breaking change in LTR559
12 from ltr559 import LTR559
13 ltr559 = LTR559()
14 except ImportError:
15 import ltr559
16
17 from bme280 import BME280
18 from enviroplus import gas
19 from subprocess import PIPE, Popen
20 from PIL import Image
21 from PIL import ImageDraw
22 from PIL import ImageFont
23 from fonts.ttf import RobotoMedium as UserFont
24 import logging
25 import numpy as np
26
27 logging.basicConfig(
28 format='%(asctime)s.%(msecs)03d %(levelname)-8s %(message)s',
29 level=logging.INFO,
30 datefmt='%Y-%m-%d %H:%M:%S')
31
32 logging.info("""all-in-one.py - Displays readings from all of Enviro plus' sensors
33 Press Ctrl+C to exit!
34 """)
35
36 # Script Constants
37 varLenBufferTTL = int((2*24*60*60)*10**9) # time-to-live in nanoseconds
38
39 # BME280 temperature/pressure/humidity sensor
40 bme280 = BME280()
41
42 # Create ST7735 LCD display class
43 st7735 = ST7735.ST7735(
44 port=0,
45 cs=1,
46 dc=9,
47 backlight=12,
48 #rotation=270,
49 rotation=90,
50 spi_speed_hz=10000000
51 )
52
53 # Initialize display
54 st7735.begin()
55
56 WIDTH = st7735.width
57 HEIGHT = st7735.height
58
59 # Set up canvas and font
60 img = Image.new('RGB', (WIDTH, HEIGHT), color=(0, 0, 0))
61 draw = ImageDraw.Draw(img)
62 path = os.path.dirname(os.path.realpath(__file__))
63 font_size = 20
64 font = ImageFont.truetype(UserFont, font_size)
65 font2_size = 15
66 font2 = ImageFont.truetype(UserFont, font2_size)
67
68 message = ""
69
70 # The position of the top bar
71 top_pos = 25
72
73
74 # Displays data and text on the 0.96" LCD
75 def display_text(variable, data, unit):
76 # Maintain length of list
77 values[variable] = values[variable][1:] + [data]
78 # Scale the values for the variable between 0 and 1
79 vmin = min(values[variable])
80 vmax = max(values[variable])
81 colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]]
82 # Format the variable name and value
83 message = "{}: {:.1f} {}".format(variable[:4], data, unit)
84 logging.info(message)
85 draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255))
86 for i in range(len(colours)):
87 # Convert the values to colours from red to blue
88 colour = (1.0 - colours[i]) * 0.6
89 r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
90 # Draw a 1-pixel wide rectangle of colour
91 draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b))
92 # Draw a line graph in black
93 line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos
94 draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0))
95 # Write the text at the top in black
96 draw.text((0, 0), message, font=font, fill=(0, 0, 0))
97 st7735.display(img)
98
99 # Displays data and text on the 0.96" LCD
100 def display_text2(variable, data, unit, values):
101 # Scale the values for the variable between 0 and 1
102 print('DEBUG:len(values[' + str(variable) + ']):' + str(len(values[variable])))
103 #print('DEBUG:values[' + str(variable) + ']:' + str(values[variable]))
104 vmin = min(values[variable])
105 vmax = max(values[variable])
106 colours = [(v - vmin + 1) / (vmax - vmin + 1) for v in values[variable]]
107 # Format the variable name and value
108 message = "{}: {:.1f} {}".format(variable[:4], data, unit)
109 #message = "{}: {:.1f} {}".format(variable[:4], values[variable][-1], unit)
110 logging.info(message)
111 draw.rectangle((0, 0, WIDTH, HEIGHT), (255, 255, 255))
112 for i in range(len(colours)):
113 # Convert the values to colours from red to blue
114 colour = (1.0 - colours[i]) * 0.6
115 r, g, b = [int(x * 255.0) for x in colorsys.hsv_to_rgb(colour, 1.0, 1.0)]
116 # Draw a 1-pixel wide rectangle of colour
117 draw.rectangle((i, top_pos, i + 1, HEIGHT), (r, g, b))
118 # Draw a line graph in black
119 line_y = HEIGHT - (top_pos + (colours[i] * (HEIGHT - top_pos))) + top_pos
120 draw.rectangle((i, line_y, i + 1, line_y + 1), (0, 0, 0))
121 # Write the text at the top in black
122 draw.text((0, 0), message, font=font, fill=(0, 0, 0))
123 # Write text (test)
124 maxMsg = "MAX:{:.1f}".format(vmax)
125 durMsg = "HR:{:.1f}".format(span_time_h)
126 minMsg = "MIN:{:.1f}".format(vmin)
127 maxMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(1/4)) + top_pos - (font2_size/2) )
128 durMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(2/4)) + top_pos - (font2_size/2) )
129 minMsg_y = int( ((HEIGHT - top_pos)/(HEIGHT))*(HEIGHT*(3/4)) + top_pos - (font2_size/2) )
130 maxMsg_x = int( WIDTH*(3/100) )
131 durMsg_x = int( WIDTH*(3/100) )
132 minMsg_x = int( WIDTH*(3/100) )
133 draw.text((maxMsg_x, maxMsg_y), maxMsg, font=font2, fill=(0, 0, 0))
134 draw.text((durMsg_x, durMsg_y), durMsg, font=font2, fill=(0, 0, 0))
135 draw.text((minMsg_x, minMsg_y), minMsg, font=font2, fill=(0, 0, 0))
136 st7735.display(img)
137
138
139 # Get the temperature of the CPU for compensation
140 def get_cpu_temperature():
141 process = Popen(['vcgencmd', 'measure_temp'], stdout=PIPE, universal_newlines=True)
142 output, _error = process.communicate()
143 return float(output[output.index('=') + 1:output.rindex("'")])
144
145
146 # Tuning factor for compensation. Decrease this number to adjust the
147 # temperature down, and increase to adjust up
148 factor = 2.25
149
150 cpu_temps = [get_cpu_temperature()] * 5
151
152 delay = 0.5 # Debounce the proximity tap
153 mode = 0 # The starting mode
154 last_page = 0
155 light = 1
156
157 # Create a values dict to store the data
158 variables = ["temperature",
159 "pressure",
160 "humidity",
161 "light"]
162 values = {} # Initialize values dictionary
163 for v in variables:
164 values[v] = [1] * WIDTH # Init a WIDTH-length list as value for each string in variables
165
166 # Create a varLenBuffer dict to store recent data
167 varLenBuffer = {}
168 for v in variables:
169 varLenBuffer[v] = [] # Init an empty list for each string in variables
170
171 # Create a varLenBufferFlt dict to store recent data as floats only
172 varLenBufferFlt = {}
173 for v in variables:
174 varLenBufferFlt[v] = [] # Init an empty list for each string in variables
175
176 # Create a fixLenBuffer dict to store data for displaying
177 fixLenBuffer = {}
178 for v in variables:
179 fixLenBuffer[v] = [] # Init an empty list for each string in variables
180
181 pollDelay = 5.0
182
183 def pollSensors():
184 # Desc: Update variables containing latest sensor tuples
185 # Output: (time [ns], unit, float)
186 # now_temp_tuple (°C)
187 # now_pressure_tuple (hPa)
188 # now_humidity_tuple (%)
189 # now_illuminance_tuple (lux)
190 # Depends: time, bme280, ltr559, get_cpu_temperature()
191
192 # Tell function to modify these global variables
193 global now_temp_tuple
194 global now_pressure_tuple
195 global now_humidity_tuple
196 global now_illuminance_tuple
197 # Initialize
198 cpu_temps = []
199 poll_time_ns_start = time.time_ns() # Get time reading (unix spoech, nanoseconds)
200 # Get temperature reading
201 cpu_temp = get_cpu_temperature() # get °C from CPU
202 # Smooth out with some averaging to decrease jitter
203 cpu_temps = cpu_temps[1:] + [cpu_temp]
204 avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
205 raw_temp = bme280.get_temperature() # get °C from BME280 sensor
206 now_temp = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
207 now_temp_tuple = (time.time_ns(), '°C', now_temp)
208 # Get pressure reading
209 now_time_ns = time.time_ns() # Get time reading (unix epoch, nanoseconds)
210 now_pressure = bme280.get_pressure() # get hPa from BME280 sensor
211 now_pressure_tuple = (time.time_ns(), 'hPa', now_pressure)
212 # Get humidity reading
213 now_humidity = bme280.get_humidity() # get % humidity from BME280 sensor
214 now_humidity_tuple = (time.time_ns(), '%', now_humidity)
215 # Get light reading
216 proximity = ltr559.get_proximity() # get proximity reading
217 if proximity < 10:
218 now_illuminance = ltr559.get_lux() # get lux reading from LTR-559 sensor if nothing is nearby
219 now_illuminance_tuple = (time.time_ns(), 'lux', now_illuminance)
220 poll_time_ns_end = time.time_ns()
221 #print('DEBUG:poll time (s):' + str((poll_time_ns_end - poll_time_ns_start)/1000000000))
222
223
224 def dateIso8601Str():
225 nowUTC = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc).isoformat()
226 return str(nowUTC)
227
228 def medianSubset(listIn: list = [], listOutLen: int = 0) -> list:
229 # Input: int: listOutLen: quantity of elements in output list
230 # list: listIn: input list consisting of integers or floats
231 # Output: list: ints/floats of specified size
232 # Ref/Attrib: PEP 3107 typing https://stackoverflow.com/a/21384492
233 # Version: 0.1.0
234 #print('DEBUG:listOutLen:' + str(listOutLen))
235 #print('DEBUG:listIn:' + str(listIn))
236
237 # Exit for invalid input
238 if not isinstance(listOutLen, int):
239 raise ValueError('ERROR:Not a valid int:' + str(listOutLen))
240 else:
241 if not listOutLen > 0:
242 raise ValueError('ERROR:Invalid value:' + str(listOutLen))
243 if not isinstance(listIn, list):
244 raise ValueError('ERROR:Not a valid list:' + str(listOutLen))
245 if not all([( (isinstance(x,int)) or (isinstance(x,float)) ) for x in listIn]):
246 raise ValueError('ERROR:Input list contains something besides integers or floating point numbers.')
247
248 # Initialize listOut
249 listOut = [None] * listOutLen
250 #print('DEBUG:listOut:' + str(listOut))
251
252 # Calc listIn length
253 listInLen = len(listIn)
254 #print('DEBUG:listInLen:' + str(listInLen))
255
256 # Calc subset length float
257 subsetLenFloat = ( (max([listInLen,listOutLen]) - 1) /min([listInLen,listOutLen]))
258 subsetIndRatio = ( (listInLen)/(listOutLen) )
259 #print('DEBUG:subsetLenFloat: %.5f' % subsetLenFloat)
260 #print('DEBUG:subsetLenFloat2: %.5f' % subsetIndRatio)
261
262 # Iterate for each element in listOut
263 for i_out in range(listOutLen):
264 #print('DEBUG:i_out:' + str(i_out))
265 ## Decide to expand or reduce listIn to produce listOut
266 if listInLen > listOutLen:
267 ### reduce listIn to listOut
268 #print('DEBUG:listOutLen:' + str(listOutLen))
269 #print('DEBUG:listInLen:' + str(listInLen))
270 if i_out == 0:
271 #### Initialize subsetIndLo in first loop
272 subsetIndLo = int(0)
273 #print('DEBUG:subsetIndLo:' + str(subsetIndLo))
274 #print('DEBUG:i_out:' + str(i_out))
275 #### Calc indices of i_out'th subset of listIn
276 subsetIndHi = (listInLen - 1) * (i_out + 1) // listOutLen
277 subsetLen = subsetIndHi - subsetIndLo + 1
278 #print('DEBUG:subsetIndLo:' + str(subsetIndLo))
279 #print('DEBUG:subsetIndHi:' + str(subsetIndHi))
280 #print('DEBUG:subsetLen:' + str(subsetLen))
281 #### Extract subset from listIn using indices inclusively
282 subset = listIn[ int(subsetIndLo) : int(subsetIndHi)+1 ]
283 #print('DEBUG:subset:' + str(subset))
284 #### Calculate median for subset
285 subsetMedian = np.median(subset)
286 #print('DEBUG:subset median:' + str(subsetMedian))
287 #### Set listOut element
288 listOut[i_out] = subsetMedian
289 #### Housekeeping
290 ##### Update subsetIndLo for next loop
291 subsetIndLo = subsetIndHi + 1
292 #print('DEBUG:Updated subsetIndLo:' + str(subsetIndLo))
293 elif listOutLen > listInLen:
294 ### Expand listIn to listOut
295 #print('DEBUG:listOutLen:' + str(listOutLen))
296 #print('DEBUG:listInLen:' + str(listInLen))
297 #### Identify index list of lists mapping listIn to ListOut
298 expandIndex = int(i_out / subsetLenFloat)
299 expandIndex = min([expandIndex,(listInLen - 1)])
300 #print('DEBUG:expandIndex:' + str(expandIndex))
301 listOut[i_out] = listIn[expandIndex]
302 #print('DEBUG:listOut[i_out]:' + str(listOut[i_out]))
303 elif listOutLen == listInLen:
304 listOut = listIn
305 #print('DEBUG:end for loop===========')
306 return listOut
307
308 def updateBuffer():
309 global now_temp_tuple
310 global now_pressure_tuple
311 global now_humidity_tuple
312 global now_illuminance_tuple
313 global varLenBuffer
314 global fixLenBuffer
315 global fixLenBufferFlt
316 global span_time_h
317 #print('DEBUG:This is the updateBuffer() function.')
318 #print('DEBUG:===========================================================')
319 #print('DEBUG:===========================================================')
320 # Capture new sensor tuples
321 pollSensors()
322 #print('DEBUG:now_temp_tuple:' + str(now_temp_tuple))
323 #print('DEBUG:now_pressure_tuple:' + str(now_pressure_tuple))
324 #print('DEBUG:now_humidity_tuple:' + str(now_humidity_tuple))
325 #print('DEBUG:now_illuminance_tuple:' + str(now_illuminance_tuple))
326
327 # Append new sensor tuples to varying-length buffer
328 ## Temperature
329 varLenBuffer[variables[0]].append(now_temp_tuple)
330 ## Pressure
331 varLenBuffer[variables[1]].append(now_pressure_tuple)
332 ## Humidity
333 varLenBuffer[variables[2]].append(now_humidity_tuple)
334 ## Illuminance
335 varLenBuffer[variables[3]].append(now_illuminance_tuple)
336 #print('DEBUG:varLenBuffer:' + str(varLenBuffer))
337
338 # Trim outdated sensor tuples from varying-length buffer
339 ## iterate through each tuple list and remove old tuples
340 varLenBufferTemp = []
341 for v in variables:
342 #varLenBufferTemp = varLenBuffer[v].copy()
343 now_time_ns = time.time_ns() # get ns timestamp of now
344 thn_time_ns = varLenBuffer[v][0][0] # get ns timestamp of earliest tuple
345 dif_time_ns = now_time_ns - thn_time_ns # calc nanosecond difference
346 #print('DEBUG:varLenBufferTTL:' + str(varLenBufferTTL))
347 #print('DEBUG:now:' + str(now_time_ns))
348 #print('DEBUG:thn:' + str(thn_time_ns))
349 #print('DEBUG:dif:' + str(dif_time_ns))
350 #print('DEBUG:dif(s):' + str(dif_time_ns / 1000000000))
351 if dif_time_ns > varLenBufferTTL:
352 varLenBuffer[v].pop(0) # discard earliest tuple if age > varLenBufferTTL
353 print('DEBUG:Len of varLenBuffer[' + str(v) + ']:' + str(len(varLenBuffer[v])))
354 #print('DEBUG:*******************************************')
355 #print('DEBUG:varLenBuffer[variables[' + str(v) + ']]:' + str(varLenBuffer[v]))
356 #print('DEBUG:*******************************************')
357
358 # Calculate buffer time span in hours
359 ## Get earliest timestamp (use temperature tuples)
360 first_time_ns = varLenBuffer[variables[0]][0][0]
361 last_time_ns = varLenBuffer[variables[0]][-1][0]
362 span_time_ns = int(last_time_ns - first_time_ns)
363 span_time_h = float(span_time_ns / (10**9*60*60)) # nanoseconds to hours
364
365 # Convert tuple buffer into float buffer
366 for v in variables:
367 varLenBufferFlt[v].clear() # clear old float list
368 #print('DEBUG:v:' + str(v))
369 for t in varLenBuffer[v]:
370 #print('DEBUG:t:' + str(t))
371 #print('DEBUG:t[2]:' + str(t[2]))
372 #print('DEBUG:------------------------------------------')
373 #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v]))
374 #print('DEBUG:------------------------------------------')
375 if isinstance(t[2], float):
376 varLenBufferFlt[v].append(float(t[2])) # build new float list
377 else:
378 varLenBufferFlt[v].append(float(-273)) # add obvious zero otherwise
379 #print('DEBUG:varLenBufferFlt[' + str(v) + ']:' + str(varLenBufferFlt[v]))
380
381 # Compress/expand buffer to fixed-length buffer
382 for v in variables:
383 #print('DEBUG:varLenBufferFlt[0]:' + str(varLenBufferFlt[variables[0]]))
384 fixLenBuffer[v] = medianSubset(varLenBufferFlt[v], WIDTH)
385 print('DEBUG:Len of fixLenBuffer[' + str(v) + ']:' + str(len(fixLenBuffer[v])))
386 #print('DEBUG:fixLenBuffer[' + str(v) + ']:' + str(fixLenBuffer[v]))
387
388
389 # The main loop
390 try:
391 # Schedule tasks
392 # schedule.every(1).second.do(updateBuffer)
393 schedule.every().minute.at(":00").do(updateBuffer)
394 # schedule.every().minute.at(":05").do(updateBuffer)
395 # schedule.every().minute.at(":10").do(updateBuffer)
396 # schedule.every().minute.at(":15").do(updateBuffer)
397 # schedule.every().minute.at(":20").do(updateBuffer)
398 # schedule.every().minute.at(":25").do(updateBuffer)
399 # schedule.every().minute.at(":30").do(updateBuffer)
400 # schedule.every().minute.at(":35").do(updateBuffer)
401 # schedule.every().minute.at(":40").do(updateBuffer)
402 # schedule.every().minute.at(":45").do(updateBuffer)
403 # schedule.every().minute.at(":50").do(updateBuffer)
404 # schedule.every().minute.at(":55").do(updateBuffer)
405 pollSensors() # initial run to start up sensors
406 time.sleep(1) # pause to give sensors time to initialize
407 updateBuffer() # initial run
408
409 while True:
410 proximity = ltr559.get_proximity()
411
412 # If the proximity crosses the threshold, toggle the display mode
413 if proximity > 1500 and time.time() - last_page > delay:
414 mode += 1
415 mode %= len(variables)
416 last_page = time.time()
417
418 # Run scheduled tasks
419 schedule.run_pending()
420
421 # One display mode for each variable
422 if mode == 0:
423 # variable = "temperature"
424 ## run function to display latest temp values in variables[mode] list
425 unit = "°C"
426 cpu_temp = get_cpu_temperature()
427 # Smooth out with some averaging to decrease jitter
428 cpu_temps = cpu_temps[1:] + [cpu_temp]
429 avg_cpu_temp = sum(cpu_temps) / float(len(cpu_temps))
430 raw_temp = bme280.get_temperature()
431 data = raw_temp - ((avg_cpu_temp - raw_temp) / factor)
432 #display_text(variables[mode], data, unit)
433 display_text2(variables[mode],data,unit,fixLenBuffer)
434
435 if mode == 1:
436 # variable = "pressure"
437 unit = "hPa"
438 data = bme280.get_pressure()
439 #display_text(variables[mode], data, unit)
440 display_text2(variables[mode],data,unit,fixLenBuffer)
441
442 if mode == 2:
443 # variable = "humidity"
444 unit = "%"
445 data = bme280.get_humidity()
446 #display_text(variables[mode], data, unit)
447 display_text2(variables[mode],data,unit,fixLenBuffer)
448
449 if mode == 3:
450 # variable = "light"
451 unit = "Lux"
452 if proximity < 10:
453 data = ltr559.get_lux()
454 else:
455 data = 1
456 #display_text(variables[mode], data, unit)
457 display_text2(variables[mode],data,unit,fixLenBuffer)
458
459
460
461 # Exit cleanly
462 except KeyboardInterrupt:
463 sys.exit(0)