Spark实时(三):Structured Streaming入门案例
创始人
2024-12-15 21:36:33
0

文章目录

Structured Streaming入门案例

一、Scala代码如下

二、Java 代码如下

三、以上代码注意点如下


Structured Streaming入门案例

我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:

         UTF-8     1.8     1.8     3.4.3                        org.apache.spark       spark-core_2.12       ${spark.version}                      org.apache.spark       spark-sql_2.12       ${spark.version}                      org.apache.spark       spark-hive_2.12       ${spark.version}                      mysql       mysql-connector-java       5.1.47                      org.apache.spark       spark-streaming_2.12       ${spark.version}                       org.apache.spark       spark-sql-kafka-0-10_2.12       ${spark.version}                       org.apache.kafka       kafka-clients       2.8.0                       org.scala-lang       scala-library       2.12.15                 org.scala-lang       scala-compiler       2.12.15                 org.scala-lang       scala-reflect       2.12.15                 log4j       log4j       1.2.12                 com.google.collections       google-collections       1.0         

一、Scala代码如下

package com.lanson.structuredStreaming  /**  *  Structured Streaming 实时读取Socket数据  */  import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}  /**  * Structured Streaming 读取Socket数据  */ object SSReadSocketData {   def main(args: Array[String]): Unit = {      //1.创建SparkSession对象     val spark: SparkSession = SparkSession.builder()       .master("local")       .appName("StructuredSocketWordCount")       //默认200个并行度,由于源头数据量少,可以设置少一些并行度       .config("spark.sql.shuffle.partitions",1)       .getOrCreate()      import spark.implicits._      spark.sparkContext.setLogLevel("Error")      //2.读取Socket中的每行数据,生成DataFrame默认列名为"value"     val lines: DataFrame = spark.readStream       .format("socket")       .option("host", "node3")       .option("port", 9999)       .load()      //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作     val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")})      //4.按照单词分组,统计个数,自动多一个列count     val wordCounts: DataFrame = words.groupBy("value").count()      //5.启动流并向控制台打印结果     val query: StreamingQuery = wordCounts.writeStream       //更新模式设置为complete       .outputMode("complete")       .format("console")       .start()     query.awaitTermination()    }  } 

 

二、Java 代码如下

package com.lanson.structuredStreaming;  import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException;  public class SSReadSocketData01 {      public static void main(String[] args) throws StreamingQueryException, TimeoutException {         SparkSession spark = SparkSession.builder().master("local")             .appName("SSReadSocketData01")             .config("spark.sql.shuffle.partitions", 1)             .getOrCreate();                  spark.sparkContext().setLogLevel("Error");          Dataset lines = spark.readStream().format("socket")             .option("host", "node3")             .option("port", 9999)             .load();          Dataset words = lines.as(Encoders.STRING())             .flatMap(new FlatMapFunction() {                 @Override                 public Iterator call(String line) throws Exception {                     return Arrays.asList(line.split(" ")).iterator();                 }             }, Encoders.STRING());          Dataset wordCounts = words.groupBy("value").count();          StreamingQuery query = wordCounts.writeStream()             .outputMode("complete")             .format("console")             .start();                  query.awaitTermination();     } } 

 

以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:

第一次输入:a b c 第二次输入:d a c 第三次输入:a b c

可以看到控制台打印如下结果:

------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ |    c|    1| |    b|    1| |    a|    1| +-----+-----+  ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ |    d|    1| |    c|    2| |    b|    1| |    a|    2| +-----+-----+  ------------------------------------------- Batch: 3 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ |    d|    1| |    c|    3| |    b|    2| |    a|    3| +-----+-----+  

三、以上代码注意点如下

  • SparkSQL 默认并行度为200,这里由于数据量少,可以将并行度通过参数“spark.sql.shuffle.partitions”设置少一些。
  • StructuredStreaming读取过来数据默认是DataFrame,默认有“value”名称的列
  • 对获取的DataFrame需要通过as[String]转换成Dataset进行操作
  • 结果输出时的OutputMode有三种输出模式:Complete Mode、Append Mode、Update Mode。

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

相关内容

热门资讯

正版授权“金花链接房卡在哪里弄... 众乐大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来...
微信开金花群房卡到哪里买/金花... 金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来享受...
hi8pro安卓系统,功能解析... 亲爱的数码爱好者们,你是否对安卓系统有着浓厚的兴趣?今天,我要给你带来一款特别引人注目的产品——hi...
秒懂教程“如何创建金花房间链接... 新九哥牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房...
金花链接房卡怎么创建房间/微信... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡来享...
牛牛在哪里购买房卡/微信金花房... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来享受...
好用的安卓系统电脑版,好用功能... 你有没有想过,手机上的安卓系统那么好用,要是能直接在电脑上用,那该多方便啊!没错,今天就要给你揭秘一...
微信炸金花链接怎样弄/炸金花房... 微信炸金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:15984933许多玩家在游戏中会购买房卡...
微信牛牛房卡招代理/微信链接牛... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡来享...
秒懂教程“金花客服代理房卡获取... 随意玩是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来享...
微信链接金花房卡怎么弄/微信金... 金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来享受...
给大家讲解“金花房卡在哪里能弄... 乐酷大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来...
微信开金花群房卡到哪里买/微信... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来享受...
在哪里买炸金花房卡哪家便宜/链... 金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来享受...
房卡必备教程“在哪里买炸金花房... 新道游是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来享...
一分钟了解“金花游戏房卡怎么获... 新众亿金花是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:15984933许多玩家在游戏中会购买房卡...
安卓怎么刷其他的系统,安卓设备... 你有没有想过给你的安卓手机换换口味呢?是的,你没听错,就是刷机!刷机,顾名思义,就是将手机系统进行更...
上下分金花牛牛房卡怎么冲/微信... 斗牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:44346008许多玩家在游戏中会购买房卡来享受...
一分钟了解“炸金花房卡链接怎么... 大圣大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来...
微信牛牛房卡要怎么弄/微信牛牛... 牛牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:15984933许多玩家在游戏中会购买房卡来享受...