大家好,首先我知道这个线程的存在,Spark 中的任务仅在一个执行器上运行 https://stackoverflow.com/questions/53425983/task-is-running-on-only-one-executor-in-spark.
但这不是我的情况,因为我正在使用repartition(n)
在我的数据框上。
基本上,我通过 Spark 从 ElasticSearch 索引中获取数据来加载 DataFrame,如下所示:
spark = SparkSession.builder \
.appName("elastic") \
.master("yarn")\
.config('spark.submit.deployMode','client')\
.config("spark.jars",pathElkJar) \
.enableHiveSupport() \
.getOrCreate()
es_reader = (spark.read
.format("org.elasticsearch.spark.sql")
.option("es.read.field.include",includeFieldsString)
.option("es.query",q)
.option("es.nodes",elasticClusterIP)
.option("es.port",port)
.option("es.resource",indexNameTable)
.option("es.nodes.wan.only" , 'true')
.option("es.net.ssl", 'true')
.option("es.net.ssl.cert.allow.self.signed", "true")
.option("es.net.http.auth.user" ,elkUser )
.option("es.net.http.auth.pass" , elkPassword)
.option("es.read.metadata", "false")
.option("es.read.field.as.array.include","system_auth_hostname")
#.option("es.mapping.exclude", "index")
#.option("es.mapping.id", "_id")
#.option("es.read.metadata._id","_id")
#.option("delimiter", ",")
#.option("inferSchema","true")
#.option("first_row_is_header","true")
)
df = es_reader.load()
默认情况下,YARN 正确地向我的应用程序添加了 2 个执行程序,因为我没有指定其他内容。从 ElasticSearch 加载数据时,DF 未分区,因此我运行以下命令来检查执行器行为:
df = df.repartition(2)
print('Number of partitions: {}'.format(df.rdd.getNumPartitions()))
>> Number of partitions: 2
df.count()
我期望从 Spark UI 中看到两个执行器都在处理count()
任务,但是我得到了一个奇怪的行为,我完成了三个任务,没有重新分区,它是一个操作的两个任务,其中第一个任务和较长的任务仅由一个执行程序运行,如下图所示:Count()-两个Executor-两个分区 https://i.stack.imgur.com/RGeis.png.
如果我从已经分区为两个的配置单元表中保存和加载(操作系统:linux/windows),事情就会按预期进行:
df.write.mode('overwrite').format("parquet").partitionBy('OS').saveAsTable('test_executors')
df2 = spark.read.load("path")
df2.count()
在这种情况下,我得到以下信息:Count()-twoExecutor-twoPartitions_loadFromHiveTable https://i.stack.imgur.com/S3SRy.png
我在哪里得到两个执行者同时工作的期望行为count()
task.
问题似乎出在repartition(n)
我认为正确地分区了 DF,我检查过df.rdd.getNumPartitions()
but doesn't执行者之间的并行工作。
如果有必要,我可以提供附图中任务的详细信息。提前致谢!