Introduction#
Frequently I’m requested to send data in two specific situations:
- When new data arrives.
- When the data changes in the raw tables.
The stakeholders only gave us the raw tables and the specific layout. ¿How do we do that?
The problem#
In order to be more clear, we imagine a dummy scenario where we need to send data related to products.
DATA SOURCES#
In this area, we need to identify the raw tables and the relationship between them
the tables are related by the sku column.
CREATE TABLES#
1
2
3
4
5
6
7
8
9
10
11
| CREATE TABLE BD.SALES.CATEGORY_RAW (
id INT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
sku VARCHAR(255) NOT NULL
);
CREATE TABLE BD.SALES.PRICE_RAW (
id INT PRIMARY KEY,
sku VARCHAR(255) NOT NULL,
amount VARCHAR(255) NOT NULL
);
|
POPULATE TABLES#
1
2
3
4
5
6
7
8
9
10
11
12
13
| INSERT INTO BD.SALES.CATEGORY_RAW(id, name, sku)
VALUES
(1,'Children Bicycles', 'CHI_BIC_25'),
(2,'Comfort Bicycles', 'COM_BIC_33'),
(3,'Cruisers Bicycles', 'CRU_BIC_25'),
(4,'Cyclocross Bicycles','CYC_BIC_25');
INSERT INTO BD.SALES.PRICE_RAW(id, sku, amount)
VALUES
(1,'CHI_BIC_25', 15000),
(2,'COM_BIC_33', 25000),
(3,'CRU_BIC_25', 13000),
(4,'CYC_BIC_25', 10000);
|
STAGING AREA#
This area is where we join the raw sources, which normally it’s a big table.
1
2
3
4
5
6
7
8
9
10
11
12
| CREATE OR REPLACE TRANSIENT TABLE BD.SALES.REPORT_STG COMMENT = 'STG table with raw data' AS (
SELECT
CR.ID
,CR.NAME
,PR.AMOUNT
FROM
BD.SALES.CATEGORY_RAW CR
INNER
JOIN BD.SALES.PRICE_RAW PR
ON
CR.SKU = PR.SKU
);
|
HISTORICAL TABLE#
We need to define the layout of the final table because in this table we will be inserting the result of the merge strategy.
1
2
3
4
5
6
7
8
| CREATE TABLE IF NOT EXISTS BD.SALES.REPORT_TARGET COMMENT = 'Table with product data to send' (
ACTION TEXT COMMENT 'Action to do with this register CREATE, UPDATE'
,CATEGORY_ID INT PRIMARY KEY COMMENT 'Unique identifier'
,CATEGORY_NAME VARCHAR(255) NOT NULL COMMENT 'Category name'
,AMOUNT DECIMAL(10,2) COMMENT 'Price'
,CREATED_AT DATE COMMENT 'Date when this register was inserted or updated'
,STATE TEXT COMMENT 'Register state, SEND, SENDED'
);
|
POPULATE#
In this moment is when we apply the merge strategy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
| MERGE INTO BD.SALES.REPORT_TARGET T
USING BD.SALES.REPORT_STG S
ON
S.ID = T.CATEGORY_ID
WHEN NOT MATCHED
THEN INSERT
(
T.ACTION
,T.CATEGORY_ID
,T.CATEGORY_NAME
,T.AMOUNT
,T.STATE
,T.CREATED_AT
)
VALUES
(
'CREATE'
,S.ID
,S.NAME
,S.AMOUNT
,'SEND'
,CURRENT_DATE()
)
WHEN MATCHED AND T.STATE = 'SEND' THEN UPDATE SET
T.STATE = 'SENDED'
WHEN MATCHED AND
(
T.CATEGORY_NAME <> S.NAME
OR
T.AMOUNT <> S.AMOUNT
)
THEN
UPDATE SET
ACTION = 'UPDATE'
,T.CATEGORY_NAME = S.NAME
,T.AMOUNT = S.AMOUNT
,T.STATE = 'SEND'
,T.CREATED_AT = CURRENT_DATE()
;
|
SEND DATA#
Normally I work with Snowflake DWH and in this particular example we send the data in json lines to S3, to achived that I create a stage in snowflake.
1
2
3
4
5
6
7
8
9
10
11
12
13
| COPY INTO @STAGE_NAME/products/products_lists FROM (
SELECT
'{' || '"action"' || ':' || '"' || "ACTION" || '"' || ',' ||
'"id"' || ':' || '"' || "CATEGORY_ID" || '"' || ',' ||
'"name"' || ':' || '"' || "CATEGORY_NAME" || '"' || ',' ||
'"amount"' || ':' || '"' || "AMOUNT" || '"' ||
'}' AS "category_list"
FROM
BD.SALES.REPORT_TARGET
WHERE
1=1
AND STATE = 'SEND'
)
|