- 监听binlog上传到nsq
- nsq转kafka,
- flinksql获取kafkaTable
- 聚合将结果写到hbase
-- kafka
create table dw_dws_pay_stored_card_recharge
(
recharge_no varchar
, kdt_id varchar
, hq_kdt_id varchar
, operator_name varchar
, recharge_time varchar
, stored_card_recharge_amt bigint
) with (
'connector.type' = 'kafka'
, 'connector.version' = '0.10'
, 'connector.topic' = '对应的Topic'
, 'connector.properties.0.key' = 'group.id'
, 'connector.properties.0.value' = 'dw_dws_pay_stored_card_recharge'
, 'connector.properties.1.key' = 'bootstrap.servers'
, 'connector.properties.1.value' = 'ip1:9092,ip2:9092'
, 'connector.property-version' = '1'
, 'connector.startup-mode' = 'latest-offset'
, 'format.type' = 'json'
, 'format.property-version' = '1'
, 'format.derive-schema' = 'true'
, 'update-mode' = 'append'
);
create table dw_dwa_pay_stored_card_team_operator
(
rowkey varchar
, cf row (stored_card_recharge_amt bigint)
) with (
'connector.type' = 'hbase'
,'connector.version' = '1.4.3'
,'connector.table-name' = 'dw_dwa_pay_stored_card_team_operator'
,'connector.zookeeper.quorum' = 'ip1,ip2'
,'connector.property-version' = '1'
,'connector.write.buffer-flush.max-rows'='10'
,'connector.write.buffer-flush.interval'='3s'
);
insert into dw_dwa_pay_stored_card_team_operator
select
concat_ws('_', reverse(cast(hq_kdt_id as varchar)), '_', operator_name, '_', date_format(recharge_time, 'yyyyMMdd'))
, row (
sum(stored_card_recharge_amt)
)
from
dw_dws_pay_stored_card_recharge
where
recharge_time >= date_format(timestampadd(hour, -16, current_timestamp), 'yyyy-MM-dd')
group by
concat_ws('_', reverse(cast(hq_kdt_id as varchar)), '_', operator_name, '_', date_format(recharge_time, 'yyyyMMdd'));