我正在尝试编写一个简单的代码,在其中创建一个架构,插入一些表,然后提取一些信息并将其打印出来。但是,我收到错误。我正在使用 Datastax cassandra Spark 连接器。我一直在使用这两个例子来帮助我尝试实现这一目标:
https://gist.github.com/jacek-lewandowski/278bfc936ca990bee35a
http://www.datastax.com/documentation/developer/java-driver/1.0/java-driver/quick_start/qsSimpleClientAddSession_t.html
但是,第二个示例不使用 cassandra Spark 连接器或一般 Spark。
这是我的代码:
package com.angel.testspark.test;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App
{
private transient SparkConf conf;
private App(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
sc.stop();
}
private void createSchema(JavaSparkContext sc) {
CassandraConnector connector = CassandraConnector.apply(sc.getConf());
// Prepare the schema
try (Session session = connector.openSession()) {
session.execute("DROP KEYSPACE IF EXISTS tester");
session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0001," +
"'Angel'," +
"'Pay'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0002," +
"'John'," +
"'Doe'," +
"'IT Engineer'" +
");");
session.execute(
"INSERT INTO tester.emp (id, fname, lname, role) " +
"VALUES (" +
"0003," +
"'Jane'," +
"'Doe'," +
"'IT Analyst'" +
");");
session.execute(
"INSERT INTO tester.dept (id, dname) " +
"VALUES (" +
"1553," +
"'Commerce'" +
");");
ResultSet results = session.execute("SELECT * FROM tester.emp " +
"WHERE role = 'IT Engineer';");
for (Row row : results) {
System.out.print(row.getString("fname"));
System.out.print(" ");
System.out.print(row.getString("lname"));
System.out.println();
}
System.out.println();
}
}
public static void main( String[] args )
{
if (args.length != 2) {
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
App app = new App(conf);
app.run();
}
}
这是我的错误:
14/09/18 11:22:18 WARN util.Utils: Your hostname, APAY-M-R03K resolves to a loopback address: 127.0.0.1; using 10.150.79.164 instead (on interface en0)
14/09/18 11:22:18 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
14/09/18 11:22:18 INFO slf4j.Slf4jLogger: Slf4jLogger started
14/09/18 11:22:18 INFO Remoting: Starting remoting
14/09/18 11:22:18 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:50506]
14/09/18 11:22:18 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:50506]
14/09/18 11:22:18 INFO spark.SparkEnv: Registering BlockManagerMaster
14/09/18 11:22:18 INFO storage.DiskBlockManager: Created local directory at /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-local-20140918112218-2c8d
14/09/18 11:22:18 INFO storage.MemoryStore: MemoryStore started with capacity 2.1 GB.
14/09/18 11:22:18 INFO network.ConnectionManager: Bound socket to port 50507 with id = ConnectionManagerId(10.150.79.164,50507)
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Trying to register BlockManager
14/09/18 11:22:18 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager 10.150.79.164:50507 with 2.1 GB RAM
14/09/18 11:22:18 INFO storage.BlockManagerMaster: Registered BlockManager
14/09/18 11:22:18 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:18 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:18 INFO server.AbstractConnector: Started [email protected]:50508
14/09/18 11:22:18 INFO broadcast.HttpBroadcast: Broadcast server started at http://10.150.79.164:50508
14/09/18 11:22:19 INFO spark.SparkEnv: Registering MapOutputTracker
14/09/18 11:22:19 INFO spark.HttpFileServer: HTTP File server directory is /var/folders/57/8s5fx3ks06bd2rzkq7yg1xs40000gn/T/spark-a0dc4491-1901-4a7a-86f4-4adc181fe45c
14/09/18 11:22:19 INFO spark.HttpServer: Starting HTTP Server
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO server.AbstractConnector: Started [email protected]:50509
14/09/18 11:22:19 INFO server.Server: jetty-7.6.8.v20121106
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage/rdd,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/storage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/stage,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages/pool,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/stages,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/environment,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/executors,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/metrics/json,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/static,null}
14/09/18 11:22:19 INFO handler.ContextHandler: started o.e.j.s.h.ContextHandler{/,null}
14/09/18 11:22:19 INFO server.AbstractConnector: Started [email protected]:4040
14/09/18 11:22:19 INFO ui.SparkUI: Started Spark Web UI at http://10.150.79.164:4040
14/09/18 11:22:19 WARN core.FrameCompressor: Cannot find LZ4 class, you should make sure the LZ4 library is in the classpath if you intend to use it. LZ4 compression will not be available for the protocol.
14/09/18 11:22:19 INFO core.Cluster: New Cassandra host /127.0.0.1:9042 added
14/09/18 11:22:19 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
Exception in thread "main" com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
at com.datastax.driver.core.DefaultResultSetFuture.getUninterruptibly(DefaultResultSetFuture.java:172)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:52)
at com.datastax.driver.core.AbstractSession.execute(AbstractSession.java:36)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:33)
at com.sun.proxy.$Proxy6.execute(Unknown Source)
at com.angel.testspark.test.App.createSchema(App.java:85)
at com.angel.testspark.test.App.run(App.java:38)
at com.angel.testspark.test.App.main(App.java:109)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: No indexed columns present in by-columns clause with Equal operator
at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
at com.datastax.driver.core.DefaultResultSetFuture.onSet(DefaultResultSetFuture.java:108)
at com.datastax.driver.core.RequestHandler.setFinalResult(RequestHandler.java:235)
at com.datastax.driver.core.RequestHandler.onSet(RequestHandler.java:367)
at com.datastax.driver.core.Connection$Dispatcher.messageReceived(Connection.java:584)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.oneone.OneToOneDecoder.handleUpstream(OneToOneDecoder.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:462)
at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:443)
at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/18 11:22:20 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster
我相信这可能只是一个语法错误,我只是不确定它在哪里以及是什么。
任何帮助都会很棒,谢谢。我搜索过互联网,但没有找到使用 cassandra 和 Spark 在 java 中插入数据和提取数据的简单示例。
******编辑:@BryceAtNetwork23 和 @mikea 对于我的语法错误是正确的,所以我编辑了问题并修复了它。我收到了一个新错误,所以我粘贴了新错误并更新了代码