Flink SQL 知其所以然:改了改源码,实现了个 Batch lookup join(附源码)
复制CREATETABLE show_log (
log_id BIGINT,
`timestamp` ascast(CURRENT_TIMESTAMPastimestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
connector = datagen,
rows-per-second = 10,
fields.user_id.length = 1,
fields.log_id.min = 1,
fields.log_id.max = 10
);
CREATETABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
connector = redis,
hostname = 127.0.0.1,
port = 6379,
format = json,
lookup.cache.max-rows = 500,
lookup.cache.ttl = 3600,
lookup.max-retries = 1
);
CREATETABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
connector = print
);
-- lookup join 的 query 逻辑
INSERTINTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFTJOIN user_profile FOR SYSTEM_TIME ASOF s.proctime AS u
ON s.user_id = u.user_id
1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.
THE END