如何在 pyspark pandas_udf 中记录/打印消息?

2023-11-25

我已经测试过logger and print无法打印消息pandas_udf,无论是在集群模式还是客户端模式。

测试代码:

import sys
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import logging

logger = logging.getLogger('test')

spark = (SparkSession
.builder
.appName('test')
.getOrCreate())


df = spark.createDataFrame(pd.DataFrame({
    'y': np.random.randint(1, 10, (20,)),
    'ds': np.random.randint(1000, 9999, (20,)),
    'store_id' : ['a'] * 10 + ['b'] *7 + ['q']*3,
    'product_id' : ['c'] * 5 + ['d'] *12 + ['e']*3,
    })
)


@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])


df1 = df.groupby(['store_id', 'product_id']).apply(train_predict)

另请注意:

log4jLogger = spark.sparkContext._jvm.org.apache.log4j
LOGGER = log4jLogger.LogManager.getLogger(__name__)
LOGGER.info("#"*50)

你不能用这个pandas_udf,因为此日志超出了 Spark 上下文对象,因此您无法在 udf 中引用 Spark 会话/上下文。

我知道的唯一方法是使用Excetion正如我在下面写的答案。 但这很棘手并且有缺点。 我想知道是否有任何方法可以在 pandas_udf 中打印消息。


目前,我尝试了 Spark 2.4 中的所有方法。

如果没有日志,就很难调试有问题的 pandas_udf。我知道可以在 pandas_udf 中打印错误消息的唯一可行方法是raise Exception。所以这样调试确实很费时间,但我知道没有更好的方法了。

@pandas_udf('y int, ds int, store_id string, product_id string', PandasUDFType.GROUPED_MAP)
def train_predict(pdf):
    print('#'*100)
    logger.info('$'*100)
    logger.error('&'*100)
    raise Exception('@'*100)  # The only way I know can print message but would break execution 
    return pd.DataFrame([], columns=['y', 'ds','store_id','product_id'])

缺点是打印消息后无法保持 Spark 运行。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 pyspark pandas_udf 中记录/打印消息? 的相关文章

随机推荐