以下是一个简单的使用 Spark Streaming 读取 Kafka 消息、统计数据后插入到 MySQL 中的 Scala 代码示例:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils import java.sql.DriverManager object KafkaToMysql { def main(args: Array[String]): Unit = { // 创建 SparkConf val conf = new SparkConf().setAppName("KafkaToMysql") // 创建 StreamingContext val ssc = new StreamingContext(conf, Seconds(5)) // 设置 Kafka 相关参数 val kafkaParams = Map[String, String]( "bootstrap.servers" -> "your_kafka_broker:9092", "group.id" -> "your_group_id" ) // 定义要读取的 Kafka 主题 val topics = Array("your_topic") // 使用 KafkaUtils 创建 DStream val kafkaStream = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) // 解析和统计数据 val data = kafkaStream.map(_.value()).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // 定义数据库连接相关信息 val url = "jdbc:mysql://your_host:your_port/your_database" val username = "your_username" val password = "your_password" // 将统计结果插入到 MySQL 中 data.foreachRDD(rdd => { rdd.foreachPartition(partition => { Class.forName("com.mysql.jdbc.Driver") val connection = DriverManager.getConnection(url, username, password) val statement = connection.createStatement() partition.foreach { case (word, count) => val sql = s"INSERT INTO your_table (word, count) VALUES ('$word', $count)" statement.executeUpdate(sql) } connection.close() }) }) ssc.start() ssc.awaitTermination() } }