如何使用scala或python在apache Spark中运行多线程作业?

2024-01-12

我面临着与 Spark 并发相关的问题,这阻止了我在生产中使用它,但我知道有一种解决方法。我正在尝试使用订单历史记录对 700 万用户的 10 亿种产品运行 Spark ALS。首先,我获取不同用户的列表,然后对这些用户运行循环以获取推荐,这是一个非常缓慢的过程,需要几天时间才能为所有用户获取推荐。我尝试使用笛卡尔用户和产品来一次性获得所有人的推荐,但再次将其提供给elasticsearch,我必须为每个用户过滤和排序记录,只有这样我才能将其提供给elasticsearch以供其他API使用。

因此,请向我推荐一个在此类用例中具有相当可扩展性的解决方案,并且可以在生产中使用并提供实时建议。

这是我在 scala 中的代码片段,它可以让您了解我目前如何解决该问题:

  //    buy_values -> RDD with Rating(<int user_id>, <int product_id>, <double rating>)
  def recommend_for_user(user: Int): Unit = {
      println("Recommendations for User ID: " + user);
      // Product IDs which are not bought by user 
      val candidates = buys_values
        .filter(x => x("customer_id").toString.toInt != user)
        .map(x => x("product_id").toString.toInt)
        .distinct().map((user, _))
      // find 30 products with top rating
      val recommendations = bestModel.get
        .predict(candidates)
        .takeOrdered(30)(Ordering[Double].reverse.on(x => x.rating))

      var i = 1
      var ESMap = Map[String, String]()
      recommendations.foreach { r =>
        ESMap += r.product.toString -> bitem_ids.value(r.product)
      }
      //  push to elasticsearch with user as id
      client.execute {
        index into "recommendation" / "items" id user fields ESMap
      }.await
      // remove candidate RDD from memory
      candidates.unpersist()
  }
  // iterate on each user to get recommendations for the user [slow process]
  user_ids.foreach(recommend_for_user)

很明显,程序中的瓶颈是搜索candidates。鉴于 Spark 架构,它严重限制了并行化的能力,并通过为每个用户启动 Spark 作业增加了大量开销。

假设典型场景,700 万用户 and 十亿种产品大多数时候,您会预测整个范围的产品,减去用户已经购买的少数产品。至少在我看来,重要的问题是为什么还要费心过滤。即使你推荐以前买过的产品,真的有害吗?

除非你有非常严格的要求,否则我会简单地忽略这个问题并使用MatrixFactorizationModel.recommendProductsForUsers https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.recommendation.MatrixFactorizationModel它几乎可以为您完成所有工作(不包括数据导出)。之后,您可以执行批量导出,一切顺利。

现在假设您有明确的无重复政策。假设典型用户仅购买相对少量的产品,您可以从为每个用户获取一组产品开始:

val userProdSet = buy_values
    .map{case (user, product, _) => (user, product)} 
    .aggregateByKey(Set.empty[Int])((s, e) => s + e, (s1, s2) => s1 ++ s2)

接下来你可以简单地映射userProdSet获得预测:

// Number of predictions for each user
val nPred = 30;

userProdSet.map{case (user, prodSet) => {
    val recommended = model
         // Find recommendations for user
        .recommendProducts(_, nPred + prodSet.size))
        // Filter to remove already purchased 
        .filter(rating => !prodSet.contains(rating.product))
        // Sort and limit
        .sortBy(_.rating)
        .reverse
        .take(nPred)
    (user, recommended)
}}

您可以通过使用可变集进行聚合和广播模型来进一步改进,但这只是一般想法。

如果用户数量user_ids小于整组用户数(buy_values)你可以简单地过滤userProdSet仅保留一部分用户。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何使用scala或python在apache Spark中运行多线程作业? 的相关文章

  • 函数名称未定义

    我有一段代码 看起来像这样 if name main main def main print hello 但是 当我尝试运行此代码时 出现错误 NameError 名称 main 未定义 我是否没有在函数 def main 的第一行定义名称
  • Spark LDA 困境 - 预测和 OOM 问题

    我正在评估 Spark 1 6 0 来构建大型 数百万个文档 数百万个特征 数千个主题 LDA 模型并进行预测 这是我可以使用 Yahoo 轻松完成的任务 LDA 从小处开始 按照 Java 示例 我使用分布式模型 EM 优化器构建了 10
  • 有什么好的适用于 Google App Engine 应用程序的 AJAX 框架吗? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在尝试在我的 Google App Engine 应用程序中实现 AJAX 因此我正在寻找一个好的
  • 使用ideone时如何传入命令行参数?

    我正在使用 ideone 在线解释器 http ideone com http ideone com 来测试一些 C 和 Python 程序 如何指定命令行参数而不是使用 STDIN 输入 看起来你不能 但是快速破解应该做的伎俩 stati
  • pandas read_csv 之前预处理数据文件

    我使用 SAP 的数据输出 但它既不是 CSV 因为它不引用包含其分隔符的字符串 也不是固定宽度 因为它具有多字节字符 它是一种 固定宽度 字符 为了将其放入 pandas 我当前读取文件 获取分隔符位置 对分隔符周围的每一行进行切片 然后
  • 如何在Python + Selenium中获取元素的值

    我在我的 Python 3 6 3 代码中得到了这个 HTML 元素 作为 Selenium网页元素当然 span class ocenaCzastkowa masterTooltip style color 000000 alt 5 sp
  • Discord.py 斜线命令在 cogs 中不起作用

    我正在构建一个不和谐的机器人 并且想要在 cogs 内使用斜杠命令 但这些命令不显示或工作 这是代码 cog guild ids 858573429787066368 861507832934563851 class Slash comma
  • DataFrame.loc 的“索引器太多”

    我读了关于切片器的文档 http pandas pydata org pandas docs stable advanced html using slicers一百万次 但我从来没有理解过它 所以我仍在试图弄清楚如何使用loc切片Data
  • 无法使用Python请求会话模块登录网站

    我刚刚开始进行网络抓取 对于我的第一个项目 我尝试使用 requests Session 登录 artofproblemsolving com 并访问另一个用户的帐户 这是我的代码 import requests LOGIN URL htt
  • 对法语文本进行词形还原[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我有一些法语文本需要以某种方式进行处理 为此 我需要 首先 将文本标记为单词 然后对这些单词进行词形还原以避免多次处理相同的词根 据我
  • 如何在Python中手动对数字列表进行排序?

    规格 Ubuntu 13 04 Python 3 3 1 背景 Python的初学者 遇到了这个 手动排序 问题 我被要求做的事情 让用户输入 3 个数值并将它们存储在 3 个不同的变量中 不使用列表或排序算法 手动将这 3 个数字从小到大
  • Python FTP下载550错误

    我编写了一个 ftp 爬虫来下载特定文件 它会一直工作 直到找到要下载的特定文件 然后抛出此错误 ftplib error perm 550 该文件存在于我的下载文件夹中 但文件大小为 0 kb 我需要转换某些内容才能下载吗 我可以访问 f
  • Python GTK3 Treeview 向上或向下移动选择

    如何在树视图中向上或向下移动所选内容 我的想法是 我可以使用向上和向下按钮将选择向上移动一行或向下移动一行 我的 Treeview 使用 ListStore 不确定这是否重要 首先 我将使用我熟悉的 C 代码 如果您在将其翻译为 Pytho
  • 为什么我无法杀死 k8s pod 中的 python 进程?

    我试图杀死一个 python 进程 ps aux grep python root 1 12 6 2 1 2234740 1332316 Ssl 20 04 19 36 usr bin python3 batch run py root 4
  • 列表中的特定范围(python)

    我有一个从文本字符串中提取的整数列表 因此当我打印该列表 我称之为test I get 135 2256 1984 3985 1991 1023 1999 我想打印或制作一个仅包含特定范围内的数字的新列表 例如1000 2000之间 我尝试
  • 如何在类型提示中定义元组或列表的大小

    有没有办法在参数的类型提示中定义元组或列表的大小 目前我正在使用这样的东西 from typing import List Optional Tuple def function name self list1 List Class1 if
  • 如何将Python包从旧版本安装到新版本?

    我正在使用 python 3 7 最近在 Linux 中安装了 python 3 8 是否有任何 bash 命令或脚本可以获取 3 7 的所有软件包列表并在 3 8 版本中一一安装 我想避免每个包裹都手工完成 注意 我将它们安装在我的系统中
  • 带 Qt 的菜单栏/系统托盘应用程序

    我是 Qt PyQt 的新手 我正在尝试制作一个应用程序 其功能将从菜单栏 系统托盘执行 这里展示了一个完美的例子 我找不到关于如何做到这一点的好资源 有人可以建议吗 Thanks 我认为您正在寻找与QMenu and QMainWindo
  • 如何创建简单的梯度下降算法

    我正在研究简单的机器学习算法 从简单的梯度下降开始 但在尝试用 python 实现它时遇到了一些麻烦 这是我试图重现的示例 我获得了有关房屋的数据 居住面积 以英尺为单位 和卧室数量 以及最终的价格 居住面积 英尺2 2104 卧室 3 价
  • 如何使用 keras.backend.gradients() 获取梯度值

    我试图获得 Keras 模型的输出相对于模型输入 x 而不是权重 的导数 似乎最简单的方法是使用 keras backend 中的 梯度 它返回梯度张量 https keras io backend https keras io backe

随机推荐

  • 存储库模式 - 如何正确处理 JOIN 和复杂查询?

    我对存储库模式有疑问 如何在多个存储库之间执行 JOIN 操作 在这个项目中 我们使用了MVC EF DDD 我知道这种问题已经出现过好几次了 我稍后会在本问题中引用这些问题 在通用存储库模型 IRepository 和特定存储库模型之间
  • Python:类型错误:“NoneType”对象不可下标

    Uses python3 import sys def max dot product a b n a a sort reverse True b b sort reverse True res 0 for i in range n res
  • 在 Python 2 中按定义顺序迭代枚举

    我正在使用 python 3 4 和 python 2 7 的反向移植 Enum 功能 gt python version Python 2 7 6 gt pip install enum34 Installs version 1 0 根据
  • Swift:尝试导入 UIKit 时出错

    当我尝试在我的 swift 文件中导入 UIKit 时 我遇到了这个非常奇怪的错误 我的代码很简单 import UIKit class Test NSObject 我在 导入 UIKit 时收到的错误是 未知类型名称 导入 预期的 在顶级
  • 是否可以从 iPhone safari 浏览器获取 UDID?

    我需要从 iPhone safari 浏览器检索 UDID 或任何其他设备唯一 ID 通过 cookie 或请求 响应标头或任何其他方式 如果可以的话可以怎样做 谢谢 Z 您可以使用 mobileconfig 看http whatsmyud
  • WCF 数据服务或 RIA 服务可以用 NHibernate 实现吗?

    从何而来我在网上读到 http wildermuth com 2009 09 29 Choosing a Data Access Layer for Silverlight 3 WCF 数据服务似乎通过服务器上的实体框架绑定到数据访问 有没
  • 从 Android 活动转到主屏幕

    我正在 android 中制作一个应用程序 我想实现一个按钮 以便每当按下它时 我只需返回主屏幕 我知道我们有硬件键和软键 当没有硬件键时 可以实现此功能 但我想为此应用程序添加此功能 有人知道该怎么做吗 谢谢 Try this Inten
  • Android 方向变化

    我有一个简单的Activity called SingleTouchTest理解屏幕触摸 奇怪的是SingleTouchTest从我所处的任何方向开始 但旋转设备不会导致屏幕旋转 我的测试设备是运行 Android 4 0 3 的 Acer
  • 如何以编程方式合并具有潜在冲突的文本文件(ala git 或 svn 等)?

    作为较大项目的一部分 我希望能够获取两个文本主体并将它们交给合并算法 该算法返回自动合并结果 在更改不冲突的情况下 或抛出错误并且 可能 生成一个文本文档 其中突出显示冲突的更改 基本上 我只是想要一种编程方式来完成地球上每个源代码控制系统
  • 如何仅在精确的 url 匹配时重定向?

    我正在尝试使用 apache htaccess 进行重定向 我有以下代码 redirectMatch 301 user http clients mydomain com 它工作得很好 但我不想要 用户登录 被引导至 http client
  • JSF 中 的范围是什么?

    我曾两次看到先前浏览的页面中的变量可能会干扰或替换下游查看的页面中的变量 例如 h datatable var 那么 ui param 的范围是什么 有没有办法遏制它 它基本上在 EL 上下文中设置一个新的变量映射 也可以看看的源代码Par
  • mysql 和 mysql2 gem 有什么区别

    我曾经与mysql宝石 但现在我经常听说mysql2宝石 此外 它还包含在Gemfile默认情况下Rails 3 0 x 使用有什么区别和优点mysql2 gem 这是来自mysql2 宝石页面 https github com brian
  • 当到达 gdb 中的断点时是否可以停止所有其他线程的执行?

    那么 一旦我在某个线程中遇到断点 是否可以暂停其他线程直到我继续 在全停止模式下 当前发布版本支持的唯一模式 只要任何线程停止 由于断点或信号 GDB 就会停止所有线程 当您继续停止的线程时 所有其他线程也将继续 除非您这样做set sch
  • 如何将 EasyMock 模拟注入测试类私有字段

    我正在使用 EasyMock 创建模拟 它是测试类中的私有参数 没有设置器 之一 我尝试使用反射 但它不能正常工作 public class TestedClassTest Test public void test TestedClass
  • 如何将 Bitmap 转换为 Base64 字符串?

    我正在尝试捕获屏幕 然后将其转换为 Base64 字符串 这是我的代码 Rectangle bounds Screen GetBounds Point Empty Bitmap bitmap new Bitmap bounds Width
  • 将 SQL 注释添加到 Linq 生成的查询中,以便它在 SQL 探查器中可见

    我们想要在一个项目中使用 Linq to SQL 这是我们第一次使用 Linq 通常我们只使用存储过程调用 到目前为止 一切都运行良好 但 DBA 询问我们是否可以以在 Profiler 中可见的方式标记 Linq 生成的 SQL 查询 我
  • 使用 google_oauth2 获取“错误:redirect_uri_mismatch”

    该网址似乎是正确的 昨天更新 文件也是 omn iauth rb provider google oauth2 MY CLIENT ID apps googleusercontent com MY CLIENT SECRET scope g
  • create-react-app 子文件夹项目不会 lint

    在子文件夹中使用 create react app 引导的项目不会出现 lint 但是 如果我在 VSCode 中以 root 身份打开项目子文件夹 如下图所示 或者在 root 下设置一个新的 create react app 项目 则
  • 从 vlcj 播放器数组中播放视频

    我正在尝试播放以字符串形式提供的 MRL 列表中的视频 问题是 当我尝试运行该类时 面板列表显示带有按钮 只有一个面板工作 但播放按钮不起作用 其他面板也不起作用 尽管我故意留下了停止按钮 因为我没有向它们添加动作侦听器 我想要实现的是 当
  • 如何使用scala或python在apache Spark中运行多线程作业?

    我面临着与 Spark 并发相关的问题 这阻止了我在生产中使用它 但我知道有一种解决方法 我正在尝试使用订单历史记录对 700 万用户的 10 亿种产品运行 Spark ALS 首先 我获取不同用户的列表 然后对这些用户运行循环以获取推荐