如何在 Spark 2.0+ 中编写单元测试?

2024-01-30

我一直在尝试寻找一种合理的测试方法SparkSession使用 JUnit 测试框架。虽然似乎有很好的例子SparkContext,我不知道如何获得相应的示例SparkSession,即使它在内部的多个地方使用火花测试基地 https://github.com/holdenk/spark-testing-base/。如果这不是真正正确的方法,我很乐意尝试一个不使用 Spark-testing-base 的解决方案。

简单的测试用例(完整的MWE项目 https://github.com/bbarker/ProjectGists/tree/master/Scala/SparkSessionTester with build.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession


class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

使用 JUnit 运行的结果是加载线处的 NPE:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

请注意,正在加载的文件是否存在并不重要;在正确配置的 SparkSession 中,会抛出更明智的错误 https://issues.apache.org/jira/browse/SPARK-20497.


感谢您提出这个悬而未决的问题。出于某种原因,当谈到 Spark 时,每个人都过于专注于分析,以至于忘记了过去 15 年左右出现的优秀软件工程实践。这就是为什么我们在课程中重点讨论测试和持续集成(以及 DevOps 等其他内容)。

术语简介

A true单元测试意味着您可以完全控制测试中的每个组件。不能与数据库、REST 调用、文件系统甚至系统时钟进行交互;正如 Gerard Mezaros 所说,一切都必须“加倍”(例如嘲笑、存根等)xUnit 测试模式 http://www.gerardmeszaros.com/。我知道这看起来像是语义学,但它确实很重要。不理解这一点是您在持续集成中看到间歇性测试失败的主要原因之一。

我们仍然可以进行单元测试

因此,鉴于这种理解,单元测试RDD是不可能的。然而,在开发分析时仍然需要进行单元测试。

考虑一个简单的操作:

rdd.map(foo).map(bar)

Here foo and bar都是简单的函数。这些可以以正常方式进行单元测试,并且它们应该包含尽可能多的极端情况。毕竟,为什么他们关心从哪里获得输入,无论是测试装置还是测试装置?RDD?

不要忘记 Spark Shell

这不是测试per se,但在这些早期阶段,您还应该在 Spark shell 中进行试验,以找出您的转换,尤其是您的方法的结果。例如,您可以使用许多不同的功能来检查物理和逻辑查询计划、分区策略和保存以及数据的状态,例如toDebugString, explain, glom, show, printSchema, 等等。我会让你探索这些。

您还可以将您的主人设置为local[2]在 Spark shell 和测试中识别仅在开始分发工作后可能出现的任何问题。

使用 Spark 进行集成测试

现在来说说有趣的事情。

为了集成测试当您对辅助功能的质量充满信心并且RDD/DataFrame转换逻辑,做一些事情很关键(无论构建工具和测试框架如何):

  • 增加 JVM 内存。
  • 启用分叉但禁用并行执行。
  • 使用您的测试框架将 Spark 集成测试累积到套件中,并初始化SparkContext在所有测试之前并在所有测试之后停止它。

使用 ScalaTest,您可以混合使用BeforeAndAfterAll(我通常更喜欢)或BeforeAndAfterEach正如 @ShankarKoirala 所做的那样初始化和拆除 Spark 工件。我知道这是一个合理的例外,但我真的不喜欢那些可变的var但你必须使用。

贷款模式

另一种方法是使用贷款模式 https://stackoverflow.com/questions/20762240/loaner-pattern-in-scala.

例如(使用 ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
} 

正如您所看到的,贷款模式利用高阶函数来“贷款”SparkContext进行测试,然后在完成后进行处理。

面向痛苦的编程(谢谢,Nathan)

这完全是一个偏好问题,但我更喜欢使用贷款模式,并在引入另一个框架之前尽可能长时间地自己进行连接。除了试图保持轻量级之外,框架有时还会添加很多“魔力”,使调试测试失败变得难以推理。所以我采取面向痛苦的编程 http://nathanmarz.com/blog/suffering-oriented-programming.html方法——我避免添加新框架,直到无法承受没有它的痛苦。但同样,这取决于你。

该替代框架的最佳选择当然是火花测试基地 https://github.com/holdenk/spark-testing-base正如@ShankarKoirala 提到的。在这种情况下,上面的测试将如下所示:

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
    
          total shouldBe 1000
        }
      }
 }

请注意我不需要做任何事情来处理SparkContext. SharedSparkContext给了我这一切——sc as the SparkContext- 免费。就我个人而言,我不会仅仅为了这个目的而引入这种依赖关系,因为贷款模式正是我所需要的。此外,由于分布式系统存在如此多的不可预测性,当持续集成中出现问题时,必须追溯第三方库源代码中发生的魔力可能是一件非常痛苦的事情。

现在在哪里火花测试基地真正令人眼前一亮的是基于 Hadoop 的助手,例如HDFSClusterLike and YARNClusterLike。将这些特征混合在一起确实可以为您省去很多设置的麻烦。它的另一个闪光点是斯卡拉检查 https://www.scalacheck.org/- 就像属性和生成器 - 当然假设您了解基于属性的测试如何工作以及它为什么有用。但同样,我个人会推迟使用它,直到我的分析和测试达到这种复杂程度。

“只有西斯才会做到绝对。” ——欧比旺·克诺比

当然,您也不必选择其中之一。也许您可以在大多数测试中使用贷款模式方法火花测试基地仅用于一些更严格的测试。选择不是二元的;而是二元的。你可以两者都做。

与 Spark Streaming 的集成测试

最后,我只想展示一个带有内存值的 Spark Streaming 集成测试设置的片段,如果没有火花测试基地:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

这比看起来更简单。它实际上只是将一系列数据转换成一个队列以提供给DStream。其中大部分实际上只是与 Spark API 配合使用的样板设置。无论如何,您可以将其与StreamingSuiteBase 如发现于 https://github.com/holdenk/spark-testing-base/wiki/StreamingSuiteBase 火花测试基地来决定您更喜欢哪一个。

这可能是我有史以来最长的帖子,所以我将其留在这里。我希望其他人提出其他想法,通过与改进所有其他应用程序开发相同的敏捷软件工程实践来帮助提高我们的分析质量。

对无耻的插件表示歉意,您可以查看我们的课程使用 Apache Spark 进行软件工程 https://www.vidyasource.com/courses/software-engineering-with-apache-spark,我们在这里讨论了很多这样的想法以及更多。我们希望尽快推出在线版本。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 Spark 2.0+ 中编写单元测试? 的相关文章

随机推荐