In addition to generating the DDL code for the deployment of Data Vault models and the ETL code to load these models, VaultSpeed has long offered out-of-the-box support for orchestration tools such as Airflow and Azure Data Factory. Now, with the introduction of the Generic FMC option for orchestration, VaultSpeed can generate the logic needed to create orchestration jobs to run on virtually any orchestration platform.

One of the first options we used to test the Generic FMC was Snowflake Tasks. This was an exciting option because it allows users to deploy all the VaultSpeed code components to a single, solid, and highly performant cloud data warehouse. In this post, I will walk you through one option for deploying and running VaultSpeed's load orchestration in Snowflake Tasks using just SQL.

The Model

Using the TPCH_SF1 schema in the SNOWFLAKE_SAMPLE_DATA database as my source, I created a basic Data Vault model with two hubs, a many-to-many link, and corresponding satellites. Then I generated and deployed all the DDL and ETL to my Snowflake data warehouse.

Next, I generated the FMC workflow for the initial load of my Raw Vault, making sure my system parameter FMC_TYPE was set to the new option "generic":

The Snowflake SQL Objects

After creating the FMC, I was ready to set up my Snowflake schema with the objects I needed to (a) assist with executing the load stored procedures generated in VaultSpeed and (b) create the Snowflake Task DAG. First, I created the schema, sequence object, and procedures needed to help run the DAG with the VaultSpeed ETL:

1.     A database schema named "TASKER"

CREATE OR REPLACE SCHEMA TASKER;
  1. A sequence object named "FMC_SEQ" for generating a load cycle id for each DAG execution.

CREATE OR REPLACE SEQUENCE TASKER.FMC_SEQ START = 1 INCREMENT = 1;
  1. A stored procedure named "GET_LOAD_CYCLE_ID()" for capturing the load cycle id as a variable that can be passed through the DAG. (OK, this is almost all SQL. I needed JavaScript for these procs.)

CREATE OR REPLACE PROCEDURE TASKER.GET_LOAD_CYCLE_ID()
RETURNS VARCHAR(16777216)
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
      var LoadCycleIdRslt=snowflake.execute({ sqlText: " SELECT TASKER.FMC_SEQ.NEXTVAL"});
      LoadCycleIdRslt.next();
      var LoadCycleId = LoadCycleIdRslt.getColumnValue(1);
      var lci_str="'"+LoadCycleId+"'";
      var sql_str="CALL SYSTEM$SET_RETURN_VALUE("+lci_str+")";
      var res = snowflake.execute({sqlText:sql_str});
      return "SUCCESS";
$$;
  1. A stored procedure named “RUN_MAPPING_PROC()” for running the load procedures with error handling. The catch block executes the failure-logging task at the end of the DAG. (Note the procedure schema for the ETL is "VS_DEMO_PROC" in this example. Change it to the ETL schema specific to your Data Vault.)

CREATE OR REPLACE PROCEDURE TASKER.RUN_MAPPING_PROC("PROC_NAME" VARCHAR(50), "LOAD_CYCLE_ID" VARCHAR(16777216))
RETURNS VARCHAR(16777216)
LANGUAGE JAVASCRIPT
EXECUTE AS OWNER
AS 
$$
    var call_sp_sql = "CALL "+PROC_NAME+";
    try {
        snowflake.execute( {sqlText: call_sp_sql} );
        result = "SUCCESS";
        }
    catch (err)  {
        result =  "Failed: Code: " + err.code + "\\n  State: " + err.state;
        result += "\\n  Message: " + err.message;
        result += "\\nStack Trace:\\n" + err.stackTraceTxt;
        snowflake.execute( {sqlText: "CALL VS_DEMO_PROC.FMC_UPD_RUN_STATUS_FL_TPCH('"+LOAD_CYCLE_ID+"', '0')"} );
        }
    return result;
$$;

After creating these objects, I had all the SQL objects needed to run the ETL in the DAG.

But I still needed the DAG. Next, I created the table and procedure that generates the DAG:

  1. In the existing schema "TASKER", a table named "JSON_MAPPING" with a variant column to store the JSON (Below you will see how I queried it to flatten the JSON mappings into records we can use to build the DAG)

CREATE OR REPLACE TABLE TASKER.TASK_MAPPING ( ID NUMBER(38,0) AUTOINCREMENT, JSON_MAPPING VARIANT ));
  1. A stored procedure named "CREATE_VS_FMC()". This is the main procedure that builds and deploys the tasks into a DAG. The warehouse variable needs to be set to your Snowflake warehouse. My warehouse was the default warehouse "COMPUTE_WH".

CREATE OR REPLACE PROCEDURE TASKER.CREATE_VS_FMC("TASK_SCHEMA" VARCHAR(50), "DAG_NAME" VARCHAR(50), "LOAD_SCHED" VARCHAR(20))
RETURNS VARCHAR(16777216)
LANGUAGE SQL
EXECUTE AS OWNER
AS
$$
DECLARE var_job VARCHAR(50) default '';             -- base name of load task 
        var_proc VARCHAR(50)default '';             -- task stored procedure name
        var_dep VARCHAR(50) default '';             -- task dependency
        var_seq INT default 0;                      -- load group sequence (EXT, STG, etc.)
        dep_arr ARRAY default ARRAY_CONSTRUCT();    -- array for dependencies
        dep_counter INTEGER default 0;              -- dependency counter
        dep_list VARCHAR(1000) default '';          -- dependency list for building CREATE TASK statement
        dep_last_task VARCHAR(50) default '';       -- last dependency (needed to pass load cycle id to last task in dag)
        proc_schema VARCHAR(50) default '';         -- schema containing load procedures
        task_sql VARCHAR(1000) default '';          -- single create task sql statement
        var_sql VARCHAR(150000) default '';         -- string containing sql statement to be executed
        task_counter INT default 0;                 -- counter tallying # of tasks
        warehouse VARCHAR(30) default 'COMPUTE_WH'; -- Snowflake warehouse
        /* params
        task_schema VARCHAR(50) default 'TASKER';   -- schema where tasks are created
        dag_name VARCHAR(50) default 'INIT_TPCH';   -- Dag name and base name of first task
        load_sched VARCHAR(20) default '60 MINUTE'; -- load schedule (must be one of Snowflake's boiler plate task schedules.)
        */
BEGIN
    --Create initial task for getting load cycle id
    var_sql := 'CREATE OR REPLACE TASK EXEC_'||UPPER(dag_name)||' WAREHOUSE = '||warehouse||' SCHEDULE = '''||load_sched||''' AS CALL '||UPPER(task_schema)||'.GET_LOAD_CYCLE_ID();';
    EXECUTE IMMEDIATE var_sql;
    task_counter := task_counter + 1;

    --Create temp table for task mappings (jobs)
    CREATE OR REPLACE TEMP TABLE JOBS (SEQ INT, JOB VARCHAR(50), PROC_SCHEMA VARCHAR(50), DEPENDENCY VARCHAR(50));

    /* Insert task mappings into JSON temp table,
        Assigning a number sequence to each load layer:
    1    = FMC start task
    2    = EXT layer
    3    = STG layer
    4    = DV layer
    5    = FMC end task */
    INSERT INTO JOBS (SEQ, JOB, PROC_SCHEMA, DEPENDENCY)
    WITH JD AS (
    SELECT j.KEY AS JOB
        ,j.VALUE:map_schema::string AS PROC_SCHEMA
        ,d.VALUE::string AS DEPENDENCY
    FROM TASK_MAPPING,
        LATERAL FLATTEN(input=> JSON_MAPPING) j, 
        LATERAL FLATTEN(input=> j.VALUE:dependencies, outer=> true) d
    )
    ,MTD AS (
        SELECT 1 AS SEQ, JOB, PROC_SCHEMA, DEPENDENCY 
        FROM JD
        WHERE DEPENDENCY is null
     )
    ,EXT AS (
        SELECT 2 AS SEQ, JOB, PROC_SCHEMA, DEPENDENCY 
        FROM JD
        WHERE DEPENDENCY IN (SELECT JOB FROM MTD)
    )
    ,STG AS (
        SELECT 3 AS SEQ, JOB, PROC_SCHEMA, DEPENDENCY 
        FROM JD
        WHERE DEPENDENCY IN (SELECT JOB FROM EXT)
    )
    ,DV AS (
        SELECT 4 AS SEQ, JOB, PROC_SCHEMA, DEPENDENCY 
        FROM JD
        WHERE DEPENDENCY IN (SELECT JOB FROM STG)
    )
    SELECT SEQ, JOB, PROC_SCHEMA, DEPENDENCY FROM MTD
    UNION ALL
    SELECT SEQ, JOB, PROC_SCHEMA, DEPENDENCY FROM EXT
    UNION ALL
    SELECT SEQ, JOB, PROC_SCHEMA, DEPENDENCY FROM STG
    UNION ALL
    SELECT SEQ, JOB, PROC_SCHEMA, DEPENDENCY FROM DV
    UNION ALL
    SELECT 5 AS SEQ, JOB, PROC_SCHEMA, DEPENDENCY 
    FROM JD 
    WHERE DEPENDENCY IN (SELECT JOB FROM DV)
    ORDER BY SEQ, JOB;

    -- Create task for logging start of FMC
    SELECT JOB INTO var_job FROM JOBS WHERE SEQ = 1;
    SELECT PROC_SCHEMA INTO proc_schema FROM JOBS WHERE SEQ = 1;
    var_sql := 'CREATE OR REPLACE TASK EXEC_'||UPPER(var_job)||' WAREHOUSE = '||warehouse||' AFTER EXEC_'||UPPER(dag_name)||' AS 
    BEGIN
        CALL '||UPPER(proc_schema)||'.'||UPPER(var_job)||'('''||dag_name||''', TO_VARCHAR(SYSTEM$GET_PREDECESSOR_RETURN_VALUE()), TO_VARCHAR(CURRENT_TIMESTAMP::timestamp));
        CALL SYSTEM$SET_RETURN_VALUE(SYSTEM$GET_PREDECESSOR_RETURN_VALUE());
    END;';
    EXECUTE IMMEDIATE var_sql;
    task_counter := task_counter + 1;

    -- Create FMC, EXT, STG, and RV load tasks in order of dependency
    DECLARE j1 cursor for SELECT DISTINCT JOB, PROC_SCHEMA, SEQ FROM JOBS WHERE SEQ > 1 ORDER BY SEQ;
    BEGIN
        FOR record in j1 do
            var_proc := record.JOB;
            proc_schema := record.PROC_SCHEMA;
            var_seq := record.SEQ;
            task_sql := 'CREATE OR REPLACE TASK EXEC_'||UPPER(var_proc)||' WAREHOUSE = '||warehouse||' AFTER ';

            -- Build dependency list
            SELECT ARRAY_AGG(TO_ARRAY(DEPENDENCY)) INTO dep_arr FROM JOBS WHERE JOB = :var_proc; 
            dep_list := '';
            dep_counter := 0;
            FOR dep in dep_arr DO
                dep_counter := dep_counter + 1;
                dep_list := dep_list||CASE WHEN :dep_counter > 1 THEN ', ' ELSE '' END||'EXEC_'||UPPER(ARRAY_TO_STRING(dep,', '));
                --Get the last dependency, ensuring only one dependency task is called to fetch the load cycle id.
                dep_last_task := 'EXEC_'||UPPER(ARRAY_TO_STRING(dep,', '));
            END FOR;

            -- Build create task statements
            var_sql := CASE WHEN :var_seq = 5 THEN  -- last task in DAG; SUCCESS
                                task_sql||dep_list||' AS CALL '||UPPER(proc_schema)||'.'||UPPER(var_proc)||'(TO_VARCHAR(SYSTEM$GET_PREDECESSOR_RETURN_VALUE('''||dep_last_task||''')), ''1'');'
                            ELSE -- all other tasks, run within handling procedure
                                task_sql||dep_list||' AS BEGIN 
                                                            CALL RUN_MAPPING_PROC('''||UPPER(proc_schema)||'.'||UPPER(var_proc)||'()'', TO_VARCHAR(SYSTEM$GET_PREDECESSOR_RETURN_VALUE('''||dep_last_task||'''))); 
                                                            CALL SYSTEM$SET_RETURN_VALUE(SYSTEM$GET_PREDECESSOR_RETURN_VALUE('''||dep_last_task||''')); 
                                                        END;'
                            END;
            EXECUTE IMMEDIATE var_sql;
            task_counter := task_counter + 1;
        END FOR;
    END;

    DROP TABLE JOBS;

    -- Enable all tasks in the dag
    SELECT SYSTEM$TASK_DEPENDENTS_ENABLE('EXEC_'||:dag_name);

    RETURN TO_VARCHAR(task_counter)||' tasks created for dag '''||'EXEC_'||:dag_name||'''.';
END;
$$;

The FMC Ingestion and Transformation

After I set up my Snowflake objects, I downloaded my FMC zip file from VaultSpeed, unzipped it, and opened the file named "11_mappings_INIT_TPCH_20230117_063529.json." (You can copy the JSON from the attached "11_mappings_INIT_TPCH_20230117_063529.pdf" with Control/Command + A.) To keep things simple for this example, I used an INSERT statement to load the JSON into the JSON_MAPPING table. I copied the JSON text to the statement below and ran the insert. Before copying this, I needed to remove the escape quotes (\") and remove the string where the source schema is qualified by the database name ("SNOWFLAKE.", in this case):

11_mappings_INIT_TPCH_20230117_063529.pdf
21.28 KB
TRUNCATE TASKER.TASK_MAPPING;
INSERT INTO TASKER.TASK_MAPPING (JSON_MAPPING)
SELECT TO_VARIANT(PARSE_JSON('{PASTE MAPPING JSON HERE}');

Once the JSON was loaded, I ran the main procedure "CREATE_VS_FMC()" with three parameters:

  1. task_schema - The schema where you will create the tasks/DAG. I created all the tasks and objects in "TASKER".

  2. dag_name - This is the basis for the name of the dag. The root task that identifies the dag will appear with the "EXEC_" prefix. In this example, The dag_name parameter was 'INIT_TPCH', and the root task that identifies the dag in Snowflake is 'EXEC_INIT_TPCH'. This corresponds to Name of the FMC in VaultSpeed.

  3. load_sched - This is the load schedule. It is a required parameter and must be in the format that Snowflake specifies. I set my schedule to '60 MINUTES'. See task schedules in Snowflake docs for more details: https://docs.snowflake.com/en/sql-reference/sql/create-task.html

CALL TASKER.CREATE_VS_FMC('TASKER', 'INIT_TPCH', '60 MINUTE');

A Snowflake DAG for My Data Vault!

Fifteen new tasks appeared under Tasks in the schema “TASKER”. These made up the dag EXEC_INIT_TPCH. I clicked on the root task by the same name and launched its detail page to see the graph view and execute the DAG.

 I saw my hubs, links, and satellites populated with data...

and my load control tables in the FMC schema were updated with load history:

NOTE: This example only covers source-level loading--not object-specific loading.

The new Generic FMC takes advantage of the new native SQL that is now used to load the Data Vault tables and update the load control tables in each orchestration. These stored procedures and scripts make it possible to use the FMC in many different orchestration runtimes. This is one example in Snowflake. Hopefully, this has your gears turning, and you will find more ways to implement this flexible orchestration logic.

In Part 2, I will describe how I deployed the Snowflake DAG automatically using the Custom Script deployment in VaultSpeed.

6