1.reduce简介
按照指定的方式,把每个元素进行累计执行。比如实现累加计算
示例:
import keyByNameTest.StockPrice
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object reduceTest {
//defined the dataSource's data type
case class StockPrice(stockId:String, timestamp: Long, price:Double)
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds
val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))
val ds = env.fromCollection(pricesList)
//transformation
//update the stock's new time, and accumulate the price
val reducedDs = ds.keyBy(0).reduce((t1, t2) => StockPrice(t1.stockId, t2.timestamp, t1.price + t2.price))
reducedDs.print()
env.execute()
}
}
输出结果:
自定义reduce func
核心步骤:
1.继承 ReduceFunction 类
2.重写reduce 方法
示例:
import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object myReduceTest {
//defined the dataSource's data type
case class StockPrice(stockId:String, timestamp: Long, price:Double)
//define my reduce func
update the stock's new time, and accumulate the price
class MyReduceFunc extends ReduceFunction[StockPrice] {
override def reduce(t: StockPrice, t1: StockPrice): StockPrice = {
//update the stock's new time, and accumulate the price
StockPrice(t.stockId, t1.timestamp, t.price + t1.price)
}
}
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds
val pricesList = List(StockPrice("stock1", 10, 1), StockPrice("stock1", 11, 2), StockPrice("stock2", 10, 666), StockPrice("stock3", 10, 888.23))
val ds = env.fromCollection(pricesList)
//transformation
val keyByedDs = ds.keyBy(0)
//use my reduce func
val myReducedDs = keyByedDs.reduce(new MyReduceFunc)
myReducedDs.print()
env.execute()
}
}
输出结果: