实现一个java UDF并从pyspark调用它

2024-04-29

我需要创建一个在 pyspark python 中使用的 UDF,它使用 java 对象进行内部计算。

如果它是一个简单的 python 我会做类似的事情:

def f(x):
    return 7
fudf = pyspark.sql.functions.udf(f,pyspark.sql.types.IntegerType())

并使用以下方式调用它:

df = sqlContext.range(0,5)
df2 = df.withColumn("a",fudf(df.id)).show()

然而,我需要的功能的实现是在java中而不是在python中。我需要以某种方式包装它,这样我就可以从 python 中以类似的方式调用它。

我的第一次尝试是实现 java 对象,然后将其包装在 pyspark 中的 python 中,并将其转换为 UDF。由于序列化错误而失败。

Java代码:

package com.test1.test2;

public class TestClass1 {
    Integer internalVal;
    public TestClass1(Integer val1) {
        internalVal = val1;
    }
    public Integer do_something(Integer val) {
        return internalVal;
    }    
}

pyspark代码:

from py4j.java_gateway import java_import
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
java_import(sc._gateway.jvm, "com.test1.test2.TestClass1")
a = sc._gateway.jvm.com.test1.test2.TestClass1(7)
audf = udf(a,IntegerType())

error:

---------------------------------------------------------------------------
Py4JError                                 Traceback (most recent call last)
<ipython-input-2-9756772ab14f> in <module>()
      4 java_import(sc._gateway.jvm, "com.test1.test2.TestClass1")
      5 a = sc._gateway.jvm.com.test1.test2.TestClass1(7)
----> 6 audf = udf(a,IntegerType())

/usr/local/spark/python/pyspark/sql/functions.py in udf(f, returnType)
   1595     [Row(slen=5), Row(slen=3)]
   1596     """
-> 1597     return UserDefinedFunction(f, returnType)
   1598 
   1599 blacklist = ['map', 'since', 'ignore_unicode_prefix']

/usr/local/spark/python/pyspark/sql/functions.py in __init__(self, func, returnType, name)
   1556         self.returnType = returnType
   1557         self._broadcast = None
-> 1558         self._judf = self._create_judf(name)
   1559 
   1560     def _create_judf(self, name):

/usr/local/spark/python/pyspark/sql/functions.py in _create_judf(self, name)
   1565         command = (func, None, ser, ser)
   1566         sc = SparkContext.getOrCreate()
-> 1567         pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command, self)
   1568         ctx = SQLContext.getOrCreate(sc)
   1569         jdt = ctx._ssql_ctx.parseDataType(self.returnType.json())

/usr/local/spark/python/pyspark/rdd.py in _prepare_for_python_RDD(sc, command, obj)
   2297     # the serialized command will be compressed by broadcast
   2298     ser = CloudPickleSerializer()
-> 2299     pickled_command = ser.dumps(command)
   2300     if len(pickled_command) > (1 << 20):  # 1M
   2301         # The broadcast will have same life cycle as created PythonRDD

/usr/local/spark/python/pyspark/serializers.py in dumps(self, obj)
    426 
    427     def dumps(self, obj):
--> 428         return cloudpickle.dumps(obj, 2)
    429 
    430 

/usr/local/spark/python/pyspark/cloudpickle.py in dumps(obj, protocol)
    644 
    645     cp = CloudPickler(file,protocol)
--> 646     cp.dump(obj)
    647 
    648     return file.getvalue()

/usr/local/spark/python/pyspark/cloudpickle.py in dump(self, obj)
    105         self.inject_addons()
    106         try:
--> 107             return Pickler.dump(self, obj)
    108         except RuntimeError as e:
    109             if 'recursion' in e.args[0]:

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in dump(self, obj)
    222         if self.proto >= 2:
    223             self.write(PROTO + chr(self.proto))
--> 224         self.save(obj)
    225         self.write(STOP)
    226 

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    566         write(MARK)
    567         for element in obj:
--> 568             save(element)
    569 
    570         if id(obj) in memo:

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/usr/local/spark/python/pyspark/cloudpickle.py in save_function(self, obj, name)
    191         if islambda(obj) or obj.__code__.co_filename == '<stdin>' or themodule is None:
    192             #print("save global", islambda(obj), obj.__code__.co_filename, modname, themodule)
--> 193             self.save_function_tuple(obj)
    194             return
    195         else:

/usr/local/spark/python/pyspark/cloudpickle.py in save_function_tuple(self, func)
    234         # create a skeleton function object and memoize it
    235         save(_make_skel_func)
--> 236         save((code, closure, base_globals))
    237         write(pickle.REDUCE)
    238         self.memoize(func)

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save_tuple(self, obj)
    552         if n <= 3 and proto >= 2:
    553             for element in obj:
--> 554                 save(element)
    555             # Subtle.  Same as in the big comment below.
    556             if id(obj) in memo:

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    284         f = self.dispatch.get(t)
    285         if f:
--> 286             f(self, obj) # Call unbound method with explicit self
    287             return
    288 

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save_list(self, obj)
    604 
    605         self.memoize(obj)
--> 606         self._batch_appends(iter(obj))
    607 
    608     dispatch[ListType] = save_list

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in _batch_appends(self, items)
    637                 write(MARK)
    638                 for x in tmp:
--> 639                     save(x)
    640                 write(APPENDS)
    641             elif n:

/home/mendea3/anaconda2/lib/python2.7/pickle.pyc in save(self, obj)
    304             reduce = getattr(obj, "__reduce_ex__", None)
    305             if reduce:
--> 306                 rv = reduce(self.proto)
    307             else:
    308                 reduce = getattr(obj, "__reduce__", None)

/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    811         answer = self.gateway_client.send_command(command)
    812         return_value = get_return_value(
--> 813             answer, self.gateway_client, self.target_id, self.name)
    814 
    815         for temp_arg in temp_args:

/usr/local/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     43     def deco(*a, **kw):
     44         try:
---> 45             return f(*a, **kw)
     46         except py4j.protocol.Py4JJavaError as e:
     47             s = e.java_exception.toString()

/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JError(
    311                     "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
--> 312                     format(target_id, ".", name, value))
    313         else:
    314             raise Py4JError(

Py4JError: An error occurred while calling o18.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
    at py4j.Gateway.invoke(Gateway.java:252)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)

EDIT:我也尝试使java类可序列化,但没有成功。

我的第二次尝试是首先在 java 中定义 UDF,但失败了,因为我不确定如何正确包装它:

java代码: 包com.test1.test2;

import org.apache.spark.sql.api.java.UDF1;

public class TestClassUdf implements UDF1<Integer, Integer> {

    Integer retval;

    public TestClassUdf(Integer val) {
        retval = val;
    }

    @Override
    public Integer call(Integer arg0) throws Exception {
        return retval;
    }   
}

但我该如何使用它呢? 我试过:

from py4j.java_gateway import java_import
java_import(sc._gateway.jvm, "com.test1.test2.TestClassUdf")
a = sc._gateway.jvm.com.test1.test2.TestClassUdf(7)
dfint = sqlContext.range(0,15)
df = dfint.withColumn("a",a(dfint.id))

但我得到:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-514811090b5f> in <module>()
      3 a = sc._gateway.jvm.com.test1.test2.TestClassUdf(7)
      4 dfint = sqlContext.range(0,15)
----> 5 df = dfint.withColumn("a",a(dfint.id))

TypeError: 'JavaObject' object is not callable

我尝试使用 a.call 而不是 a:

df = dfint.withColumn("a",a.call(dfint.id))

但得到: -------------------------------------------------- ------------------------ 类型错误回溯(最近一次调用最后一次) 在 () 3 a = sc._gateway.jvm.com.test1.test2.TestClassUdf(7) 4 dfint = sqlContext.range(0,15) ----> 5 df = dfint.withColumn("a",a.call(dfint.id))

/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
    796     def __call__(self, *args):
    797         if self.converters is not None and len(self.converters) > 0:
--> 798             (new_args, temp_args) = self._get_args(args)
    799         else:
    800             new_args = args

/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in _get_args(self, args)
    783                 for converter in self.gateway_client.converters:
    784                     if converter.can_convert(arg):
--> 785                         temp_arg = converter.convert(arg, self.gateway_client)
    786                         temp_args.append(temp_arg)
    787                         new_args.append(temp_arg)

/usr/local/spark/python/lib/py4j-0.9-src.zip/py4j/java_collections.py in convert(self, object, gateway_client)
    510         HashMap = JavaClass("java.util.HashMap", gateway_client)
    511         java_map = HashMap()
--> 512         for key in object.keys():
    513             java_map[key] = object[key]
    514         return java_map

TypeError: 'Column' object is not callable

任何帮助,将不胜感激。


我在以下人员的帮助下完成了这个工作你自己的另一个问题(和答案) https://stackoverflow.com/questions/35868276/wrapping-a-java-function-in-pyspark关于 UDAF。

Spark 提供了一个udf()包装Scala的方法FunctionN,所以我们可以将 Java 函数包装在 Scala 中并使用它。您的 Java 方法需要是静态的或者位于一个类上implements Serializable.

package com.example

import org.apache.spark.sql.UserDefinedFunction
import org.apache.spark.sql.functions.udf

class MyUdf extends Serializable {
  def getUdf: UserDefinedFunction = udf(() => MyJavaClass.MyJavaMethod())
}

PySpark 中的用法:

def my_udf():
    from pyspark.sql.column import Column, _to_java_column, _to_seq
    pcls = "com.example.MyUdf"
    jc = sc._jvm.java.lang.Thread.currentThread() \
        .getContextClassLoader().loadClass(pcls).newInstance().getUdf().apply
    return Column(jc(_to_seq(sc, [], _to_java_column)))

rdd1 = sc.parallelize([{'c1': 'a'}, {'c1': 'b'}, {'c1': 'c'}])
df1 = rdd1.toDF()
df2 = df1.withColumn('mycol', my_udf())

与您其他问题和答案中的 UDAF 一样,我们可以将列传递到其中return Column(jc(_to_seq(sc, ["col1", "col2"], _to_java_column)))

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

实现一个java UDF并从pyspark调用它 的相关文章

  • Python:“直接”调用方法是否实例化对象?

    我是 Python 新手 在对我的对象进行单元测试时 我注意到一些 奇怪 的东西 class Ape object def init self print ooook def say self s print s def main Ape
  • Jupyter笔记本突然变得很慢

    我以前在anaconda环境下运行jupyter运行得很好 显示警告后 IOPub data rate exceeded The notebook server will temporarily stop sending output to
  • 无需递归即可对可观察结果进行分页 - RxJava

    我有一个非常标准的 API 分页问题 您可以通过一些简单的递归来处理 这是一个捏造的例子 public Observable
  • 数据库中的持久日期不等于检索日期

    我有一个具有 Date 属性的简单实体类 此属性对应于 MySQL 日期时间列 Entity public class Entity Column name start date Temporal TemporalType TIMESTAM
  • Java 8 Stream,获取头部和尾部

    Java 8 引入了Stream http download java net jdk8 docs api java util stream Stream html类似于 Scala 的类Stream http www scala lang
  • 从三点求圆心的算法是什么?

    我在圆的圆周上有三个点 pt A A x A y pt B B x B y pt C C x C y 如何计算圆心 在Processing Java 中实现它 我找到了答案并实施了一个可行的解决方案 pt circleCenter pt A
  • 如何处理 Tkinter 中的窗口关闭事件?

    如何在 Python Tkinter 程序中处理窗口关闭事件 用户单击 X 按钮 Tkinter 支持一种称为协议处理程序 http web archive org web 20201111215134 http effbot org tk
  • 如何在 Python 中跟踪日志文件?

    我想在 Python 中提供 tail F 或类似内容的输出 而无需阻塞或锁定 我找到了一些非常旧的代码来做到这一点here http code activestate com recipes 436477 filetailpy 但我认为现
  • 文本视图不显示全文

    我正在使用 TableLayout 和 TableRow 创建一个简单的布局 其中包含两个 TextView 这是代码的一部分
  • RuntimeError(f"目录 '{directory}' 不存在") RuntimeError: 目录 'app/static' 不存在

    当我运行 server py 文件时出现错误 File C Users nawin AppData Local Programs Python Python38 lib site packages starlette staticfiles
  • 在 for 循环中访问 itertools 产品的元素

    我有一个列表列表 是附加 itertools 产品的一些其他结果的结果 我想要的是能够使用 for 循环访问列表列表中列表的每个元素 但我无法访问所有元素 我只能访问最后一个列表的元素 结果是一个非常巨大的列表列表 例如 1 2 4 3 6
  • 使用 Sphinx 时,如何记录没有文档字符串的成员?

    我正在为我发布的包编写文档 我发现您的文档越全面 人们就越容易找到您的包来使用 废话 实际上 我在充满爱心地编写代码的所有功能和细节方面获得了很多乐趣 然而 我对如何为类级变量编写与 Sphinx 兼容的文档感到完全困惑 特别是 我有一些e
  • 我所有的 java 应用程序现在都会抛出 java.awt.headlessException

    所以几天前我有几个工作Java应用程序使用Swing图书馆 JFrame尤其 他们都工作得很好 现在他们都抛出了这个异常 java awt headlessexception 我不知道是什么改变了也许我的Java版本不小心更新了 谢谢你尽你
  • 如何使用logging.conf文件使用RotatingFileHandler将所有内容记录到文件中?

    我正在尝试使用RotatingHandler用于 Python 中的日志记录目的 我将备份文件保留为 500 个 这意味着我猜它将创建最多 500 个文件 并且我设置的大小是 2000 字节 不确定建议的大小限制是多少 如果我运行下面的代码
  • Android ScrollView,检查当前是否滚动

    有没有办法检查标准 ScrollView 当前是否正在滚动 方向是向上还是向下并不重要 我只需要检查它当前是否正在滚动 ScrollView当前形式不提供用于检测滚动事件的回调 有两种解决方法可用 1 Use a ListView并实施On
  • 使用 Numpy 进行多维批量图像卷积

    在图像处理和分类网络中 一个常见的任务是输入图像与一些固定滤波器的卷积或互相关 例如 在卷积神经网络 CNN 中 这是一种极其常见的操作 我已将通用版本任务减少为 Given 一批 N 个图像 N H W D 和一组 K 个滤镜 K H W
  • 如何在supervisord中设置组?

    因此 我正在设置 Supervisord 并尝试控制多个进程 并且一切正常 现在我想设置一个组 以便我可以启动 停止不同的进程集 而不是全部或全无 这是我的配置文件的片段 group tapjoy programs tapjoy game1
  • 在 python 中使用高精度时间戳

    嘿 我正在使用 python 处理日期时间 我想知道解析这个时间戳的最佳方法是什么 时间戳是ISO标准 这里是一个例子 2010 06 19T08 17 14 078685237Z 现在到目前为止我已经使用过 time datetime d
  • Java 的“&&”与“&”运算符

    我使用的示例来自 Java Herbert Schildt 的完整参考文献 第 12 版 Java 是 14 他给出了以下 2 个示例 如果阻止 第一个是好的 第二个是错误的 因此发表评论 public class PatternMatch
  • Spring 作为 JNDI 提供者?

    我想使用 Spring 作为 JNDI 提供程序 这意味着我想在 Spring 上下文中配置一个 bean 可以通过 JNDI 访问该 bean 这看起来像这样

随机推荐

  • Android - 如何创建布局选择器(如 ImageButton 选择器)

    我有一个 ImageButton 和一个包裹该按钮的 LinearLayout 如下所示
  • Rxjava 中“背压”一词是什么意思?

    我是 RxJava 的初学者 我很好奇 背压 这是否意味着生产者在背后给消费者施压 或者这是否意味着消费者正在向生产者施加压力 反方向施压 RxJava 背压 当你有一个 observable 发射物品的速度太快 以至于消费者无法跟上流量
  • 循环遍历类为“blah”的所有元素并找到最高的 id 值

    我有很多元素 例如 div class blah div 我想循环遍历所有这些并获得最高的 ID 即 123 这个怎么做 以下是正确的和最好的方法吗 blah each function var id this attr id split
  • 在 Windows 应用商店应用程序中进行模拟

    我可能不是第一个出于测试目的而在 Windows 商店应用程序中处理模拟的人 我想测试我的 ViewModel 并使用一些模拟框架来模拟它们 当然 所有可用的 通用 框架都不能在 Windows 应用商店应用程序项目中使用 我有一个想法如何
  • 返回“application/xml”而不是“text/plain”ASP.NET Core Web API

    我有一个 XML 字符串 我需要将其作为 XML 文档返回 默认情况下 返回的内容类型为text plain 内容已呈现 但我需要内容类型application xml 我启用了 RespectBrowserAcceptHeader 选项
  • C++ 是否可以延迟常量静态成员的初始化?

    我正在使用 Qt 但这是一个通用的 C 问题 我的情况很简单 我有一个课程Constants它有一个常量静态成员 我希望在进行某些函数调用后对其进行初始化 常量 h ifndef CONSTANTS H define CONSTANTS H
  • iPhone 上 NSString 的 AES 加密

    任何人都可以为我指明正确的方向 以便能够加密字符串 并返回带有加密数据的另一个字符串吗 我一直在尝试使用 AES256 加密 我想编写一种需要两个 NSString 实例的方法 一个是要加密的消息 另一个是用于加密它的 密码 我怀疑我必须生
  • 读取 C/C++ 的 EOF

    我正在使用 NetBeans MinGW 编译简单的 C 程序 我是新手 我的问题是我有这个简单的代码 include
  • 将分区扩展到另一级

    根据下图来自春季批量文档 http docs spring io spring batch reference html scalability html partitioning 主步骤被划分为六个从步骤 它们是主步骤的相同副本 我的问题
  • 计算5个城市之间的地理距离以及每个城市所有可能的组合

    所以我有一个 csv 文件 其中包含 3 列 城市 纬度 经度 我已经使用此代码从这个 csv 文件在 python 中创建了一个数据框 data pd read csv lat long csv nrows 10 Lat data lat
  • 2+3 是否被视为文字?

    假设我有类似的东西 int x 2 3 Is x被认为是字面意思吗 x是一个符号 2 3是一个表达式 2 and 3是文字
  • 如何从不知道要卸载的工件名称或工件组 ID 的脚本中执行与 mvn install 相反的操作? [复制]

    这个问题在这里已经有答案了 这听起来应该很容易 但我还没有找到答案 如果我使用 mvn install 安装一个工件 如何删除该工件 我尝试使用 dependency purge local repository 但它只删除依赖项 而不是实
  • 我的 matlab 图中需要不同的颜色

    这是我的情节代码 问题是我的图中的两条线具有相同的颜色 我需要为图中的每条线 总共 4 条线 分配一个特殊的颜色 for i 1 nFolderContents data hdrload folderContents i if size f
  • 如何在cordova中动态加载CSS

    我正在尝试通过 xhr 请求在 cordova 中动态加载 CSS CSS 的加载不是问题 我可以通过 xhr 加载它并通过 HTML5 文件 API 将其存储到文件系统 然后我就可以得到一个完美的 URL 但是如果我通过 javascri
  • 如何模拟嵌套函数?

    我想模拟特定函数中的一些嵌套函数 tools py def cpu count def get cpu quota return int load sys fs cgroup cpu cpu cfs quota us def get cpu
  • c 中的指针、双指针和三重指针[重复]

    这个问题在这里已经有答案了 可能的重复 有人可以告诉我给定代码中第二个 printf 语句中的引用流吗 https stackoverflow com questions 4638527 can someone tell me the fl
  • 无法从外部 bash 脚本正确设置 MySQL 密码

    我有两个脚本 主要的一个脚本执行一些不同的操作并调用第二个脚本 第二个脚本安装 MySQL 从我的主脚本中我做了这样的事情 read p Set the password for the database min 4 characters
  • TestNG 与 DataProvider 并行执行

    我有一个从数据提供者接收数据的测试 我希望此测试与数据提供者的不同值并行运行 我尝试了这样的方法 public class IndependentTest Test dataProvider dp1 threadPoolSize 3 inv
  • 如何检查 lat long 是否在城市范围内

    如何检查我的纬度 经度是否在城市范围内 或者例如 大伦敦包含在 bbox 0 489 51 28 0 236 51 686 Source http wiki openstreetmap org wiki Bounding Box http
  • 实现一个java UDF并从pyspark调用它

    我需要创建一个在 pyspark python 中使用的 UDF 它使用 java 对象进行内部计算 如果它是一个简单的 python 我会做类似的事情 def f x return 7 fudf pyspark sql functions