Loading Data to a Snowflake Data Warehouse

The Pipeline stages your data in Hevo’s S3 bucket, from where it is finally loaded to your Snowflake Destination.

This section describes the queries for loading data into a Snowflake data warehouse. It assumes that you are familiar with Hevo’s process for Loading Data to a Data Warehouse.

Note: The queries listed here have been simplified to facilitate understanding of the overall data deduplication and loading process.

Loading Data with Higher Precision and Scale

For a decimal, numeric data type, the total number of digits in a number is called precision, and the number of digits to the right of the decimal is called scale.

Snowflake supports a maximum precision and scale of 38 and 37, respectively. The precision does not affect storage because it is always fixed, whereas the scale can impact storage because the column has to be sized accordingly. Hence, while loading data to the Destination table, Hevo sets the precision of every number column to the maximum value of 38 and the scale of each column to the highest scale found in the Events. This is done across all Pipelines with Snowflake as their Destination.

Scenario 1: Source data with precision greater than 38.

Let us suppose you have a column in your Destination table with precision and scale of 38 and 5, respectively, and the Source data contains an Event with precision and scale of 43 and 5, respectively.

When Hevo tries to load this data, it results in an error since Snowflake does not support a precision greater than 38. You must manually reduce the precision to less than or equal to 38 in the Source to load the data to the Destination successfully.

Scenario 2: Source data with a scale higher than the current scale of the Destination column.

Let us suppose you have a column in your Destination table with precision and scale of 38 and 5, respectively, and the Source data contains an Event with precision and scale of 38 and 10, respectively.

When Hevo tries to load this data, it results in an error since Snowflake does not support resizing the scale from a lower value to a higher value. Read Alter Table. For such cases, from Release 1.82 onwards, Hevo tries to resize the scale of the column in your Destination table for all new and existing Pipelines with Snowflake as a Destination. You need to contact Hevo Support to enable this feature.

Note: Resizing the scale may incur extra costs at the Destination level due to its effect on storage.

After enabling the feature, Hevo tries to resize the scale using the following steps:

  1. Add a temporary column in the Destination table:

    ALTER TABLE <table_name> ADD COLUMN <temp_column_name> decimal(38,10);
    
  2. Copy the data from the original column to the temporary column:

    UPDATE <table_name> SET <temp_column_name> = <column_name>;
    
  3. Drop the original column:

    ALTER TABLE <table_name> DROP <column_name>;
    
  4. Rename the temporary column to the original column:

    ALTER TABLE <table_name> RENAME COLUMN <temp_column_name> TO <column_name>;
    

Loading Data without Primary Keys

If primary keys are not present in the Destination tables, Hevo directly appends the data into the target tables in the Destination warehouse using the following steps:

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at and, if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Create a temporary variant table:

     CREATE TEMP TABLE <variant_stage_table> ("data" variant);
    
  4. Copy files into the variant table:

     COPY INTO <variant_stage_table> FROM '<s3://path_to/stage/>' ...;
    
  5. Insert from the variant table directly into the target tables:
     INSERT INTO <stage_table>
       SELECT data:field_1,
              data:field_2,
              …
              data:field_n
         FROM <variant_stage_table>;
    
  6. Drop the temporary variant table:
     DROP TABLE <variant_stage_table>;
    

Loading Data with Primary Keys

If the Destination tables provide for primary keys, Hevo performs the following steps to deduplicate and load the data to the data warehouse.

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at, and if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Create a temporary variant staging table with the same schema as the target table. The ingested data is loaded to this table and all the steps for its deduplication are performed on this table:

    CREATE TEMP TABLE <variant_stage_table> ("data" variant);
    
  4. Create a temporary, flat staging table:
    CREATE TEMP TABLE <stage_table> LIKE <target_table>;
    
  5. Add the Hevo-reserved meta columns to the staging table. These are used to identify the latest Event in corner cases and special scenarios:

    ALTER TABLE <stage_table> ADD COLUMN __HE__MSG_SEQ_ID BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __HEVO__CONSUMPTION_ID BIGINT DEFAULT 0;
    
    ALTER TABLE <stage_table> ADD COLUMN __HEVO__MARKED_DELETED BOOLEAN DEFAULT NULL;
    
  6. Copy the data from variant staging tables to the flat staging table:

    INSERT INTO <stage_table>
      SELECT data:field_1,
             data:field_2,
             …
             data:field_n
         FROM <variant_stage_table>;
    
  7. Drop the variant staging table:
    DROP TABLE <variant_stage_table>;
    
  8. Delete duplicate data from the staging table on basis of __hevo__ingested_at, __he__msg_seq_id, and __hevo__consumption_id:

    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HEVO__INGESTED_AT)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HEVO__INGESTED_AT)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HE__MSG_SEQ_ID)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HE__MSG_SEQ_ID)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
    DELETE FROM <stage_table>
     WHERE (<PK1, PK2, ...PKn>, __HEVO__CONSUMPTION_ID)
             NOT IN (SELECT <PK1, PK2, ...PKn>, max(__HEVO__CONSUMPTION_ID)
                       FROM <stage_table>
                     GROUP BY <PK1, PK2, ...PKn>);
    
  9. Check if duplicates are still present:

    SELECT COUNT(*) AS DUPLICATE_COUNT
      FROM (SELECT <PK1, PK2, ...PKn>
              FROM <stage_table>
             GROUP BY <PK1, PK2, ...PKn> HAVING COUNT(*) >= 2);
    
  10. Load the data into the target table:

    Note: Snowflake uses a single Merge statement to handles all three operations of deletions, insertions, and updates to the target table.

    MERGE INTO <target_table> AS T
      USING <stage_table> AS S
        ON T.PK1 = S.PK1 AND
           T.PK2 = S.PK2 AND
           ...
           T.PKn = S.PKn
          WHEN MATCHED AND
               S.__HEVO__INGESTED_AT >= T.__HEVO__INGESTED_AT AND
               S.__HEVO__MARKED_DELETED IS NOT NULL AND
               S.__HEVO__MARKED_DELETED = true
            THEN UPDATE SET T.__HEVO__MARKED_DELETED = true,
                            T.__HEVO__INGESTED_AT = S.__HEVO__INGESTED_AT,
                            T.__HEVO__LOADED_AT = <loaded_at>
          WHEN MATCHED AND
               S.__HEVO__INGESTED_AT >= T.__HEVO__INGESTED_AT AND
               (S.__HEVO__MARKED_DELETED IS NULL OR S.__HEVO__MARKED_DELETED = false)
            THEN UPDATE SET T.FIELD_1 = S.FIELD_1,
                            T.FIELD_2 = S.FIELD_2,
                            ...
                            T.FIELD_n = S.FIELD_n,
                            T.__HEVO__LOADED_AT = <loaded_at>
          WHEN NOT MATCHED AND
               (S.__HEVO__MARKED_DELETED IS NULL OR S.__HEVO__MARKED_DELETED = false)
            THEN INSERT (FIELD_1,
                         FIELD_2,
                         ...
                         FIELD_n,
                         __HEVO__LOADED_AT)
                 VALUES (FIELD_1,
                         FIELD_2,
                         ...
                         FIELD_n,
                         <loaded_at>);
    
  11. Drop the staging table:

    DROP TABLE <stage_table>;
    

This completes the loading of data to the Snowflake data warehouse.



See Also


Revision History

Refer to the following table for the list of key updates made to this page:

Date Release Description of Change
Feb-21-2022 1.82 Added section, Loading Data with Higher Precision and Scale.
Feb-22-2021 NA Updated the page overview to state that the Pipeline stages the ingested data in Hevo’s S3 bucket, from where it is finally loaded to the Destination.
Last updated on 09 Mar 2022