An FMC implementation for dbt Cloud native scheduling

With the introduction of the Generic FMC, it is now possible to execute the VaultSpeed code with most scheduling platforms, as long as they support a few basic features.
This post describes how I went about implementing an FMC solution for dbt Cloud's scheduler.
The code for using this integration can be found on our GitHub.

Step 1: Scheduling

We start with the basics; how can we schedule things in dbt Cloud?
In dbt Cloud, we can create Jobs, these can be set to run at a specific schedule, and each of these jobs can execute a series of dbt commands. More info can be found in the dbt docs (https://docs.getdbt.com/docs/deploy/dbt-cloud-job).
We will create a job for each of our FMC flows.
These Jobs can also be created via an API, which we will use in our implementation.

So our first step consists of writing a script that can parse the Generic FMC JSON files and convert them into the appropriate API calls.

In the Github repo mentioned above, you will find a Python script dbt_cloud_fmc.py. This script contains all the logic to deploy our FMC to dbt Cloud.
In particular, it includes a function create_job, which uses the dbt Cloud API to create a job. As input, we need three things.
First, a job name. For this, we will use the dag_name property from the FMC_info JSON files.
Secondly, an array of commands that we need to execute. In the following steps, we will figure out which commands we need to load the VaultSpeed code correctly.
Lastly, a schedule definition, for our initial loads; we want to run them once, so we don't need it. For our incremental loads, we can enter a JSON schedule definition that conforms to the dbt Cloud API spec in the Flow definitions in VaultSpeed; these values will then be present in the INFO JSON file. For example, for an hourly run, this would be: {'date': {'type': 'every_day'}, 'time': {'type': 'every_hour', 'interval': 1}}. The possible options can be found at https://docs.getdbt.com/dbt-cloud/api-v2#tag/Jobs/operation/createJob.

Note that since the initial loads do not have a schedule, they must be executed manually or via the API. There currently doesn't seem to be an option to run a job just once at a specific time.

Step 2: FMC metadata

In general, a VaultSpeed FMC flow contains the following three parts.

  1. Setting FMC metadata. The first task in a pipeline consists of calling a procedure that does all our loading window management and loading history tracking.

  2. Executing the Mapping to load the Raw Vault or Business Vault. This will be discussed in the next step.

  3. Calling a procedure that saves the status of the load to the history table(s).

Let's start with part 1. For this, we need to figure out how to execute a Snowflake procedure from DBT and give it the required runtime information (load date and load-cycle id).
With the run-operation command in dbt, we can execute a macro and give it some parameters as input. Inside this macro, we could then call our Snowflake procedure.

These macros would look something like this:

{% macro set_fmc_mtd_fl_init(dv_name, dag_name, proc_schema, proc_name, start_date) -%}
    {%- set lci_table = dag_name + '_lci' -%}
    {% set query -%}
        begin transaction;
        drop table if exists {{lci_table}};
        create table {{lci_table}} as select {{dv_name}}_load_cycle_seq.nextval as lci;
        call "{{ proc_schema }}"."{{proc_name}}"(
            '{{ dag_name }}',
            (select lci from {{lci_table}}),
            '{{ start_date }}'
        );
        commit;
    {%- endset %}

    {% do run_query(query) %}

{% endmacro %}

We can build a Snowflake query and then run it using run_query. I made these macros as generic as possible such that they can be deployed to dbt as is, and all project-specific info is passed as variables.

The dbt Cloud Jobs don't provide us with something we could use as the load cycle id and load date. So we will have to manage those ourselves. For the load date, we will use the start date from the info file for initial loads. For incremental loads, we will use the current_timestamp.
We will create our own sequence for the load cycle id, but since we also need this id for our status update at the end of the load, we need to persist it somewhere. This is done by creating a new table specific to the current flow and storing it there.

In our main Python script, we can generate the commands to execute these macros as follows:

set_mtd_cmd = f"""dbt run-operation set_fmc_mtd_{fmc_info['flow_type'].lower()}_init --args '{{dv_name: {fmc_info['dv_code']}, dag_name: {fmc_info['dag_name']}, proc_schema: {proc_schema}, proc_name: {proc_name}, start_date: "{fmc_info['start_date'].replace('T', ' ')}"}}'"""

For the final part of our pipeline, we need to execute a Snowflake procedure and provide it with the status of the load.
We can achieve this in dbt by specifying a macro in the on-run-end property of the project. This macro can have as input the status of the dbt command, and by parsing that property, we can determine whether the mappings have failed or not.

The status update procedure can be called by the following macro:

{% macro fmc_upd_run_status_fl(dag_name, proc_schema, proc_name, success_flag) -%}
    {%- set lci_table = dag_name + '_lci' -%}
    {% set query -%}
        begin transaction;
        call "{{ proc_schema }}"."{{proc_name}}"(
           (select lci from {{lci_table}}),
           '{{success_flag}}'
        );
        commit;
    {%- endset %}

    {% do run_query(query) %}

{% endmacro %}

Note that we retrieved the correct load cycle id from the table we created in the first part.

To provide this macro with the necessary parameters, we will build a second macro on top of this one. However, this macro will have to be dynamic since we cannot specify extra parameters for the on-run-end macro in our dbt command.
The Python script will generate a custom macro based on the generic FMC data and save it into the macros directory of the generated files.
This macro will do two things, determine the first three parameters for the fmc_upd_run_status_fl macro, based on the runtime variables, and secondly, determine the status of the run.

The dag_name, proc_schema and proc_name can be generated as follows:

{% if var("source") == 'moto_sales' %}
    {%- set proc_name = 'fmc_upd_run_status_fl_ms' -%}

    {% if var("load_type") == 'INCR' %}
        {%- set dag_name = 'moto_sales_incr' -%}
    {% endif %}
    {% if var("load_type") == 'INIT' %}
        {%- set dag_name = 'moto_sales_init' -%}
    {% endif %}
{% endif %}

While the status can be determined by:

{% set vars = {'success_flag': 1} %}

{% for res in results %}

    {% if res.status == 'error' %}
        {# workaround since we cant just do variable updates inside a loop with Jinja, their values are cleared when the loop ends #}
        {% if vars.update({'success_flag': 0}) %} {% endif %}
    {% endif %}

{% endfor %}

Note that we cannot just have a {% set success_flag = 1/0 %} since set commands inside a for load do not persist outside of it. Thus we have to use a more complex syntax.

This macro will be generated by the generate_status_update_macro function in our Python script.

Step 3: Mapping execution

The command for executing the actual mappings for our flow is relatively straightforward; we call the dbt run command, specify the source name, or BV as a tag selection and provide the necessary variables:

if fmc_info["flow_type"] == "FL": 
    flow_cmd = f"dbt run --select tag:{fmc_info['src_name']} --vars '{{load_type: {fmc_info['load_type']}, source: {fmc_info['src_name']}}}'"
else:
    flow_cmd = f"dbt run --select tag:BV --vars '{{load_type: {fmc_info['load_type']}, source: BV}}'"

Step 4: The Business Vault

The unique part of the Business Vault Flows is that they should not run while the Raw Vault is still being executed. Since all flows are separate jobs, with their own schedule, and there is currently no way to specify dependencies between jobs, we will have to build something ourselves.

The trick we use in cases like this is to create a task that keeps checking the loading history table for records where the success flag is null, meaning that they are still running, and where the job would only stop running when all flows have ended.

We can create such a procedure in Snowflake:

CREATE OR REPLACE PROCEDURE wait_for_running_flows()
RETURNS VARCHAR
LANGUAGE SQL
AS
BEGIN
  LET running := 0;
  REPEAT
    CALL SYSTEM$WAIT(10);
    select count(1) into :running from "moto_fmc"."fmc_loading_history" where "success_flag" is null;
  UNTIL (running = 0)
  END REPEAT;
  RETURN 'All flows are finished';
END;

This will check whether any flow is still running every 10 seconds.

We can then add this procedure to our macro that sets the metadata:

{% macro set_fmc_mtd_bv_incr(dv_name, dag_name, proc_schema, proc_name) -%}
    {%- set lci_table = dag_name + '_lci' -%}
    {% set query -%}
        begin transaction;
        drop table if exists {{lci_table}};
        create table {{lci_table}} as select {{dv_name}}_load_cycle_seq.nextval as lci;
        call "{{ proc_schema }}"."{{proc_name}}"(
            '{{ dag_name }}',
            (select lci from {{lci_table}}),
            TO_VARCHAR(TO_TIMESTAMP_NTZ(current_timestamp))
        );
        commit;
    {%- endset %}

    {% set wait_query -%}
        call wait_for_running_flows();
    {%- endset %}

    {# This will wait for all other flows to finish, only after it ends can we start the BV loading process #}
    {% do run_query(wait_query) %}

    {% do run_query(query) %}

{% endmacro %}

Note that currently, this wait_for_running_flows procedure is not generated automatically, but will have to be adjusted manually to have the correct object names. The procedure can be found in the snowflake_fmc_prerequisites.sql file in the Github repository.

The Result

Now we have all the components for an FMC integration. We have a Python script that will parse our FMC JSONs and create the jobs, as well as a set of extra macros to add to our dbt project.

Let's execute our Python script:

We can now see our jobs in dbt-cloud, and start the initial loads manually; the incremental loads will begin running automatically on the required schedule:

Thanks for reading; I hope this post gave you some insight into how we approach building a new FMC integration and how our dbt Cloud integration works under the hood.

If you are a dbt expert with ideas for improvements, feel free to implement them and set up a pull request.
Or, build your own integration for a different scheduling platform and add it to our repository.

5
1 reply