James Duffy

Personal AQI Service

// By Updated:
featured-image.png

As the fire season in California intensifies each year, it becomes increasingly crucial to keep a close eye on air quality. This year, with numerous other challenges at hand, monitoring the air quality around my living space has become more important than ever. To achieve this, I have developed a system that gathers data from PurpleAir sensors, visualizes it using Grafana, and sends alerts when the Air Quality Index (AQI) surpasses 50.

The foundation of this system is PurpleAir’s API, which allows me to fetch sensor data approximately every 5 minutes and store it in a MySQL database. Next, I set up Grafana to create a visually appealing dashboard, complete with a map of sensor locations and other relevant points of interest. Grafana also manages the process of sending push notifications through Pushover.net when the AQI remains above 50 for a duration of 10 minutes.

To facilitate this setup, both MySQL and Grafana are running in containers on my Synology NAS, utilizing external volume storage.

Additionally, I created a Python script that runs on my Intel NUC and is triggered by a CRON job every five minutes. The script performs the following steps:

  1. Retrieve JSON data from PurpleAir.
  2. Exclude any sensor that does not fulfill the following criteria:
    • Located “outside”
    • Reported data within the last 5 minutes
    • Positioned within a specific longitude and latitude range
    • Convert the raw PM 2.5 value into the US AQI.
  3. Insert the gathered data into the database.

By implementing this system, I can now effortlessly monitor and stay informed about the air quality around my apartment, ensuring that I can take the necessary precautions during California’s increasingly severe fire seasons.

# MySQL Table

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
CREATE TABLE `sensor_data` (
  `id` int NOT NULL AUTO_INCREMENT,
  `purpleair_id` int DEFAULT '0',
  `created_at` datetime DEFAULT CURRENT_TIMESTAMP,
  `time` int DEFAULT NULL,
  `name` varchar(100) DEFAULT NULL,
  `latitude` decimal(10,8) DEFAULT NULL,
  `longitude` decimal(11,8) DEFAULT NULL,
  `geohash` varchar(12) DEFAULT NULL,
  `pm2_5` decimal(8,2) DEFAULT NULL,
  `aqi` int DEFAULT '0',
  PRIMARY KEY (`id`),
  KEY `purpleair_id` (`purpleair_id`),
  KEY `time` (`time`),
  KEY `idx_sensor_data_aqi` (`aqi`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

# Ingestion Script

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import requests
import time

import pymysql
import pymysql.cursors

from json.decoder import JSONDecodeError

from retrying import retry

# Connect to the database
connection = pymysql.connect(
    host='',
    user='',
    password='',
    db='',
    charset='utf8')


def aqi_from_pm(pm):
    if pm > 350.5:
        return calculate_aqi(pm, 500, 401, 500, 350.5)
    elif pm > 250.5:
        return calculate_aqi(pm, 400, 301, 350.4, 250.5)
    elif pm > 150.5:
        return calculate_aqi(pm, 300, 201, 250.4, 150.5)
    elif pm > 55.5:
        return calculate_aqi(pm, 200, 151, 150.4, 55.5)
    elif pm > 35.5:
        return calculate_aqi(pm, 150, 101, 55.4, 35.5)
    elif pm > 12.1:
        return calculate_aqi(pm, 100, 51, 35.4, 12.1)
    elif pm >= 0:
        return calculate_aqi(pm, 50, 0, 12, 0)
    else:
        return False


def calculate_aqi(Cp, Ih, Il, BPh, BPl):
    a = Ih - Il
    b = BPh - BPl
    c = Cp - BPl
    return round(((a/b) * c + Il))


@retry(wait_fixed=30000, stop_max_attempt_number=3)
def fetch_results():
    purpleair = requests.get("https://www.purpleair.com/json")
    return purpleair.json()['results']

purpleair_results = fetch_results()

sql = "INSERT INTO `sensor_data` (`purpleair_id`, `time`, `name`, `latitude`, `longitude`, `pm2_5`, `aqi`) VALUES (%s, %s, %s, %s, %s, %s, %s)"

data = []
for measurement in purpleair_results:
    is_outside = 'DEVICE_LOCATIONTYPE' in measurement and measurement['DEVICE_LOCATIONTYPE'] == 'outside'
    is_recent = 'AGE' in measurement and measurement['AGE'] <= 300
    is_valid = 'ID' in measurement and 'Label' in measurement and 'Lat' in measurement and 'Lon' in measurement and 'PM2_5Value' in measurement

    if is_outside and is_recent and is_valid:
        # Include California and some of the surrounding area
        is_longitude =  -125.22216797 < measurement['Lon'] < -113.26904297
        is_latitude = 32.17561248 < measurement['Lat'] < 42.16340342

        if is_longitude and is_latitude:
            raw_pm = measurement['PM2_5Value']
            pm2_5 = float(raw_pm) if '.' in raw_pm else False

            if pm2_5:
                data.append([
                    measurement['ID'],
                    measurement['LastSeen'],
                    measurement['Label'].strip(),
                    measurement['Lat'],
                    measurement['Lon'],
                    pm2_5,
                    aqi_from_pm(pm2_5)
                ])

cursor = connection.cursor()

for sensor in data:
    print(sensor)
    cursor.execute(sql, sensor)

connection.commit()

connection.close()