import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue
import org.apache.flink.util.Collector
import trigger.StockPrice
import java.{lang, util}
import java.time.Duration
import java.util.Properties
object evictor {
def main(args: Array[String]): Unit = {
//env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//set parallel
env.setParallelism(1)
//set event time
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//creat ds
//for kafka connect
val kafkaProps = new Properties()
//set ip
kafkaProps.setProperty("bootstrap.servers", "10.10.10.10:9092")
//group
kafkaProps.setProperty("group.id", "gksk-bigdata")
val kafkaSource = new FlinkKafkaConsumer[String]("stockPrice", new SimpleStringSchema, kafkaProps)
//set offset
kafkaSource.setStartFromEarliest()
//auto commit offset
kafkaSource.setCommitOffsetsOnCheckpoints(true)
//band datasource
val ds = env.addSource(kafkaSource)
//trans
val stockPriceDS = ds.map(s => s.split(","))
.map(s => StockPrice(s(0).toString, s(1).toLong, s(2).toDouble))
//water mark
val sumedDS = stockPriceDS.assignTimestampsAndWatermarks(
//set water mark 0
WatermarkStrategy
.forBoundedOutOfOrderness[StockPrice](Duration.ofSeconds(0))
.withTimestampAssigner(new SerializableTimestampAssigner[StockPrice] {
override def extractTimestamp(t: StockPrice, l: Long): Long = t.timeStamp
})
).keyBy(s => s.stockId)
.timeWindow(Time.seconds(2))
//evictor
.evictor(new MyEvictor())
.process(new MyProcessWindowsFunc())
//print
sumedDS.print()
//execute
env.execute()
}
class MyEvictor() extends Evictor[StockPrice,TimeWindow](){
override def evictBefore(iterable: lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
//create the object, which contain (dataObject, timeStamp)
val ite: util.Iterator[TimestampedValue[StockPrice]] = iterable.iterator()
//
while (ite.hasNext){
//get the dataSource's object
val element: TimestampedValue[StockPrice] = ite.next()
println("now the stockPrice is :"+ element.getValue().price)
//remove illegal data
if(element.getValue.price <= 0){
println("illegal price :"+ element.getValue.price)
ite.remove()
}
}
}
override def evictAfter(iterable: lang.Iterable[TimestampedValue[StockPrice]], i: Int, w: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
//do nothing
}
}
//get each stock's avg price
class MyProcessWindowsFunc() extends ProcessWindowFunction[StockPrice, (String,Double), String, TimeWindow]() {
//process Function will be called, while timeWindow is close,
// (keybyed stream: each group once )
//so,if data is big,it's inappropriate to use Process func
override def process(key: String, context: Context, elements: Iterable[StockPrice], out: Collector[(String, Double)]): Unit = {
//ps: all timeWindow's data will save to iterable,that is big deal
//sum
var sumPrice = 0.0
elements.foreach(s => {
sumPrice = sumPrice + s.price
})
//stock price's avg
out.collect(key, sumPrice/elements.size)
}
}
}