Using the Airflow Experimental Rest API to trigger a DAG

The Airflow experimental api allows you to trigger a DAG over HTTP. This comes in handy if you are integrating with cloud storage such Azure Blob store. Because although Airflow has the concept of Sensors, an external trigger will allow you to avoid polling for a file to appear. In this blog post, I will show how we use Azure Functions to trigger a DAG when a file is uploaded to a Azure Blob Store.

Experimental API

The experimental API allows you to fetch information regarding dags and tasks, but also trigger and even delete a DAG. In this blog post we will use it to trigger a DAG. By default the experimental API is unsecured, and hence before we continue we should define an auth_backend which secures it. There are multiple options available, in this blogpost we use the password_auth backend which implements HTTP Basic Authentication. Something you should only use over HTTPS.

Enabling the password_auth backend is a small change to your Airflow config file:

[api]
auth_backend = airflow.contrib.auth.backends.password_auth

Next, use the Airflow web interface to create a new user to be used by the Azure Function to trigger the DAG.

Azure Functions

Azure allows you to define small snippets of code which can be triggered by a whole range of other Auzre products. Examples are being triggered by a message on an EventHub, or in this case a file appearing on a Blob Store. There are a couple of different languages to choose from, and in this case I was a bit lazy and went for JavaScript in the BlobTrigger wizard.

Blobtrigger wizard

You need to link a storage account, and define the path you want to monitor. In my case I configured it to monitor my-data/{name}, where my-data is the name of the container within the storage account.

Next, you're presented with a small example. I've extending their example a bit which resulted in the following piece of code:

var request = require('request');

module.exports = function (context, myBlob) {
    context.log("JavaScript blob trigger function processed blob \n Name:", context.bindingData.name, "\n Blob Size:", myBlob.length, "Bytes");
    filename = context.bindingData.name;
    context.log("Triggering DAG");

    request({
        url: "https://AIRFLOW_URL/api/experimental/dags/DAG_NAME/dag_runs",
        method: "POST",
        json: {'conf': '{"filename": "'+filename+'"}'},
        auth: {
            'user': 'AIRFLOW_USER',
            'pass': 'AIRFLOW_PASSWORD',
            'sendImmediately': true
        }
    }).on('response', function(response) {
        context.log(response.statusCode)
    });

    context.done();
};

You need to adjust the AIRFLOW_URL, DAG_NAME, AIRFLOW_USER, and AIRFLOW_PASSWORD. The nice thing here is that I'm actually passing the filename of the new file to Airflow, which I can use in the DAG lateron.

Airflow DAG

The full Airflow DAG itself I won't post, but in the excerpt below I show how to use the filename in the DAG.

def copy_blob(templates_dict, **kwargs):
    if templates_dict['filename']:
        #do something with the filename
        pass
    else:
        #no filename specified, probably a manual run
        pass

with DAG(....) as dag:
    copy_blob = PythonOperator(task_id='copy_blob',
                               provide_context=True,
                               python_callable=copy_blob,
                               templates_dict={'filename': "{{dag_run.conf['filename']}}"})

Concluding

So in this blog post I've shown how to use an Azure Function to trigger an Airflow DAG using the experimental API. I very much like the fact that it removes the need of polling for a file, and it allows us to integrate Airflow nicely in an otherwise cloud native setup.

By the way, we offer an Airflow course to teach you the internals, terminology, and best practices of working with Airflow, with hands-on experience in writing an maintaining data pipelines.

Author
Follow us for more of this
Recent posts
Recent tweets
Stay up to date on the latest insights and best-practices by registering for the GoDataDriven newsletter.
Follow us for more of this