我们期待使用 Spark Streaming(带有 Flume)和带有窗口的 Spark SQL 来实现一个用例,使我们能够对一组数据执行 CEP 计算。(有关如何捕获和使用数据的信息,请参阅下文)。这个想法是使用 SQL 来执行一些符合某些条件的操作。 。基于每个传入事件批次执行查询似乎非常慢(随着它的进展)。
这里慢意味着我配置了 600 秒的窗口大小和 20 秒的批处理间隔。 (以每 2 秒 1 个输入的速度泵送数据)因此,在 10 分钟后,传入的输入将保持不变,因此执行 SQL 查询应该花费相同的时间。
但时间过去后,它开始花费更多时间并逐渐增加,因此对于大约 300 条记录, select count(*) 查询最初需要 1 秒,后来在 15 分钟后开始花费 2 到 3 秒并逐渐增加。
如果有人能提出更好的方法来实现这个用例,我将不胜感激。以下是我们为实现这一目标而执行的步骤 -
//Creating spark and streaming context
JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, 20);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream; = FlumeUtils.createStream(ssc, "localhost", 55555);
//Adding the events on window
JavaDStream<SparkFlumeEvent> windowDStream =
flumeStream.window(WINDOW_LENGTH, SLIDE_INTERVAL);
// sc is an existing JavaSparkContext.
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
windowDStream.foreachRDD(new Function<JavaRDD<SparkFlumeEvent>, Void>()
{
public Void call(JavaRDD<SparkFlumeEvent> eventsData)
throws Exception
{
long t2 = System.currentTimeMillis();
lTempTime = System.currentTimeMillis();
JavaRDD<AVEventPInt> inputRDD1 = eventsData.map(new Function<SparkFlumeEvent, AVEventPInt>()
{
@Override
public AVEventPInt call(SparkFlumeEvent eventsData) throws Exception
{
...
return avevent;
}
});
DataFrame schemaevents = sqlContext.createDataFrame(inputRDD1, AVEventPInt.class);
schemaevents.registerTempTable("avevents" + lTempTime);
sqlContext.cacheTable("avevents" + lTempTime);
// here the time taken by query is increasing gradually
long t4 = System.currentTimeMillis();
Long lTotalEvent = sqlContext.sql("SELECT count(*) FROM avevents" + lTempTime).first().getLong(0);
System.out.println("time for total event count: " + (System.currentTimeMillis() - t4) / 1000L + " seconds \n");
sqlContext.dropTempTable("avevents" + lTempTime);
sqlContext.clearCache();
return null;
}
});
例如,假设我们想要根据日志级别确定一段时间内的事件计数。在 SQL 中,我们会发出以下形式的查询:
SELECT level, COUNT(1) from ambari GROUP BY level
但是使用 Scala Data Frame API,您可以发出以下查询:
ambari.groupBy("level").count()
此时,可以使用与本机 SQL 非常接近的东西进行查询,例如:
sqlContext.sql("SELECT level, COUNT(1) from ambari group by level")
这将返回与 DataFrame API 中返回的数据结构相同的数据结构。返回的数据结构本身就是一个数据框。
此时,还没有执行:数据帧上的操作被映射到 RDD 上的适当操作(在本例中)
RDD.groupBy(...).aggregateByKey(...))
我们可以通过对结果执行collect() 来强制执行,将执行结果放入驱动程序内存中。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)