Loading Data to an Amazon Redshift Data Warehouse
On This Page
The Pipeline stages your data in Hevo’s S3 bucket, from where it is finally loaded to your Amazon Redshift Destination.
This section describes the queries for loading data into an Amazon Redshift data warehouse. It assumes that you are familiar with Hevo’s process for Loading Data to a Data Warehouse.
Note: The queries listed here have been simplified to facilitate understanding of the overall data deduplication and loading process.
Loading Data without Primary Keys
If primary keys are not present in the Destination tables, Hevo directly appends the data into the target tables in the Destination warehouse using the following steps:
-
Apply the
__hevo_ingested_at
timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also. -
Ensure
__hevo_ingested_at
and, if required,__hevo__loaded_at
columns are present in the Destination table, else, create them. -
Copy the data directly into the target table:
COPY <target_table> FROM '<s3://path/to/manifest-file>'...;
Loading Data with Primary Keys
If the Destination tables provide for primary keys, Hevo performs the following steps to deduplicate and load the data to the data warehouse.
-
Apply the
__hevo_ingested_at
timestamp to each Event at the time of ingestion from Source. This column is retained in the Destination table also. -
Ensure
__hevo_ingested_at
, and if required,__hevo__loaded_at
columns are present in the Destination table, else, create them. -
Create a temporary staging table in the Destination with the same schema as the Destination table. The ingested data is loaded to this table and all the steps for its deduplication are performed on this table.
CREATE TEMP TABLE <stage_table> (LIKE <target_table>);
-
Add the Hevo-reserved meta columns to the staging table:
ALTER TABLE <stage_table> ADD COLUMN __he__msg_seq_id BIGINT DEFAULT 0; ALTER TABLE <stage_table> ADD COLUMN __hevo__consumption_id BIGINT DEFAULT 0; ALTER TABLE <stage_table> ADD COLUMN __hevo__marked_deleted BOOLEAN DEFAULT NULL;
-
Copy the ingested data along with the new columns into the staging table:
COPY <stage_table> FROM '<s3://path/to/manifest-file>'...;
-
Ensure
__hevo__marked_deleted
is present if data is streamed from a Source that captures deleted Events. For example, MySQL BinLog. -
Identify and remove duplicates:
-
Get the count of duplicate Events:
SELECT COUNT(*) AS duplicate_count FROM (SELECT <PK1, PK2, ...PKn> FROM <stage_table> GROUP BY <PK1, PK2, ...PKn> HAVING COUNT(*) >= 2);
-
If duplicate data exists, delete the duplicates from the staging table on basis of
__hevo__ingested_at
,__he__msg_seq_id
, and__hevo__consumption_id
:DELETE FROM <stage_table> WHERE (<PK1, PK2, ...PKn>, __hevo__ingested_at) NOT IN (SELECT <PK1, PK2, ...PKn>, MAX(__hevo__ingested_at) FROM <stage_table> GROUP BY <PK1, PK2, ...PKn>); DELETE FROM <stage_table> WHERE (<PK1, PK2, ...PKn>, __he__msg_seq_id) NOT IN (SELECT <PK1, PK2, ...PKn>, MAX(__he__msg_seq_id) FROM <stage_table> GROUP BY <PK1, PK2, ...PKn>); DELETE FROM <stage_table> WHERE (<PK1, PK2, ...PKn>, __hevo__consumption_id) NOT IN (SELECT <PK1, PK2, ...PKn>, MAX(__hevo__consumption_id) FROM <stage_table> GROUP BY <PK1, PK2, ...PKn>);
-
-
Remove stale data from the staging table (where
__hevo__ingested_at
timestamp is earlier than that of the Destination Event):DELETE FROM <stage_table> AS S USING <target_table> AS T WHERE T.PK1 = S.PK1 AND T.PK2 = S.PK2 ... AND T.PKn = S.PKn AND T.__hevo__ingested_at IS NOT NULL AND T.__hevo__ingested_at > S.__hevo__ingested_at;
-
Clean-up the data in the staging table for loading, by removing the now redundant Hevo-reserved columns:
ALTER TABLE $stage_table$ DROP COLUMN __he__msg_seq_id BIGINT DEFAULT 0 ALTER TABLE $stage_table$ DROP COLUMN __hevo__consumption_id BIGINT DEFAULT 0
-
Update, delete, and insert the eligible Events using the following queries.
Note: After deduplication of data, if the latest record is a delete record, the
__hevo__marked_deleted
column is set to True for it in the Destination table.-
Check if there are deleted columns:
SELECT COUNT(*) AS deleted_count FROM <stage_table> WHERE __hevo__marked_deleted = 1;
-
Mark columns that are deleted, in the target table:
UPDATE <target_table> AS T SET T.__hevo__marked_deleted = 1, T.__hevo__ingested_at = S.__hevo__ingested_at, T.__hevo__loaded_at = <loaded_at> FROM <stage_table> AS S WHERE S.__hevo__marked_deleted = 1 AND S.PK1 = T.PK1 AND S.PK2 = T.PK2 ... AND S.PKn = T.PKn;
-
Remove deleted rows from the staging table:
DELETE FROM <stage_table> WHERE __hevo__marked_deleted = 1;
-
Delete records from the target table that are also present in the staging table:
DELETE FROM <target_table> AS T USING <stage_table> AS S WHERE S.PK1 = T.PK1 AND S.PK2 = T.PK2 ... AND S.PKn = T.PKn;
-
Insert rows in the target table:
INSERT INTO <target_table> (field_1, field_2, ... field_n, __hevo__loaded_at) (SELECT field_1, field_2, ... field_n, <loaded_at> FROM <stage_table>);
-
-
Drop the staging table:
DROP TABLE <stage_table>;
This completes the loading of data to the Amazon Redshift data warehouse.
Revision History
Refer to the following table for the list of key updates made to this page:
Date | Release | Description of Change |
---|---|---|
Feb-22-2021 | NA | Updated the page overview to state that the Pipeline stages the ingested data in Hevo’s S3 bucket, from where it is finally loaded to the Destination. |