SpringBoot整合Flink CDC,实时追踪mysql数据变动
创始人
2024-12-18 05:09:13
0
❃博主首页 : 「码到三十五」 ,同名公众号 :「码到三十五」,wx号 : 「liwu0213」
☠博主专栏 : <源码解读> <面试攻关>
♝博主的话 :搬的每块砖,皆为峰峦之基;公众号搜索「码到三十五」关注这个爱发技术干货的coder,一起筑基

我们将整合Spring Boot和Apache Flink CDC(Change Data Capture)来实现实时数据追踪。下面是一个基本的实践流程代码,包括搭建Spring Boot项目、整合Flink CDC以及实现数据变动的实时追踪。

文章目录

      • 前言
      • 1. MySQL开启Binlog
      • 2. 创建Spring Boot项目
      • 3. 添加依赖
      • 4. 配置Flink和MySQL CDC
      • 5. 实现数据实时追踪
      • 6. 启动Spring Boot应用
      • 7. 运行并测试

前言

Flink CDC(Flink Change Data Capture)是一种基于数据库日志的CDC技术,它实现了一个全增量一体化的数据集成框架。与Flink计算框架相结合,Flink CDC能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助Flink CDC,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。

Flink CDC的应用场景广泛,包括但不限于实时数据仓库更新、实时数据同步和迁移以及实时数据处理等。它还能确保数据一致性,并在数据发生变更时准确地进行捕获和处理。此外,Flink CDC支持与多种数据源进行集成,如MySQL、PostgreSQL、Oracle等,并提供了相应的连接器,便于数据的捕获和处理。

接下来,将详细介绍MySQL CDC的使用。MySQL CDC连接器允许从MySQL数据库中读取快照数据和增量数据。

1. MySQL开启Binlog

MySQL中开启binlog功能,需要修改配置文件中(如Linux的/etc/my.cnf或Windows的\my.ini)的[mysqld]部分设置相关参数:

[mysqld] server-id=1 # 设置日志格式为行级格式 binlog-format=Row # 设置binlog日志文件的前缀 log-bin=mysql-bin # 指定需要记录二进制日志的数据库 binlog_do_db=testjpa 

除了开启binlog功能外,还需要为Flink CDC配置相应的权限,以确保其能够正常连接到MySQL并读取数据。这包括授予Flink CDC连接MySQL的用户必要的权限,如SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。这些权限是Flink CDC读取数据和元数据所必需的。

检查是否已开启binlog功能:

mysql> SHOW VARIABLES LIKE 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin       | ON    | +---------------+-------+ 

至此,MySQL的相关配置已完成。

2. 创建Spring Boot项目

首先,你需要创建一个Spring Boot项目。可以使用Spring Initializr(https://start.spring.io/)来快速生成项目。

3. 添加依赖

pom.xml中添加Apache Flink和Flink CDC的依赖。以下是必要的依赖:

                   org.apache.flink         flink-java         1.14.0                   org.apache.flink         flink-streaming-java_2.12         1.14.0                   org.apache.flink         flink-connector-mysql-cdc         2.0.0                        org.springframework.boot         spring-boot-starter       

4. 配置Flink和MySQL CDC

在Spring Boot的application.ymlapplication.properties文件中配置Flink和MySQL数据库连接:

flink:   checkpoint:     interval: 10000   parallelism: 1  spring:   datasource:     url: jdbc:mysql://localhost:3306/your_database     username: your_username     password: your_password 

5. 实现数据实时追踪

创建一个服务类来实现数据的实时追踪:

import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.springframework.stereotype.Service;  @Service public class FlinkCdcService {      public void startDataStreaming() {         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);          // 使用Flink CDC连接MySQL         String name = "inventory";         tableEnv.executeSql("CREATE TABLE " + name + " (" +             "  id INT," +             "  name STRING," +             "  description STRING," +             "  weight DECIMAL(10, 3)" +             ") WITH (" +             "  'connector' = 'mysql-cdc'," +             "  'hostname' = 'localhost'," +             "  'port' = '3306'," +             "  'username' = 'your_username'," +             "  'password' = 'your_password'," +             "  'database-name' = 'your_database'," +             "  'table-name' = 'your_table'" +             ")");          // 查询并打印结果         DataStream dataStream = tableEnv.sqlQuery("SELECT * FROM " + name).execute().print();          try {             env.execute("Flink CDC Demo");         } catch (Exception e) {             e.printStackTrace();         }     } } 

6. 启动Spring Boot应用

在你的Spring Boot应用的启动类中调用FlinkCdcServicestartDataStreaming方法来启动数据追踪:

import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;  @SpringBootApplication public class FlinkCdcApplication implements CommandLineRunner {      @Autowired     private FlinkCdcService flinkCdcService;      public static void main(String[] args) {         SpringApplication.run(FlinkCdcApplication.class, args);     }      @Override     public void run(String... args) throws Exception {         flinkCdcService.startDataStreaming();     } } 

7. 运行并测试

运行Spring Boot应用,并在MySQL数据库中做出一些数据变动。你应该能在控制台看到实时打印的数据变动。


关注公众号[码到三十五]获取更多技术干货 !

相关内容

热门资讯

创建金花房间链接教程/玩链接牛... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来享受...
科普!微信金花链接从哪里买房卡... 微信游戏中心:牛牛房卡,添加微信【55051770】,进入游戏中心或相关小程序,搜索“微信牛牛房卡”...
终于发现!微信拼三张购买房卡方... 微信游戏中心:拼三张房卡,添加微信【56001354】,进入游戏中心或相关小程序,搜索“微信拼三张房...
终于知道”圣游怎么买房卡“新老... 第二也可以在游戏内商城:在游戏界面中找到 “微信金花,斗牛链接房卡”“商城”选项,选择房卡的购买选项...
微信金花链接房卡购买/金花链接... 金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡来享...
科普!微信九人金花房卡怎么买,... 微信游戏中心:拼三张房卡,添加微信【33903369】,进入游戏中心或相关小程序,搜索“微信拼三张房...
终于发现!微信炸金花购买房卡,... 微信游戏中心:炸金花房卡,添加微信【66336574】,进入游戏中心或相关小程序,搜索“微信炸金花房...
终于知道”蛮王大厅房卡充值“牛... 来教大家如何使用房卡充值房卡充值 添加房卡批售商:微【113857775】复制到微信搜索、直接添加房...
科普!微信金花房卡购买全攻略,... 微信游戏中心:牌九房卡,添加微信【8488009】,进入游戏中心或相关小程序,搜索“微信牌九房卡”,...
给大家讲解“微信上玩炸金花冲房... 新518互游是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房...
安卓系统不能玩nba吗 你有没有想过,为什么你的安卓手机上不能玩NBA游戏呢?是不是觉得这事儿有点奇怪,毕竟安卓系统那么强大...
终于发现!斗牛链接房卡怎么搞,... 微信游戏中心:斗牛房卡,添加微信【71319951】,进入游戏中心或相关小程序,搜索“微信斗牛房卡”...
终于知道”江山房卡购买“拼三张... 终于知道”江山房卡购买“拼三张房卡充值游戏中心打开微信,添加客服【113857776】,进入游戏中心...
科普!微信玩炸金花在哪购买房卡... 微信游戏中心:炸金花房卡,添加微信【55051770】,进入游戏中心或相关小程序,搜索“微信炸金花房...
一分钟推荐“金花链接如何创建房... 神皇大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡...
终于知道”新九神房卡获取方式“... 终于知道”新九神房卡获取方式“哪里买低价房卡!微信房卡充值 添加房卡批售商:微【113857776】...
终于发现!正版玩牛牛购买房卡,... 微信游戏中心:牛牛房卡,添加微信【56001354】,进入游戏中心或相关小程序,搜索“微信牛牛房卡”...
科普!微信在哪开炸金花房间房卡... 微信游戏中心:炸金花房卡,添加微信【33903369】,进入游戏中心或相关小程序,搜索“微信炸金花房...
终于知道”新众亿房卡详细充值“... 来教大家如何使用房卡详细充值房卡充值 添加房卡批售商:微【113857775】复制到微信搜索、直接添...
终于找到“金花建房软件哪个好/... 起点大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来...