Loading Data to a Google BigQuery Data Warehouse

The Pipeline loads the Source data every five minutes by default to your Google Cloud Storage (GCS) buckets, from where it is loaded to the BigQuery Destination.

This section describes the queries for loading data into a Google BigQuery data warehouse. It assumes that you are familiar with Hevo’s process for Loading Data to a Data Warehouse.

Note: The queries listed here have been simplified to facilitate understanding of the overall data deduplication and loading process.

Loading Data without Primary Keys

If primary keys are not present in the Destination tables, Hevo directly appends the data into the target tables in the Destination data warehouse using the following steps:

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at and, if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Copy files into the target table:

   load <gs://path_to/file> to <dataset>.<target_table>

Loading Data with Primary Keys

If the Destination tables provide for primary keys, Hevo performs the following steps to deduplicate and load the data to the data warehouse.

  1. Apply the __hevo_ingested_at timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also.

  2. Ensure __hevo_ingested_at, and if required, __hevo__loaded_at columns are present in the Destination table, else, create them.

  3. Create a temporary staging table with the same schema as the Destination table and the additional metadata columns. The ingested data is loaded to this table and all the steps for its deduplication are performed on this table. Read Creating and using tables in Google BigQuery.

  4. Copy files to the staging table:

     load <gs://path_to/file> to <dataset>.<stage_table>
    
  5. Check if the staging table contains any entries for deleted Events:

      SELECT COUNT(*) AS deleted_count
      FROM
        <dataset>.<stage_table>
      WHERE
        __hevo__marked_deleted = TRUE;
    
  6. If the value of deleted_count obtained from Step 5 is greater than 0, then perform this step. Else, skip to Step 7. In this step:

    1. Create a temporary table (for example, delete_table) with the most recent deleted Events from the staging table:

      CREATE TABLE
        <dataset>.<delete_table> AS
      SELECT
        row.<PK1>,
        row.<PK2>,
        ...
        row.<PKn>,
        row.__hevo__ingested_at,
        row.__he__msg_seq_id,
        row.__hevo__consumption_id
        FROM (
          SELECT
          ARRAY_AGG (T
          ORDER BY
            __hevo__ingested_at DESC, __he__msg_seq_id DESC, __hevo__consumption_id DESC
            LIMIT 1)
            [OFFSET (0)] AS ROW
        FROM
          <dataset>.<stage_table> AS T
        WHERE
          __hevo__marked_deleted = TRUE
        GROUP BY <PK1, PK2,...,PKn>);  
      
    2. Remove the entries for all deleted Events from the staging table:

      DELETE
      FROM
        <dataset>.<stage_table>
      WHERE
        __hevo__marked_deleted = TRUE;
      
  7. Delete duplicate data from the staging table on the basis of __hevo__ingested_at, __he__msg_seq_id, and __hevo__consumption_id:

    DELETE
    FROM
      <dataset>.<stage_table>
    WHERE
      STRUCT(<PK1, PK2, ..., PKn>, __hevo__ingested_at)
        NOT IN (SELECT AS STRUCT <PK1, PK2, ..., PKn>, MAX(__hevo__ingested_at)
                FROM
                  <dataset>.<stage_table>
                GROUP BY <PK1, PK2, ..., PKn>);
    
    DELETE
    FROM
      <dataset>.<stage_table>
    WHERE
      STRUCT(<PK1, PK2, ..., PKn>, __hevo__ingested_at)
        NOT IN (SELECT AS STRUCT <PK1, PK2, ..., PKn>, MAX(__he__msg_seq_id)
                FROM
                  <dataset>.<stage_table>
                GROUP BY <PK1, PK2, ..., PKn>);
    
    DELETE
    FROM
      <dataset>.<stage_table>
    WHERE
      STRUCT(<PK1, PK2, ..., PKn>, __hevo__ingested_at)
        NOT IN (SELECT AS STRUCT <PK1, PK2, ..., PKn>, MAX(__hevo__consumption_id)
                FROM
                  <dataset>.<stage_table>
                GROUP BY <PK1, PK2, ..., PKn>);
    
  8. If in Step 6, the delete_table was created, then:

    1. Load data from the delete_table into the staging table.

      Note: Google BigQuery uses a single Merge statement to handle all three operations of deletion, insertion, and update to the staging table.

      MERGE INTO
        <dataset>.<stage_table> S
      USING
        <dataset>.<delete_table> D
      ON
        S.PK1 = D.PK1 AND
        S.PK2 = D.PK2 AND
        ...
        S.PKn = D.PKn
      WHEN MATCHED AND
          ((D.__hevo__ingested_at > S.__hevo__ingested_at) OR
           (D.__hevo__ingested_at = S.__hevo__ingested_at AND
            D.__he__msg_seq_id > S.__he__msg_seq_id) OR
           (D.__hevo__ingested_at = S.__hevo__ingested_at AND
            D.__he__msg_seq_id = S.__he__msg_seq_id AND
            D.__hevo__consumption_id > S.__hevo__consumption_id)) OR
          S.__hevo__ingested_at IS NULL
      THEN UPDATE SET S.__hevo__is_merged = TRUE,
                      S.__hevo__marked_deleted = TRUE,
                      S.__hevo__ingested_at = D.__hevo__ingested_at
      WHEN NOT MATCHED
        THEN INSERT (<PK1>,
                     <PK2>,
                      ...
                     <PKn>,
                     __hevo__consumption_id,
                     __he__msg_seq_id,
                     __hevo__ingested_at,
                     __hevo__marked_deleted)
             VALUES (D.<PK1>,
                     D.<PK2>,
                     ...
                     D.<PKn>,
                     D.__hevo__consumption_id,
                     D.__he__msg_seq_id,
                     D.__hevo__ingested_at,
                     D.__hevo__marked_deleted);
      
    2. Drop the temporary delete_table.

  9. Load the data from the staging table into the Destination table:

    Note: Google BigQuery uses a single Merge statement to handle all three operations of deletion, insertion, and update to the Destination table.

    MERGE INTO
      <dataset>.<destination_table> AS T
    USING
      <dataset>.<stage_table> AS S
    ON
      T.PK1 = S.PK1 AND
      T.PK2 = S.PK2 AND
      ...
      T.PKn = S.PKn
    WHEN MATCHED AND
      (S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
      (S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
      (S.__hevo__is_merged IS NULL)
      THEN UPDATE SET
        T.__hevo__marked_deleted = TRUE,
        T.__hevo__ingested_at = S.__hevo__ingested_at,
        T.__hevo__loaded_at = <loaded_at>
    WHEN MATCHED AND
      (S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
      (((S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
      (S.__hevo__is_merged = TRUE)) OR
      (S.__hevo__marked_deleted IS NULL OR S.__hevo__marked_deleted = FALSE))
      THEN UPDATE SET
        T.agent_name = S.agent_name,
        T.__hevo__ingested_at = S.__hevo__ingested_at,
        T.__hevo__marked_deleted = S.__hevo__marked_deleted,
        T.__hevo__loaded_at = <loaded_at>
    WHEN NOT MATCHED
      THEN INSERT (<PK1>,
                   <PK2>,
                    ...
                   <PKn>,
                   field_1,
                   field_2,
                    ...
                   field_n,
                   __hevo__ingested_at,
                   __hevo__marked_deleted,
                   __hevo__loaded_at)
           VALUES (S.<PK1>,
                   S.<PK2>,
                    ...
                   S.<PKn>,
                   S.field_1,
                   S.field_2,
                    ...
                   S.field_n,
                   S.__hevo__ingested_at,
                   S.__hevo__marked_deleted,
                   <loaded_at>);
    
  10. Drop the staging table.

This completes the loading of data to the Google BigQuery data warehouse.



See Also


Revision History

Refer to the following table for the list of key updates made to this page:

Date Release Description of Change
Jan-24-2022 1.80 Updated section, Loading Data with Primary Keys to document the changed deduplication process.
Last updated on 07 Mar 2022