文章目录
Structured Streaming入门案例
一、Scala代码如下
二、Java 代码如下
三、以上代码注意点如下
我们使用Structured Streaming来监控socket数据统计WordCount。这里我们使用Spark版本为3.4.3版本,首先在Maven pom文件中导入以下依赖:
UTF-8 1.8 1.8 3.4.3 org.apache.spark spark-core_2.12 ${spark.version} org.apache.spark spark-sql_2.12 ${spark.version} org.apache.spark spark-hive_2.12 ${spark.version} mysql mysql-connector-java 5.1.47 org.apache.spark spark-streaming_2.12 ${spark.version} org.apache.spark spark-sql-kafka-0-10_2.12 ${spark.version} org.apache.kafka kafka-clients 2.8.0 org.scala-lang scala-library 2.12.15 org.scala-lang scala-compiler 2.12.15 org.scala-lang scala-reflect 2.12.15 log4j log4j 1.2.12 com.google.collections google-collections 1.0
package com.lanson.structuredStreaming /** * Structured Streaming 实时读取Socket数据 */ import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} /** * Structured Streaming 读取Socket数据 */ object SSReadSocketData { def main(args: Array[String]): Unit = { //1.创建SparkSession对象 val spark: SparkSession = SparkSession.builder() .master("local") .appName("StructuredSocketWordCount") //默认200个并行度,由于源头数据量少,可以设置少一些并行度 .config("spark.sql.shuffle.partitions",1) .getOrCreate() import spark.implicits._ spark.sparkContext.setLogLevel("Error") //2.读取Socket中的每行数据,生成DataFrame默认列名为"value" val lines: DataFrame = spark.readStream .format("socket") .option("host", "node3") .option("port", 9999) .load() //3.将每行数据切分成单词,首先通过as[String]转换成Dataset操作 val words: Dataset[String] = lines.as[String].flatMap(line=>{line.split(" ")}) //4.按照单词分组,统计个数,自动多一个列count val wordCounts: DataFrame = words.groupBy("value").count() //5.启动流并向控制台打印结果 val query: StreamingQuery = wordCounts.writeStream //更新模式设置为complete .outputMode("complete") .format("console") .start() query.awaitTermination() } }
package com.lanson.structuredStreaming; import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQuery; import org.apache.spark.sql.streaming.StreamingQueryException; public class SSReadSocketData01 { public static void main(String[] args) throws StreamingQueryException, TimeoutException { SparkSession spark = SparkSession.builder().master("local") .appName("SSReadSocketData01") .config("spark.sql.shuffle.partitions", 1) .getOrCreate(); spark.sparkContext().setLogLevel("Error"); Dataset lines = spark.readStream().format("socket") .option("host", "node3") .option("port", 9999) .load(); Dataset words = lines.as(Encoders.STRING()) .flatMap(new FlatMapFunction() { @Override public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }, Encoders.STRING()); Dataset wordCounts = words.groupBy("value").count(); StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination(); } }
以上代码编写完成之后,在node3节点执行“nc -lk 9999”启动socket服务器,然后启动代码,向socket中输入以下数据:
第一次输入:a b c 第二次输入:d a c 第三次输入:a b c
可以看到控制台打印如下结果:
------------------------------------------- Batch: 1 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | c| 1| | b| 1| | a| 1| +-----+-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | d| 1| | c| 2| | b| 1| | a| 2| +-----+-----+ ------------------------------------------- Batch: 3 ------------------------------------------- +-----+-----+ |value|count| +-----+-----+ | d| 1| | c| 3| | b| 2| | a| 3| +-----+-----+
上一篇:x plane10安卓版下载
下一篇:苹果4手机刷系统升级