[Spark Streaming] 读取 Kafka 消息, 插入到 MySQL
创始人
2024-11-14 10:05:23

以下是一个简单的使用 Spark Streaming 读取 Kafka 消息、统计数据后插入到 MySQL 中的 Scala 代码示例:

import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import java.sql.DriverManager  object KafkaToMysql {    def main(args: Array[String]): Unit = {     // 创建 SparkConf     val conf = new SparkConf().setAppName("KafkaToMysql")     // 创建 StreamingContext     val ssc = new StreamingContext(conf, Seconds(5))      // 设置 Kafka 相关参数     val kafkaParams = Map[String, String](       "bootstrap.servers" -> "your_kafka_broker:9092",       "group.id" -> "your_group_id"     )      // 定义要读取的 Kafka 主题     val topics = Array("your_topic")      // 使用 KafkaUtils 创建 DStream     val kafkaStream = KafkaUtils.createDirectStream[String, String](       ssc,       LocationStrategies.PreferConsistent,       ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)     )      // 解析和统计数据     val data = kafkaStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)      // 定义数据库连接相关信息     val url = "jdbc:mysql://your_host:your_port/your_database"     val username = "your_username"     val password = "your_password"      // 将统计结果插入到 MySQL 中     data.foreachRDD(rdd => {       rdd.foreachPartition(partition => {         Class.forName("com.mysql.jdbc.Driver")         val connection = DriverManager.getConnection(url, username, password)         val statement = connection.createStatement()         partition.foreach { case (word, count) =>           val sql = s"INSERT INTO your_table (word, count) VALUES ('$word', $count)"           statement.executeUpdate(sql)         }         connection.close()       })     })      ssc.start()     ssc.awaitTermination()   } } 

相关内容

热门资讯

裸辞做“一人公司”,我后悔了 去年这个时候,一位以色列程序员正在东南亚旅行。他顺手把一个在脑子里转了很久的想法做成了产品,一个让任...
南京建成国内首个Pre-6G试... 4月21日,2026全球6G技术与产业生态大会在南京开幕。全息互动技术展台前,一名远在北京的工作人员...
超梵求职受邀参加“2025抖音... 超梵求职受邀参加“2025抖音巨量引擎成人教育行业生态大会”,探讨分享优质内容传播,服务万千学员。 ...
摩托罗拉Razr 2026(R... IT之家 4 月 22 日消息,摩托罗拉宣布新一代 Razr 折叠手机将于 4 月 29 日在美国发...
库克卸任,特纳斯领航:苹果新纪... 苹果首席执行官蒂姆·库克将卸任,硬件工程主管约翰·特纳斯将接任,苹果公司今天宣布此事。 库克将在夏季...