flink的scala版本尝试,尚且有问题

This commit is contained in:
dingjiawen 2022-09-05 09:35:43 +08:00
parent 79f88b6081
commit 1e4764c544
1 changed files with 38 additions and 14 deletions

View File

@ -1,6 +1,9 @@
package day01.scala
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object Example_scala1 {
@ -11,28 +14,49 @@ object Example_scala1 {
env.setParallelism(1)
//TODO 读取数据源
val stream = env.socketTextStream("localhost", 9999)
// stream.flatMap(
// words =>{
// val word = words.split(" ")
// val stream = env.socketTextStream("localhost", 9999)
val stream = env.fromElements("hello world", "hello world")
//
// word.map(
// word =>{
// WordWithCount(word,1L)
// val value: SingleOutputStreamOperator[WordWithCount] = stream.flatMap[WordWithCount] {
//
// case (inputData:String, out:Collector[WordWithCount]) => {
// val strings = inputData.split(" ")
// strings.foreach {
// word => {
// out.collect(new WordWithCount(word, 1L))
// }
// )
//
// }
// }
// )
// }
// val value: SingleOutputStreamOperator[WordWithCount] = stream.flatMap(new FlatMapFunction[String, WordWithCount] {
// override def flatMap(t: String, collector: Collector[WordWithCount]): Unit = {
// val strings = t.split(" ")
// strings.foreach {
// word => {
// collector.collect(new WordWithCount(word, 1L))
// }
// }
// }
// })
// value.print()
env.execute()
// stream.flatMap {
// words => {
// val word = words.split(" ")
// word.map(
// word => {
// WordWithCount(word, 1L)
// }
// )
// }
// }
}
case class WordWithCount(var word:String,var count:Long)
case class WordWithCount(var word: String, var count: Long)
}