In terms of open-source workflow management tools, Apache Airflow has established itself as the de facto standard. At the time of writing this it has 1,827 contributors and 41 releases listed on its Github repo. But as is usual with software, newer implementations have arisen and one of those getting a lot of attention is Prefect. I decided to take a look and see what’s under the hood.

The Prefect ethos

Prefect (the company) was founded in 2017. The CEO is Jeremiah Lowin and he was an Airflow core contributor.

Prefect has an ethos centered around positive vs negative engineering: Engineers spend a significant amount of time handling the unexpected. And we like tooling that supports this with features like modularity, transparent implementations, built-in retries and mostly importantly good logging. The creators of Prefect understand this and have endeavoured to build a tool to support this.

Prefect’s design goal is be minimally invasive when things go right and maximally helpful when go wrong. Nice.

The Hitchhiker’s Guide

image-20211121163048579

There’s also a geeky aspect to it which I enjoy which is the Hitchhiker’s Guide to the Galaxy theme. (If you haven’t read it, do, it contains one of the few famous Dents). Ford Prefect is a character in these books and though I couldn’t find any explicit reference to the book in their documentation, the fact that the search bar has a snarky Marvin and that Towel is one of the orchestrator services left me in no doubt as to the origin of the name.

What about Airflow

So how does it compare to Airflow? I will not be covering that in this article, they have a long, well-thought-out article in the documentation entitled “Why Not Airflow?”. Any challenger to Airflow is going to have to address this question, but the Prefect folk are quite explicit about it. Perhaps in a future blog post I can do a feature-by-feature comparison.

The basics

Here I’ll run through some of the core concepts of Prefect. They have excellent documentation with many code samples. My intention here is to show the simplicity of using Prefect and to look a little deeper in terms of their implementation.

Task

from prefect import task

@task
def add(x,y):
	return x + y

A Task is a discrete action in a workflow. It has an optional input and an optional result. It just requires decorating a function 🎄. Simple.

Flow

from prefect import task, Flow
import random

@task
def random_number():
    return random.randint(0, 100)

@task
def add(x,y):
    return x + y

with Flow('My Functional Flow') as flow:
    x = random_number()
    y = random_number()
    sum = add(x,y)

flow.run()

A Flow is a container for Tasks. It describes the Task dependencies. It’s a Directed Acyclic Graph (this is not surprising 😛)

Parameter

from prefect import task, Flow, Parameter

@task
def print_plus_one(x):
    print(x + 1)

with Flow('Parameterized Flow') as flow:
    x = Parameter('x', default=2)
    print_plus_one(x=x)

flow.run(parameters=dict(x=1))  # prints 2
flow.run(parameters=dict(x=100))  # prints 101
flow.run()  # prints 3

These are special tasks that receive user input and allow defaults. This is one of the differentiators from Airflow, more here.

Schedule

A Schedule object can be attached to a flow and allows for complex scheduling configurations.

from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule

@task
def say_hello():
    print("Hello, world!")

schedule = IntervalSchedule(interval=timedelta(minutes=2))

with Flow("Hello", schedule) as flow:
    say_hello()

flow.run()

State

This is the currency of Prefect. State objects represent information about running tasks or flows. At any moment, you can learn anything you need to know about a task or flow by examining its current state or the history of its states.

image-20211121165958805

There are three main state types, Pending, Running and Finished. There are subclasses of these states as can be seen above. There is also MetaState which enhance existing states.

The documentation gives a really good coverage of states. What was most interesting for me is the ability to define state change handlers which allows for really fine grained monitoring and alerts.

Runners and Executors

This is going a bit beyond the “basics” but helped with understanding the underlying processing of tasks and flows

image-20211123210612080

There are two classes, FlowRunner and TaskRunner:

  • FlowRunner takes a flow and attempts to run all its tasks, followed by collecting the resulting tasks and if all are complete, returning the final state. If a task is unfinished it will loop through the tasks again and operate on them based on their states (so if a task is finished it will not be run again)
  • FlowRunner may also receive parameter values if they have been specified
  • TaskRunner executes a single task. It determines from the initial state and any upstream states if the task should be run. Mapping is also handled here which generate dynamic task runners. Post processing is performed to determine the need for a retry (or caching)

Executors run the tasks and support submit and wait functions. Various types are supported, from a synchronous local process to a completely separate Dask engine for asynchronous processing.

Give it a try!

All of this can be setup and run with a single python package. No extra setup or dependencies.

If you want to give it a try:

  • clone the Prefect repo from here and in the terminal open examples/tutorial
  • install required packages including the Prefect package with pip install -r requirements.txt (preferably within a Python virtual environment)
  • run python 02_etl_flow.py

To get a feeling for the functionality I do recommend following the full tutorial.

But wait, there’s more!

So perhaps you’re happy with a single flow running happily in a container. But you might want a few more things like:

  • managing and visualising multiple flows
  • a fancy UI for running and monitoring
  • a GraphQL API for integration with other services

And you can have it with Prefect Cloud or Prefect Server.

Prefect Cloud

This is a managed backend that offers the goodies above but also:

  • permission and authorization management
  • SLAs
  • agent monitoring

They support a free tier with 3 users and 10,000 runs per month. And it supports a hybrid configuration in which your data and code are kept private. The execution can be performed on agents within your own infrastructure.

Prefect Server

In many projects there’s no possibility to connect to services outside of an internal network. For this there’s Prefect Server which is an open source backend containing:

  • a scheduler
  • a web server
  • a metadata database

You can run this locally with ease. If you have Docker installed you can follow the tutorial below to get a feel for the functionality supported.

Testing out Prefect Server

  1. If you haven’t done so, install prefect with pip install prefect and clone the Prefect repo from here

  2. Run the following to change the backend to Prefect Server (the default is Prefect Cloud):

    prefect backend server
    
  3. In a terminal start the prefect server. This will spin up a number of containers

    prefect server start
    
  4. In another terminal start an agent.

    prefect agent local start
    
  5. In another terminal, create a project with

    prefect create project "conditional"
    
  6. Ensure that the project has been created by going to localhost:8080 in a browser and selecting the “conditional” project from the dropdown

    image-20211228172821389

  7. Open the folder examples/tutorial and register the conditionals flow with

    prefect register --project conditional --path conditional.py --name "Example: Conditional Tasks"
    
  8. You should now be able to see the flow in the project under “FLOWS”: select it.

  9. A couple of things to try out:

    • Run the flow by selecting the flow and then “QUICK RUN” and watch the live updates.
    • Visualise the tasks of the flow by selecting “TASKS”.
    • Visualise the DAG by selecting “SCHEMATIC”.
    • See the details of a specific run by selecting “RUNS” and selecting a run.

image-20211228182308660

Deployment

Their deployment architecture looks as follows:

This allows for the use of Prefect Cloud for managing and visualising one’s flows. This would still require the provisioning of the execution environment and additional storage and monitoring. They provide guidance on agent provisioning which should cover most cloud setups.

If deploying to a closed environment one would then need to deploy Prefect Server. This could be deployed on a single node similarly to how it can be run locally (which uses docker-compose under the hood). A more elaborate deployment would be to a Kubernetes cluster and they provide a Helm chart to support this. It is also possible to configure use of a separate Postgres instance instead of the one running in a container, more here.

I have unfortunately not been able to test out the deployment and will hopefully cover it in a future post.

Connections

Any decent workflow management tool should have a plethora of options for connecting to external services and Prefect is no exception. The Task Library has an overview and more details can be found by select “prefect.tasks” in the API page.

Conclusion

What I liked

  • The minimally invasive goal: Prefect doesn’t force one to conform to use of a specific API or code style
  • Simplicity: I could quickly grasp the base concepts
  • The introduction tutorial: clearly explained and I could get up and running quickly
  • The documentation in general: it’s extensive and well written

What I liked less

  • The community is still small: while searching online for details about a the Shell Task I couldn’t find many posts. This will obviously change as the adoption increases
  • I did struggle getting Prefect Server up an running as the default is set to Cloud and I missed the command that does the switch
  • The ETL example passes data between functions. This is good for demonstrating the data passing possibilities but I can’t foresee doing something similar for large datasets. This can, of course, be done using something like the Databricks Task or perhaps using a combination of Fivetran and DBT

Overall I enjoyed working with Prefect and look forward to using it in future projects.