Loading Data to a Google BigQuery Data Warehouse
On This Page
The Pipeline loads the Source data every five minutes by default to your Google Cloud Storage (GCS) buckets, from where it is loaded to the BigQuery Destination.
This page describes the queries that Hevo runs in your BigQuery data warehouse for loading data to non-partitioned and partitioned tables. Hevo uses primary keys, if any, for deduplicating the data before loading it to the target tables. In the case of tables partitioned by a time-unit column or an integer range, Hevo optimizes its queries using the partitioning column. Hevo runs the same queries for loading data to non-partitioned tables and ingestion time-based partitioned tables. In the absence of primary keys, Hevo appends data to the target tables. Read Loading Data to a Data Warehouse for a comprehensive understanding of Hevo’s data loading process.
Note: The queries listed here have been simplified to facilitate understanding the overall data deduplication and loading process.
Loading Data without Primary Keys
If primary keys are not present in the Destination tables, Hevo directly appends the data into the target tables in the Destination data warehouse using the following steps:
-
Apply the
__hevo_ingested_at
timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table. -
Ensure
__hevo_ingested_at
and, if required,__hevo__loaded_at
columns are present in the Destination table; else, create them. -
Copy files into the target table:
load <gs://path_to/file> to <dataset>.<target_table>
Loading Data with Primary Keys
If the Destination tables provide for primary keys, Hevo performs the following steps to deduplicate and load the data to the data warehouse.
-
Apply the
__hevo_ingested_at
timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table. -
Ensure
__hevo_ingested_at
, and if required,__hevo__loaded_at
columns are present in the Destination table; else, create them. -
Create a temporary staging table with the same schema as the Destination table and the additional metadata columns. The ingested data is loaded to this table, and all the steps for deduplicating it are performed on this table. Read Creating and using tables in Google BigQuery.
-
Copy files to the staging table:
load <gs://path_to/file> to <dataset>.<stage_table>
-
Check if the staging table contains any entries for deleted Events:
SELECT COUNT(*) AS deleted_count FROM <dataset>.<stage_table> WHERE __hevo__marked_deleted = TRUE;
-
If the value of deleted_count obtained from Step 5 is greater than 0, perform this step. Else, skip to Step 7. In this step:
-
Create a temporary table (for example, delete_table) with the most recent deleted Events from the staging table:
CREATE TABLE <dataset>.<delete_table> AS SELECT row.<PK1>, row.<PK2>, ... row.<PKn>, row.__hevo__ingested_at, row.__he__msg_seq_id, row.__hevo__consumption_id FROM ( SELECT ARRAY_AGG (T ORDER BY __hevo__ingested_at DESC, __he__msg_seq_id DESC, __hevo__consumption_id DESC LIMIT 1) [OFFSET (0)] AS ROW FROM <dataset>.<stage_table> AS T WHERE __hevo__marked_deleted = TRUE GROUP BY <PK1, PK2,...,PKn>);
-
Remove the entries for all deleted Events from the staging table:
DELETE FROM <dataset>.<stage_table> WHERE __hevo__marked_deleted = TRUE;
-
-
Delete duplicate data from the staging table based on
__hevo__ingested_at
,__he__msg_seq_id
, and__hevo__consumption_id
:DELETE FROM <dataset>.<stage_table> WHERE STRUCT(<PK1, PK2, ..., PKn>, __hevo__ingested_at) NOT IN (SELECT AS STRUCT <PK1, PK2, ..., PKn>, MAX(__hevo__ingested_at) FROM <dataset>.<stage_table> GROUP BY <PK1, PK2, ..., PKn>); DELETE FROM <dataset>.<stage_table> WHERE STRUCT(<PK1, PK2, ..., PKn>, __hevo__ingested_at) NOT IN (SELECT AS STRUCT <PK1, PK2, ..., PKn>, MAX(__he__msg_seq_id) FROM <dataset>.<stage_table> GROUP BY <PK1, PK2, ..., PKn>); DELETE FROM <dataset>.<stage_table> WHERE STRUCT(<PK1, PK2, ..., PKn>, __hevo__ingested_at) NOT IN (SELECT AS STRUCT <PK1, PK2, ..., PKn>, MAX(__hevo__consumption_id) FROM <dataset>.<stage_table> GROUP BY <PK1, PK2, ..., PKn>);
-
If the delete_table was created in Step 6, perform this step. Else, skip to step 7. In this step:
-
Load data from the delete_table into the staging table.
Note: Google BigQuery uses a single Merge statement to handle the deletes, inserts, and updates to the staging table.
MERGE INTO <dataset>.<stage_table> S USING <dataset>.<delete_table> D ON S.PK1 = D.PK1 AND S.PK2 = D.PK2 AND ... S.PKn = D.PKn WHEN MATCHED AND ((D.__hevo__ingested_at > S.__hevo__ingested_at) OR (D.__hevo__ingested_at = S.__hevo__ingested_at AND D.__he__msg_seq_id > S.__he__msg_seq_id) OR (D.__hevo__ingested_at = S.__hevo__ingested_at AND D.__he__msg_seq_id = S.__he__msg_seq_id AND D.__hevo__consumption_id > S.__hevo__consumption_id)) OR S.__hevo__ingested_at IS NULL THEN UPDATE SET S.__hevo__is_merged = TRUE, S.__hevo__marked_deleted = TRUE, S.__hevo__ingested_at = D.__hevo__ingested_at WHEN NOT MATCHED THEN INSERT (<PK1>, <PK2>, ... <PKn>, __hevo__consumption_id, __he__msg_seq_id, __hevo__ingested_at, __hevo__marked_deleted) VALUES (D.<PK1>, D.<PK2>, ... D.<PKn>, D.__hevo__consumption_id, D.__he__msg_seq_id, D.__hevo__ingested_at, D.__hevo__marked_deleted);
-
Drop the temporary delete_table.
-
-
Load the data from the staging table into the Destination table. Hevo runs one of the following queries based on the target table type.
Note: Google BigQuery uses a single Merge statement to handle the deletes, inserts, and updates to the Destination table.
Click any link in the list below to view the query that Hevo runs for that target table type:
-
Drop the staging table.
This completes the loading of data to the Google BigQuery data warehouse.
Merge queries for loading data to the Destination table
Hevo runs these queries in your BigQuery data warehouse to load data from the staging table to the target table.
Non-partitioned tables and tables partitioned by ingestion time
MERGE INTO
<dataset>.<destination_table> AS T
USING
<dataset>.<stage_table> AS S
ON
T.PK1 = S.PK1 AND
T.PK2 = S.PK2 AND
...
T.PKn = S.PKn
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged IS NULL)
THEN UPDATE SET
T.__hevo__marked_deleted = TRUE,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__loaded_at = <loaded_at>
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(((S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged = TRUE)) OR
(S.__hevo__marked_deleted IS NULL OR S.__hevo__marked_deleted = FALSE))
THEN UPDATE SET
T.agent_name = S.agent_name,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__marked_deleted = S.__hevo__marked_deleted,
T.__hevo__loaded_at = <loaded_at>
WHEN NOT MATCHED
THEN INSERT (<PK1>,
<PK2>,
...
<PKn>,
field_1,
field_2,
...
field_n,
__hevo__ingested_at,
__hevo__marked_deleted,
__hevo__loaded_at)
VALUES (S.<PK1>,
S.<PK2>,
...
S.<PKn>,
S.field_1,
S.field_2,
...
S.field_n,
S.__hevo__ingested_at,
S.__hevo__marked_deleted,
<loaded_at>);
Tables partitioned by a date column
MERGE INTO
<dataset>.<destination_table> AS T
USING
<dataset>.<stage_table> AS S
ON
T.PK1 = S.PK1 AND
T.PK2 = S.PK2 AND
...
T.PKn = S.PKn AND
((T.PRK >= "<date_value1>" AND T.PRK <= "<date_value2>") OR T.PRK IS NULL)
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged IS NULL)
THEN UPDATE SET
T.__hevo__marked_deleted = TRUE,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__loaded_at = <loaded_at>
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(((S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged = TRUE)) OR
(S.__hevo__marked_deleted IS NULL OR S.__hevo__marked_deleted = FALSE))
THEN UPDATE SET
T.agent_name = S.agent_name,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__marked_deleted = S.__hevo__marked_deleted,
T.__hevo__loaded_at = <loaded_at>
WHEN NOT MATCHED
THEN INSERT (<PK1>,
<PK2>,
...
<PKn>,
field_1,
field_2,
...
field_n,
__hevo__ingested_at,
__hevo__marked_deleted,
__hevo__loaded_at)
VALUES (S.<PK1>,
S.<PK2>,
...
S.<PKn>,
S.field_1,
S.field_2,
...
S.field_n,
S.__hevo__ingested_at,
S.__hevo__marked_deleted,
<loaded_at>);
Tables partitioned by a datetime column
MERGE INTO
<dataset>.<destination_table> AS T
USING
<dataset>.<stage_table> AS S
ON
T.PK1 = S.PK1 AND
T.PK2 = S.PK2 AND
...
T.PKn = S.PKn AND
((T.PRK >= "<datetime_value1>" AND T.PRK <= "<datetime_value2>") OR
T.PRK IS NULL)
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged IS NULL)
THEN UPDATE SET
T.__hevo__marked_deleted = TRUE,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__loaded_at = <loaded_at>
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(((S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged = TRUE)) OR
(S.__hevo__marked_deleted IS NULL OR S.__hevo__marked_deleted = FALSE))
THEN UPDATE SET
T.agent_name = S.agent_name,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__marked_deleted = S.__hevo__marked_deleted,
T.__hevo__loaded_at = <loaded_at>
WHEN NOT MATCHED
THEN INSERT (<PK1>,
<PK2>,
...
<PKn>,
field_1,
field_2,
...
field_n,
__hevo__ingested_at,
__hevo__marked_deleted,
__hevo__loaded_at)
VALUES (S.<PK1>,
S.<PK2>,
...
S.<PKn>,
S.field_1,
S.field_2,
...
S.field_n,
S.__hevo__ingested_at,
S.__hevo__marked_deleted,
<loaded_at>);
Tables partitioned by a timestamp column
MERGE INTO
<dataset>.<destination_table> AS T
USING
<dataset>.<stage_table> AS S
ON
T.PK1 = S.PK1 AND
T.PK2 = S.PK2 AND
...
T.PKn = S.PKn AND
((T.PRK >= TIMESTAMP_SECONDS(<timestamp_value1>) AND T.PRK <= TIMESTAMP_SECONDS(<timestamp_value1>)) OR T.PRK IS NULL)
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged IS NULL)
THEN UPDATE SET
T.__hevo__marked_deleted = TRUE,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__loaded_at = <loaded_at>
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(((S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged = TRUE)) OR
(S.__hevo__marked_deleted IS NULL OR S.__hevo__marked_deleted = FALSE))
THEN UPDATE SET
T.agent_name = S.agent_name,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__marked_deleted = S.__hevo__marked_deleted,
T.__hevo__loaded_at = <loaded_at>
WHEN NOT MATCHED
THEN INSERT (<PK1>,
<PK2>,
...
<PKn>,
field_1,
field_2,
...
field_n,
__hevo__ingested_at,
__hevo__marked_deleted,
__hevo__loaded_at)
VALUES (S.<PK1>,
S.<PK2>,
...
S.<PKn>,
S.field_1,
S.field_2,
...
S.field_n,
S.__hevo__ingested_at,
S.__hevo__marked_deleted,
<loaded_at>);
Tables partitioned by an integer column
MERGE INTO
<dataset>.<destination_table> AS T
USING
<dataset>.<stage_table> AS S
ON
T.PK1 = S.PK1 AND
T.PK2 = S.PK2 AND
...
T.PKn = S.PKn AND
((T.PRK >= <integer_value1> AND T.PRK <= <integer_value2>) OR
T.PRK IS NULL)
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged IS NULL)
THEN UPDATE SET
T.__hevo__marked_deleted = TRUE,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__loaded_at = <loaded_at>
WHEN MATCHED AND
(S.__hevo__ingested_at >= T.__hevo__ingested_at OR T.__hevo__ingested_at IS NULL) AND
(((S.__hevo__marked_deleted IS NOT NULL AND S.__hevo__marked_deleted = TRUE) AND
(S.__hevo__is_merged = TRUE)) OR
(S.__hevo__marked_deleted IS NULL OR S.__hevo__marked_deleted = FALSE))
THEN UPDATE SET
T.agent_name = S.agent_name,
T.__hevo__ingested_at = S.__hevo__ingested_at,
T.__hevo__marked_deleted = S.__hevo__marked_deleted,
T.__hevo__loaded_at = <loaded_at>
WHEN NOT MATCHED
THEN INSERT (<PK1>,
<PK2>,
...
<PKn>,
field_1,
field_2,
...
field_n,
__hevo__ingested_at,
__hevo__marked_deleted,
__hevo__loaded_at)
VALUES (S.<PK1>,
S.<PK2>,
...
S.<PKn>,
S.field_1,
S.field_2,
...
S.field_n,
S.__hevo__ingested_at,
S.__hevo__marked_deleted,
<loaded_at>);
Revision History
Refer to the following table for the list of key updates made to this page:
Date | Release | Description of Change |
---|---|---|
Aug-28-2023 | NA | Added subsection, Merge queries for loading data to the Destination table to document the queries run for loading data to non-partitioned and partitioned tables. |
Jan-24-2022 | 1.80 | Updated section, Loading Data with Primary Keys to document the changed deduplication process. |