Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询执行时间较长

2023-12-28

我们期待使用 Spark Streaming(带有 Flume)和带有窗口的 Spark SQL 来实现一个用例,使我们能够对一组数据执行 CEP 计算。(有关如何捕获和使用数据的信息,请参阅下文)。这个想法是使用 SQL 来执行一些符合某些条件的操作。 。基于每个传入事件批次执行查询似乎非常慢(随着它的进展)。

这里慢意味着我配置了 600 秒的窗口大小和 20 秒的批处理间隔。 (以每 2 秒 1 个输入的速度泵送数据)因此,在 10 分钟后,传入的输入将保持不变,因此执行 SQL 查询应该花费相同的时间。

但时间过去后,它开始花费更多时间并逐渐增加,因此对于大约 300 条记录, select count(*) 查询最初需要 1 秒,后来在 15 分钟后开始花费 2 到 3 秒并逐渐增加。

如果有人能提出更好的方法来实现这个用例,我将不胜感激。以下是我们为实现这一目标而执行的步骤 -

    //Creating spark and streaming context
    JavaSparkContext sc = new JavaSparkContext(sparkConf);
    JavaStreamingContext ssc = new JavaStreamingContext(sc, 20);
    JavaReceiverInputDStream<SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(ssc, "localhost", 55555);

    //Adding the events on window
    JavaDStream<SparkFlumeEvent> windowDStream =
        flumeStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);

    // sc is an existing JavaSparkContext.
    SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

    windowDStream.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>()
    {

        public Void call(JavaRDD<SparkFlumeEvent> eventsData)
        throws Exception
        {
            long t2 = System.currentTimeMillis();
            lTempTime = System.currentTimeMillis();

            JavaRDD<AVEventPInt> inputRDD1 = eventsData.map(new Function<SparkFlumeEvent, AVEventPInt>()
            {
                @Override
                public AVEventPInt call(SparkFlumeEvent eventsData) throws Exception
                {
                ...
                    return avevent;
                }
            });
            DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1, AVEventPInt.class);
            schemaevents.registerTempTable("avevents" + lTempTime);
            sqlContext.cacheTable("avevents" + lTempTime);

            // here the time taken by query is increasing gradually
            long t4 = System.currentTimeMillis();
            Long lTotalEvent = sqlContext.sql("SELECT count(*) FROM avevents" + lTempTime).first().getLong(0);
            System.out.println("time for total event count: " + (System.currentTimeMillis() - t4) / 1000L + " seconds \n");

            sqlContext.dropTempTable("avevents"  + lTempTime);
            sqlContext.clearCache();

            return null;

        }
    });

例如,假设我们想要根据日志级别确定一段时间内的事件计数。在 SQL 中,我们会发出以下形式的查询:

SELECT level, COUNT(1) from ambari GROUP BY level

但是使用 Scala Data Frame API,您可以发出以下查询:

ambari.groupBy("level").count()

此时,可以使用与本机 SQL 非常接近的东西进行查询,例如:

sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")

这将返回与 DataFrame API 中返回的数据结构相同的数据结构。返回的数据结构本身就是一个数据框。

此时,还没有执行:数据帧上的操作被映射到 RDD 上的适当操作(在本例中)

RDD.groupBy(...).aggregateByKey(...))

我们可以通过对结果执行collect() 来强制执行,将执行结果放入驱动程序内存中。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询执行时间较长 的相关文章

随机推荐

  • 执行 aws s3 cp 命令时出现全局名称“ssl”未定义错误

    我正在尝试使用 AWS CLI 命令将文件上传到 AWS S3 我使用的是RedHat 4操作系统的系统 Python版本是2 7 9 OpenSSL 版本为 0 9 8v 2012 年 4 月 19 日 我已经安装了 AWS CLI 当我
  • Code::Blocks 出现 GLUT 编译错误

    一周前 当我第一次尝试使用 GLUT 并遵循本教程 http www sci brooklyn cuny edu goetz codeblocks glut 它工作正常 没有任何错误 现在 一周后 我安装了相同的软件 相同的库 但每次我尝试
  • 将数据复制到同一个表中或从同一个表中复制数据,并将复制的数据中某一列的值更改为指定值

    我想复制 SQL Server 2008 数据库中单个表中的一些数据 并将其复制到同一个表中 并将复制数据的一列中的值更改为单个指定数字 这是一个例子 在下面的名为Metric的表中 数据是 Key Name MetricValue 112
  • 在 WebView 中检查互联网连接

    各位程序员大家好 我的 WebView 应用程序出现问题 我想在启动时添加一个 连接检查 当没有可用的 Internet 连接或 WebView 超时时 该检查会弹出错误 我无法自己完成它 因为我是 Android 编程新手 我还想添加一个
  • Java 中 HashMap 的迭代器

    我尝试在 Java 中迭代 hashmap 这应该是一件相当容易的事情 但是 以下代码给我带来了一些问题 HashMap hm new HashMap hm put 0 zero hm put 1 one Iterator iter Ite
  • 如何清除 Ruby 中的终端?

    我想知道如何在 Ruby 中做我能做的事情system clear 在C 我写了一个像这样的程序 puts amit system clear 我希望执行此命令后清除控制台 但它不起作用 如果你想要一些不太便携的东西 你可以尝试 syste
  • WCF 和 ColdFusion

    我有一个 WCF WebService 我想使用 ColdFusion 来使用 常规过程是使用 CFHTTP 来处理 WSDL 并在正文中包含 SOAP 请求 通常情况下 这是有效的 一切都工作正常
  • 将 Spring Security 3 与 cookie 结合使用

    我有一个用 Java 构建的需要授权的应用程序 但是 身份验证部分由不同 单独的应用程序 不是 Java 处理 用户登录到身份验证应用程序 该应用程序会设置 cookie 如果用户被授权访问 Java 应用程序 他们将被授权应用程序重定向到
  • 需要在 Android 中将图像发布到 Tumblr 博客

    我需要将图片发布到 Tumblr 我读了这个http www tumblr com docs en api v2 auth http www tumblr com docs en api v2 auth我开始知道我需要获取用户信息才能获取博
  • Rails Mailer Net::SMTPServerBusy

    在我的 Rails 站点上 当我尝试通过 GMail 发送邮件时 它工作得很好 但是当我尝试通过 MandrillApp 发送它时 它给出以下错误 RController create 是调用交付命令的地方 Net SMTPServerBu
  • 合并排序数组[重复]

    这个问题在这里已经有答案了 可能的重复 合并两个排序列表 https stackoverflow com questions 2348374 merging two sorted lists N路合并算法 https stackoverfl
  • 一个大的包含文件还是几个较小的包含文件?

    我正在编写一些jsp 我想知道是否最好有一个大的包含文件 我将包含在每个页面中 或者几个较小的文件 我将根据需要仅包含在某些页面上 任何给定页面最多只需要调用几个方法 我主要关心的是性能 其次是维护 我们正在使用 jsp include 指
  • 通过接口使用 Function<> ?

    我已经有一个现有的泛型类 public class Foo
  • 如何一点点构建一个Linq to Sql where子句?

    我正在参数类中传递一组查询字符串参数 用于查询图像数据库 每次调用时 某些参数可能为空 所以在 sql 中我会建立这样的查询 if parameters Value1 null sql Append sql where clause if
  • Android 位图内存问题 - ARGB_4444 与 RGB_565

    将图像加载到位图中时 哪种方法消耗更多内存 ARGB 4444 与 RGB 565 Thanks 它们占用相同数量的内存 看到这些数字了吗 它们告诉您每个分量 A R G B 的位数 将它们相加即可得到每个像素的总位数 不过 如果您不需要这
  • 使用 Win32 的带有主题的透明单选按钮控件

    我正在尝试在启用主题时仅使用 Win32 制作具有透明背景的单选按钮控件 这样做的原因是允许将单选按钮放置在图像上并显示图像 而不是灰色的默认控件背景 开箱即用的是 该控件将具有灰色的默认控件背景以及通过处理以下任一内容来更改此背景的标准方
  • 查找曲线中的增加和减少趋势 MATLAB

    a 2 3 6 7 2 1 0 01 6 8 10 12 15 18 9 6 5 4 2 这是一个数组 我需要提取增加和减少趋势开始的确切值 数组的输出a将 2 first element 2 6 9 a 2 3 6 7 2 1 0 01
  • svn check out 有问题无法读取块大小

    我遇到了以下错误 它检查了一段时间然后向我抛出一个错误 SVN 1 6 请让我知道具体应该是什么配置 REPORT of svn svn vcc default Could not read chunk size connection wa
  • 在 OnLoad 或 OnInit 期间动态添加控件?

    我想根据某些特定条件向页面添加更多控件 这些控件不需要任何 ViewState 或绑定数据 它就像静态链接一样简单 我想知道我可以在哪里编写代码 里面OnLoad or OnInit方法 为什么 如果我把它放在里面OnLoad 我应该添加以
  • Spark SQL + Window + Streaming 问题 - 使用 Spark Streaming 运行时,Spark SQL 查询执行时间较长

    我们期待使用 Spark Streaming 带有 Flume 和带有窗口的 Spark SQL 来实现一个用例 使我们能够对一组数据执行 CEP 计算 有关如何捕获和使用数据的信息 请参阅下文 这个想法是使用 SQL 来执行一些符合某些条