~bsprague/air-quality

5b414d888f272d8a32cee0a4ac227e416af0e87e — Brandon Sprague 10 months ago 8899cb4
Add WiFi + data sending support, refactor code
2 files changed, 155 insertions(+), 59 deletions(-)

M .gitignore
M circuitpython/code.py
M .gitignore => .gitignore +1 -0
@@ 1,1 1,2 @@
/circuitpython/.venv
/circuitpython/secrets.py

M circuitpython/code.py => circuitpython/code.py +154 -59
@@ 8,88 8,183 @@ Sketch to connect BME280, PM25, and SGP30 sensors together, based on the example
sketch to connect to PM2.5 sensor with either I2C or UART.
"""

# Networking
import wifi
import socketpool
import adafruit_requests
import os
import ipaddress
import ssl

# pylint: disable=unused-import
import time
import board
from digitalio import DigitalInOut, Direction, Pull

# Sensors
from adafruit_pm25.i2c import PM25_I2C
import adafruit_sgp30
from adafruit_bme280 import basic as adafruit_bme280

try:
    from secrets import secrets
except ImportError:
    print("WiFi credentials and endpoint info are kept in secrets.py, please add them there!")
    raise

def connect_to_wifi():
    try:
        wifi.radio.connect(secrets["ssid"], secrets["password"])
        print("Connected to {}!".format(secrets["ssid"]))
        print("IP:", wifi.radio.ipv4_address)

led = digitalio.DigitalInOut(board.LED)
led.direction = digitalio.Direction.OUTPUT
        pool = socketpool.SocketPool(wifi.radio)
        return adafruit_requests.Session(pool, ssl.create_default_context())

    # Wi-Fi connectivity fails with error messages, not specific errors, so this except is broad.
    except Exception as e:  # pylint: disable=broad-except
        print("failed to connect to wifi")
        print(e)
        return None

reset_pin = None
led = DigitalInOut(board.LED)
led.direction = Direction.OUTPUT

# TODO: Wire the pin labelled 'RST' to the D13 pin (it goes BAT, EN, USB, then
# D13) on the Feather ESP32-S2.Relevant diagrams and info:
# https://learn.adafruit.com/adafruit-esp32-s2-feather/pinouts

reset_pin = DigitalInOut(board.D13)
reset_pin.direction = Direction.OUTPUT
reset_pin.value = False
# reset_pin = DigitalInOut(board.D13)
# reset_pin.direction = Direction.OUTPUT
# reset_pin.value = False

# For using the built-in STEMMA QT connector on a microcontroller
i2c = board.STEMMA_I2C()

# Connect to a PM2.5 sensor over I2C
pm25 = PM25_I2C(i2c, reset_pin)
print("Found PM2.5 sensor, reading data...")
# A debugging tool, board for some reason isn't seeing both PM25 and SGP, only
# ever one, depending on what's plugged in.
def scan_i2c():
    while not i2c.try_lock():
        pass

# See https://docs.circuitpython.org/projects/sgp30/en/latest/
sgp30 = adafruit_sgp30.Adafruit_SGP30(i2c)
print("Found SGP30 sensor, reading data...")

bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c)
print("Found BME280 sensor, reading data...")
    try:
        while True:
            print(
                "I2C addresses found:",
                [hex(device_address) for device_address in i2c.scan()],
            )
            time.sleep(2)

# It seems like this needs to be set but I have **no clue** if I should change
# this for a given location
bme280.sea_level_pressure = 1013.25
    finally:  # unlock the i2c bus when ctrl-c'ing out of the loop
        i2c.unlock()

def connect_pm25(reset_pin=None):
    try:
        pm25 = PM25_I2C(i2c, None) # reset_pin)
        print("Found PM2.5 sensor")
        return pm25
    except:
        return None

led_stat = True
while True:
    time.sleep(2)
    led_stat = not led_stat
    led.value = led_stat
def connect_bme280():
    try:
        bme280 = adafruit_bme280.Adafruit_BME280_I2C(i2c)
        print("Found BME280 sensor")
        # It seems like this needs to be set but I have **no clue** if I should change
        # this for a given location
        bme280.sea_level_pressure = 1013.25
        return bme8280
    except:
        return None

# See https://docs.circuitpython.org/projects/sgp30/en/latest/
def connect_sgp30():
    try:
        aqdata = pm25.read()
        # print(aqdata)
    except RuntimeError:
        print("Unable to read from sensor, retrying...")
        continue
        sgp30 = adafruit_sgp30.Adafruit_SGP30(i2c)
        print("Found SGP30 sensor")
        return sgp30
    except:
        return None

    print()
    print("Concentration Units (standard)")
    print("---------------------------------------")
    print(
        "PM 1.0: %d\tPM2.5: %d\tPM10: %d"
        % (aqdata["pm10 standard"], aqdata["pm25 standard"], aqdata["pm100 standard"])
    )
    print("Concentration Units (environmental)")
    print("---------------------------------------")
    print(
        "PM 1.0: %d\tPM2.5: %d\tPM10: %d"
        % (aqdata["pm10 env"], aqdata["pm25 env"], aqdata["pm100 env"])
    )
    print("---------------------------------------")
    print("Particles > 0.3um / 0.1L air:", aqdata["particles 03um"])
    print("Particles > 0.5um / 0.1L air:", aqdata["particles 05um"])
    print("Particles > 1.0um / 0.1L air:", aqdata["particles 10um"])
    print("Particles > 2.5um / 0.1L air:", aqdata["particles 25um"])
    print("Particles > 5.0um / 0.1L air:", aqdata["particles 50um"])
    print("Particles > 10 um / 0.1L air:", aqdata["particles 100um"])
    print("---------------------------------------")

    eCO2, TVOC = sgp30.iaq_measure()
    print("eCO2 = %d ppm \t TVOC = %d ppb" % (eCO2, TVOC))
    print("---------------------------------------")

    print("\nTemperature: %0.1f C" % bme280.temperature)
    print("Humidity: %0.1f %%" % bme280.relative_humidity)
    print("Pressure: %0.1f hPa" % bme280.pressure)
    print("Altitude = %0.2f meters" % bme280.altitude)
    print("---------------------------------------")
# Initialize our I2C-connected sensors
pm25 = connect_pm25()
bme280 = connect_bme280()
sgp30 = connect_sgp30()

requests = connect_to_wifi()

while True:
    if requests is None:
        print("Attempting to connect, as first attempt apparently failed")
        requests = connect_to_wifi()

    led.value = False
    time.sleep(10)
    # LED on while scanning
    led.value = True

    if pm25 is None:
        pm25_data = None
    else:
        try:
            aqdata = pm25.read()
            # Datasheet: https://cdn-shop.adafruit.com/product-files/4632/4505_PMSA003I_series_data_manual_English_V2.6.pdf
            # Explanation of 'standard' vs 'environmental': https://forums.adafruit.com/viewtopic.php?p=767725#p767725
            pm25_data = {
                # μg/m^3
                "standard": {
                    "pm10": aqdata["pm10 standard"],
                    "pm25": aqdata["pm25 standard"],
                    "pm100": aqdata["pm100 standard"],
                },
                # μg/m^3
                "environmental": {
                    "pm10": aqdata["pm10 env"],
                    "pm25": aqdata["pm25 env"],
                    "pm100": aqdata["pm100 env"],
                },
                # Number of particles in 0.1 L of air
                "particles": {
                    "03um": aqdata["particles 03um"],
                    "05um": aqdata["particles 05um"],
                    "10um": aqdata["particles 10um"],
                    "25um": aqdata["particles 25um"],
                    "50um": aqdata["particles 50um"],
                    "100um": aqdata["particles 100um"],
                },
            }
        except RuntimeError:
            pm25_data = None
            print("Unable to read from PM2.5 sensor")

    if sgp30 is None:
        sgp30_data = None
    else:
        eCO2, tvoc = sgp30.iaq_measure()
        sgp30_data = {
            "eCO2": eCO2, # ppm
            "tvoc": tvoc, # ppb
        }

    if bme280 is None:
        bme280_data = None
    else:
        bme280_data = {
            "temperature": bme280.temperature, # degrees celsius
            "relative_humidity": bme280.relative_humidity, # percent
            "pressure": bme280.pressure, # hPa
            "altitude": bme280.altitude, # meters
        }

    json_data = {
        "pm25": pm25_data,
        "sgp30": sgp30_data,
        "bme280": bme280_data,
    }

    if requests is None:
        print("recorded data, but not sending")
        print(json_data)
        continue

    requests.post(secrets["endpoint"], json=json_data)