Skip to main content

Listen to the endpoint and generate CSVs

In this tutorial, we will see how to connect to the Tracking Stream API for a limited/unlimited amount of time, and generate a CSV file from the obtained results in 30 minutes time buckets.

Source code#

You can find and download the source code for this example here.

Getting started#

Let's start by installing the request package that will be necessary for this example to work.

pip install requests

We can create our main.py file, with the listen_to_stream function in it:

main.py
import logging
from datetime import datetime, timedelta
import sys
import json
import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import RetryError
from requests.packages.urllib3.util.retry import Retry
import time
from exceptions import MaxRetries, ConnectionLost
log = logging.getLogger(__name__)
target_updates = []
def listen_to_stream(timeout=None):
if timeout is not None:
timeout = datetime.now() + timedelta(0, timeout)
retry_strategy = Retry(
total=10,
backoff_factor=3,
status_forcelist=[429, 500, 502, 503, 504, 422],
allowed_methods=["HEAD", "GET", "OPTIONS"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
try:
response = http.get(
"https://api.airsafe.spire.com/v2/targets/stream?compression=none",
headers={"Authorization": f"Bearer <your_token>"},
stream=True,
)
except RetryError:
log.warn(RetryError)
raise MaxRetries()
if response.status_code == 401:
print("Unauthorized, token might be invalid")
sys.exit()
try:
for line in response.iter_lines(decode_unicode=True):
if timeout is not None and datetime.now() >= timeout:
response.close()
sys.exit()
if line and '"target":{' in line:
target = json.loads(line)["target"]
target_updates.append(target)
except Exception as e:
raise ConnectionLost()

Let's breakdown what was done here:

  • We have added a retry strategy, in case the connection fails the first few times. We will use that later to ensure that we stay connected to the stream at all times.
  • We have set up a timeout parameter, which will allow us to listen to the Tracking Stream for a specific duration if we want to
  • We have called the Tracking Stream endpoint using http.get after creating a Session.
  • We are looping through the response lines and parsing the JSON object in each of them, adding it to a global array target_update declared at the top.

Background scheduler#

If we want our Tracking Stream connection to stay open and generate CSV files on the go, we need to have some sort of scheduling process that will generate the CSV files. For that, we will use the package apscheduler and its BackgroundScheduler class.

pip install apscheduler

We can now create our scheduler and its associated callback:

main.py
import logging
from datetime import datetime, timedelta
import sys
import copy
import json
import csv
import requests
from requests.adapters import HTTPAdapter
from requests.exceptions import RetryError
from requests.packages.urllib3.util.retry import Retry
import time
from apscheduler.schedulers.background import BackgroundScheduler
from exceptions import MaxRetries, ConnectionLost
log = logging.getLogger(__name__)
target_updates = []
time_from = None
def reset_bucket():
global target_updates
target_updates = []
def export_to_csv_job():
global time_from
global target_updates
# Do the CSV export here
to_proccess = copy.deepcopy(target_updates)
old_time_from = copy.deepcopy(time_from)
time_from = datetime.now()
reset_bucket()
if len(to_proccess) > 0:
print(to_proccess[0])
data_file = open(
f"data_{old_time_from.strftime('%m_%d_%Y_%H_%M_%S')}_{datetime.now().strftime('%m_%d_%Y_%H_%M_%S')}.csv",
"w",
)
# create the csv writer object
csv_writer = csv.writer(data_file)
most_keys = max(to_proccess, key=lambda item: len(item.keys()))
csv_writer.writerow(most_keys.keys())
for elem in to_proccess:
csv_writer.writerow(map(lambda key: elem.get(key, ""), most_keys.keys()))
data_file.close()
def listen_to_stream(timeout=None):
global time_from
reset_bucket()
if timeout is not None:
timeout = datetime.now() + timedelta(0, timeout)
scheduler = BackgroundScheduler()
retry_strategy = Retry(
# 10 retries before throwing exception
total=10,
backoff_factor=3,
status_forcelist=[429, 500, 502, 503, 504, 422],
allowed_methods=["HEAD", "GET", "OPTIONS"],
)
adapter = HTTPAdapter(max_retries=retry_strategy)
http = requests.Session()
http.mount("https://", adapter)
http.mount("http://", adapter)
try:
response = http.get(
"https://api.airsafe.spire.com/v2/targets/stream?compression=none",
headers={"Authorization": f"Bearer <your_token>"},
stream=True,
)
except RetryError:
log.warn(RetryError)
raise MaxRetries()
if response.status_code == 401:
print("Unauthorized, token might be invalid")
sys.exit()
try:
scheduler.add_job(
export_to_csv_job,
"cron",
minute="*/30",
id="airsafe_stream_csv",
)
time_from = datetime.now()
scheduler.start()
except Exception as e:
log.warn(e)
print("failed to start scheduler")
raise ConnectionLost()
try:
for line in response.iter_lines(decode_unicode=True):
if timeout is not None and datetime.now() >= timeout:
scheduler.remove_job("airsafe_stream_csv")
scheduler.shutdown()
export_to_csv_job()
response.close()
sys.exit()
if line and '"target":{' in line:
target = json.loads(line)["target"]
target_updates.append(target)
except Exception as e:
log.warn(e)
scheduler.remove_job("airsafe_stream_csv")
scheduler.shutdown()
export_to_csv_job()
raise ConnectionLost()

Let's go through what we have added here:

  • We have created a reset_bucket() function that will empty our data buckets from which we are generating every CSV file when the Background Scheduler triggers a new job.
  • We have created the job function export_to_csv_job(), which takes our bucket data, processes it, and writes it down in a CSV.
  • We are launching our scheduled job, set up to every 30 minutes: */30.
  • We are handling error cases by removing our job when errors happen, or when the timeout is reached.

Connection manager#

We can now add a connection manager, which will handle possible disconnections from the endpoint, and try reconnecting after a while.

The connection manager will look like that:

main.py
def connection_manager():
try:
# If you wish to listen for a specific time:
# listen_to_stream(70) will listen for 70 seconds
listen_to_stream()
except MaxRetries:
print("stream failed to connect multiple times, will retry in 30mn")
# 30 minutes sleep (60 seconds x 30)
time.sleep(60 * 30)
connection_manager()
except ConnectionLost:
print("Connection was lost, retrying now ...")
connection_manager()

The connection manager will call our listen_to_stream function, awaiting any possible exception to try the following:

  • If the exception is MaxRetries, the endpoint was called multiple time to no avail, which will trigger the connection manager to wait for 30 minutes before retrying to connect.
  • The exception is ConnectionLost, the connection manager will retry to connect right away.
Calling our function

We can now simply call our connection_manager() function anywhere and start listening and exporting CSV files from the Tracking Stream endpoint!