Flink SQL 知其所以然:改了改源码,实现了个 Batch lookup join(附源码)

复制CREATETABLE show_log (      log_id BIGINT     `timestampascast(CURRENT_TIMESTAMPastimestamp(3)),      user_id STRING,      proctime AS PROCTIME()  WITH (    connector = datagen   rows-per-second = 10   fields.user_id.length = 1   fields.log_id.min = 1   fields.log_id.max = 10 );  CREATETABLE user_profile (      user_id STRING,      age STRING,      sex STRING      ) WITH (    connector = redis   hostname = 127.0.0.1   port = 6379   format = json   lookup.cache.max-rows = 500   lookup.cache.ttl = 3600   lookup.max-retries = 1 );  CREATETABLE sink_table (      log_id BIGINT     `timestampTIMESTAMP(3),      user_id STRING,      proctime TIMESTAMP(3),      age STRING,      sex STRING  WITH (    connector = print );  -- lookup join 的 query 逻辑 INSERTINTO sink_table  SELECT     s.log_id as log_id      , s.`timestampas `timestamp     , s.user_id as user_id      , s.proctime as proctime      , u.sex as sex      , u.age as age  FROM show_log AS s  LEFTJOIN user_profile FOR SYSTEM_TIME ASOF s.proctime AS u  ON s.user_id = u.user_id  1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.

THE END