从 kafka-Spark-Streaming 读取数据时获取空集

2024-03-03

大家好,我是 Spark Streaming 的新手。我正在尝试读取 xml 文件并将其发送到 kafka 主题。这是我的 Kafka 代码,它将数据发送到 Kafka-console-consumer。

Code:

package org.apache.kafka.Kafka_Producer;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutionException;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

@SuppressWarnings("unused")
public class KafkaProducer { 
   private static String sCurrentLine;
   public static void main(String args[]) throws InterruptedException, ExecutionException{ 
       try (BufferedReader br = new BufferedReader(new FileReader("/Users/sreeharsha/Downloads/123.txt")))
       {
           while ((sCurrentLine = br.readLine()) != null) {
               System.out.println(sCurrentLine);
               kafka(sCurrentLine);
           }
       } catch (FileNotFoundException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();}
   }
   public static void kafka(String sCurrentLine)  {
       Properties props = new Properties();
       props.put("metadata.broker.list", "localhost:9092");
       props.put("serializer.class", "kafka.serializer.StringEncoder");
       props.put("partitioner.class","kafka.producer.DefaultPartitioner");
       props.put("request.required.acks", "1");
       ProducerConfig config = new ProducerConfig(props);
       Producer<String, String> producer = new Producer<String, String>(config);
       producer.send(new KeyedMessage<String, String>("sample",sCurrentLine));
       producer.close();
   }
}

我可以在 Kafka-Console-Consumer 中接收数据。在下面的屏幕截图中,您可以看到我发送到该主题的数据。

现在我需要使用 Spark-Streaming 将发送到 kafka-console-consumer 的数据进行流式传输。这是代码。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class SparkStringConsumer {

   public static void main(String[] args) {

       SparkConf conf = new SparkConf()
               .setAppName("kafka-sandbox")
               .setMaster("local[*]");
       JavaSparkContext sc = new JavaSparkContext(conf);
       JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

       Map<String, String> kafkaParams = new HashMap<>();
       kafkaParams.put("metadata.broker.list", "localhost:9092");
       Set<String> topics = Collections.singleton("sample");

       JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
       String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
       directKafkaStream.foreachRDD(rdd -> {
       System.out.println("--- New RDD with " + rdd.partitions().size()
           + " partitions and " + rdd.count() + " records");
       rdd.foreach(record -> System.out.println(record._2));
       });
       ssc.start();
       ssc.awaitTermination();
   }
}

在提交我的作业时得到空集,如下所示:

./spark-submit --class org.apache.spark_streaming.Spark_Kafka_Streaming.SparkStringConsumer --master local[4] Spark_Kafka_Streaming-0.0.1-SNAPSHOT.jar

下面您可以看到数据接收情况的屏幕截图:

使用以下版本:

火花 - 2.0.0

动物园管理员-3.4.6

卡夫卡 - 0.8.2.1

有什么建议请,


最后在网上冲浪后我找到了这些解决方案。

不要同时使用“Spark-Submit”和“SetMaster”。

  • 如果您从 IDE 运行代码,请在代码中使用 SetMaster
  • 如果您通过“Spark-Submit”运行 jar,请勿将 setMaster 放入代码中

还有一件事首先运行/提交你的 Spark jar,然后将数据发送到 Kafka-Console-Consumer

工作正常。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

从 kafka-Spark-Streaming 读取数据时获取空集 的相关文章

随机推荐

  • 通过 PHP 使用 EPL 打印:存储图像

    背景信息 Mac OS X Lion 10 7 3 11D50b EPL http en wikipedia org wiki Eltron Programming Language http en wikipedia org wiki E
  • 向形状/线条添加渐变

    我正在尝试使用 VBA 将渐变添加到 Excel 中的线条形状 此功能可在Line Color下的部分Format Shape选项 尽管此功能存在于Format Shape选项 我无法在 VBA 中重现该功能 我的代码是 With Acti
  • 我可以通过 Lucene 在 Orchard 中搜索/索引自定义数据源吗?

    我目前正在开发一个网站 允许用户搜索自定义产品目录 我一直在寻找并希望利用 Orchard CMS 来帮助我开发这个网站 我目前已经经历了罗恩 彼得森的 YouTube 系列 http www youtube com watch v Iv7
  • 输入助手 valueBinding 已弃用 - 有什么替代方案?

    我有一些像这样的文本输入助手 input type text valueBinding name focus out focusOutName 我刚刚将 Ember 升级到 1 11 0 现在收到此弃用警告 弃用 您尝试通过将 valueB
  • 如何通过ajax将值传递给php变量

    这是我的 JavaScript 代码 function category row dataparam oper delete row row ajax type POST url multiupload php data dataparam
  • Helm Charts 中的秘密管理

    我正在尝试使用Helm charts在 Kubernetes 集群中安装应用程序 有人可以建议什么是更好的秘密管理解决方案吗 使用helm secrets是个好主意或者Hashicorp Vault Vault 在技术上非常棒 但它可能会成
  • 如何创建类似谷歌纵横的标记?

    在我的 HTML5 应用程序中 我使用 Google Map v3 并在地图上添加多个标记 放置新标记和更改图标很容易 但我希望能够构建像谷歌纬度中使用的标记一样的标记 这些标记设置有图标图像和漂亮的边框 关于如何做到这一点有什么想法吗 您
  • 使用 bootstrap 无法在 angularjs 中打开模式窗口

    这是我的 app js 文件 const app angular module CurseTransport ui router ui bootstrap ngMessages raceModule app config stateProv
  • Azure函数应用程序-在执行和间歇性运行的旧代码之间共享全局变量

    目前 我在 Azure 函数应用程序中面临两个问题 我已提供以下详细信息 1 全局变量内容在执行之间共享 我使用了并发字典 它是一个全局变量 私有的和静态的 该变量在队列触发器中使用 private static readonly Conc
  • iphone开发:验证来自https url的证书信息

    当用户使用网络浏览器 Safari Chrome等 连接到 https url 时 例如 https encrypted google com 则用户可以获得有关证书相关的信息到这样的 https url 也就是说 在连接到url http
  • 使用 HTML 表单和 PHP 更新 MySQL 数据库字段数据

    所以我试图使用 html 表单和一些 PHP 代码更新数据库字段 但我无法让它工作 它不会抛出任何错误 但不会更新该字段 我不确定它是否因为我也回显该字段网页 它似乎所做的就是打印失败消息 HTML
  • MSSQL - 将一个字段拆分为 3 个字段

    我有一个由 1 列组成的结果集 在本例中为 2 行 单列 ProductDescription 是一个 varchar 字段 其中包含 3 条信息 我没有设计它 我需要将这三条信息分成 3 个使用查询的附加字段 before Product
  • 在Bootstrap组件中单独加载

    我正在开发一个涉及许多开发人员的大型 Web 项目 我想精简 Bootstrap3 的包并仅保留我们正在使用的内容 基本上 这个想法是在页面加载到浏览器中时减少任何额外的开销 所以我可以通过两种方法来做到这一点 我也可以 a 从库中删除任何
  • 如何使用 lambda 表达式创建扩展方法

    目前我正在创建一个接受参数的扩展方法 使用下面的示例 如何使用 lambda 表达式对其进行转换 public static decimal ChangePercentage this IEnumerable
  • 根据前缀对目录中的文件进行分组

    我有一个包含图片的文件夹 文件夹 1 Files ABC 138923 ABC 3223 ABC 33489 ABC 3111 CBA 238923 CBA 1313 CBA 1313 DAC 38932 DAC 1111 DAC 1389
  • 使用 Microsoft Graph API 获取 SharePoint Online 团队网站

    我正在尝试访问组织的 SharePoint 团队网站 我使用 Microsoft Graph API 因为它是 Office 365 最完整的 API 我了解如何获取访问令牌以及如何使用它来发出请求 我知道它有效 因为我可以获得组列表 但是
  • 获取ejs模板中的url参数

    我试图根据 URL 参数创建一个 ejs 条件 例如 如果测试参数存在于 localhost 3000 page test 则显示一个 div 否则不显示它 我的 ejs 模板看起来像这样 div class row div div div
  • 如何捕获事件调度线程 (EDT) 异常?

    我正在使用一个名为MyExceptionHandler实现Thread UncaughtExceptionHandler处理我的项目中的正常异常 据我了解 这个类无法捕获 EDT 异常 所以我尝试在main 处理EDT异常的方法 publi
  • jQuery 测试 element1 是否是 element2 的后代

    有谁知道一种好方法来测试存储在 var 中的一个元素是否是另一个也存储在 var 中的元素的后代 我不需要element1 isChildOf selector 这很容易 I need element1 isChildOf element2
  • 从 kafka-Spark-Streaming 读取数据时获取空集

    大家好 我是 Spark Streaming 的新手 我正在尝试读取 xml 文件并将其发送到 kafka 主题 这是我的 Kafka 代码 它将数据发送到 Kafka console consumer Code package org a