flink学习day02::datasource、transforma和sink

2023-11-04

datasource

批处理中常见是两类source

基于集合

1.使用env.fromElements()支持Tuple,自定义对象等复合形式。

2.使用env.fromCollection()支持多种Collection的具体类型

3.使用env.generateSequence()支持创建基于Sequence的DataSet

参考代码:

package cn.itcast.batch.source

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink中dataset的常见datasource
 */
object DataSourceDemo {
  def main(args: Array[String]): Unit = {
    /*
    dataset api中datasource主要有两类
    1.基于集合
    2.基于文件
     */
    //1 获取executionenviroment
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 2 source操作
    // 2.1     1.基于集合
    /*
    1.使用env.fromElements()支持Tuple,自定义对象等复合形式。
    2.使用env.fromCollection()支持多种Collection的具体类型
    3.使用env.generateSequence()支持创建基于Sequence的DataSet
     */
    // 使用env.fromElements()
    val eleDs: DataSet[String] = env.fromElements("spark", "hadoop", "flink")

    // 使用env.fromCollection()
    val collDs: DataSet[String] = env.fromCollection(Array("spark", "hadoop", "flink"))
    //使用env.generateSequence()
    val seqDs: DataSet[Long] = env.generateSequence(1, 9)

    // 3 转换 可以没有转换
    //4 sink 输出
    eleDs.print()
    collDs.print()
    seqDs.print()

    // 5 启动  在批处理中: 如果sink操作是'count()', 'collect()', or 'print()',最后不需要执行execute操作,否则会报错
    //    env.execute()
  }

}

完整版本:

package cn.itcast.batch.source

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

/*
基于集合创建dataset 完整版
 */
object DataSourceDemo2 {
  def main(args: Array[String]): Unit = {
    //获取env
    val env = ExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.api.scala._

    //0.用element创建DataSet(fromElements)
    val ds0: DataSet[String] = env.fromElements("spark", "flink")
    ds0.print()

    //1.用Tuple创建DataSet(fromElements)
    val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
    ds1.print()

    //2.用Array创建DataSet
    val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
    ds2.print()

    //3.用ArrayBuffer创建DataSet
    val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
    ds3.print()

    //4.用List创建DataSet
    val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
    ds4.print()

    //5.用ListBuffer创建DataSet
    val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
    ds5.print()

    //6.用Vector创建DataSet
    val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
    ds6.print()

    //7.用Queue创建DataSet
    val ds7: DataSet[String] = env.fromCollection(mutable.Queue("spark", "flink"))
    ds7.print()

    //8.用Stack创建DataSet
    val ds8: DataSet[String] = env.fromCollection(mutable.Stack("spark", "flink"))
    ds8.print()

    //9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)
    val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
    ds9.print()

    //10.用Seq创建DataSet
    val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
    ds10.print()

    //11.用Set创建DataSet
    val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
    ds11.print()

    //12.用Iterable创建DataSet
    val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
    ds12.print()

    //13.用ArraySeq创建DataSet
    val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
    ds13.print()

    //14.用ArrayStack创建DataSet
    val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
    ds14.print()

    //15.用Map创建DataSet
    val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
    ds15.print()

    //16.用Range创建DataSet
    val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
    ds16.print()

    //17.用fromElements创建DataSet
    val ds17: DataSet[Long] = env.generateSequence(1, 9)
    ds17.print()
  }
}


基于文件

  1. 读取本地文件数据 readTextFile

  2. 读取HDFS文件数据

  3. 读取CSV文件数据

  4. 读取压缩文件

  5. 遍历目录

参考代码:

package cn.itcast.batch.source

import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration

/*
演示flink 批处理基于文件创建dataset
 */
object DataSourceDemo3 {
  def main(args: Array[String]): Unit = {
    //1 获取env
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 2 基于文件来创建dataset
    //2.1读取本地文本文件
    val wordsDs: DataSet[String] = env.readTextFile("E:/data/words.txt")
    // 2.2读取hdfs文件
    val hdfsDs: DataSet[String] = env.readTextFile("hdfs://node1:8020/wordcount/input/words.txt")
    // 2.3读取csv文件
    //读取csv文件需要准备一个case class
    case class Subject(id: Int, name: String)
    val subjectDs: DataSet[Subject] = env.readCsvFile[Subject]("E:/data/subject.csv")
    // 2.4 读取压缩文件
    val compressDs: DataSet[String] = env.readTextFile("E:/data/wordcount.txt.gz")
    // 2.5 遍历读取文件夹数据
    val conf = new Configuration()
    conf.setBoolean("recursive.file.enumeration",true)
    val folderDs: DataSet[String] = env.readTextFile("E:/data/wc/").withParameters(conf)
    //打印输出结果
//    wordsDs.print()
//    print("------------------------------------------")
//    hdfsDs.print()
//    println("=====================")
//    subjectDs.print()
//    println("=====================")
//    compressDs.print()
//    println("===============")
    folderDs.print()
  }
}

注意:

1 读取压缩文件,对于某些压缩文件flink可以直接读取,不能并行读取
在这里插入图片描述

2 如果读取的是文件夹,想要遍历读取需要设置属性;recursive.file.enumeration进行递归读取

transforma

map&mappartition

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink map与mappartition的operator
 */
/*
需求:
示例
使用map操作,将以下数据转换为一个scala的样例类。
"1,张三", "2,李四", "3,王五", "4,赵六"
 */
object MapAndMapPartitionDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val sourceDs: DataSet[String] = env.fromCollection(List("1,张三", "2,李四", "3,王五", "4,赵六"))

    // 3 转换操作
    // 3.1 定义case class
    case class Student(id: Int, name: String)
    // 3.2 map操作
    val stuDs: DataSet[Student] = sourceDs.map(
      line => {
        val arr: Array[String] = line.split(",")
        Student(arr(0).toInt, arr(1))
      }
    )
    // 3.3 mappartition操作
    val stuDs2: DataSet[Student] = sourceDs.mapPartition(
      
      iter => { //迭代器
        //todo 做一些昂贵的动作,比如开启连接
        //遍历迭代器数据转为case class类型然后返回
        iter.map(
          it => {
            val arr: Array[String] = it.split(",")
            Student(arr(0).toInt, arr(1))
          }
        )
        //todo 做一些昂贵的动作,关闭连接
      }
    )

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    stuDs.print()
    println("====================")
    stuDs2.print()
    // 5 执行
  }
}

总结:

map与mappartition最终效果实际是一样的,但是对于mappartition可以让我们有机会对整个分区的数据看做一个整体进行处理,此外还给我们创建了针对当前分区只需做一次的昂贵动作的机会。

flatMap

在这里插入图片描述

针对需求如果是数据变多的时候就要考虑是不是要使用flatmap进行操作,注意flatMap返回值类型要是一个可迭代类型;

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink flatMap的operator
 */
/*
需求:
示例
分别将以下数据,转换成国家、省份、城市三个维度的数据。
将以下数据
    张三,中国,江西省,南昌市
    李四,中国,河北省,石家庄市
转换为
    (张三,中国)
    (张三,中国,江西省)
    (张三,中国,江西省,南昌市)
    (李四,中国)
    (李四,中国,河北省)
    (李四,中国,河北省,石家庄市)
 */
object FlatMapDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val sourceDs: DataSet[String] = env.fromCollection(List(
      "张三,中国,江西省,南昌市",
      "李四,中国,河北省,石家庄市"
    ))

    // 3 转换操作 使用flatMap:分为map和flat,先执行map操作针对每个数据切分,切分后按照要求组成所谓元组数据(3)装入list中,最后执行flat操作去掉
    //list的外壳
    val faltMapDs: DataSet[Product with Serializable] = sourceDs.flatMap(
      line => {
        // 3.1对数据进行切分操作
        val arr: Array[String] = line.split(",")
        // 3.2 组装数据装入list中
        List(
          //(张三,中国)
          (arr(0), arr(1)),
          // (张三,中国,江西省)
          (arr(0), arr(1), arr(2)),
          // (张三,中国,江西省,南昌市)
          (arr(0), arr(1), arr(2), arr(3))
        )
      }
    )


    // 4 输出 (如果直接打印无需进行启动,执行execute)
    faltMapDs.print()
    // 5 执行
  }
}

filter

对数据进行过滤操作,保留下来结果为true的数据

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink filter的operator
 */
/*
需求:
示例:
过滤出来以下以长度>4的单词。
"hadoop", "hive", "spark", "flink"
 */
object FilterDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val wordsDs: DataSet[String] = env.fromCollection(List("hadoop", "hive", "spark", "flink"))

    // 3 转换操作 使用filter过滤单词长度大于4的单词
    val moreThan4Words: DataSet[String] = wordsDs.filter(_.length > 4)

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    moreThan4Words.print()
    // 5 执行
  }
}

reduce操作

reduce聚合操作的算子,

注意针对groupby之后的数据流我们可以使用reduce进行聚合,不用考虑按照那种方式分组,reduce都可以实现;

但是如果想要使用sum()进行聚合前面分组指定key必须是按照索引才可以。

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink filter的operator
 */
/*
需求:
示例1
请将以下元组数据,使用reduce操作聚合成一个最终结果
 ("java" , 1) , ("java", 1) ,("java" , 1)
将上传元素数据转换为("java",3)
示例2
请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object ReduceDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    //    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("java", 1)))
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("scala", 1)))
    //    // 3 转换操作 使用reduce计算单词次数
    //    val resultDs: DataSet[(String, Int)] = wordsDs.reduce(
    //      (w1, w2) => { // w1是一个初始值,第一次的时候是:(单词,0),之后都是之前的累加结果
    //        //3.1 进行次数累加
    //        (w1._1, w1._2 + w2._2)
    //      }
    //    )

    //    // 3 转换操作,需要对单词进行分组,分组之后再进行次数的累加
    val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(_._1)
    // 3.2 进行次数累加
    val resultDs: DataSet[(String, Int)] = groupDs.reduce(
      (w1, w2) => { // w1是一个初始值,第一次的时候是:(单词,0),之后都是之前的累加结果
        //3.1 进行次数累加
        (w1._1, w1._2 + w2._2)
      }
    )
    // 3 转换操作,按照索引分组,使用sum操作
    //    wordsDs.groupBy(0).reduce(
    //      (w1, w2) => { // w1是一个初始值,第一次的时候是:(单词,0),之后都是之前的累加结果
    //                //3.1 进行次数累加
    //                (w1._1, w1._2 + w2._2)
    //              }
    //    ).print()
    // 这种方式不允许,会报错:ava.lang.UnsupportedOperationException: Aggregate does not support grouping with KeySelector functions, yet.
    //    wordsDs.groupBy(_._1).sum(1).print()
    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  }
}

reduceGroup

原理分析:
在这里插入图片描述

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink filter的operator
 */
/*
需求:

示例
请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object ReduceGroupDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    //    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("java", 1)))
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("scala", 1)))

    // 3 转换 使用reducegroup实现单词计数
    val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(_._1)
    val resultDs: DataSet[(String, Int)] = groupDs.reduceGroup(
      iter => {
        //参数是一个迭代器
        iter.reduce(//再对迭代器进行reduce聚合操作
          (w1, w2) => (w1._1, w1._2 + w2._2)
        )
      }
    )

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  }
}

aggregate操作

aggregate只能作用于元组类型的数据,并且对分组方式有要求只能是按照索引或者字段名称方式分组的聚合计算。

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.java.aggregation.Aggregations
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink filter的operator
 */
/*
需求:

示例
请将以下元组数据,下按照单词使用groupBy进行分组,再使用reduce操作聚合成一个最终结果
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object AggregateDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    //    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("java", 1)))
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 1), ("scala", 1)))

    // 3 转换 使用aggregate实现单词计数
//   val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(_._1) //aggregation聚合方式不支持这种分组方式
    val groupDs: GroupedDataSet[(String, Int)] = wordsDs.groupBy(0)
    val resultDs: AggregateDataSet[(String, Int)] = groupDs.aggregate(Aggregations.SUM,1)

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  }
}

distinct

对数据进行去重操作,可以指定对某个字段进行去重

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink distinct的operator
 */
/*
需求:

示例
请将以下元组数据,对数据进行去重操作
("java" , 1) , ("java", 1) ,("scala" , 1)
转换为
("java", 2), ("scala", 1)
 */
object DistinctDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val wordsDs = env.fromCollection(List(("java", 1), ("java", 2), ("scala", 1)))

    // 3 转换 使用distinct实现去重
//    wordsDs.distinct().print() //是对整个元组进行去重
    wordsDs.distinct(0).print()  //指定按照某个字段进行去重操作

    // 4 输出 (如果直接打印无需进行启动,执行execute)
//    resultDs.print()
    // 5 执行
  }
}

join操作

类似于sql中的inner join,只展示join成功的数据。

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink join的operator
 */
/*
需求:
示例
有两个csv文件,有一个为score.csv,一个为subject.csv,分别保存了成绩数据以及学科数据。
需要将这两个数据连接到一起,然后打印出来。
 */
//定义case class
// 成绩  学生id,学生的姓名,学科id,分数
case class Score(id: Int, name: String, subjectId: Int, score: Double)

//学科 学科id,学科名称
case class Subject(id: Int, name: String)

object JoinDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2 source 加载数据
    val scoreDs: DataSet[Score] = env.readCsvFile[Score]("E:\\data\\score.csv")
    val sbujectDs: DataSet[Subject] = env.readCsvFile[Subject]("E:\\data\\subject.csv")
    // 3 转换 使用join实现两个dataset关联
    val joinDs: JoinDataSet[Score, Subject] = scoreDs.join(sbujectDs).where(2).equalTo(0)
    // 4 输出 (如果直接打印无需进行启动,执行execute)
    joinDs.print()
    // 5 执行
  }
}

leftouterjoin

类似表的左外关联

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink LeftOuterJoin的operator
 */
/*
需求:用户dataset左关联城市数据
示例
请将以下元组数据(用户id,用户姓名)
(1, "zhangsan") , (2, "lisi") ,(3 , "wangwu")
元组数据(用户id,所在城市)
(1, "beijing"), (2, "shanghai"), (4, "guangzhou")
返回如下数据:
(3,wangwu,null)
(1,zhangsan,beijing)
(2,lisi,shanghai)
 */


object LeftOuterJoinDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2 source 加载数据
    //2.1用户数据
    val userDs: DataSet[(Int, String)] = env.fromCollection(List((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
    //2.2城市数据
    val cityDs: DataSet[(Int, String)] = env.fromCollection(List((1, "beijing"), (2, "shanghai"), (4, "guangzhou")))
    // 3 转换 使用userDs leftouterjoin cityDs
    /*
    
      OPTIMIZER_CHOOSES:将选择权交予Flink优化器;
      BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;
      BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;
      REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于
      第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;
      REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。
      该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;
      REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;

     */
   //我们一般如果不明确数据集情况,就使用OPTIMIZER_CHOOSES
    val leftJoinAssigner: JoinFunctionAssigner[(Int, String), (Int, String)] = userDs.leftOuterJoin(cityDs,
      JoinHint.OPTIMIZER_CHOOSES).where(0).equalTo(0)
    //3.1 使用apply方法解析JoinFunctionAssigner中的数据处理逻辑
    val resultDs: DataSet[(Int, String, String)] = leftJoinAssigner.apply(
      (left, right) => {
        if (right == null) { //cityds中结果为null
          //返回的数据
          (left._1, left._2, "null")
        } else {
          //返回的数据
          (left._1, left._2, right._2)
        }
      }
    )

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  }
}

注意:

1 leftjoin返回结果不是dataset而是leftjoinassginer,需要我们通过apply进行逻辑处理的实现;

2 关join的策略问题,优先使用joinhint.OPTIMIZER_CHOOSES

joinhit策略:

 OPTIMIZER_CHOOSES:将选择权交予Flink优化器;
 BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;
 BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;
 REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;
 REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;
 REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;

rightouterjoin

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink RightOuterJoin的operator
 */
/*
需求:用户dataset左关联城市数据
示例
请将以下元组数据(用户id,用户姓名)
(1, "zhangsan") , (2, "lisi") ,(3 , "wangwu")
元组数据(用户id,所在城市)
(1, "beijing"), (2, "shanghai"), (4, "guangzhou")
返回如下数据:
(3,wangwu,null)
(1,zhangsan,beijing)
(2,lisi,shanghai)
 */


object RighterOuterJoinDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2 source 加载数据
    //2.1用户数据
    val userDs: DataSet[(Int, String)] = env.fromCollection(List((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
    //2.2城市数据
    val cityDs: DataSet[(Int, String)] = env.fromCollection(List((1, "beijing"), (2, "shanghai"), (4, "guangzhou")))
    // 3 转换 使用userDs leftouterjoin cityDs
    /*

      OPTIMIZER_CHOOSES:将选择权交予Flink优化器;
      BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;
      BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;
      REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于
      第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;
      REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。
      该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;
      REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;

     */
    //我们一般如果不明确数据集情况,就使用OPTIMIZER_CHOOSES
    val leftJoinAssigner: JoinFunctionAssigner[(Int, String), (Int, String)] = userDs.rightOuterJoin(cityDs,
      JoinHint.OPTIMIZER_CHOOSES).where(0).equalTo(0)
    //3.1 使用apply方法解析JoinFunctionAssigner中的数据处理逻辑
    val resultDs: DataSet[(Int, String, String)] = leftJoinAssigner.apply(
      (left, right) => {
        if (left == null) { //cityds中结果为null
          //返回的数据
          (right._1, right._2, "null")
        } else {
          //返回的数据
          (right._1, right._2, left._2)
        }
      }
    )

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  }
}

fullOuterJoin

参考代码

package cn.itcast.batch.transformation

import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink FullOuterJoin的operator
 */
/*
需求:用户dataset FullOuterJoin城市数据
示例
请将以下元组数据(用户id,用户姓名)
(1, "zhangsan") , (2, "lisi") ,(3 , "wangwu")
元组数据(用户id,所在城市)
(1, "beijing"), (2, "shanghai"), (4, "guangzhou")
返回如下数据:
(3,wangwu,null)
(1,zhangsan,beijing)
(2,lisi,shanghai)
 */


object FullOuterJoinDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2 source 加载数据
    //2.1用户数据
    val userDs: DataSet[(Int, String)] = env.fromCollection(List((1, "zhangsan"), (2, "lisi"), (3, "wangwu")))
    //2.2城市数据
    val cityDs: DataSet[(Int, String)] = env.fromCollection(List((1, "beijing"), (2, "shanghai"), (4, "guangzhou")))
    // 3 转换 使用userDs leftouterjoin cityDs
    /*

      OPTIMIZER_CHOOSES:将选择权交予Flink优化器;
      BROADCAST_HASH_FIRST:广播第一个输入端,同时基于它构建一个哈希表,而第二个输入端作为探索端,选择这种策略的场景是第一个输入端规模很小;
      BROADCAST_HASH_SECOND:广播第二个输入端并基于它构建哈希表,第一个输入端作为探索端,选择这种策略的场景是第二个输入端的规模很小;
      REPARTITION_HASH_FIRST:该策略会导致两个输入端都会被重分区,但会基于第一个输入端构建哈希表。该策略适用于第一个输入端数据量小于
      第二个输入端的数据量,但这两个输入端的规模仍然很大,优化器也是当没有办法估算大小,没有已存在的分区以及排序顺序可被使用时系统默认采用的策略;
      REPARTITION_HASH_SECOND:该策略会导致两个输入端都会被重分区,但会基于第二个输入端构建哈希表。
      该策略适用于两个输入端的规模都很大,但第二个输入端的数据量小于第一个输入端的情况;
      REPARTITION_SORT_MERGE:输入端被以流的形式进行连接并合并成排过序的输入。该策略适用于一个或两个输入端都已排过序的情况;

     */
    //我们一般如果不明确数据集情况,就使用OPTIMIZER_CHOOSES
    val leftJoinAssigner: JoinFunctionAssigner[(Int, String), (Int, String)] = userDs.fullOuterJoin(cityDs,
      JoinHint.REPARTITION_SORT_MERGE).where(0).equalTo(0)
    //3.1 使用apply方法解析JoinFunctionAssigner中的数据处理逻辑
    val resultDs: DataSet[(Int, String, String)] = leftJoinAssigner.apply(
      (left, right) => {
        if (left == null) { //cityds中结果为null
          //返回的数据
          (right._1,  "null",right._2)
        } else if (right == null) {
          (left._1,left._2,  "null")
        }  else {
          //返回的数据
          (left._1, left._2, right._2)
        }
      }
    )

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resultDs.print()
    // 5 执行
  }
}

union

针对两个dataset去并集,注意结果不去重,并且要求两个数据集的数据类型必须一致。

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

/*
演示flink union的operator
 */
/*
需求:
示例
将以下数据进行取并集操作
数据集1
"hadoop", "hive", "flume"
数据集2
"hadoop", "hive", "spark"
 */

object UnionDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2 source 加载数据
    val ds1: DataSet[String] = env.fromCollection(List("hadoop",
      "hive",
      "flume"))
    val ds2: DataSet[String] = env.fromCollection(List("hadoop",
      "hive",
      "azkaban"))
//val ds2: DataSet[Int] = env.fromCollection(List(1,
//  2,
//  3))
    // union算子要求两个ds中的数据类型必须一致!!
    // 3 转换 使用union获取两个ds的并集
    val unionDs: DataSet[String] = ds1.union(ds2)

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    unionDs.print()
    // 5 执行
  }
}

rebalance

rabalance可以对数据集进行数据的均匀分布,内部使用round robin(轮询策略)进行数据分发。
在这里插入图片描述

rebalance 参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._

/*
演示flink rebalance的operator
 */
/*
需求:
演示rebalance分区均衡数据
 */
object RebalanceDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment

    //2 source 加载数据
    val sourceDs: DataSet[Long] = env.generateSequence(0, 100)
    //过滤掉部分数据保证倾斜现象明显
    val filterDs: DataSet[Long] = sourceDs.filter(_ > 8)


    // 3 转换操作 使用filter过滤单词长度大于4的单词
    // 3.1 如何观测到每个分区的数据量?使用richmapfunciton来观察数据
    val subTaskDs: DataSet[(Long, Long)] = filterDs.map(
      new RichMapFunction[Long, (Long, Long)] { //rich为代表的富函数可以获取到运行的上下文对象
        //自己定义map逻辑,
        override def map(value: Long): (Long, Long) = {
          //获取到当前任务编号信息(以此来代表分区)
          val subtask: Long = getRuntimeContext.getIndexOfThisSubtask
          (subtask, value)
        }
      }
    )
    // 3.2使用rebalance解决数据倾斜问题
    val rebalanceDs: DataSet[Long] = filterDs.rebalance()

    val rebDs: DataSet[(Long, Long)] = rebalanceDs.map(
      new RichMapFunction[Long, (Long, Long)] { //rich为代表的富函数可以获取到运行的上下文对象
        //自己定义map逻辑,
        override def map(value: Long): (Long, Long) = {
          //获取到当前任务编号信息(以此来代表分区)
          val subtask: Long = getRuntimeContext.getIndexOfThisSubtask
          (subtask, value)
        }
      }
    )
    // 4 输出 (如果直接打印无需进行启动,执行execute)
    subTaskDs.print()
    println("================================")
    rebDs.print()
    // 5 执行
  }
}

总结:

reblance通过轮询的方式大致保证每个分区的数据量时均衡分布(不一定完全均衡)。

技术点:

1 通过subtask编号来代指分区,需要通过上下文件对象获取到subtask编号

2 算子普通方法中无法获取运行时上下文,需要通过富函数来获取,富函数就是flink提供的拥有更丰富权限的一类方法;

在这里插入图片描述

flink dataset 分区策略

hash分区:根据key的hash值进行分区

range分区:根据key的范围进行分区的划分

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.common.operators.Order
import org.apache.flink.api.scala.{ExecutionEnvironment, _}
import org.apache.flink.core.fs.FileSystem

import scala.collection.mutable
import scala.util.Random

/*
演示flink dataset的分区策略
 */


object PartitionDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 演示效果明显,设置并行度为2,设置全局并行度为2
    env.setParallelism(2)
    //2 source 加载数据
    val datas = new mutable.MutableList[(Int, Long, String)]
    datas.+=((1, 1L, "Hello"))
    datas.+=((2, 2L, "Hello"))
    datas.+=((3, 2L, "Hello"))
    datas.+=((4, 3L, "Hello"))
    datas.+=((5, 3L, "Hello"))
    datas.+=((6, 3L, "hehe"))
    datas.+=((7, 4L, "hehe"))
    datas.+=((8, 4L, "hehe"))
    datas.+=((9, 4L, "hehe"))
    datas.+=((10, 4L, "hehe"))
    datas.+=((11, 5L, "hehe"))
    datas.+=((12, 5L, "hehe"))
    datas.+=((13, 5L, "hehe"))
    datas.+=((14, 5L, "hehe"))
    datas.+=((15, 5L, "hehe"))
    datas.+=((16, 6L, "hehe"))
    datas.+=((17, 6L, "hehe"))
    datas.+=((18, 6L, "hehe"))
    datas.+=((19, 6L, "hehe"))
    datas.+=((20, 6L, "hehe"))
    datas.+=((21, 6L, "hehe"))
    val orginalDs: DataSet[(Int, Long, String)] = env.fromCollection(Random.shuffle(datas))

    // 3 转换 使用分区规则进行分区
    //3.1 hash分区
    val hashDs: DataSet[(Int, Long, String)] = orginalDs.partitionByHash(_._3)
    //3.2 range 分区
    val rangeDs: DataSet[(Int, Long, String)] = orginalDs.partitionByRange(_._1)
    //对分区进行排序 sortpartition
    val sortDs: DataSet[(Int, Long, String)] = hashDs.sortPartition(_._2, Order.ASCENDING)

    // 4 输出 (如果直接打印无需进行启动,执行execute)
    hashDs.writeAsText("e:/data/partitionbyhash/", FileSystem.WriteMode.OVERWRITE)
    rangeDs.writeAsText("e:/data/partitionbyrange", FileSystem.WriteMode.OVERWRITE)

    sortDs.writeAsText("e:/data/sortpartition", FileSystem.WriteMode.OVERWRITE)
    // 5 执行
    env.execute()
  }
}

使用partitionbyhash可以对数据集按照指定字段的hash值进行数据分分散,

partitionbyrange可以对数据集按照指定字段的范围进行划分;

sortpartition就是分区内的数据按照指定字段以及指定规则进行排序操作

minby/maxby

求取最小或者最大值

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.ExecutionEnvironment

import scala.collection.mutable
import scala.util.Random
import org.apache.flink.api.scala._

/*
演示flink minby,maxby操作
 */
/*
计算每个学科下最大和最小成绩
 */

object MinMaxDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    // 演示效果明显,设置并行度为2,设置全局并行度为2
    val scores = new mutable.MutableList[(Int, String, Double)]
    scores.+=((1, "yuwen", 90.0))
    scores.+=((2, "shuxue", 20.0))
    scores.+=((3, "yingyu", 30.0))
    scores.+=((4, "wuli", 40.0))
    scores.+=((5, "yuwen", 50.0))
    scores.+=((6, "wuli", 60.0))
    scores.+=((7, "yuwen", 70.0))
    val scoreDs = env.fromCollection(Random.shuffle(scores))

    // 3 转换  计算每个学科下最大和最小成绩
    val groupDs: GroupedDataSet[(Int, String, Double)] = scoreDs.groupBy(1)
    //调用min
    val aggMinDs: AggregateDataSet[(Int, String, Double)] = groupDs.min(2)
//调用minby
    val minByDs: DataSet[(Int, String, Double)] = groupDs.minBy(2)
    // 4 输出 (如果直接打印无需进行启动,执行execute)
    aggMinDs.print()
    println("====================")
    minByDs.print()
    // 5 执行
//    env.execute()
  }
}

注意:

min也可以计算最小值,但是返回的数据是包含最小值字段的数据,但是有可能其它字段是不正确的,所以想要获取最小值要使用minby,max与maxby同理。

cross 笛卡尔积

参考代码:

package cn.itcast.batch.transformation

import org.apache.flink.api.scala.{ExecutionEnvironment, _}

import scala.collection.mutable
import scala.util.Random

/*
演示flink cross笛卡尔积
 */
/*

 */

object CrossDemo {
  def main(args: Array[String]): Unit = {

    // 1 获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2 source 加载数据
    val students = new mutable.MutableList[(Int, String)]
    //学生
    students.+=((1, "张三"))
    students.+=((2, "李四"))
    students.+=((3, "王五"))
    students.+=((4, "赵六"))

    val subjects = new mutable.MutableList[(Int, String)]
    //课程
    subjects.+=((1, "Java"))
    subjects.+=((2, "Python"))
    subjects.+=((3, "前端"))
    subjects.+=((4, "大数据"))
    val stuDs = env.fromCollection(Random.shuffle(students))
    val subjectDs = env.fromCollection(Random.shuffle(subjects))
    // 3 转换 进行笛卡尔积操作,类似偏函数(scala)
    val resDs: DataSet[(Int, String, Int, String)] = stuDs.cross(subjectDs) {
      (item1, item2) => {
        (item1._1, item1._2, item2._1, item2._2)
      }
    }


    // 4 输出 (如果直接打印无需进行启动,执行execute)
    resDs.print()
    // 5 执行
  }
}

sink

参考代码

package cn.itcast.batch.sink

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._

/*
演示flink dataset 中的sink operator
 */
object SinkDemo {
  def main(args: Array[String]): Unit = {
    //1 创建入口对象,获取运行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
    //2 source
    val wordsDs: DataSet[String] = env.fromElements("spark", "hadoop", "flink")

    // 3 转换

    // 4 sink (如果是print无需执行下面的execute)

    // 4.1  基于集合
    //标准输出
//    wordsDs.print()
//    println("==========================")
//    //错误输出
//    wordsDs.printToErr() //高亮显示
//    //collect 到本地
//    println("==========================")
//
//    println(wordsDs.collect())

    // 4.2 基于文件,保存结果到文件, 可以直接调整operator的并行度
    wordsDs.setParallelism(1).writeAsText("e:/data/sink/words")
    // 5 执行
    env.execute()
  }
}

注意:

最后保存数据到文件时,如果只有一个并行度最终会保存为一个文件,如果是多个并行度最后是在文件中生成多个并行度对应的文件。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink学习day02::datasource、transforma和sink 的相关文章

随机推荐

  • Ubuntu下chgrp的用法

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 每天一个linux命令链接 http www cnblogs com peida archive 2012 12 03 2799003 html 实例1 改变文件的群组属性
  • Elasticsearch中 match、match_phrase、query_string和term的区别

    一 text字段和keyword字段的区别 以下给出一个例子 首先建立一个索引和类型 引入一个keywork的字段 PUT my index mappings products properties name type keyword 然后
  • 华为OD机试 Java 实现【计算日期到天数转换】【牛客练习题】

    一 题目描述 根据输入的日期 计算是这一年的第几天 保证年份为4位数且日期合法 二 输入描述 输入一行 每行空格分割 分别是年 月 日 三 输出描述 输出是这一年的第几天 四 Java算法源码 public static void main
  • 200行代码实现Mini ASP.NET Core

    前言 在学习ASP NET Core源码过程中 偶然看见蒋金楠老师的ASP NET Core框架揭秘 不到200行代码实现了ASP NET Core Mini框架 针对框架本质进行了讲解 受益匪浅 本文结合ASP NET Core Mini
  • 逆矩阵的概念、应用和求解

    目录 逆矩阵的概念 求解逆矩阵 应用例子 可能没有逆矩阵 求解逆 方法1 初等行运算 高斯 若尔当 求解逆 方法2 余子式 代数余子式和伴随 求解逆 方法3 程序库 逆矩阵的概念 矩阵运算中 是没有除法的 也就是不能除以一个矩阵 这时就需要
  • C++ 遍历驱动列表(应用层下)

    上代码咯 include stdafx h include
  • (android地图开发) 高德地图手势切换

    效果截图 相关布局文件 手势滑动第一个界面
  • Apache-Arrow是什么?

    Apache Arrow是什么 Arrow是一种数据存储格式 以及对这个格式的一系列API及多语言的SDK 当上层应用使用这个接口访问这些数据的时候 大家就不用在不同的私有格式之间转换 从而省去了大量的序列化和反序列化的计算资源 基础概念
  • PVE虚拟化平台之安装openEuler系统

    PVE虚拟化平台之安装openEuler系统 一 openEuler介绍 1 1 openEuler简介 1 2 openEuler的硬件要求 物理机的安装要求 虚拟机的安装要求 二 下载openEuler系统镜像 2 1 官方网址 2 2
  • MAVEN利器:一文带你了解MAVEN以及如何配置

    前言 强大的构建工具 Maven 作为Java生态系统中的重要组成部分 Maven为开发人员提供了一种简单而高效的方式来构建 管理和发布Java项目 无论是小型项目还是大型企业级应用 Maven都能帮助开发人员轻松处理依赖管理 编译 测试和
  • Jquery添加元素(append,prepend,after,before四种方法区别对比)

    jquery是一个平常比较喜欢用的js框架 因为上手比较简单吧 哈哈 下面呢 就介绍一下Jquery中如何添加元素 jquery添加元素一共有四个语句 分别是append prepend after before append的用法 这个方
  • 神经网络激活函数sigmoid relu tanh 为什么sigmoid 容易梯度消失

    什么是激活函数 为什么要用 都有什么 sigmoid ReLU softmax 的比较 如何选择 1 什么是激活函数 如下图 在神经元中 输入的 inputs 通过加权 求和后 还被作用了一个函数 这个函数就是激活函数 Activation
  • Python爬取58同城广州房源+可视化分析

    感谢关注天善智能 走好数据之路 欢迎关注天善智能 我们是专注于商业智能BI 人工智能AI 大数据分析与挖掘领域的垂直社区 学习 问答 求职一站式搞定 对商业智能BI 大数据分析挖掘 机器学习 python R等数据领域感兴趣的同学加微信 t
  • 如何在Mac电脑上编译Unity项目至iOS simulator (ipad/iphone)

    如何将Unity项目编译成iOS app 并在虚拟的ipad或者iphone上运行呢 大体步骤分为三步 使用Unity生成 xcodeproj 文件 在Xcode中运行 simulator 通过Xcode编译 xcodeproj 文件 并安
  • 结构体指针数组 内存分配 释放

    include
  • 【js】dayjs的用法

    导入dayjs库 import dayjs from dayjs 创建一个dayjs对象 const today dayjs 今天 console log today today 2023 08 28T05 19 09 296Z const
  • 如何实现人机融合的“即时聚优 ”

    即时聚优是指在某个特定时间段内 通过聚合和优选信息 为用户提供实时的 个性化的显示内容 即时聚优的目的是为了在短时间内向用户提供最有价值和最相关的信息 以提高用户体验和效率 它通常基于用户的偏好 历史行为 上下文信息等来进行内容的聚合和筛选
  • Spring Boot+AOP记录日志

    1 pom xml中加入web依赖
  • AD画PCB的时候,板子找不到了该如何应对?

    按下V F或V D 不要太惊呆了 记得不要忘了英文输入法哦 第一次按下后 怎奈唯有 WC 哈哈哈 然后写下第一篇CSDN 此文章如有不合适的地方 请多多指教 评论或私信留言 谢谢
  • flink学习day02::datasource、transforma和sink

    datasource 批处理中常见是两类source 基于集合 1 使用env fromElements 支持Tuple 自定义对象等复合形式 2 使用env fromCollection 支持多种Collection的具体类型 3 使用e