After following this tutorial you will have a working target update stream, consuming the Tracking Stream with Python, in a Docker container, without timeouts, and with the ability to reconnect to the stream shortly before a disconnection event.
In this example, we log the arriving target updates of a filtered query that only shows target updates over Atlanta airport. At every position_token we receive, we write the token to the local file system. The last token is written and then renamed to make the update operation atomic. This avoids corrupt or half-written position_tokens. In case of a disconnect, the stream reconnects at the last available position_token, to avoid missing target updates.
This example is best used as a base for a persistent connection that is not expected to interrupt regularly, e.g. a stream on EC2 or Google Compute Engine. Here we do not timeout the connection, but keep it alive indefinitely, until either the server disconnects or the consumer fails. Then the consumer can reconnect to the last known position_token to avoid missing target updates, but it will likely receive some duplicate target updates.
You can find and download the source code for this example here.
This overview outlines the purpose of each file in this example, and which role it plays.
client_test.pycontain production-ready sample client code that wraps the v2/targets/stream API; It exposes target updates via callbacks and handles graceful disconnection to avoid duplicate target update delivery.
client.pyand manages loading and storing position_tokens, that encode the position in the stream that the client has progressed to. This is also where the TargetProcessor class is located: This class processes target updates as they come in, and exposes a callback function to do so.
Pipfile.lockdefines all Python package dependencies for the client, like the requests package to easily call network resources, and pytest for testing and development.
Dockerfilecontains the Docker image definition and specifies the command that will be called when starting a container.
The next section lists all prerequisites, with links and notes on how to install them.
To execute this tutorial you need the following accounts and tools on your system:
- Docker to build the Docker image and run the Docker container;
- Git to download the source code; and
- pyenv to install and load the correct Python version, in case you want to modify it.
Having these prerequisites in place you can walk through the next section to set up the example on your account.
The first part of this section contains and describes the necessary commands to build the Docker image and run the example on your system. The second part shows how to set up the development environment, and adapt the code to your needs.
Build and tag the Docker image
To build the Docker image fetch the source code from
our Github repository. In the terminal, navigate into the
tutorial-resources/docker folder. In that folder, execute the docker build command.
In the next step we pass the Tracking Stream token to the image and mount a folder where the streaming process can persist position_tokens, to allow it to reconnect after a potential failure.
Start the container locally, correctly setting the required environment variable and position_token mount point
To start the container prepare as follows.
- First create a directory on the local file system to write position_tokens to, e.g. a folder
- Secondly export the Tracking Stream token you received from Spire as an environment variable, e.g. by executing
After these preparatory steps, execute the docker run command.
After a few seconds, you should start seeing a stream of logged target updates In the terminal where you started the
Docker container. In a real system, you would adapt the
TargetProcessor class to do more than just log the incoming
In the folder
position_tokens you should see a rolling list of timestamped
position_tokens. The alphabetically last
file contains the last
position_token that has been sent by the stream. Looking into
main.py you can see that the
PositionTokenProcessor first writes incoming
position_tokens timestamped with a 0 value and then renames them,
which is an atomic operation on most systems. In case the write operation fails, the corrupt file will not interfere
with the restart, since the restart reads the last available
position_token, and ignores the previous ones.
The stream will not disconnect, other than if the server disconnects, or something fails on the client-side.
This set-up can be called from a Linux service file to enable auto-reconnection after a disconnect or failure. The image can be built locally or in CI / CD, and Ansible can deploy the service file to load the image and start the container.
Note: The first start of the container will report that it can't find a valid
position_token and starts without one
at the latest position in the stream. Following invocations will re-start from their respective previous
The core functionality for further processing, filtering or forwarding of target updates is located
main.py in the class
TargetProcessor. This section shows how to modify and re-deploy that code.
For this example, the target processor only logs target updates. For a real use-case, the callback might add the target update to a list, to allow batch-processing after the time-out; it might forward the target update to a stream processor, or to a PubSub topic.
To modify the Python code of the GCP Cloud Function (
main.py) or the API wrapper (
first install pyenv, then
cd into the
pyenv initand follow the instructions, to get pyenv ready to load the correct python version.
pyenv install $(cat .python-version)to install the required Python version on your system.
pyenv shell $(cat .python-version)to load this python version into the active shell.
pip install pipenvto install pipenv into the active pyenv virtual environment.
pipenv --python $(cat .python-version)to create a virtual environment for this project.
pipenv shellto load it into the active shell.
pipenv sync --devto install development and production requirements.
pipenv --venvshows the virtual environment location, to correctly set the environment in your IDE.
To stop the active container, press Ctrl + C.
To remove the created Docker image, execute
docker image rm airsafe-2-stream-example