Spark 中的简单矩阵乘法

2024-01-01

我正在努力处理一些非常基本的 Spark 代码。我想定义一个矩阵x有 2 列。这是我尝试过的:

scala> val s = breeze.linalg.linspace(-3,3,5)
s: breeze.linalg.DenseVector[Double] = DenseVector(-3.0, -1.5, 0.0, 1.5, 3.0) // in this case I want s to be both column 1 and column 2 of x

scala> val ss = s.toArray ++ s.toArray
ss: Array[Double] = Array(-3.0, -1.5, 0.0, 1.5, 3.0, -3.0, -1.5, 0.0, 1.5, 3.0)

scala> import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.RowMatrix

scala> val mat = new RowMatrix(ss, 5, 2)
<console>:17: error: type mismatch;
 found   : Array[Double]
 required: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector]
       val mat = new RowMatrix(ss, 5, 2)

我不明白如何获得正确的转换以便将值传递给分布式矩阵^

编辑: 也许我已经能够解决:

scala> val s = breeze.linalg.linspace(-3,3,5)
s: breeze.linalg.DenseVector[Double] = DenseVector(-3.0, -1.5, 0.0, 1.5, 3.0)

scala> val ss = s.to
toArray         toDenseMatrix   toDenseVector   toScalaVector   toString        
toVector        

scala> val ss = s.toArray ++ s.toArray
ss: Array[Double] = Array(-3.0, -1.5, 0.0, 1.5, 3.0, -3.0, -1.5, 0.0, 1.5, 3.0)

scala> val x = new breeze.linalg.Dense
DenseMatrix   DenseVector   

scala> val x = new breeze.linalg.DenseMatrix(5, 2, ss)
x: breeze.linalg.DenseMatrix[Double] = 
-3.0  -3.0  
-1.5  -1.5  
0.0   0.0   
1.5   1.5   
3.0   3.0   

scala> val xDist = sc.parallelize(x.toArray)
xDist: org.apache.spark.rdd.RDD[Double] = ParallelCollectionRDD[0] at parallelize at <console>:18

像这样的东西。此类型检查,但由于某种原因不会在我的 Scala 工作表中运行。

import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc= new SparkContext(conf)

// the values for the column in each row
val col = List(-3.0, -1.5, 0.0, 1.5, 3.0) ;

// make two rows of the column values, transpose it,
// make Vectors of the result
val t = List(col,col).transpose.map(r=>Vectors.dense(r.toArray))

// make an RDD from the resultant sequence of Vectors, and 
// make a RowMatrix from that.
val rm = new RowMatrix(sc.makeRDD(t));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark 中的简单矩阵乘法 的相关文章

  • 更改 Spark Streaming 中的输出文件名

    我正在运行一个 Spark 作业 就逻辑而言 它的性能非常好 但是 当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时 输出文件的名称格式为 part 00000 part 00001 等 有没有办法更改输出文件名 谢谢
  • Spark:导入UTF-8编码的文本文件

    我正在尝试处理一个包含很多特殊字符的文件 例如德语变音符号 o 等 如下所示 sc hadoopConfiguration set textinputformat record delimiter r n r n sc textFile f
  • 在 Scala 中扩展函数1

    在几个例子中 我看到一个对象或一个类扩展Function1 E g object Cash extends CashProduct gt String in Scala 的隐藏功能 https stackoverflow com quest
  • Slick:将操作与 DBIOAction 的 Seq 组合起来

    我有 工作 以下代码 val actions for lt slickUsers insertOrUpdate dbUser loginInfo lt loginInfoAction lt slickUserLoginInfos DBUse
  • 使用泛型全面实现特征

    我正在通过实现矩阵数学来练习 Rust 但遇到了一些障碍 我定义了我认为与矩阵相关的特征 trait Matrix
  • 函数式 Scala 中的选择排序

    我正在学习 Scala 编程 并编写了选择排序算法的快速实现 然而 由于我对函数式编程还不太了解 所以在转换为更 Scala 风格时遇到了困难 对于 Scala 程序员来说 如何使用 Lists 和 vals 来做到这一点 而不是回到我的命
  • Scala 中的条件未来

    给定这两个 future 仅当条件为真时我才需要运行第一个 future 请参阅if y gt 2 但我有一个例外Future filter predicate is not satisfied 这是什么意思以及如何修复该示例 object
  • Java时间转正常格式

    我有 Java 时间1380822000000 我想转换为我可以阅读的内容 import java util Date object Ws1 val a new Date 1380822000000 toString 导致异常 warnin
  • 如何将 Spark DataFrame 以 csv 格式保存在磁盘上?

    例如 这样的结果 df filter project en select title count groupBy title sum 将返回一个数组 如何将 Spark DataFrame 作为 csv 文件保存在磁盘上 Apache Sp
  • sh / Bash shell 脚本中 !# (bang-pound) 的含义是什么?

    我想了解这个 Scala 脚本是如何工作的 usr bin env bash exec scala 0 object HelloWorld def main args Array String println Hello world arg
  • “为 Apache Hadoop 2.7 及更高版本预构建”是什么意思?

    Apache Spark 下载页面上的 pre built for Apache Hadoop 2 7 and later 是什么意思 这是否意味着spark中HDFS必须有库 如果是这样 其他存储系统 例如 Cassandra s3 HB
  • 缩放数据框的每一列

    我正在尝试缩放数据框的每一列 首先 我将每一列转换为向量 然后使用 ml MinMax Scaler 除了简单地重复它之外 是否有更好 更优雅的方法将相同的函数应用于每一列 import org apache spark ml linalg
  • 使用spark-sql从oracle加载数据时如何增加默认精度和小数位数

    尝试从 oracle 表加载数据 其中我有几列保存浮点值 有时它最多保存 DecimalType 40 20 即点后 20 位数字 目前 当我使用加载其列时 var local ora df DataFrameReader ora df l
  • 伴随对象中的方法编译成scala中的静态方法?

    看起来 scala 将伴生对象中的方法编译为静态方法 这使得从 java 代码中调用它们变得更容易一些 例如 您可以编写 CompanionObject method 而不是 CompanionObject MODULE method 然而
  • DataFrame 分区到单个 Parquet 文件(每个分区)

    我想重新分区 合并我的数据 以便将其保存到每个分区的一个 Parquet 文件中 我还想使用 Spark SQL partitionBy API 所以我可以这样做 df coalesce 1 write partitionBy entity
  • 使用 Reader Monad 进行依赖注入

    我最近看到了谈话极其简单的依赖注入 http www youtube com watch v ZasXwtTRkio and 无需体操的依赖注入 http vimeo com 44502327关于 Monads 的 DI 并留下了深刻的印象
  • Scala 如何使用我的所有核心?

    object PrefixScan sealed abstract class Tree A case class Leaf A a A extends Tree A case class Node A l Tree A r Tree A
  • pyspark.sql.utils.AnalysisException:u'Path不存在

    我正在使用标准 hdfs 与 amazon emr 运行 Spark 作业 而不是 S3 来存储我的文件 我在 hdfs user hive warehouse 有一个配置单元表 但当我的 Spark 作业运行时找不到它 我配置了 Spar
  • 如何插入UUID的值?

    我在 Play Framework 2 3 支持的 postgresql 9 4 中使用 anorm 2 4 给出一个这样的模型 case class EmailQueue id UUID send from String send to
  • 如何访问 Spark Streaming 应用程序的统计端点?

    从 Spark 2 2 0 开始 API 中有新的端点用于获取有关流作业的信息 我在 EMR 集群上运行 Spark 在集群模式下使用 Spark 2 2 0 当我到达流作业的端点时 它给我的只是错误消息 没有附加到的流侦听器 我已经深入研

随机推荐