所有执行器均已死亡 MinHash LSH PySpark approxSimilarityJoin EMR 集群上的自连接

2024-02-12

在 (name_id, name) 组合的数据帧上调用 Spark 的 MinHashLSH 的 approxSimilarityJoin 时,我遇到了问题。

我尝试解决的问题的摘要:

我有一个包含大约 3000 万个公司名称唯一(name_id、name)组合的数据框。其中一些名称指的是同一家公司,但 (i) 拼写错误,和/或 (ii) 包含其他名称。对每个组合执行模糊字符串匹配是不可能的。为了减少模糊字符串匹配组合的数量,我在 Spark 中使用 MinHashLSH。我的预期方法是使用具有相对较大 Jaccard 阈值的 approxSimilarityJoin(自连接),这样我就能够对匹配的组合运行模糊匹配算法,以进一步改善歧义消除。

我所采取的步骤摘要:

  1. 使用 CountVectorizer 为每个名称创建字符计数向量,
  2. Used MinHashLSH and its approxSimilarityJoin with the following settings:
    • 哈希表数=100
    • 阈值=0.3(approxSimilarityJoin 的杰卡德阈值)
  3. 在 approxSimilarityJoin 之后,我删除重复的组合(其中认为存在匹配的组合 (i,j) 和 (j,i),然后删除 (j,i))
  4. 删除重复的组合后,我使用 FuzzyWuzzy 包运行模糊字符串匹配算法,以减少记录数量并提高名称的歧义性。
  5. 最终,我在剩余的边 (i,j) 上运行连接组件算法,以匹配哪些公司名称属于一起。

使用的部分代码:

    id_col = 'id'
    name_col = 'name'
    num_hastables = 100
    max_jaccard = 0.3
    fuzzy_threshold = 90
    fuzzy_method = fuzz.token_set_ratio

    # Calculate edges using minhash practices
    edges = MinHashLSH(inputCol='vectorized_char_lst', outputCol='hashes', numHashTables=num_hastables).\
        fit(data).\
        approxSimilarityJoin(data, data, max_jaccard).\
        select(col('datasetA.'+id_col).alias('src'),
               col('datasetA.clean').alias('src_name'),
               col('datasetB.'+id_col).alias('dst'),
               col('datasetB.clean').alias('dst_name')).\
        withColumn('comb', sort_array(array(*('src', 'dst')))).\
        dropDuplicates(['comb']).\
        rdd.\
        filter(lambda x: fuzzy_method(x['src_name'], x['dst_name']) >= fuzzy_threshold if x['src'] != x['dst'] else False).\
        toDF().\
        drop(*('src_name', 'dst_name', 'comb'))

解释一下计划edges

== Physical Plan ==
*(5) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
+- Exchange hashpartitioning(datasetA#232, datasetB#263, 200)
   +- *(4) HashAggregate(keys=[datasetA#232, datasetB#263], functions=[])
      +- *(4) Project [datasetA#232, datasetB#263]
         +- *(4) BroadcastHashJoin [entry#233, hashValue#234], [entry#264, hashValue#265], Inner, BuildRight, (UDF(datasetA#232.vectorized_char_lst, datasetB#263.vectorized_char_lst) < 0.3)
            :- *(4) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#225) AS datasetA#232, entry#233, hashValue#234]
            :  +- *(4) Filter isnotnull(hashValue#234)
            :     +- Generate posexplode(hashes#225), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#225], false, [entry#233, hashValue#234]
            :        +- *(1) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#225]
            :           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
            :                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
            :                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
            :                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
            :                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
            :                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                   +- Exchange hashpartitioning(name#11, 200)
            :                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
            :                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
            :                                            +- Exchange RoundRobinPartitioning(8)
            :                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
            :                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>
            +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false], input[2, vector, true]))
               +- *(3) Project [named_struct(id, id#10, name, name#11, clean, clean#90, char_lst, char_lst#95, vectorized_char_lst, vectorized_char_lst#107, hashes, hashes#256) AS datasetB#263, entry#264, hashValue#265]
                  +- *(3) Filter isnotnull(hashValue#265)
                     +- Generate posexplode(hashes#256), [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, hashes#256], false, [entry#264, hashValue#265]
                        +- *(2) Project [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107, UDF(vectorized_char_lst#107) AS hashes#256]
                           +- InMemoryTableScan [char_lst#95, clean#90, id#10, name#11, vectorized_char_lst#107]
                                 +- InMemoryRelation [id#10, name#11, clean#90, char_lst#95, vectorized_char_lst#107], StorageLevel(disk, memory, deserialized, 1 replicas)
                                       +- *(4) Project [id#10, name#11, pythonUDF0#114 AS clean#90, pythonUDF2#116 AS char_lst#95, UDF(pythonUDF2#116) AS vectorized_char_lst#107]
                                          +- BatchEvalPython [<lambda>(name#11), <lambda>(<lambda>(name#11)), <lambda>(<lambda>(name#11))], [id#10, name#11, pythonUDF0#114, pythonUDF1#115, pythonUDF2#116]
                                             +- SortAggregate(key=[name#11], functions=[first(id#10, false)])
                                                +- *(3) Sort [name#11 ASC NULLS FIRST], false, 0
                                                   +- Exchange hashpartitioning(name#11, 200)
                                                      +- SortAggregate(key=[name#11], functions=[partial_first(id#10, false)])
                                                         +- *(2) Sort [name#11 ASC NULLS FIRST], false, 0
                                                            +- Exchange RoundRobinPartitioning(8)
                                                               +- *(1) Filter AtLeastNNulls(n, id#10,name#11)
                                                                  +- *(1) FileScan csv [id#10,name#11] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:<path>, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:string,name:string>

How data looks:

+-------+--------------------+--------------------+--------------------+--------------------+
|     id|                name|               clean|            char_lst| vectorized_char_lst|
+-------+--------------------+--------------------+--------------------+--------------------+
|3633038|MURATA MACHINERY LTD|    MURATA MACHINERY|[M, U, R, A, T, A...|(33,[0,1,2,3,4,5,...|
|3632811|SOCIETE ANONYME D...|SOCIETE ANONYME D...|[S, O, C, I, E, T...|(33,[0,1,2,3,4,5,...|
|3632655|FUJIFILM CORPORATION|            FUJIFILM|[F, U, J, I, F, I...|(33,[3,10,12,13,2...|
|3633318|HEINE OPTOTECHNIK...|HEINE OPTOTECHNIK...|[H, E, I, N, E,  ...|(33,[0,1,2,3,4,5,...|
|3633523|SUNBEAM PRODUCTS INC|    SUNBEAM PRODUCTS|[S, U, N, B, E, A...|(33,[0,1,2,4,5,6,...|
|3633300|           HIVAL LTD|               HIVAL|     [H, I, V, A, L]|(33,[2,3,10,11,21...|
|3632657|             NSK LTD|                 NSK|           [N, S, K]|(33,[5,6,16],[1.0...|
|3633240|REHABILITATION IN...|REHABILITATION IN...|[R, E, H, A, B, I...|(33,[0,1,2,3,4,5,...|
|3632732|STUDIENGESELLSCHA...|STUDIENGESELLSCHA...|[S, T, U, D, I, E...|(33,[0,1,2,3,4,5,...|
|3632866|ENERGY CONVERSION...|ENERGY CONVERSION...|[E, N, E, R, G, Y...|(33,[0,1,3,5,6,7,...|
|3632895|ERGENICS POWER SY...|ERGENICS POWER SY...|[E, R, G, E, N, I...|(33,[0,1,3,4,5,6,...|
|3632897| MOLI ENERGY LIMITED|         MOLI ENERGY|[M, O, L, I,  , E...|(33,[0,1,3,5,7,8,...|
|3633275| NORDSON CORPORATION|             NORDSON|[N, O, R, D, S, O...|(33,[5,6,7,8,14],...|
|3633256|  PEROXIDCHEMIE GMBH|       PEROXIDCHEMIE|[P, E, R, O, X, I...|(33,[0,3,7,8,9,11...|
|3632695|      POWER CELL INC|          POWER CELL|[P, O, W, E, R,  ...|(33,[0,1,7,8,9,10...|
|3633037|        ERGENICS INC|            ERGENICS|[E, R, G, E, N, I...|(33,[0,3,5,6,8,9,...|
|3632878|  FORD MOTOR COMPANY|          FORD MOTOR|[F, O, R, D,  , M...|(33,[1,4,7,8,13,1...|
|3632573|    SAFT AMERICA INC|        SAFT AMERICA|[S, A, F, T,  , A...|(33,[0,1,2,3,4,6,...|
|3632852|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632698|   KRUPPKOPPERS GMBH|        KRUPPKOPPERS|[K, R, U, P, P, K...|(33,[0,6,7,8,12,1...|
|3633150|ALCAN INTERNATION...| ALCAN INTERNATIONAL|[A, L, C, A, N,  ...|(33,[0,1,2,3,4,5,...|
|3632761|AMERICAN TELEPHON...|AMERICAN TELEPHON...|[A, M, E, R, I, C...|(33,[0,1,2,3,4,5,...|
|3632757|HITACHI KOKI COMP...|        HITACHI KOKI|[H, I, T, A, C, H...|(33,[1,2,3,4,7,9,...|
|3632836|HUGHES AIRCRAFT C...|     HUGHES AIRCRAFT|[H, U, G, H, E, S...|(33,[0,1,2,3,4,6,...|
|3633152|            SOSY INC|                SOSY|        [S, O, S, Y]|(33,[6,7,18],[2.0...|
|3633052|HAMAMATSU PHOTONI...|HAMAMATSU PHOTONI...|[H, A, M, A, M, A...|(33,[1,2,3,4,5,6,...|
|3633450|       AKZO NOBEL NV|          AKZO NOBEL|[A, K, Z, O,  , N...|(33,[0,1,2,5,7,10...|
|3632713| ELTRON RESEARCH INC|     ELTRON RESEARCH|[E, L, T, R, O, N...|(33,[0,1,2,4,5,6,...|
|3632533|NEC ELECTRONICS C...|     NEC ELECTRONICS|[N, E, C,  , E, L...|(33,[0,1,3,4,5,6,...|
|3632562| TARGETTI SANKEY SPA| TARGETTI SANKEY SPA|[T, A, R, G, E, T...|(33,[0,1,2,3,4,5,...|
+-------+--------------------+--------------------+--------------------+--------------------+
only showing top 30 rows

使用的硬件:

  1. 主节点:m5.2xlarge 8 个 vCore、32 GiB 内存、仅 EBS 存储 EBS存储:128 GiB
  2. 从节点 (10x):m5.4xlarge 16 个 vCore、64 GiB 内存、仅 EBS 存储 EBS存储:500 GiB

使用的 Spark 提交设置:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.shuffle.partitions=2000" --conf "spark.executor.cores=4" --conf "spark.executor.memory=14g" --conf "spark.driver.memory=14g" --conf "spark.driver.maxResultSize=14g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py

Web UI 中的任务错误

ExecutorLostFailure (executor 21 exited caused by one of the running tasks) Reason: Slave lost
ExecutorLostFailure (executor 31 exited unrelated to the running tasks) Reason: Container marked as failed: container_1590592506722_0001_02_000002 on host: ip-172-31-47-180.eu-central-1.compute.internal. Exit status: -100. Diagnostics: Container released on a *lost* node.

(部分)执行者日志:


20/05/27 16:29:09 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:29:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:15 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:17 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (0  time so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:28 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:29:33 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:29:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (1  time so far)
20/05/27 16:29:42 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:29:46 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:29:53 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:29:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (2  times so far)
20/05/27 16:30:00 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:30:05 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:10 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:15 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (3  times so far)
20/05/27 16:30:19 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:30:22 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:29 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:32 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (4  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:30:39 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:30:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:30:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (5  times so far)
20/05/27 16:30:55 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:30:59 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:31:03 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (6  times so far)
20/05/27 16:31:13 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:31:22 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:24 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (7  times so far)
20/05/27 16:31:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:31:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:31:41 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:31:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (8  times so far)
20/05/27 16:31:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:31:48 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:32:02 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (9  times so far)
20/05/27 16:32:04 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:08 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:32:19 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:20 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:21 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (10  times so far)
20/05/27 16:32:26 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:32:37 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (11  times so far)
20/05/27 16:32:38 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:32:45 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:32:51 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:32:56 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (12  times so far)
20/05/27 16:32:58 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:03 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:33:08 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:13 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (13  times so far)
20/05/27 16:33:15 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:20 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:33:26 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:30 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:33:31 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (14  times so far)
20/05/27 16:33:36 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:33:46 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:33:47 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:33:51 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (15  times so far)
20/05/27 16:33:54 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:34:03 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:04 INFO ShuffleExternalSorter: Thread 146 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:08 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (16  times so far)
20/05/27 16:34:14 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:34:16 INFO PythonUDFRunner: Times: total = 774701, boot = 3, init = 10, finish = 774688
20/05/27 16:34:21 INFO ShuffleExternalSorter: Thread 147 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (17  times so far)
20/05/27 16:34:30 INFO PythonUDFRunner: Times: total = 773372, boot = 2, init = 9, finish = 773361
20/05/27 16:34:32 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:34:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (18  times so far)
20/05/27 16:34:46 INFO ShuffleExternalSorter: Thread 89 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:34:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (19  times so far)
20/05/27 16:35:01 INFO PythonUDFRunner: Times: total = 776905, boot = 3, init = 11, finish = 776891
20/05/27 16:35:05 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (20  times so far)
20/05/27 16:35:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (21  times so far)
20/05/27 16:35:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (22  times so far)
20/05/27 16:35:52 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (23  times so far)
20/05/27 16:36:10 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (24  times so far)
20/05/27 16:36:29 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (25  times so far)
20/05/27 16:36:47 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (26  times so far)
20/05/27 16:37:06 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (27  times so far)
20/05/27 16:37:25 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (28  times so far)
20/05/27 16:37:44 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (29  times so far)
20/05/27 16:38:03 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (30  times so far)
20/05/27 16:38:22 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (31  times so far)
20/05/27 16:38:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (32  times so far)
20/05/27 16:38:59 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (33  times so far)
20/05/27 16:39:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (34  times so far)
20/05/27 16:39:39 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (35  times so far)
20/05/27 16:39:58 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (36  times so far)
20/05/27 16:40:18 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (37  times so far)
20/05/27 16:40:38 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (38  times so far)
20/05/27 16:40:57 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (39  times so far)
20/05/27 16:41:16 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (40  times so far)
20/05/27 16:41:35 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (41  times so far)
20/05/27 16:41:55 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1988.0 MB to disk (42  times so far)
20/05/27 16:42:19 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (43  times so far)
20/05/27 16:42:41 INFO ShuffleExternalSorter: Thread 145 spilling sort data of 1992.0 MB to disk (44  times so far)
20/05/27 16:42:59 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL TERM
20/05/27 16:42:59 INFO DiskBlockManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Shutdown hook called
20/05/27 16:42:59 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1590592506722_0001/spark-73af8e3b-f428-47d4-9e13-fed4e19cc2cd
2020-05-27T16:41:16.336+0000: [GC (Allocation Failure) 2020-05-27T16:41:16.336+0000: [ParNew: 272234K->242K(305984K), 0.0094375 secs] 9076907K->8804915K(13188748K), 0.0094895 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:34.686+0000: [GC (Allocation Failure) 2020-05-27T16:41:34.686+0000: [ParNew: 272242K->257K(305984K), 0.0084179 secs] 9076915K->8804947K(13188748K), 0.0084840 secs] [Times: user=0.09 sys=0.01, real=0.01 secs] 
2020-05-27T16:41:35.145+0000: [GC (Allocation Failure) 2020-05-27T16:41:35.145+0000: [ParNew: 272257K->1382K(305984K), 0.0095541 secs] 9076947K->8806073K(13188748K), 0.0096080 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.077+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.077+0000: [ParNew: 273382K->2683K(305984K), 0.0097177 secs] 9078073K->8807392K(13188748K), 0.0097754 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:41:55.513+0000: [GC (Allocation Failure) 2020-05-27T16:41:55.513+0000: [ParNew: 274683K->3025K(305984K), 0.0093345 secs] 9079392K->8807734K(13188748K), 0.0093892 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:05.481+0000: [GC (Allocation Failure) 2020-05-27T16:42:05.481+0000: [ParNew: 275025K->4102K(305984K), 0.0092950 secs] 9079734K->8808830K(13188748K), 0.0093464 secs] [Times: user=0.12 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:18.711+0000: [GC (Allocation Failure) 2020-05-27T16:42:18.711+0000: [ParNew: 276102K->2972K(305984K), 0.0098928 secs] 9080830K->8807700K(13188748K), 0.0099510 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:36.493+0000: [GC (Allocation Failure) 2020-05-27T16:42:36.493+0000: [ParNew: 274972K->3852K(305984K), 0.0094324 secs] 9079700K->8808598K(13188748K), 0.0094897 secs] [Times: user=0.11 sys=0.00, real=0.01 secs] 
2020-05-27T16:42:40.880+0000: [GC (Allocation Failure) 2020-05-27T16:42:40.880+0000: [ParNew: 275852K->2568K(305984K), 0.0111794 secs] 9080598K->8807882K(13188748K), 0.0112352 secs] [Times: user=0.13 sys=0.00, real=0.01 secs] 
Heap
 par new generation   total 305984K, used 261139K [0x0000000440000000, 0x0000000454c00000, 0x0000000483990000)
  eden space 272000K,  95% used [0x0000000440000000, 0x000000044fc82cf8, 0x00000004509a0000)
  from space 33984K,   7% used [0x00000004509a0000, 0x0000000450c220a8, 0x0000000452ad0000)
  to   space 33984K,   0% used [0x0000000452ad0000, 0x0000000452ad0000, 0x0000000454c00000)
 concurrent mark-sweep generation total 12882764K, used 8805314K [0x0000000483990000, 0x0000000795e63000, 0x00000007c0000000)
 Metaspace       used 77726K, capacity 79553K, committed 79604K, reserved 1118208K
  class space    used 10289K, capacity 10704K, committed 10740K, reserved 1048576K

执行者截图 https://i.stack.imgur.com/MsKzB.png

我尝试过的:

  • 改变spark.sql.shuffle.partitions
  • 改变spark.default.parallelism
  • 重新分区数据框

我该如何解决这个问题?

提前致谢!

Thijs


@lokk3r 的回答确实帮助我找到了正确的方向。然而,在我能够无错误地运行该程序之前,我还必须做一些其他事情。我将分享它们来帮助遇到类似问题的人:

  • 首先,我用过NGrams正如 @lokk3r 建议的那样,不要仅使用单个字符,以避免 MinHashLSH 算法内出现极端的数据偏差。当使用 4 克时,data好像:
+------------------------------+-------+------------------------------+------------------------------+------------------------------+
|                          name|     id|                         clean|                   ng_char_lst|           vectorized_char_lst|
+------------------------------+-------+------------------------------+------------------------------+------------------------------+
|     SOCIETE ANONYME DITE SAFT|3632811|     SOCIETE ANONYME DITE SAFT|[  S O C, S O C I, O C I E,...|(1332,[64,75,82,84,121,223,...|
|          MURATA MACHINERY LTD|3633038|              MURATA MACHINERY|[  M U R, M U R A, U R A T,...|(1332,[55,315,388,437,526,5...|
|HEINE OPTOTECHNIK GMBH AND ...|3633318|    HEINE OPTOTECHNIK GMBH AND|[  H E I, H E I N, E I N E,...|(1332,[23,72,216,221,229,34...|
|          FUJIFILM CORPORATION|3632655|                      FUJIFILM|[  F U J, F U J I, U J I F,...|(1332,[157,179,882,1028],[1...|
|          SUNBEAM PRODUCTS INC|3633523|              SUNBEAM PRODUCTS|[  S U N, S U N B, U N B E,...|(1332,[99,137,165,175,187,1...|
| STUDIENGESELLSCHAFT KOHLE MBH|3632732| STUDIENGESELLSCHAFT KOHLE MBH|[  S T U, S T U D, T U D I,...|(1332,[13,14,23,25,43,52,57...|
|REHABILITATION INSTITUTE OF...|3633240|REHABILITATION INSTITUTE OF...|[  R E H, R E H A, E H A B,...|(1332,[20,44,51,118,308,309...|
|           NORDSON CORPORATION|3633275|                       NORDSON|[  N O R, N O R D, O R D S,...|(1332,[45,88,582,1282],[1.0...|
|     ENERGY CONVERSION DEVICES|3632866|     ENERGY CONVERSION DEVICES|[  E N E, E N E R, N E R G,...|(1332,[54,76,81,147,202,224...|
|           MOLI ENERGY LIMITED|3632897|                   MOLI ENERGY|[  M O L, M O L I, O L I  ,...|(1332,[438,495,717,756,1057...|
|    ERGENICS POWER SYSTEMS INC|3632895|        ERGENICS POWER SYSTEMS|[  E R G, E R G E, R G E N,...|(1332,[6,10,18,21,24,35,375...|
|                POWER CELL INC|3632695|                    POWER CELL|[  P O W, P O W E, O W E R,...|(1332,[6,10,18,35,126,169,3...|
|            PEROXIDCHEMIE GMBH|3633256|                 PEROXIDCHEMIE|[  P E R, P E R O, E R O X,...|(1332,[326,450,532,889,1073...|
|            FORD MOTOR COMPANY|3632878|                    FORD MOTOR|[  F O R, F O R D, O R D  ,...|(1332,[156,158,186,200,314,...|
|                  ERGENICS INC|3633037|                      ERGENICS|[  E R G, E R G E, R G E N,...|(1332,[375,642,812,866,1269...|
|              SAFT AMERICA INC|3632573|                  SAFT AMERICA|[  S A F, S A F T, A F T  ,...|(1332,[498,552,1116],[1.0,1...|
|   ALCAN INTERNATIONAL LIMITED|3632598|           ALCAN INTERNATIONAL|[  A L C, A L C A, L C A N,...|(1332,[20,434,528,549,571,7...|
|             KRUPPKOPPERS GMBH|3632698|                  KRUPPKOPPERS|[  K R U, K R U P, R U P P,...|(1332,[664,795,798,1010,114...|
|       HUGHES AIRCRAFT COMPANY|3632752|               HUGHES AIRCRAFT|[  H U G, H U G H, U G H E,...|(1332,[605,632,705,758,807,...|
|AMERICAN TELEPHONE AND TELE...|3632761|AMERICAN TELEPHONE AND TELE...|[  A M E, A M E R, M E R I,...|(1332,[19,86,91,126,128,134...|
+------------------------------+-------+------------------------------+------------------------------+------------------------------+

请注意,我在名称上添加了前导和尾随空格,以确保名称中的单词顺序对于NGrams: 'XX YY'有 3 克'XX ', 'X Y', ' YY', while 'YY XX'有 3 克'YY ', 'Y X', ' XX'。这意味着两者共享 6 个唯一值中的 0 个NGrams。如果我们使用前导和尾随空格:' XX YY '有 3 克' XX', 'XX ', 'X Y', ' YY', 'YY ', while ' YY XX '有 3 克' YY', 'YY ', 'Y X', ' XX', 'XX '。这意味着两者共享 6 个独特项中的 4 个NGrams。这意味着在 MinHashLSH 期间两条记录在同一个存储桶中结束的可能性要大得多。

  • 我尝试了不同的值n- 输入参数NGrams。我发现两者n=2 and n=3仍然会产生如此多的数据偏差,以至于一些 Spark 作业花费的时间太长,而其他作业则在几秒钟内完成。所以你最终会在程序继续之前等待很久。我现在用n=4,这仍然会产生很大的偏差,但它是可行的。

  • 为了进一步减少数据倾斜的影响,我使用了一些额外的过滤来过滤过于(不)频繁发生的数据NGrams in the CountVectorizerSpark 的方法。我已经设定minDF=2这样它就过滤掉了NGrams仅以一个名称出现的情况。我这样做是因为你无法根据NGram无论如何,这种情况只出现在一个名字中。另外,我设置maxDF=0.001这样它就过滤掉了NGrams出现在超过 0.1% 的名称中。这意味着对于大约 3000 万个名字来说,NGrams出现频率超过 30000 个名称的名称将被过滤掉。我认为出现得太频繁了NGram无论如何都不会提供有关哪些名称可以匹配的有用信息。

  • 我通过过滤掉非拉丁(扩展)名称,将唯一名称的数量(首先是 3000 万)减少到 1500 万。我注意到(例如阿拉伯语和中文)字符也会导致数据出现很大的偏差。由于我主要对消除这些公司名称的歧义不感兴趣,因此我从数据集中忽略了它们。我使用以下正则表达式匹配进行过滤:

re.fullmatch('[\u0020-\u007F\u00A0-\u00FF\u0100-\u017F\u0180-\u024F]+'.encode(), string_to_filter.encode())
  • 这是一个有点直接的建议,但我因为没有看到它而遇到了一些问题。确保在将数据集提供给数据集之前对数据集运行过滤器MinHashLSH算法过滤掉没有的记录NGrams由于设置而剩余minDF and maxDF或者只是因为它是一个小名字。显然这不适用于MinHashLSH算法。

  • 最后,关于设置spark-submit命令和 EMR 集群的硬件设置,我发现我不需要像论坛上的一些答案所建议的那样更大的集群。所有上述更改使程序在集群上完美运行,并使用我原来的帖子中提供的设置。减少spark.shuffle.partitions, the spark.driver.memoryspark.driver.maxResultSize大大提高了程序的运行时间。这spark-submit我提交的是:

spark-submit --master yarn --conf "spark.executor.instances=40" --conf "spark.default.parallelism=640" --conf "spark.executor.cores=4" --conf "spark.executor.memory=12g" --conf "spark.driver.memory=8g" --conf "spark.driver.maxResultSize=8g" --conf "spark.dynamicAllocation.enabled=false" --packages graphframes:graphframes:0.7.0-spark2.4-s_2.11 run_disambiguation.py
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

所有执行器均已死亡 MinHash LSH PySpark approxSimilarityJoin EMR 集群上的自连接 的相关文章

随机推荐

  • 设置 PEP 代理

    我一直在研究 PEP Proxy Steelskin 以便我可以为我的 Orion Context 提供一些安全层 但是 有一些问题阻碍了我的进展 我想使用 IDM 和 Keystone 全局实例 我已按照相应的指示成功安装了 pepPro
  • PHP GD imagecreatefromjpeg 无法处理大尺寸图像?

    我的项目是当我自动上传图像时我的程序将创建拇指大小 如果图片大小约为 1024x768 我的程序可以正常工作 但是当我上传大小为 1576x2379 的图片时 显示如下错误 允许的内存大小 8388608 字节已耗尽 尝试分配 1576 字
  • SwiftUI 导航到 NavigationView 堆栈的底部

    我进行了以下设置 其中父视图包含NavigationView它显示一系列页面 A B 和 C 在页面 C 上有一个隐藏导航视图的按钮 我想要它 以便当再次显示导航视图时 它会自动导航到页面 A 但是我不确定如何使用 SwiftUI 执行此操
  • d3.js:具有多个 y 轴值的数据集数组

    我是 d3 js 的初学者 所以请友善 考虑这个 jsbin 示例 http jsbin com edatol 1 edit 我有以下数据集 var dataset d3 time hour utc offset now 5 1 10 d3
  • 如何将多个文件复制到docker数据卷中

    这听起来可能微不足道 但我找不到一种简单的方法将多个文件复制到 Docker 卷的根文件夹中 我正在使用Ubuntu仙尼尔 16 04 and 泊坞窗1 12 1 例如 如果我有一个带有卷的 Ubuntu 容器 my data docker
  • 使用 Supervisord 运行 PostgreSQL

    我想在 Ubuntu 10 04 上使用 Supervisor 运行 PostgreSQL 9 1 目前 我使用 init 脚本手动启动 PostgreSQL etc init d postgresql start 根据这篇文章 http
  • 类型错误:push() 不是一个函数

    我正在尝试将一个项目推送到数组 但它不起作用 当我运行代码时 我收到此错误 未捕获的类型错误 data allItems type push 不是函数 var data allItems exp inc totals exp 0 inc 0
  • 如何在后台运行 Solr Jetty

    我正在使用 Solr 附带的 Jetty Solr 构建 并且希望在后台而不是在终端中运行它 现在我开始它java jar start jar但我希望它记录到一个文件并在服务器的后台运行 以便我可以关闭终端窗口 我确信有一些我找不到的 ja
  • 使用PyInstaller将.py和.txt文件封装成.exe文件

    我有 2 个文件想要放入 exe 文件中 其中一个文件是 py 另一个是 txt我找不到如何获取多个文件并将其转换为可执行文件 请帮忙 您应该将 txt 文件放在一个文件夹中 然后将文件夹的名称放在 标签中 pyinstaller onef
  • 使用 Javascript 访问 CSS 自定义变量

    我在 css 文件中有以下样式 galleryImages position absolute top 24px left 41px width 900px moving false 当我尝试通过 Javascript 访问它时 它返回未定
  • JMeter(活动?)FTP 到 VLTrader

    情况 我正在使用 JMeter 来加载测试我的通信应用程序 Cleo VLTrader 我是 JMeter 的新手 并且能够使 HTTP 通信工作 但不能使 FTP 工作 当我尝试使用 JMeter FTP 请求采样器时 我可以在服务器端看
  • Nodejs如何为每个请求设置内容类型标头

    我想知道如何设置标题 Content Type application json 对于每个进来的 Nodejs Express 请求 我尝试了这两行 但如果我自己不添加标头 我的调用仍然失败 app use function req res
  • 如何创建一个Looper线程,然后立即向其发送消息?

    我有一个工作线程位于后台 处理消息 像这样的事情 class Worker extends Thread public volatile Handler handler actually private of course public v
  • Clojure 中的结构共享

    我不清楚 Clojure 中的结构共享 下面是一个函数 xconj 取自 Joy of Clojure 顺便说一句 很棒的书 Building a naive binary search tree using recursion defn
  • 如何禁用
     块的 Prettier 以便保留新行(换行符)?                
                

    prettier config js module exports arrowParens always bracketSpacing true endOfLine auto printWidth 180 semi true singleQ
  • 如何使用请求模块缓冲 HTTP 响应?

    我想将 HTTP 响应的内容流式传输到变量 我的目标是通过获取图像request 并将其存储在 MongoDB 中 但图像总是损坏 这是我的代码 request http google com doodle png function err
  • 如何在使用 boto3 create_presigned_post 时添加元数据?

    想要将自定义元数据添加到我上传的文件中create presigned post来自boto3 我正在运行以下代码 但收到 403 响应 下面的代码借用自here https boto3 amazonaws com v1 documenta
  • 如何将 BytesIO 与 matplotlib 和 pyqt5 一起使用?

    我在 matplotlib 中制作了一个图表 并希望将其放入图像中并在我的 pyqt5 应用程序中使用它 有人建议我为此使用 BytesIO 到目前为止 这是我的代码 绘制我的图表 plt axis equal buff io BytesI
  • 应该是“安排-断言-行动-断言”吗?

    关于经典的测试模式安排 执行 断言 http c2 com cgi wiki ArrangeActAssert 我经常发现自己在 Act 之前添加了反断言 这样我就知道传递的断言确实是作为操作的结果传递的 我认为它类似于红绿重构中的红色 只
  • 所有执行器均已死亡 MinHash LSH PySpark approxSimilarityJoin EMR 集群上的自连接

    在 name id name 组合的数据帧上调用 Spark 的 MinHashLSH 的 approxSimilarityJoin 时 我遇到了问题 我尝试解决的问题的摘要 我有一个包含大约 3000 万个公司名称唯一 name id n