Introduction
Frequently I’m requested to send data in two specific situations:
- When new data arrives.
- When the data changes in the raw tables.
The stakeholders only gave us the raw tables and the specific layout. ¿How do we do that?
The problem
In order to be more clear, we imagine a dummy scenario where we need to send data related to products.
DATA SOURCES
In this area, we need to identify the raw tables and the relationship between them
- CATEGORY_RAW
- PRICE_RAW
the tables are related by the sku column.
CREATE TABLES
CREATE TABLE BD.SALES.CATEGORY_RAW (
id INT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
sku VARCHAR(255) NOT NULL
);
CREATE TABLE BD.SALES.PRICE_RAW (
id INT PRIMARY KEY,
sku VARCHAR(255) NOT NULL,
amount VARCHAR(255) NOT NULL
);
POPULATE TABLES
INSERT INTO BD.SALES.CATEGORY_RAW(id, name, sku)
VALUES
(1,'Children Bicycles', 'CHI_BIC_25'),
(2,'Comfort Bicycles', 'COM_BIC_33'),
(3,'Cruisers Bicycles', 'CRU_BIC_25'),
(4,'Cyclocross Bicycles','CYC_BIC_25');
INSERT INTO BD.SALES.PRICE_RAW(id, sku, amount)
VALUES
(1,'CHI_BIC_25', 15000),
(2,'COM_BIC_33', 25000),
(3,'CRU_BIC_25', 13000),
(4,'CYC_BIC_25', 10000);
STAGING AREA
This area is where we join the raw sources, which normally it’s a big table.
CREATE OR REPLACE TRANSIENT TABLE BD.SALES.REPORT_STG COMMENT = 'STG table with raw data' AS (
SELECT
CR.ID
,CR.NAME
,PR.AMOUNT
FROM
BD.SALES.CATEGORY_RAW CR
INNER
JOIN BD.SALES.PRICE_RAW PR
ON
CR.SKU = PR.SKU
);
HISTORICAL TABLE
We need to define the layout of the final table because in this table we will be inserting the result of the merge strategy.
CREATE TABLE IF NOT EXISTS BD.SALES.REPORT_TARGET COMMENT = 'Table with product data to send' (
ACTION TEXT COMMENT 'Action to do with this register CREATE, UPDATE'
,CATEGORY_ID INT PRIMARY KEY COMMENT 'Unique identifier'
,CATEGORY_NAME VARCHAR(255) NOT NULL COMMENT 'Category name'
,AMOUNT DECIMAL(10,2) COMMENT 'Price'
,CREATED_AT DATE COMMENT 'Date when this register was inserted or updated'
,STATE TEXT COMMENT 'Register state, SEND, SENDED'
);
POPULATE
In this moment is when we apply the merge strategy
MERGE INTO BD.SALES.REPORT_TARGET T
USING BD.SALES.REPORT_STG S
ON
S.ID = T.CATEGORY_ID
WHEN NOT MATCHED
THEN INSERT
(
T.ACTION
,T.CATEGORY_ID
,T.CATEGORY_NAME
,T.AMOUNT
,T.STATE
,T.CREATED_AT
)
VALUES
(
'CREATE'
,S.ID
,S.NAME
,S.AMOUNT
,'SEND'
,CURRENT_DATE()
)
WHEN MATCHED AND T.STATE = 'SEND' THEN UPDATE SET
T.STATE = 'SENDED'
WHEN MATCHED AND
(
T.CATEGORY_NAME <> S.NAME
OR
T.AMOUNT <> S.AMOUNT
)
THEN
UPDATE SET
ACTION = 'UPDATE'
,T.CATEGORY_NAME = S.NAME
,T.AMOUNT = S.AMOUNT
,T.STATE = 'SEND'
,T.CREATED_AT = CURRENT_DATE()
;
SEND DATA
Normally I work with Snowflake DWH and in this particular example we send the data in json lines to S3, to achived that I create a stage in snowflake.
COPY INTO @STAGE_NAME/products/products_lists FROM (
SELECT
'{' || '"action"' || ':' || '"' || "ACTION" || '"' || ',' ||
'"id"' || ':' || '"' || "CATEGORY_ID" || '"' || ',' ||
'"name"' || ':' || '"' || "CATEGORY_NAME" || '"' || ',' ||
'"amount"' || ':' || '"' || "AMOUNT" || '"' ||
'}' AS "category_list"
FROM
BD.SALES.REPORT_TARGET
WHERE
1=1
AND STATE = 'SEND'
)