Metro Sign Build: Getting real-time Metro data from WMATA
Build Log 4: Wifi Connection + API Calls
Alright, we have our panels assembled, our code running, and we’re able to get text to show up on our panels. Let’s introduce another element: accessing the Washington Metropolitan Area Transit Authority (WMATA) API to get information about metro train times. If you’ve worked with APIs before, good news! The WMATA API is structured and documented very well, so it’ll be easy to get this example up and running If you haven’t worked with APIs before, good news! This is a great API to start out with, and we’ll learn a lot along the way that will help us down the line if we choose to add more APIs 😉 If you’re enjoying the Build Log so far, consider subscribing so you don’t miss the next one!
Let’s break this task down because there are a few elements at play, and if we want to be thorough we should include a lot of error handling as we start to increase the complexity of our code:
We need to establish a wifi connection so that we can access API information over the internet
We need to handle not connecting to the network
We need to craft a request for the WMATA API that will return the right data
We need to handle not getting a response (or getting a malformed response) back
We need to store that data in a way that we can use it to display on the sign
We need to handle having no data to display
We need to run that request periodically so we have fresh data to display
How frequently should this run?
Choosing our Wifi Management
Let’s start with getting our internet connection running; the first thing we’ll want to do is create a new, separate secrets.py file to hold everything we don’t want the rest of the world to see. This keeps us from hardcoding things like API keys, wifi passwords, and other sensitive information directly into our code, and it makes it much easier to share it with someone if you need help or want to share your project online. The secrets.py file can just be a dict, and you’ll import the file so you can grab fields as needed.
Once you’ve loaded in some of your wifi details (SSIID + password), we can talk connectivity. There are a lot of options in the circuitpython library bundle; they run the spectrum from close to the metal to super abstract. Let’s start with our requirements and see what fits our needs:
We need something compatible with the ESP32 WiFi processor in our Matrixportal M4.
We need something that can maintain an open connection for a long time, as we want to run requests several times an hour.
We want to leverage a requests helper but we need to customize it, specifically in the headers or response code handling, so we need something that either allows us to use the adafruit_requests library or bundles another solution.
I chose to go with something that would handle a lot of the connection maintenance for us, the WiFiManager from the adafruit_esp32spi library1. It handles a lot of the connection for us and can reset the ESP32 if something goes wrong. I’ll be honest, it’s still a bit finicky, especially with short timeouts, so I’m actively troubleshooting some scenarios, but I think it’s the best I’ve found for this use case so far.
Wifi Implementation
There are two basic components to our wifi implementation: getting our sensitive info from our secrets.py file and actually setting up the wifi manager on our hardware. The first is easy enough, we just have to import secrets (make sure your secrets file is in the same folder) and pull them into the code like so:
try:
from secrets import secrets
except ImportError:
print("Wifi + constants are kept in secrets.py, please add them there!")
raise
Next, we can start initializing board components for our wifi manager. Since we have pre-defined ESP32 pins on the MatrixPortal M4, we can directly grab them from the board, then create our spi
and esp
objects. Then we grab our neopixel to use as a status indicator and we can build our wifimanager
object! I’ve mostly borrowed this code from the aforementioned Adafruit wifimanager tutorial, so that’s a great place to follow along.
# Initialize ESP32 Pins:
esp32_cs = DigitalInOut(board.ESP_CS)
esp32_ready = DigitalInOut(board.ESP_BUSY)
esp32_reset = DigitalInOut(board.ESP_RESET)
# Initialize wifi components
spi = busio.SPI(board.SCK, board.MOSI, board.MISO)
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset)
# Initialize neopixel status light
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2)
# Initialize wifi object
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light)
wifi.connect()
We should be good to go! If you’d like to test out the connection at this point, feel free to grab the example code from the adafruit demo; you’ll need to set up an adafruitio account and add a few more fields to your secrets files, but it’ll make sure that your wifi is working so you can better troubleshoot in the next step if something goes wrong. Alternatively, if you’re more comfortable with APIs you can take a look at the status codes we get back from the WMATA API in our next step and diagnose issues that way!
WMATA API Overview
Now that we can connect to the internet, we can review the WMATA API so we know how to get train data. We’re going to query the API by building and sending a GET request2. Let’s start by checking out their API documentation3; we’re specifically interested in real-time rail predictions, but there are plenty of interesting endpoints here to check out that might be a good fit for your project!
Another thing we’ll need is an API key, which we can get by signing up for a free account. Most sites that offer API access, especially free sites, will limit your calls per hour, day, or month. On the Default tier (the free one), the WMATA API rate limits you to 10 calls per second or 50,000 calls per day. This project will not even scratch the surface of that limit, so we don’t have to worry about limiting our calls for this API.
NOTE: If you’re working with an API that does have more relevant limits, especially one that will start charging you after a certain number of calls, see if you can limit your API key through the service so your key will get denied before you start getting charged. Especially when you’re still experimenting with the API deployment, you can tend to make way more calls more frequently than your actual use case, so keep that in mind if you’re considering other APIs.
Next, go to the real-time rail prediction page and click “Try it”; the page will preload with a StationCode parameter and your API key info so you can test out calls and evaluate the response right in the browser. This is a good way to ensure our calls match the request URL structure and our responses match what we know we should be seeing.
The WMATA API is a bit different than many other APIs in that it passes your API key through a custom header rather than as a parameter. That means we’ll need to add two main components to our GET request:
Our header with our API key
Our URL with the appended StationCode parameter
We already have our API key, so we just need our station code. This will depend on what station you’d like to track, and since that can be a bit personal we’ll include this in our secrets file as well. I found the best way to actually find your StationCode was to use the Station List endpoint4 from the Rail Station Information API to get a list of all the stations on a given line, which returns the StationCode for each station. You can do this directly from the page in their API demo so thankfully you don’t need to write any additional code!
NOTE: you’ll see that there are JSON and XML endpoints; this indicates the format of your response. JSON is very human-readable and super easy to use in python, so we’re going to use the JSON endpoint!
WMATA Request Implementation
Finally, we can bring it all together to get our train data! Let’s start by creating our GET request; while you can technically do this all in the get function, I think it’s more readable (and best practice) to split things out into variables and then load them into the request:
try:
# query WMATA API with input StationCode
URL = 'https://api.wmata.com/StationPrediction.svc/json/GetPrediction/'
payload = {'api_key': secrets['wmata api key']}
response = wifi.get(URL + StationCode, headers=payload)
json_data = response.json()
except Exception as e:
print("Failed to get data, retrying\n", e)
wifi.reset()
Note that we’re handling the Exception by hard resetting the wifi object in an attempt to re-establish the connection. Unfortunately, this doesn’t always work, so we’ll need to plan for a case in which we don’t get data back from the WMATA API. We’ll worry about that later!
Now we have to think about our response, which will come back in JSON. The default json() function (from the json library) will load the response into a Python dict that we can work with. For this example, let’s just have it print each item in the dict to make sure our code is working:
try:
for item in json_data["Trains"]:
print(item)
except Exception as e:
print(e)
If everything is going smoothly, we should now have some train data printed in our serial console! A couple things to check if you’re not getting the data back:
Is your secrets file importing successfully?
Is your wifi connection established successfully? You should be seeing errors in the serial console if something is wrong. Like I said, this part can sometimes just be finicky, so definitely try a few soft or hard resets before full troubleshooting.
Is your request formed properly? You can try out your specific request directly on the WMATA developer site to make sure everything works, but sometimes translating to your own code can introduce issues. Make sure that your API key is being passed as a header, and double-check all the names and labels for everything.
If you’re still having issues, let me know in the comments and we can troubleshoot together!
Next up, we need to get this data more structured so we can start doing things with it. We also want to being setting up the main loop of our code, so we can start doing things on a recurring cadence. Make sure to subscribe if you haven’t already so you get the next update!
Code Examples
from adafruit_display_text import label | |
from adafruit_bitmap_font import bitmap_font | |
import board | |
import gc | |
import time | |
import busio | |
from digitalio import DigitalInOut, Pull | |
import neopixel | |
import json | |
import adafruit_apds9960.apds9960 | |
from adafruit_apds9960 import colorutility | |
from adafruit_matrixportal.matrix import Matrix | |
import adafruit_requests as requests | |
from adafruit_esp32spi import adafruit_esp32spi | |
from adafruit_esp32spi import adafruit_esp32spi_wifimanager | |
import display_manager | |
# --- CONSTANTS SETUP ---- | |
try: | |
from secrets import secrets | |
except ImportError: | |
print("Wifi + constants are kept in secrets.py, please add them there!") | |
raise | |
# local Metro station | |
station_code = secrets["station_code"] | |
# width of total displays in pixels | |
# NOTE this width is set for 2 64x32 RGB LED Matrix panels | |
# (https://www.adafruit.com/product/2278) | |
width = 128 | |
# --- INITIALIZE DISPLAY ----------------------------------------------- | |
# MATRIX DISPLAY MANAGER | |
matrix = Matrix(width=128, height=32, bit_depth=2, tile_rows=1) | |
display_manager = display_manager.display_manager(matrix.display) | |
print("display manager loaded") | |
# --- WIFI SETUP ------------- | |
# Initialize ESP32 Pins: | |
esp32_cs = DigitalInOut(board.ESP_CS) | |
esp32_ready = DigitalInOut(board.ESP_BUSY) | |
esp32_reset = DigitalInOut(board.ESP_RESET) | |
# Initialize wifi components | |
spi = busio.SPI(board.SCK, board.MOSI, board.MISO) | |
esp = adafruit_esp32spi.ESP_SPIcontrol(spi, esp32_cs, esp32_ready, esp32_reset) | |
# Initialize neopixel status light | |
status_light = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.2) | |
# Initialize wifi object | |
wifi = adafruit_esp32spi_wifimanager.ESPSPI_WiFiManager(esp, secrets, status_light, attempts=5) | |
print("WiFi loaded") | |
gc.collect() | |
# --- FUNCTIONS --- | |
# queries WMATA API to return a dict with all unique train destinations, sorted by min | |
# input is StationCode from WMATA API | |
def get_trains(StationCode): | |
try: | |
# query WMATA API with input StationCode | |
URL = 'https://api.wmata.com/StationPrediction.svc/json/GetPrediction/' | |
payload = {'api_key': secrets['wmata api key']} | |
response = wifi.get(URL + StationCode, headers=payload) | |
json_data = response.json() | |
except Exception as e: | |
print("Failed to get data, retrying\n", e) | |
wifi.reset() | |
try: | |
for item in json_data["Trains"]: | |
print(item) | |
except Exception as e: | |
print(e) | |
get_trains(station_code) |