在Java Maven项目中通过Spark查询Cassandra中的数据

2023-12-07

我正在尝试编写一个简单的代码,在其中创建一个架构,插入一些表,然后提取一些信息并将其打印出来。但是,我收到错误。我正在使用 Datastax cassandra Spark 连接器。我一直在使用这两个例子来帮助我尝试实现这一目标:

https://gist.github.com/jacek-lewandowski/278bfc936ca990bee35a

http://www.datastax.com/documentation/developer/java-driver/1.0/java-driver/quick_start/qsSimpleClientAddSession_t.html

但是,第二个示例不使用 cassandra Spark 连接器或一般 Spark。

这是我的代码:

package com.angel.testspark.test;


import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;

import scala.Tuple2;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }
    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        // Prepare the schema
        try (Session session = connector.openSession()) {
            session.execute("DROP KEYSPACE IF EXISTS tester");
            session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
            session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
            session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0001," +
                          "'Angel'," +
                          "'Pay'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0002," +
                          "'John'," +
                          "'Doe'," +
                          "'IT Engineer'" +
                          ");");
            session.execute(
                      "INSERT INTO tester.emp (id, fname, lname, role) " +
                      "VALUES (" +
                          "0003," +
                          "'Jane'," +
                          "'Doe'," +
                          "'IT Analyst'" +
                          ");");
                session.execute(
                      "INSERT INTO tester.dept (id, dname) " +
                      "VALUES (" +
                          "1553," +
                          "'Commerce'" +
                          ");");

                ResultSet results = session.execute("SELECT * FROM tester.emp " +
                        "WHERE role = 'IT Engineer';");
            for (Row row : results) {
                System.out.print(row.getString("fname"));
                System.out.print(" ");
                System.out.print(row.getString("lname"));
                System.out.println(); 
            }
                System.out.println();
            }

        }

    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

这是我的错误:

14/09/18 11:22:18 WARN util.Utils: Your hostname, APAY-M-R03K resolves to a loopback address: 127.0.0.1; using 10.150.79.164 instead (on interface en0)
14/09/18 11:22:18 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/09/18 11:22:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/18 11:22:18 INFO Remoting: Starting remoting
14/09/18 11:22:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50506]
14/09/18 11:22:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:50506]
14/09/18 11:22:18 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/18 11:22:18 INFO storage.DiskBlockManager: Created local directory at /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-local-20140918112218-2c8d
14/09/18 11:22:18 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
14/09/18 11:22:18 INFO network.ConnectionManager: Bound socket to port 50507 with id = ConnectionManagerId(10.150.79.164,50507)
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/09/18 11:22:18 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 10.150.79.164:50507 with 2.1 GB RAM
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/18 11:22:18 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:18 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:18 INFO server.AbstractConnector: Started [email protected]:50508
14/09/18 11:22:18 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.150.79.164:50508
14/09/18 11:22:19 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/18 11:22:19 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-a0dc4491-1901-4a7a-86f4-4adc181fe45c
14/09/18 11:22:19 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO server.AbstractConnector: Started [email protected]:50509
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/09/18 11:22:19 INFO server.AbstractConnector: Started [email protected]:4040
14/09/18 11:22:19 INFO ui.SparkUI: Started Spark Web UI at http://10.150.79.164:4040
14/09/18 11:22:19 WARN core.FrameCompressor: Cannot find LZ4 class, you should make sure the LZ4 library is in the classpath if you intend to use it. LZ4 compression will not be available for the protocol.
14/09/18 11:22:19 INFO core.Cluster: New Cassandra host /127.0.0.1:9042 added
14/09/18 11:22:19 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
    at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
    at com.sun.proxy.$Proxy6.execute(Unknown Source)
    at com.angel.testspark.test.App.createSchema(App.java:85)
    at com.angel.testspark.test.App.run(App.java:38)
    at com.angel.testspark.test.App.main(App.java:109)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108)
    at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
    at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367)
    at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:584)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/18 11:22:20 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

我相信这可能只是一个语法错误,我只是不确定它在哪里以及是什么。

任何帮助都会很棒,谢谢。我搜索过互联网,但没有找到使用 cassandra 和 Spark 在 java 中插入数据和提取数据的简单示例。

******编辑:@BryceAtNetwork23 和 @mikea 对于我的语法错误是正确的,所以我编辑了问题并修复了它。我收到了一个新错误,所以我粘贴了新错误并更新了代码


尝试通过 cqlsh 运行 CQL,您应该会得到相同/类似的错误:

aploetz@cqlsh:stackoverflow> CREATE TABLE dept (id INT PRIMARY KEY, dname TEXT);
aploetz@cqlsh:stackoverflow> INSERT INTO dept (id, dname) VALUES (1553,Commerce);
<ErrorMessage code=2000 [Syntax error in CQL query] message="line 1:50 no viable alternative at
input ')' (... dname) VALUES (1553,Commerce[)]...)">

在“Commerce”两边加上单引号,它应该可以工作:

session.execute(
                  "INSERT INTO tester.dept (id, dname) " +
                  "VALUES (" +
                      "1553," +
                      "'Commerce'" +
                      ");");

但现在我收到了一个新错误...

还可以尝试从 cqlsh 运行它。

aploetz@cqlsh:stackoverflow> SELECT * FROM emp WHERE role = 'IT Engineer';
code=2200 [Invalid query] message="No indexed columns present in by-columns clause with Equal operator"

发生这种情况是因为role未定义为您的主键。 Cassandra 不允许您按任意列值进行查询。解决这个问题的最佳方法是创建一个名为 empByRole 的附加查询表,其中role作为分区键。像这样:

CREATE TABLE empByRole 
    (id INT, fname TEXT, lname TEXT, role TEXT,
    PRIMARY KEY (role,id)
);

aploetz@cqlsh:stackoverflow> INSERT INTO empByRole (id, fname, lname, role) VALUES (0001,'Angel','Pay','IT Engineer');
aploetz@cqlsh:stackoverflow> SELECT * FROM empByRole WHERE role = 'IT Engineer';

 role        | id | fname | lname
-------------+----+-------+-------
 IT Engineer |  1 | Angel |   Pay

(1 rows)
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在Java Maven项目中通过Spark查询Cassandra中的数据 的相关文章

  • HttpSession 内的同步是否可行?

    UPDATE 问题后立即解决 问题 通常 同步是在 JVM 内序列化并行请求 例如 private static final Object LOCK new Object public void doSomething synchroniz
  • Spring @Validated 在服务层

    Hej 我想使用 Validated group Foo class 在执行方法之前验证参数的注释 如下所示 public void doFoo Foo Validated groups Foo class foo 当我将此方法放入 Spr
  • Java:无安全管理器:RMI 类加载器已禁用

    您好 我有 RMI 应用程序 现在我尝试从客户端调用服务器上的一些方法 我有以下代码 public static void main final String args try Setting the security manager Sy
  • 无法将 INode 类型值分配给 类型变量。为什么?

    我想知道为什么以下代码无法工作 public static
  • 想要从java中的char数组创建字符流

    我想从 char 数组构造一个流以使用 java 8 功能 例如过滤器和映射 char list a c e Stream
  • Apache Commons VFS - 无法解析文件

    VFS 方法无法处理此 URI jboss server temp dir local outgoing配置在jboss beans xml这是决心 C Download jboss eap 5 1 1 server default tmp
  • 限制 JPQL 中的结果数量

    如何限制从数据库检索结果的数量 select e from Entity e I need only 10 results for instance 您可以尝试像这样给出 10 个要显式获取的结果 entityManager createQ
  • pyspark 数据框中的自定义排序

    是否有推荐的方法在 pyspark 中实现分类数据的自定义排序 我理想地寻找 pandas 分类数据类型提供的功能 因此 给定一个数据集Speed列 可能的选项是 Super Fast Fast Medium Slow 我想实现适合上下文的
  • 欧拉项目 45

    我还不是一名熟练的程序员 但我认为这是一个有趣的问题 我想我应该尝试一下 三角形 五边形 六边形 数字由以下生成 公式 三角形 T n n n 1 2 1 3 6 10 15 五边形 P n n 3n 1 2 1 5 12 22 35 六角
  • 生产者程序中的 kafka 网络处理器错误(ArrayIndexOutOfBoundsException:18)

    我有下面的 kafka Producer Api 程序 我对 kafka 本身是新手 下面的代码从 API 之一获取数据并将消息发送到 kafka 主题 package kafka Demo import java util Propert
  • 如何构建和使用 TimeSeriesCollections

    我想在图表的 X 轴上显示一些日期 并且here https stackoverflow com questions 5118684 jfreechart histogram with dates据说我必须使用 TimeSeriesColl
  • JPA 的 Hibernate 查询提示

    我一直在尝试为所有可以通过设置的提示找到一个明确的资源Query setHint String Object JPA 中的方法调用 但我一无所获 有人知道一个好的参考吗 See 3 4 1 7 查询提示 http docs jboss or
  • 单元测试、集成测试还是设计中的问题?

    我编写了我的第一个单元测试 我认为它过于依赖其他模块 我不确定是否是因为 这是一个复杂的测试 我实际上已经编写了集成测试或 我的设计有问题 我首先要说的是 虽然我有大约 4 年的开发经验 但我从未学过 也没有人教过自动化测试 我刚刚使用 H
  • 如何加载图像文件到ImageView?

    我试图在从文件选择器中选择图像文件后立即显示该图像文件 文件选择器仅限于 png 和 jpg 文件 所选文件存储在文件类型的变量中 为此 我设置了一个 ImageView 我希望用这个新文件设置图像 唯一的问题是它的类型是文件而不是图像 如
  • java中的第三个布尔状态是什么?

    虽然我知道根据定义 布尔值仅包含两种状态 真或假 我想知道布尔值在用这些状态之一初始化之前有什么值 它默认为 false http java sun com docs books tutorial java nutsandbolts dat
  • Java给定长度的随机数

    我需要在 Java 中生成一个恰好 6 位数字的随机数 我知道我可以在随机发生器上循环 6 次 但是在标准 Java SE 中还有其他方法可以做到这一点吗 要生成 6 位数字 Use Random http download oracle
  • 内部类的访问修饰符[重复]

    这个问题在这里已经有答案了 可能的重复 受保护 公共内部类 https stackoverflow com questions 595179 protected public inner classes 我确信这个问题已经被问过 但我找不到
  • 一个类中有多个具有相同参数类型的方法

    我知道 至少已经有了关于这个主题的一个问题 https stackoverflow com questions 5561436 can two java methods have same name with different retur
  • 如何在apache POI中读取excel文件的准确单元格内容

    当我读取单元格的内容时 例如如果它是日期格式 它会转换为另一个值 例如 12 31 2099 gt 46052 和 50 00 gt 50 和 50 00 gt 0 5 但我想要的是获取每个单元格的确切字符串值 我的代码是这样的 cell
  • 通过向上转换将 Java.sql.date 转换为 Java.util.date 安全吗?

    java sql date 扩展了 java util date 那么通过将 java sql date 转换为 java util date 是否可以在两者之间进行转换 或者有其他方法可以转换它们吗 您不一定需要强制转换 您可以将 SQL

随机推荐