pyspark作为生产者发送消息(推送数据)到kafka

2023-11-09

pyspark作为生产者发送消息到kafka

网上大部分的案例都是pyspark作为消费者消费kafka的消息,但是作为生产者生产消息发送给kafka的很少,下面把pyspark如何创建数据(或读取数据)作为生产者发送消息给kafka作为案例进行分享。

pyspark创建DataFrame数据集

调用spark.createDataFrame()方法创建两条测试数据。

from pyspark.sql import SparkSession, Row

def get_data():
    """这个测试数据是创建的dataframe,如果需要可以修改成读取数据库取数据"""
    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame([
        Row(row_code='H111', msg_code='6019975', state='graft', state_num=1, decision_date='2022-07-01'),
        Row(row_code='H112', msg_code='6001458', state='adopt', state_num=2, decision_date='2022-07-15'),
    ])

创建好数据要发送给kafka,为了方便消费将数据转换成json格式,需要调用pyspark.sql.functionsto_json方法。

from pyspark.sql.functions import to_json, col, struct

json_data = to_json(struct([col(c).alias(c) for c in df.columns])).alias('value')

最后为什么要.alias('value')名称为value,因为kafka需要接受key:value数据,所以数据列命名需要是value,这个需要注意一下,要不然数据发送不过去。

发送消息到kafka

发送消息也就是推送dataframe数据到kafka,ETLConfig都是动态配置的内容,下面对每项内容配置说明:

如果kafka开启了sasl认证需要配置前三项,这个根据自己的kafka配置进行填写,因为每个kafka的配置不同这里就写具体的值了:

  • kafka.sasl.mechanism
  • kafka.security.protocol
  • kafka.sasl.jaas.config

配置kafka基础信息:

  • kafka.bootstrap.servers:配置kafka连接信息(node1:9092,node2:9092,node3:9092)
  • topic:配置接收消息的topic名

如果遇到数据量大,推送数据失败可以添加下面两个配置:

  • kafka.batch.size:批量推送消息的数量设置,大量数据时分批推送会加快速率
  • kafka.request.timeout.ms:请求超时时间设置

具体代码如下,如果运行失败参考最后的注意事项,看是否缺少jar包

# 获取上面创建的测试数据
df = get_data()

# 推送到kafka,这里集成了to_json格式转换
df.select(to_json(struct([col(c).alias(c) for c in df.columns])).alias('value'))\
    .write.format("kafka")\
    .option("kafka.sasl.mechanism", ETLConfig.KAFKA_SASL_MECHANISM) \
    .option("kafka.security.protocol", ETLConfig.KAFKA_SECURITY_PROTOCOL) \
    .option("kafka.sasl.jaas.config", ETLConfig.KAFKA_SASL_JAAS_CONFIG) \
    .option('kafka.batch.size', 5000)\
    .option("kafka.bootstrap.servers", ETLConfig.KAFKA_BOOTSTRAP_SERVERS)\
    .option('kafka.request.timeout.ms', 120000)\
    .option("topic", "lz_mm_return_warehouse").save()

到这里基本就结束了,推送数据使用了pyspark.sql.DataFrameWriter.format.save方法,通过将数据转换成kafka格式然后配置对应的连接信息即可进行推送,具体扩展使用可以自行查阅。

注意

spark发送消息到kafka需要配置的jar包,如果之前没有导入过对应的jar包会运行失败,这个需要根据你的spark版本到公开的仓库进行下载,导入spark即可:

  • spark-sql-kafka-0-10_2.11-2.4.7.jar:比如我的spark是2.4.7所以下载对应的版本
  • kafka-clients-3.1.0.jar

小结

虽然内容很简短,但也是经过很多的查阅和试验总结的成果,希望能帮助到大家。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

pyspark作为生产者发送消息(推送数据)到kafka 的相关文章

随机推荐