From 1e4764c544a750fb5d6728bd4f1b48ed061acf29 Mon Sep 17 00:00:00 2001 From: dingjiawen <745518019@qq.com> Date: Mon, 5 Sep 2022 09:35:43 +0800 Subject: [PATCH] =?UTF-8?q?flink=E7=9A=84scala=E7=89=88=E6=9C=AC=E5=B0=9D?= =?UTF-8?q?=E8=AF=95=EF=BC=8C=E5=B0=9A=E4=B8=94=E6=9C=89=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/day01/scala/Example_scala1.scala | 52 ++++++++++++++----- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala b/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala index 0012541..d18b520 100644 --- a/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala +++ b/Big_data_example/Flink/src/main/java/day01/scala/Example_scala1.scala @@ -1,6 +1,9 @@ package day01.scala +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.util.Collector object Example_scala1 { @@ -11,28 +14,49 @@ object Example_scala1 { env.setParallelism(1) //TODO 读取数据源 - val stream = env.socketTextStream("localhost", 9999) - -// stream.flatMap( -// words =>{ -// val word = words.split(" ") + // val stream = env.socketTextStream("localhost", 9999) + val stream = env.fromElements("hello world", "hello world") // -// word.map( -// word =>{ -// WordWithCount(word,1L) +// val value: SingleOutputStreamOperator[WordWithCount] = stream.flatMap[WordWithCount] { +// +// case (inputData:String, out:Collector[WordWithCount]) => { +// val strings = inputData.split(" ") +// strings.foreach { +// word => { +// out.collect(new WordWithCount(word, 1L)) // } -// ) -// +// } // } -// ) - - +// } + // val value: SingleOutputStreamOperator[WordWithCount] = stream.flatMap(new FlatMapFunction[String, WordWithCount] { + // override def flatMap(t: String, collector: Collector[WordWithCount]): Unit = { + // val strings = t.split(" ") + // strings.foreach { + // word => { + // collector.collect(new WordWithCount(word, 1L)) + // } + // } + // } + // }) +// value.print() + env.execute() + // stream.flatMap { + // words => { + // val word = words.split(" ") + // word.map( + // word => { + // WordWithCount(word, 1L) + // } + // ) + // } + // } } - case class WordWithCount(var word:String,var count:Long) + + case class WordWithCount(var word: String, var count: Long) }