Flink Sql Redis Connector 新版本来袭
创始人
2024-11-05 23:07:35

1.新版本功能和性能介绍

1.1 流批一体

新版本使用了Flink最新的Source接口和SinkWriter接口,可以使用一套代码完成流式读取数据和批量读取数据

1.2 吞吐量大

新版本使用jedispipline和jedisClusterPipeline对数据进行写入和读取,每分钟可以达到千万级别的数据写入或者读取,且对机器要求较低

1.3 兼容所有版本的Flink

新版本使用新的接口重写不但可以适用旧版本的Flink,也兼容新版本的Flink

2.使用方式

使用方式还是和之前版本一样,但是新增了一些连接参数

1.使用案例和讲解 1.读取数据案例 CREATE TABLE orders (   `order_id` STRING,   `price` STRING,   `order_time` STRING,    PRIMARY KEY(order_id) NOT ENFORCED ) WITH (   'connector' = 'redis',   'mode' = 'single',   'single.host' = '192.168.10.101',   'single.port' = '6379',   'password' = 'xxxxxx',   'command' = 'hgetall',   'key' = 'orders' );     select * from orders         #集群模式 create table redis_sink ( site_id STRING, inverter_id STRING, start_time STRING, PRIMARY KEY(site_id) NOT ENFORCED ) WITH ( 'connector' = 'redis', 'mode' = 'cluster', 'cluster.nodes' = 'test3:7001,test3:7002,test3:7003,test3:8001,test3:8002,test3:8003', 'password' = '123123', 'command' = 'hgetall', 'key' = 'site_inverter' )   cluster.nodes用来定义集群ip和host,例如:host1:p1,host2:p2,host3:p3  注:redis表必须定义主键,可以是单个主键,也可以是联合主键  以下为sql读取结果,直接将redis数据解析成我们需要的表格形式  2.写入数据案例 1. generate source data CREATE TABLE order_source (   `order_number` BIGINT,   `price` DECIMAL(32,2),   `order_time` TIMESTAMP(3),    PRIMARY KEY(order_id) NOT ENFORCED ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '5', 'fields.order_number.min' = '1', 'fields.order_number.max' = '20', 'fields.price.min' = '1001', 'fields.price.max' = '1100' );   2. define redis sink table    CREATE TABLE orders (   `order_number` STRING,   `price` STRING,   `order_time` STRING,    PRIMARY KEY(order_id) NOT ENFORCED ) WITH (   'connector' = 'redis',   'mode' = 'single',   'single.host' = '192.168.10.101',   'single.port' = '6379',   'password' = 'xxxxxx',   'command' = 'hmset',   'key' = 'orders' );   3. insert data to redis sink table (cast data type to string)   insert into redis_sink     select         cast(order_number as STRING) order_number,         cast(price as STRING) price,         cast(order_time as STRING) order_time     from orders

3.新增的连接参数

OptionRequiredDefaultTypeDescription
connectorrequirednoStringconnector name
moderequirednoStringredis cluster mode (single or cluster)
single.hostoptionalnoStringredis single mode machine host
single.portoptionalnointredis single mode running port
passwordoptionalnoStringredis database password
commandrequirednoStringredis write data or read data command
keyrequirednoStringredis key
expireoptionalnoIntset key ttl
fieldoptionalnoStringget a value with field when using hget command
cursoroptionalnoIntusing hscan command(e.g:1,2)
startoptional0Intread data when using lrange command
endoptional10Intread data when using lrange command
connection.max.wait-millsoptionalnoIntredis connection parameter
connection.timeout-msoptionalnoIntredis connection parameter
connection.max-totaloptionalnoIntredis connection parameter
connection.max-idleoptionalnoIntredis connection parameter
connection.test-on-borrowoptionalnoBooleanredis connection parameter
connection.test-on-returnoptionalnoBooleanredis connection parameter
connection.test-while-idleoptionalnoBooleanredis connection parameter
so.timeout-msoptionalnoIntredis connection parameter
max.attemptsoptionalnoIntredis connection parameter
sink.parallelismoptional1Intsink data parallelism
sink.delivery-guaranteeoptionalAT_LEAST_ONCEEnumAT_LEAST_ONCE or EXACTLY_ONCE
sink.buffer-flush.max-rowsoptional1000Intsink data row size
sink.buffer-flush.intervaloptional1sdurationSpecifies the batch flush interval

4.代码地址

Github: https://github.com/niuhu3/flink_sql_redis_connector/tree/0.1.0

目前该connector已提交给flink,详见:[FLINK-35588] flink sql redis connector - ASF JIRA (apache.org)

希望大家可以帮忙点个fork和stars,后面会持续更新这个连接器,欢迎大家试用,试用的时候遇到什么问题也可以给我反馈,或者在社区反馈,有什么好的想法也可以联系我哦。

后面会给大家更新写这个连接器的思路,也会试着去更新新的连接器。

相关内容

热门资讯

裸辞做“一人公司”,我后悔了 去年这个时候,一位以色列程序员正在东南亚旅行。他顺手把一个在脑子里转了很久的想法做成了产品,一个让任...
南京建成国内首个Pre-6G试... 4月21日,2026全球6G技术与产业生态大会在南京开幕。全息互动技术展台前,一名远在北京的工作人员...
超梵求职受邀参加“2025抖音... 超梵求职受邀参加“2025抖音巨量引擎成人教育行业生态大会”,探讨分享优质内容传播,服务万千学员。 ...
摩托罗拉Razr 2026(R... IT之家 4 月 22 日消息,摩托罗拉宣布新一代 Razr 折叠手机将于 4 月 29 日在美国发...
库克卸任,特纳斯领航:苹果新纪... 苹果首席执行官蒂姆·库克将卸任,硬件工程主管约翰·特纳斯将接任,苹果公司今天宣布此事。 库克将在夏季...