Loading Data to a Snowflake Data Warehouse

This section describes the queries for loading data into a Snowflake Destination. It assumes that you are familiar with Hevo’s process for Loading Data to a Data Warehouse.

Note: The queries listed here have been simplified to facilitate understanding of the overall data deduplication and loading process.

Loading Data without Primary Keys

If primary keys are not present in the Destination tables, Hevo directly appends the data into the target tables in the Destination data warehouse using the following steps:

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at and, if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Create a temporary variant table:

     CREATE TEMP TABLE <variant_stage_table> ("data" variant);
    
  4. Copy files into the variant table

     COPY INTO <variant_stage_table> FROM '<s3://path_to/stage/>' ...;
    
  5. Insert from the variant table directly into the target tables.
     INSERT INTO <stage_table>
       SELECT data:field_1,
              data:field_2,
              …
              data:field_n
         FROM <variant_stage_table>;
    
  6. Drop the temporary variant table.
     DROP TABLE <variant_stage_table>;
    

Loading Data with Primary Keys

If the Destination tables provide for primary keys, Hevo performs the following steps to deduplicate and load the data to the data warehouse.

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at, and if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Create a temporary variant staging table with the same schema as the target table. The ingested data is loaded to this table and all the steps for its deduplication are performed on this table.

    CREATE TEMP TABLE <variant_stage_table> ("data" variant);
    
  4. Create a temporary, flat staging table.
    CREATE TEMP TABLE <stage_table> LIKE <target_table>;
    
  5. Add the Hevo-reserved meta columns to the staging table. These are used to identify the latest Event in corner cases and special scenarios.

    ALTER TABLE <stage_table> ADD COLUMN __HE__MSG_SEQ_ID BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __HEVO__CONSUMPTION_ID BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __HEVO__MARKED_DELETED BOOLEAN DEFAULT NULL;
    
  6. Copy the data from variant staging tables to the flat staging table.

    INSERT INTO <stage_table>
      SELECT data:field_1,
             data:field_2,
             …
             data:field_n
         FROM <variant_stage_table>;
    
  7. Drop the variant staging table.
    DROP TABLE <variant_stage_table>;
    
  8. Delete duplicate data from the staging table on basis of __hevo__ingested_at, __he__msg_seq_id, and __hevo__consumption_id.

    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HEVO__INGESTED_AT)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HEVO__INGESTED_AT)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HE__MSG_SEQ_ID)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HE__MSG_SEQ_ID)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HEVO__CONSUMPTION_ID)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HEVO__CONSUMPTION_ID)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
  9. Check if duplicates are still present.

    SELECT COUNT(*) AS DUPLICATE_COUNT
      FROM (SELECT <PK1, PK2, ...PKn>
              FROM <stage_table>
             GROUP BY <PK1, PK2, ...PKn> HAVING COUNT(*) >= 2);
    
  10. Load the data into the target table.

    Note: Snowflake uses a single Merge statement to handles all three operations of deletions, insertions, and updates to the target table.

    MERGE INTO <target_table> AS T
      USING <stage_table> AS S
        ON T.PK1 = S.PK1 AND
           T.PK2 = S.PK2 AND
           ...
           T.PKn = S.PKn
          WHEN MATCHED AND
               S.__HEVO__INGESTED_AT >= T.__HEVO__INGESTED_AT AND
               S.__HEVO__MARKED_DELETED IS NOT NULL AND
               S.__HEVO__MARKED_DELETED = true
            THEN UPDATE SET T.__HEVO__MARKED_DELETED = true,
                            T.__HEVO__INGESTED_AT = S.__HEVO__INGESTED_AT,
                            T.__HEVO__LOADED_AT = <loaded_at>
          WHEN MATCHED AND
               S.__HEVO__INGESTED_AT >= T.__HEVO__INGESTED_AT AND
               (S.__HEVO__MARKED_DELETED IS NULL OR S.__HEVO__MARKED_DELETED = false)
            THEN UPDATE SET T.FIELD_1 = S.FIELD_1,
                            T.FIELD_2 = S.FIELD_2,
                            ...
                            T.FIELD_n = S.FIELD_n,
                            T.__HEVO__LOADED_AT = <loaded_at>
          WHEN NOT MATCHED AND
               (S.__HEVO__MARKED_DELETED IS NULL OR S.__HEVO__MARKED_DELETED = false)
            THEN INSERT (FIELD_1,
                         FIELD_2,
                         ...
                         FIELD_n,
                         __HEVO__LOADED_AT)
                 VALUES (FIELD_1,
                         FIELD_2,
                         ...
                         FIELD_n,
                         <loaded_at>);
    
  11. Drop the staging table.

    DROP TABLE <stage_table>;
    

This completes the loading of data to the Snowflake data warehouse.


See Also

Last updated on 23 Sep 2020