如何连接Kafka和Elasticsearch?

2024-04-21

我是Kafka的新手,我使用kafka通过logstash收集netflow(可以),并且我想将数据从kafka发送到elasticsearch,但是存在一些问题。
我的问题是如何将 Kafka 与 Elasticsearch 连接起来? netflow 到 kafka Logstash 配置:

input{
    udp{
        host => "120.127.XXX.XX"
        port => 5556
        codec => netflow
    }
}
    filter{

    }
output {
  kafka {
    bootstrap_servers => "localhost:9092"    
    topic_id => "test"    
  }
  stdout{codec=> rubydebug}
}

kafka 到 Elasticsearch Logstash:

input {
      kafka { }
    }
    output {
        elasticsearch {
            hosts => ["120.127.XXX.XX:9200"]
        }
        stdout{codec=> rubydebug}
    }

log:

D:\ELK\logstash-6.1.1\bin>logstash -f kafkatoES.conf --path.data D:\ELK\logstash-6.1.1\datatest
Sending Logstash's logs to D:/ELK/logstash-6.1.1/logs which is now configured via log4j2.properties
[2018-02-01T18:52:59,713][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"D:/ELK/logstash-6.1.1/modules/fb_apache/configuration"}
[2018-02-01T18:52:59,728][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"D:/ELK/logstash-6.1.1/modules/netflow/configuration"}
[2018-02-01T18:53:00,072][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2018-02-01T18:53:01,070][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.1.1"}
[2018-02-01T18:53:01,804][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9601}
[2018-02-01T18:53:09,024][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://120.127.XX.XX:9200/]}}
[2018-02-01T18:53:09,040][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://120.127.XX.XX:9200/, :path=>"/"}
[2018-02-01T18:53:09,305][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://120.127.XX.XX:9200/"}
[2018-02-01T18:53:09,383][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>nil}
[2018-02-01T18:53:09,383][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-02-01T18:53:09,415][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-02-01T18:53:09,430][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"default"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-02-01T18:53:09,493][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//120.127.XXX.XX:9200"]}
[2018-02-01T18:53:09,524][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>2000, :thread=>"#<Thread:0x45e62903 run>"}
[2018-02-01T18:53:09,609][INFO ][logstash.pipeline ] Pipeline started {"pipeline.id"=>"main"}
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/D:/ELK/logstash-6.1.1/logstash-core/lib/org/apache/logging/log4j/log4j-slf4j-impl/2.6.2/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/D:/ELK/logstash-6.1.1/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka-8.0.2/vendor/jar-dependencies/runtime-jars/log4j-slf4j-impl-2.8.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
[2018-02-01T18:53:09,789][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-02-01T18:53:09,852][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9092]
check.crcs = true
client.id = logstash-0
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

[2018-02-01T18:53:09,945][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 0.11.0.0
[2018-02-01T18:53:09,945][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : cb8625948210849f
[2018-02-01T18:53:10,149][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Discovered coordinator winoc-netflow:9092 (id: 2147483647 rack: null) for group logstash.
[2018-02-01T18:53:10,164][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Revoking previously assigned partitions [] for group logstash
[2018-02-01T18:53:10,164][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (Re-)joining group logstash
[2018-02-01T18:53:10,180][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] Successfully joined group logstash with generation 6
[2018-02-01T18:53:10,180][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] Setting newly assigned partitions [logstash-0] for group logstash

先感谢您!


我建议使用 Kafka Connect 及其Elasticsearch 接收器 https://docs.confluent.io/current/connect/connect-elasticsearch/docs/。我昨晚实际上就这个主题进行了演讲:)这是幻灯片 https://speakerdeck.com/rmoff/building-streaming-data-pipelines-with-elasticsearch-apache-kafka-and-ksql.

你可以看一个详细的例子here https://www.confluent.io/blog/blogthe-simplest-useful-kafka-connect-data-pipeline-in-the-world-or-thereabouts-part-2/.

2020 年 5 月更新:另请参阅

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何连接Kafka和Elasticsearch? 的相关文章

  • Elasticsearch 中的嵌套与对象

    有人可以解释 Elasticsearch 文档中 对象 和 嵌套 字段之间的区别吗 我知道默认情况下字段被定义为对象 我还知道我可以用这样的点访问对象字段 my field name my field title 等 对象的文档 http
  • 如何提高elasticsearch中的索引类型?

    我以前是这样搜索的 curl XGET localhost 9200 users search 但 users 包含用户 a b c 如下所示 curl XGET localhost 9200 users a b c search user
  • 如何在弹性搜索中生成多个布尔查询的查询

    我想使用 spring 框架在 elasticsearch 中动态生成多个布尔运算的查询 我在elasticsearch中的数据就像 masterID
  • 在 Elasticsearch 中对具有一个值的属性进行多个值查询

    我正在尝试在这个查询的基础上进行一些构建 我正在搜索的索引还有一个带有 id 的 实体 字段 因此 一些记录将具有 实体 16 实体 156 等 具体取决于实体的 ID 我需要以这样的方式扩展此查询 以便可以传递数组或某些值列表 例如 te
  • 如何使用elasticsearch进行分页?来自 vs 滚动 API

    我使用elasticsearch作为数据库来存储大量日志数据 我知道有两种方法可以进行分页 使用大小并来自 API 使用滚动API 现在我使用 from 进行分页 从前端和后端获取页面和大小参数 Java searchSourceBuild
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • Elasticsearch:如何查询连接数?

    如何询问我的 Elasticsearch 服务器现在有多少个连接 这与插座数量相同吗 我也不知道如何获得这些数字 这与客户端的数量不同 对吧 因为每个客户端可能打开多个连接 找不到任何相关信息 但我确实发现您可以在 Elasticsearc
  • 从中间部分匹配完成建议elasticsearch

    我有一个名为搜索建议具有以下 search suggest type completion analyzer simple payloads true preserve separators false preserve position
  • ElasticSearch 多滚动 Java API

    我想从索引中获取所有数据 由于项目数量对于内存来说太大 我使用滚动 很好的功能 client prepareSearch index setTypes myType setSearchType SearchType SCAN setScro
  • 即使在kafka机器重新启动后,如何保留kafka保留字节和kafka保留段[重复]

    这个问题已经存在了 we set retention bytes价值 104857600对于主题 topic test root confluent01 kafka topics zookeeper localhost 2181 alter
  • 全文搜索 DynamoDB

    以下情况 我正在为我的客户将元素存储在 DyanmoDb 中 HashKey 是元素 ID Range Key 是客户 ID 除了这些字段之外 我还存储一个字符串数组 gt 标签 例如 Pets House 和多行文本 我想在我的应用程序中
  • 从 App Engine 连接到 Kubernetes 引擎

    我们希望使用应用程序引擎灵活的流程来更新位于 Google Kubernetes Engine 上的 ElasticSearch 索引 我们需要通过 http s 地址连接到 ElasticSearch 推荐的方法是什么 我们不想将集群暴露
  • 如何在不更改设置的情况下不区分大小写排序

    我的索引名称是 data new 下面是插入索引的代码 test id 1 name A professor Bill Cage accounting id 2 name AB professor Gregg Payne engineeri
  • ElasticSearch 定义自定义映射与默认“_doc”映射冲突

    尝试创建自定义映射类型时会发生此问题 为第一个插入弹性创建自定义映射后想要创建 doc映射类型和冲突就发生在这里 第一步我创建一个映射 mappings properties field1 type keyword field2 type
  • Elasticsearch 单个字段的多个分析器

    我使用严格的预定义映射将不同类型的文档存储在单个索引中 它们都有一些字段 例如 body 但我希望在索引时对它们进行稍微不同的分析 例如 对特定文档使用不同的标记过滤器 并在搜索时以相同的方式处理 据我所知 分析器不能按文档指定 我还考虑使
  • Kafka REST 代理 API 有哪些好处?

    我不知道Kafka REST Proxy API的优点 它是一个 REST API 所以我知道它对于管理来说很方便 人们为什么使用 Kafka REST 代理 API 添加对生产者或消费者的 Maven 依赖是否很麻烦 另外 我知道kafk
  • 我们可以同时使用拼音标记和同义词吗?

    我正在尝试同时启用语音分析器和同义词 这似乎不起作用 它们一起使用有错吗 在下面的实现中 我希望使用同义词转换搜索查询 然后使用语音分析器来检索结果 但我的同义词在这里完全被忽略了 如果我在创建索引时删除语音分析器 那么同义词就可以正常工作
  • 弹性搜索文档计数

    我正在运行 2 2 版本的 Elastic 搜索 我已经创建了索引并加载了示例文档 我发现其中有些问题 当我给予 GET index type count 我得到了正确的答案 count 9998 shards total 5 succes
  • Nest Elastic - 构建动态嵌套查询

    我必须使用 Nest 查询嵌套对象 但是查询是以动态方式构建的 下面的代码演示了以静态方式对嵌套 书籍 进行查询 QueryContainer qry qry new QueryStringQuery DefaultField name D
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入

随机推荐

  • 错误 1062。mysql 中的重复条目

    我有一个 MySQL 表 其架构中的列 1 是主键 我有一个 tsv 文件 需要将其插入到该表中 现在 tsv 具有主键重复 因此当我尝试将其插入 MySQL 表时 会出现错误 ERROR 1062 23000 Duplicate entr
  • 使用 Python/PIL 比较(相似)图像

    我正在尝试计算相似 read 编辑距离 的两个图像 使用 Python 2 6 和 PIL 我计划使用python levenshtein http pypi python org pypi python Levenshtein 0 10
  • 是否可以将布局 xml 放在 /res/layout 下的子目录中?

    我的布局文件越来越大 所以我想到了将它们放在子目录中的想法 是否可以 简单代码
  • 如何从 bash 查看二进制文件?

    我想从命令行查看当前目录中文件的内容 但以二进制形式查看 我怎样才能实现这个目标 xxd https linux die net man 1 xxd既可以是二进制 也可以是十六进制 bin xxd b file hex xxd file
  • Facebook:不安全的 JavaScript 问题(document.domain 值应该相同)

    我的是一个基于canvas的FB应用程序 在 Chrome 和 Firefox 上都面临问题 尽管通常是 Chrome 1 当我在新的隐身 Chrome 窗口上点击我批准的 Facebook 应用程序安全 URL 时 https apps
  • 使用 Win32 / C++ 读取 PNG

    没有simple在本机 Windows 中将 PNG 文件读入内存位图的方法似乎是不可能的 但经过多次谷歌搜索后我开始相信它 libpng zlib 组合太大了 有任何第三方库 Win32 库可以读取 PNG 吗 编辑 boost gil
  • 当 contentType=application/json 时 Jquery JQGrid 中断?

    我必须使用 ajaxSetup 将 contentType 全局更改为 application json ajaxSetup contentType application json charset utf 8 请参阅此问题了解为什么我必须
  • 通过 OpenXml SDK 的 XLSX 文件有效和无效

    我有一个程序将 System Data DataTable 导出到 XLSX OpenXml 电子表格 最后让它大部分工作 但是 当在 Excel 中打开电子表格时 Excel 抱怨文件无效 需要修复 并给出此消息 我们发现 中的某些内容存
  • 将 CMSampleBuffer 转换为 UIImage

    这是一个将 CMSampleBuffer 转换为 UIImage 的函数 来自 Apple 文档的代码 func imageFromSampleBuffer sampleBuffer CMSampleBuffer gt UIImage Ge
  • 如何在 Python 中将多级列表转换为字符串列表?

    我有一个看起来像这样的列表 a A V C A D D 我想创建另一个可以转换的列表a into AVC ADD 我将如何继续这样做 Use str join https docs python org 3 5 library stdtyp
  • 使用 Firebase 监听嵌套更改的正确方法是什么?

    背景 我正在尝试使用 Firebase Twilio 和 Node js 通过浏览器发送短信 我当前在 Firebase 中的数据结构如下所示 messages 15553485 FB GENERATED KEY 1 body hello
  • rSpec 和 Rails3 中的存根设计

    如何使用 rSpec 在 Rails 3 中存根 Devise 我有一个UsersController and a User模型 目前 这两者都与 Devise 相关 我正在编写控制器规格 但我真的很难实现作为 Devise 的期望sign
  • 如何为ListBox(值)索引或树视图节点设置相同的按钮事件?

    我有 C Win form 如下所示 我也有一些数组作为列表框的菜单 public string ArrayMain 1 Water 2 Air 3 Soil 4 Fire public string ArrayWater 1 Salty
  • 与最接近的值左连接,不重复

    我想在 MS SQL 中实现如下所示的效果 使用 2 个表并通过连接而不是迭代 从表 A 中 我希望每一行从表 B 中识别列表中的哪一个是它们最接近的值 并且当选择值时 该值不能重复使用 如果您以前做过类似的事情 请帮忙 先感谢您 SOre
  • 为 PHP CLI 安装 Xdebug

    我在用着XAMPP http www apachefriends org tr index html在 Mac OS 上 尝试使用PHPUnit http phpunit de的代码覆盖率检查 这需要XDebug http phpunit
  • EF 中的过滤器包含[重复]

    这个问题在这里已经有答案了 我有这个 LINQ 查询 它在 Include 中的筛选器上给出错误 当我在 Google 上搜索我的朋友时 我发现无法在 包含 中进行过滤 我已经找到了一些方法可以以另一种方式做到这一点 但我无法让它适用于我的
  • 如何找到第一个设置位的索引

    是否有按位解决方案来查找仅设置一个位的掩码中第一个设置位的索引 例如对于 8 则为 3 对于 16 gt 4 依此类推 请不要循环 我能想到的最佳解决方案是创建位到索引的映射 function firstBit x return Math
  • 如何为 HTML5 画布上下文设置 2 种字体?

    我试图让画布在绘制文本时使用两种字体 这是因为我的主要字体是 Comic Sans MS 这是一个儿童应用程序 由于我在 iPad 上找不到 Comic Sans 因此我尝试用 MarkerFelt Thin 来替代它 我尝试使用以下语句的
  • 如何在 powershell 提示符中使用波形符?

    所以我明白了 function global prompt Commands go here 在 powershell 中设置提示符 我可以用Get Location获取当前工作目录 我可以cd 并在我的主目录中 但是我可以让提示使用波浪号
  • 如何连接Kafka和Elasticsearch?

    我是Kafka的新手 我使用kafka通过logstash收集netflow 可以 并且我想将数据从kafka发送到elasticsearch 但是存在一些问题 我的问题是如何将 Kafka 与 Elasticsearch 连接起来 net