从 Spark-Shell (pyspark) 查询 Spark 流应用程序

2024-04-29

我正在关注这个example http://cdn2.hubspot.net/hubfs/438089/notebooks/spark2.0/Structured%20Streaming%20using%20Python%20DataFrames%20API.html in the pyspark控制台一切正常。

之后我将其编写为 PySpark 应用程序,如下所示:

# -*- coding: utf-8 -*-

import sys

import click

import logging

from pyspark.sql import SparkSession

from pyspark.sql.types import *


@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
    spark = SparkSession \
            .builder \
            .master(master)\
            .appName("stream-test")\
            .getOrCreate()

    spark.sparkContext.setLogLevel('ERROR')

    some_schema = ....  # Schema removed 

    some_stream    = spark\
                     .readStream\
                     .option("sep", ",")\
                     .schema(some_schema)\
                     .option("maxFilesPerTrigger", 1)\
                     .csv("/data/some_stream", header=True)

    streaming_counts = (
        linkage_stream.groupBy(some_stream.field_1).count()
    )

    query = streaming_counts.writeStream\
                            .format("memory")\
                            .queryName("counts")\
                            .outputMode("complete")\
                            .start()



    query.awaitTermination()

if __name__ == "__main__":
    logging.getLogger("py4j").setLevel(logging.ERROR)
    most_idiotic_bi_query()

该应用程序执行如下:

spark-submit test_stream.py --master spark://master:7077

现在,如果我在另一个终端中打开一个新的 Spark 驱动程序:

pyspark --master spark://master:7077

并尝试运行:

spark.sql("select * from counts")

它失败了:

During handling of the above exception, another exception occurred:

AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()

/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
    541         [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    542         """
--> 543         return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
    544 
    545     @since(2.0)

/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: 'Table or view not found: counts; line 1 pos 14'

我不明白发生了什么事。


这是预期的行为。如果你检查文档 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks对于内存接收器:

输出作为内存表存储在内存中。支持附加和完整输出模式。这应该用于低数据量的调试目的,因为收集了整个输出并存储在驾驶员的内存中。因此,请谨慎使用。

正如您所看到的,内存接收器不会创建持久表或全局临时视图,而是创建仅限于驱动程序的本地结构。因此无法从另一个 Spark 应用程序查询它。

因此,必须从写入内存的驱动程序中查询内存输出。例如你可以模仿console模式如下图。

一个虚拟作家:

import pandas as pd
import numpy as np
import tempfile
import shutil

def producer(path):
    temp_path = tempfile.mkdtemp()

    def producer(i):
        df = pd.DataFrame({
          "group": np.random.randint(10, size=1000)
        }) 
        df["val"] = (
            np.random.randn(1000) + 
            np.random.random(1000) * df["group"] + 
            np.random.random(1000) * i % 7
        )
        f = tempfile.mktemp(dir=temp_path)
        df.to_csv(f, index=False)
        shutil.move(f, path)
    return producer

火花应用:

from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField

schema = StructType([
   StructField("group", IntegerType()),
   StructField("val", DoubleType())
])

path = tempfile.mkdtemp()
query_name = "foo"

stream = (spark.readStream
    .schema(schema)
    .format("csv")
    .option("header", "true")
    .load(path))

query = (stream
    .groupBy("group")
    .avg("val")
    .writeStream
    .format("memory")
    .queryName(query_name)
    .outputMode("complete")
    .start())

以及一些事件:

from rx import Observable

timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())

query.awaitTermination()
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 Spark-Shell (pyspark) 查询 Spark 流应用程序 的相关文章

随机推荐

  • Spring-MVC 3.1:如何映射带有尾部斜杠的 URL?

    我正在将旧版 servlet 应用程序转换为 Spring 3 1 在此过程中 一些 URL 现在已过时 我们的网络存在一些问题 短期内不会得到解决 我的老板不想相信他们的重定向将始终有效 因此 她要求我将自己的重定向放入网络应用程序中 一
  • 为现有数据库/sql 视图创建 django 模型?

    我已在 template dir sql someTableName sql 文件中插入视图的定义 创建或替换视图 所以每次我运行syncdb 创建数据库视图 我可以在 models py 中创建一个访问该视图的 python 类吗 使用
  • 如何进行不区分大小写的字符串比较?

    如何使下面的行不区分大小写 drUser Enrolled enrolledUsers FindIndex x gt x Username string drUser Username 1 今天早些时候我得到了一些建议 建议我使用 x Us
  • 将静态链接的 elf 二进制文件转换为动态链接的

    我有一个 elf 二进制文件 它已静态链接到 libc 我无权访问其 C 代码 我想使用 OpenOnload 库 它在用户空间中实现了套接字 因此与标准 libc 版本相比提供了更低的延迟 OpenOnload 实现标准套接字 api 并
  • CSS 网格行垂直溢出其容器

    我想在页面上有一个网格布局 其中网格延伸到整个视口 并且行具有最小高度 最简单的示例是具有单个单元格的网格 请参见下面的代码片段 我遇到的问题是 当视口的高度小于定义的最小行高时 该行垂直溢出其容器 通过在下面的示例中添加红色和绿色边框 可
  • iPhone 相机访问权限?

    我想知道如何访问 iPhone 相机并实时使用它 例如 仅在相机视图上绘图 另一个相关问题 可以显示吗同时 4 个摄像机视图就像 Mac 上的 Photo Booth 一样 您可以使用 AVFoundation 来做到这一点 void in
  • 从布伦特里汇款

    我使用 Braintree 作为我的网站的付款方式 收款时没问题 但现在我想将钱转入特定客户帐户 不退款 请帮我 谢谢你 Full disclosure I work at Braintree If you have any further
  • 如何让wildfly localhost连接自动变成https?

    我需要在本地主机上使用 https 协议测试我的 Web 应用程序 我怎样才能在wildfly上配置设置以使https localhost 8443 myapp html works New 我将其添加到我的安全领域
  • 使用 Google 地图对 geoJson 文件中的点进行自定义标记

    我使用 GeoJSON 作为 Google 地图的数据源 我使用 API v3 创建数据层 如下所示
  • Node.js 中的缓冲区是什么?

    正如您可以在有关 Buffer 类的 Node js 文档 http nodejs org api buffer html 一个缓冲区 类似于整数数组 但对应于 V8 堆外部的原始内存分配 到目前为止 一切都很好 现在让我困惑的是 从技术上
  • 查找其他列表项中列表项的列表索引

    我有一个长字符串列表 我想获取与另一个列表中的字符串子字符串匹配的列表元素的索引 使用列表理解可以轻松检查列表项是否包含列表中的单个字符串 例如这个问题 https stackoverflow com questions 4843158 c
  • AJAX 加载 WordPress 内容

    我一直在遵循 AJAX 教程来尝试将我的 WordPress 帖子内容加载到我网站的主页上 而无需重新加载页面 我不知道为什么 但是当单击链接时 它仍然导航到页面 而不是将内容加载到我指定的 div 中 不管怎样 这对我来说有点太多了 我希
  • [UIScreen mainScreen].bounds 与 [UIApplcation sharedApplication].keyWindow.bounds?

    我认为我想覆盖整个屏幕 我想将其框架设置为覆盖整个屏幕 浏览堆栈溢出 我发现设置视图框架覆盖屏幕的两种不同方法 UIScreen mainScreen bounds UIApplcation sharedApplication keyWin
  • 为什么 XDebug 忽略 NetBeans 6.8 中的断点?

    我在 Ubuntu 10 04 笔记本电脑上运行 PHP 5 3 2 Apache 2 2 14 和 xdebug 2 2 0rc1 并且尝试在 Netbeans 6 8 中的本地主机上设置调试 我的问题是我在 Netbeans 中设置的断
  • 尝试从 assetForURL:resultBlock 内部分配 __block ALAsset 时出错:

    我正在尝试创建一个方法 该方法将返回给定资产 url 的 ALAsset 我需要稍后上传资产 并希望在结果块之外执行此操作 ALAsset assetForPhoto Photo photo ALAssetsLibrary library
  • 如何在 NSUserDefaults 中存储 Swift 枚举值

    我有一个这样的枚举 enum Environment case Production case Staging case Dev 我想在 NSUserDefaults 中保存一个实例 如下所示 func saveEnvironment en
  • 如何使用 forEach 删除列表中的元素?

    var people alex jason matt people forEach function p if p length gt 4 REMOVE THIS PERSON or pop it out of the list or wh
  • 从恐慌中恢复的程序未按预期退出

    根据我的理解 当恐慌恢复时 我期望程序退出并表现出正常行为 但事实并非如此 我期望最后一行打印 程序结束 是正确的吗 如果出现运行时错误 它不会被打印 是吗 package main import fmt func main defer f
  • 最快的 SQL Server 协议?

    最快的 SQL Server 连接协议是什么 相关 哪些协议可以远程使用 哪些可以本地使用 这会影响最快协议的选择吗 VIA 这是最快的 SQL 协议 它在专用硬件上运行 并用于执行 SQL Server 基准测试记录 请注意 VIA 协议
  • 从 Spark-Shell (pyspark) 查询 Spark 流应用程序

    我正在关注这个example http cdn2 hubspot net hubfs 438089 notebooks spark2 0 Structured 20Streaming 20using 20Python 20DataFrame