火花笛卡尔积

2023-12-04

我必须比较坐标才能获得距离。因此,我使用 sc.textFile() 加载数据并制作笛卡尔积。文本文件中大约有 2.000.000 行,因此需要比较 2.000.000 x 2.000.000 坐标。

我用大约 2000 个坐标测试了代码,几秒钟内就可以正常工作。但使用大文件似乎会在某个点停止,我不知道为什么。代码如下:

def concat(x,y):
    if(isinstance(y, list)&(isinstance(x,list))):
        return x + y
    if(isinstance(x,list)&isinstance(y,tuple)):
        return x + [y]
    if(isinstance(x,tuple)&isinstance(y,list)):
        return [x] + y
    else: return [x,y]

def haversian_dist(tuple):
    lat1 = float(tuple[0][0])
    lat2 = float(tuple[1][0])
    lon1 = float(tuple[0][2])
    lon2 = float(tuple[1][2])
    p = 0.017453292519943295
    a = 0.5 - cos((lat2 - lat1) * p)/2 + cos(lat1 * p) * cos(lat2 * p) * (1 - cos((lon2 - lon1) * p)) / 2
    print(tuple[0][1])
    return (int(float(tuple[0][1])), (int(float(tuple[1][1])),12742 * asin(sqrt(a))))

def sort_val(tuple):
    dtype = [("globalid", int),("distance",float)]
    a = np.array(tuple[1], dtype=dtype)
    sorted_mins = np.sort(a, order="distance",kind="mergesort")
    return (tuple[0], sorted_mins)


def calc_matrix(sc, path, rangeval, savepath, name):
    data = sc.textFile(path)
    data = data.map(lambda x: x.split(";"))
    data = data.repartition(100).cache()
    data.collect()
    matrix = data.cartesian(data)
    values = matrix.map(haversian_dist)
    values = values.reduceByKey(concat)
    values = values.map(sort_val)
    values = values.map(lambda x: (x[0], x[1][1:int(rangeval)].tolist()))
    values = values.map(lambda x: (x[0], [y[0] for y in x[1]]))
    dicti = values.collectAsMap()
    hp.save_pickle(dicti, savepath, name)

即使包含大约 15,000 个条目的文件也不起作用。我知道笛卡尔导致 O(n^2) 运行时间。但 Spark 不应该处理这个问题吗?或者有什么问题吗?唯一的起点是错误消息,但我不知道它是否与实际问题相关:

16/08/06 22:21:12 WARN TaskSetManager: Lost task 15.0 in stage 1.0 (TID 16, hlb0004): java.net.SocketException: Daten?bergabe unterbrochen (broken pipe)
    at java.net.SocketOutputStream.socketWrite0(Native Method)
    at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
    at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at java.io.DataOutputStream.write(DataOutputStream.java:107)
    at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
    at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:440)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:452)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:452)
    at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:280)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:239)

16/08/06 22:21:12 INFO TaskSetManager: Starting task 15.1 in stage 1.0 (TID 17, hlb0004, partition 15,PROCESS_LOCAL, 2408 bytes)
16/08/06 22:21:12 WARN TaskSetManager: Lost task 7.0 in stage 1.0 (TID 8, hlb0004): java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:209)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    at java.io.DataInputStream.readInt(DataInputStream.java:387)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:139)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.api.python.PairwiseRDD.compute(PythonRDD.scala:342)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

你用过data.collect()在您的代码中,基本上将所有数据调用到一台机器中。根据该机器上的内存,2,000,000 行数据可能不太适合。

另外,我尝试通过连接而不是使用来减少要完成的计算数量cartesian。 (请注意,我只是使用 numpy 生成随机数,这里的格式可能与您的格式不同。不过,主要思想是相同的。)

import numpy as np
from numpy import arcsin, cos, sqrt

# suppose my data consists of latlong pairs
# we will use the indices for pairing up values
data = sc.parallelize(np.random.rand(10,2)).zipWithIndex()
data = data.map(lambda (val, idx): (idx, val))

# generate pairs (e.g. if i have 3 pairs with indices [0,1,2],
# I only have to compute for distances of pairs (0,1), (0,2) & (1,2)
idxs = range(data.count())
indices = sc.parallelize([(i,j) for i in idxs for j in idxs if i < j])

# haversian func (i took the liberty of editing some parts of it)
def haversian_dist(latlong1, latlong2):
    lat1, lon1 = latlong1
    lat2, lon2 = latlong2
    p = 0.017453292519943295
    def hav(theta): return (1 - cos(p * theta))/2
    a = hav(lat2 - lat1) + cos(p * lat1)*cos(p * lat2)*hav(lon2 - lon1)
    return 12742 * arcsin(sqrt(a))

joined1 = indices.join(data).map(lambda (i, (j, val)): (j, (i, val)))
joined2 = joined1.join(data).map(lambda (j, ((i, latlong1), latlong2)): ((i,j), (latlong1, latlong2))
haversianRDD = joined2.mapValues(lambda (x, y): haversian_dist(x, y))
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

火花笛卡尔积 的相关文章

  • python future 和元组解包

    实现像使用 future 进行元组解包这样的事情的优雅 惯用的方法是什么 我有这样的代码 a b c f x y g a b z h y c 我想将其转换为使用期货 理想情况下我想写一些类似的东西 a b c ex submit f x y
  • Python模块可以访问英语词典,包括单词的定义[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找一个 python 模块 它可以帮助我从英语词典中获取单词的定义 当然有enchant 这可以帮助我检查该单词是否存在于英语中
  • 如何使用 Plotly 中的直方图将所有离群值分入一个分箱?

    所以问题是 我可以在 Plotly 中绘制直方图 其中所有大于某个阈值的值都将被分组到一个箱中吗 所需的输出 但使用标准情节Histogram类我只能得到这个输出 import pandas as pd from plotly import
  • 从 ffmpeg 获取实时输出以在进度条中使用(PyQt4,stdout)

    我已经查看了很多问题 但仍然无法完全弄清楚 我正在使用 PyQt 并且希望能够运行ffmpeg i file mp4 file avi并获取流式输出 以便我可以创建进度条 我看过这些问题 ffmpeg可以显示进度条吗 https stack
  • 通过列表理解压平列表列表

    我正在尝试使用 python 中的列表理解来展平列表 我的清单有点像 1 2 3 4 5 6 7 8 只是为了打印这个列表列表中的单个项目 我编写了这个函数 def flat listoflist for item in listoflis
  • Spark:导入UTF-8编码的文本文件

    我正在尝试处理一个包含很多特殊字符的文件 例如德语变音符号 o 等 如下所示 sc hadoopConfiguration set textinputformat record delimiter r n r n sc textFile f
  • 从零开始的 numpy 形状意味着什么

    好的 我发现数组的形状中可以包含 0 对于将 0 作为唯一维度的情况 这对我来说是有意义的 它是一个空数组 np zeros 0 但如果你有这样的情况 np zeros 0 100 让我很困惑 为什么这么定义呢 据我所知 这只是表达空数组的
  • 切片 Dataframe 时出现 KeyError

    我的代码如下所示 d pd read csv Collector Output csv df pd DataFrame data d dfa df copy dfa dfa rename columns OBJECTID Object ID
  • 使用 OLS 回归预测未来值(Python、StatsModels、Pandas)

    我目前正在尝试在 Python 中实现 MLR 但不确定如何将我找到的系数应用于未来值 import pandas as pd import statsmodels formula api as sm import statsmodels
  • 对图像块进行多重处理

    我有一个函数必须循环遍历图像的各个像素并计算一些几何形状 此函数需要很长时间才能运行 在 24 兆像素图像上大约需要 5 小时 但似乎应该很容易在多个内核上并行运行 然而 我一生都找不到一个有据可查 解释充分的例子来使用 Multiproc
  • Spark-1.6.1 上的 DMLC 的 XGBoost-4j

    我正在尝试在 Spark 1 6 1 上使用 DMLC 的 XGBoost 实现 我能够使用 XGBoost 训练我的数据 但在预测方面面临困难 我实际上想以在 Apache Spark mllib 库中完成的方式进行预测 这有助于计算训练
  • Numpy 过滤器平滑零区域

    我有一个 0 及更大整数的 2D numpy 数组 其中值代表区域标签 例如 array 9 9 9 0 0 0 0 1 1 1 9 9 9 9 0 7 1 1 1 1 9 9 9 9 0 2 2 1 1 1 9 9 9 8 0 2 2 1
  • 将 matplotlib 颜色图集中在特定值上

    我正在使用 matplotlib 颜色图 seismic 绘制绘图 并且希望白色以 0 为中心 当我在不进行任何更改的情况下运行脚本时 白色从 0 下降到 10 我尝试设置 vmin 50 vmax 50 但在这种情况下我完全失去了白色 关
  • 无法在 osx-arm64 上安装 Python 3.7

    我正在尝试使用 Conda 创建一个带有 Python 3 7 的新环境 例如 conda create n qnn python 3 7 我收到以下错误 Collecting package metadata current repoda
  • python Soap zeep模块获取结果

    我从 SOAP API 得到如下结果 client zeep Client wsdl self wsdl transport transport auth header lb E authenticate self login res cl
  • 创建嵌套字典单行

    您好 我有三个列表 我想使用一行创建一个三级嵌套字典 i e l1 a b l2 1 2 3 l3 d e 我想创建以下嵌套字典 nd a 1 d 0 e 0 2 d 0 e 0 3 d 0 e 0 b a 1 d 0 e 0 2 d 0
  • 使用yield 进行字典理解

    作为一个人为的例子 myset set a b c d mydict item yield join item s for item in myset and list mydict gives as cs bs ds a None b N
  • 默认情况下,Keras 自定义层参数是不可训练的吗?

    我在 Keras 中构建了一个简单的自定义层 并惊讶地发现参数默认情况下未设置为可训练 我可以通过显式设置可训练属性来使其工作 我无法通过查看文档或代码来解释为什么会这样 这是应该的样子还是我做错了什么导致默认情况下参数不可训练 代码 im
  • 您可以将操作直接应用于map/reduce/filter 中的参数吗?

    map and filter通常可以与列表理解互换 但是reduce并不那么容易被交换map and filter 此外 在某些情况下我仍然更喜欢函数语法 但是 当您需要对参数本身进行操作时 我发现自己正在经历语法体操 最终必须编写整个函数
  • 您可以使用关键字参数而不提供默认值吗?

    我习惯于在 Python 中使用这样的函数 方法定义 def my function arg1 None arg2 default do stuff here 如果我不供应arg1 or arg2 那么默认值None or default

随机推荐

  • fpdf“UnicodeEncodeError:'latin-1'编解码器无法对位置 88 中的字符 '\u2013' 进行编码:序数不在范围内(256)”

    我正在尝试在 Python 中将文本文件转换为 pdf 但出现错误 为什么会发生这种情况以及如何解决 这是我的代码 import fpdf from fpdf import FPDF pdf FPDF pdf add page pdf se
  • PHP:将本地时间转换为 UTC

    假设我得到一个像这样的字符串08 22 2015 10 56 PM并且该日期 时间字符串始终仅指一个特定时区 我需要能够将其转换为这种格式 Ymd THis Z 这是 iCal 格式 如何将该字符串转换为祖鲁时间并转换为 Ymd THis
  • 如何在 Xamarin iOS 上执行简单的后台任务

    在我们的应用程序中 用户可以跟踪并提交他们记录的旅程 我需要一种在 iOS 中创建任务的简单方法 我已经在 Android 上创建并测试了它 它的工作原理是 用户选择他们想要提交的旅程 点击同步并创建一个前台服务 将旅程同步到我们的 API
  • 用于仅插入/仅查询应用程序的 ORM 框架

    我已经使用 Hibernate 多年了 从来没有遇到过任何问题 但我刚刚意识到我的大部分工作都涉及 CRUD 方法 其中我需要数据保持持久化并随意修改 这样做的问题是 有人想要制作 2 个独立的应用程序 一个用于批量插入 另一个对插入的数据
  • 格式化斯坦福 Corenlp 的 NER 输出

    我正在与斯坦福 CoreNLP 合作并将其用于 NER 但是当我提取组织名称时 我看到每个单词都标有注释 因此 如果实体是 NEW YORK TIMES 那么它会被记录为三个不同的实体 NEW YORK 和 TIMES 我们是否可以在斯坦福
  • 重用PreparedStatement

    我在我们的代码库上运行了 findbugs 它指出还有两个语句仍然需要关闭 在这部分代码中 我们运行 preparedStatement connection prepareStatement query 对于3个不同的查询 重用prepa
  • 如何使用 Greasemonkey 脚本通过 XSLT 转换 XML 文件?

    我有一个搜索服务器 它提供一个测试页面 我可以在其中输入查询并以 XML 形式返回结果 我希望能够以更加用户友好的方式浏览结果 因此我开始使用 XSLT 现在我有了一个简单的样式表 可以将不知何故臃肿的 XML 转换为仅显示部分数据的简单表
  • 仅获取白色屏幕截图

    我可以读取条形码 但无法获取屏幕快照 getScreenImage 函数获取白屏 如何获取屏幕截图 包括我看到的相机视图的屏幕 谢谢 interface igViewController
  • 处理器如何读取内存?

    我正在尝试重新实现 malloc 我需要了解对齐的目的 据我了解 如果内存对齐 代码将执行得更快 因为处理器不必采取额外的步骤来恢复被剪切的内存位 我想我明白 64 位处理器读取 64 位乘 64 位内存 现在 让我们想象一下我有一个按顺序
  • 使用 BitBlt 进行的屏幕截图会在 Windows 10 上显示黑色图像

    我正在使用下面的代码来捕获当前活动窗口的屏幕截图 这段代码来自捕获屏幕截图 包括 NET 中的半透明窗口 有一些小的添加 即它使用 GetForegroundWindow 和一个计时器 以便我可以选择所需的窗口 在 Windows 10 x
  • 在 Java 8 流中捕获 UncheckedIOException

    编辑 这似乎不可能 请参阅https bugs openjdk java net browse JDK 8039910 我有一个帮助类 它提供了Stream
  • 类型错误:“datetime.date”对象没有属性“__getitem__”

    我在我的 models py 中使用 class Pedido models Model data pedido models DateField Data do pedido cliente models ForeignKey Clien
  • 谷歌地理编码不适用于数据库中带有特殊字符的地址

    我的谷歌地理编码数据库中的地址特殊字符有问题 但如果我对它们进行硬编码则不会 简单的地理编码代码 url http maps googleapis com maps api geocode json address address sens
  • TabControl 处理非活动选项卡上的控件

    我正在为我的应用程序使用 MVVM 模式 主窗口包括一个TabControl与DataContext映射到 ViewModel
  • 如何将 Lua 模块作为字符串而不是文件加载?

    我正在使用 LuaJava 和 Lua 的 C 代码 我想做的是读取在Android应用程序中存储为资源字符串的Lua源代码 以便可以执行读入的Lua源代码 我需要知道如何使用 LuaJava 或 C 语言来做到这一点 我想知道如何使用字符
  • Compact Framework 中的 MAC 地址

    如何仅使用紧凑框架获取 MAC 地址 1 4 的 OpenNETCF 代码从以下 P Invoke 调用中获取信息 DllImport iphlpapi dll SetLastError true public static extern
  • NgAnimate 页面加载 hack

    在更新 1 4 1 中 AngularJs Animate 不再像以前那样在页面加载时触发 我的旧解决方案类似对此 笨蛋 found here并一直工作到 v1 3 9
  • CSS 字体 Unicode 范围

    font face font family Nanum Barun Gothic src url NanumBarunGothic ttf unicode range U AC00 D7A3 U 1100 11FF U 3130 318F
  • 将新的拟合阶段添加到现有 PipelineModel 中,无需再次拟合

    我想将几个经过训练的管道连接到一个 这类似于 Spark 将新的拟合阶段添加到现有 PipelineModel 中 无需再次拟合 但是下面的解决方案适用于 PySpark gt pipe model new PipelineModel st
  • 火花笛卡尔积

    我必须比较坐标才能获得距离 因此 我使用 sc textFile 加载数据并制作笛卡尔积 文本文件中大约有 2 000 000 行 因此需要比较 2 000 000 x 2 000 000 坐标 我用大约 2000 个坐标测试了代码 几秒钟