按字段关联 ELK 中的消息

2024-04-29

相关:在ELK中合并日志和查询 https://stackoverflow.com/questions/28429607/combine-logs-and-query-in-elk

我们正在设置 ELK,并希望在 Kibana 4 中创建可视化。 这里的问题是我们想要在两种不同类型的消息之间建立关联。

简化一下:

  • 消息类型1字段:message_type、common_id_number、byte_count、 ...
  • 消息类型 2 字段:message_type、common_id_number、主机名、...

两条消息在elasticsearch 中共享相同的索引。

正如您所看到的,我们试图在不考虑 common_id_number 的情况下绘制图表,但似乎我们必须使用它。但我们还不知道怎么做。

有什么帮助吗?

EDIT

这些是ES模板中的相关字段定义:

      "URIHost" : {
        "type" : "string",
        "norms" : {
          "enabled" : false
        },
        "fields" : {
          "raw" : {
            "type" : "string",
            "index" : "not_analyzed",
            "ignore_above" : 256
          }
        }
      },
      "Type" : {
        "type" : "string",
        "norms" : {
          "enabled" : false
        },
        "fields" : {
          "raw" : {
            "type" : "string",
            "index" : "not_analyzed",
            "ignore_above" : 256
          }
        }
      },
      "SessionID" : {
        "type" : "long"
      },
      "Bytes" : {
        "type" : "long"
      },
      "BytesReceived" : {
        "type" : "long"
      },
      "BytesSent" : {
        "type" : "long"
      },

这是一个TRAFFIC类型,编辑后的文档:

{
  "_index": "logstash-2015.11.05",
  "_type": "paloalto",
  "_id": "AVDZqdBjpQiRid-uxPjE",
  "_score": null,
  "_source": {
    "@version": "1",
    "@timestamp": "2015-11-05T21:59:55.543Z",
    "syslog_severity_code": 5,
    "syslog_facility_code": 1,
    "syslog_timestamp": "Nov  5 22:59:58",
    "Type": "TRAFFIC",
    "SessionID": 21713,
    "Bytes": 939,
    "BytesSent": 480,
    "BytesReceived": 459,
  },
  "fields": {
    "@timestamp": [
      1446760795543
    ]
  },
  "sort": [
    1446760795543
  ]
}

这是一个威胁类型的文档:

{
  "_index": "logstash-2015.11.05",
  "_type": "paloalto",
  "_id": "AVDZqVNIpQiRid-uxPjC",
  "_score": null,
  "_source": {
    "@version": "1",
    "@timestamp": "2015-11-05T21:59:23.440Z",
    "syslog_severity_code": 5,
    "syslog_facility_code": 1,
    "syslog_timestamp": "Nov  5 22:59:26",
    "Type": "THREAT",
    "SessionID": 21713,
    "URIHost": "whatever.nevermind.com",
    "URIPath": "/connectiontest.html"
  },
  "fields": {
    "@timestamp": [
      1446760763440
    ]
  },
  "sort": [
    1446760763440
  ]
}

这是logstash“过滤器”配置:

filter {
    if [type] == "paloalto" {
        syslog_pri {
            remove_field => [ "syslog_facility", "syslog_severity" ]
        }

        grok {
            match => {
                "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{HOSTNAME:hostname} %{INT},%{YEAR}/%{MONTHNUM}/%{MONTHDAY} %{TIME},%{INT},%{WORD:Type},%{GREEDYDATA:log}"
            }
            remove_field => [ "message" ]
        }

        if [Type] == "THREAT" {
            csv {
                source => "log"
                columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "URL", "Threat_OR_ContentName", "reportid", "Category", "Severity", "Direction", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "contenttype", "pcap_id", "filedigest", "cloud", "url_idx", "user_agent", "filetype", "xff", "referer", "sender", "subject", "recipient" ]
                remove_field => [ "log" ]
            }
            mutate {
                convert => {
                    "SessionID" => "integer"
                    "SourcePort" => "integer"
                    "DestinationPort" => "integer"
                    "NATSourcePort" => "integer"
                    "NATDestinationPort" => "integer"
                }
                remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "reportid", "Severity", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
            }
            grok {
                match => {
                    "URL" => "%{URIHOST:URIHost}%{URIPATH:URIPath}(%{URIPARAM:URIParam})?"
                }
                remove_field => [ "URL" ]
            }
        }

        else if [Type] == "TRAFFIC" {
            csv {
                source => "log"
                columns => [ "Threat_OR_ContentType", "ConfigVersion", "GenerateTime", "SourceAddress", "DestinationAddress", "NATSourceIP", "NATDestinationIP", "Rule", "SourceUser", "DestinationUser", "Application", "VirtualSystem", "SourceZone", "DestinationZone", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "SessionID", "RepeatCount", "SourcePort", "DestinationPort", "NATSourcePort", "NATDestinationPort", "Flags", "IPProtocol", "Action", "Bytes", "BytesSent", "BytesReceived", "Packets", "StartTime", "ElapsedTimeInSecs", "Category", "Padding", "seqno", "actionflags", "SourceCountry", "DestinationCountry", "cpadding", "pkts_sent", "pkts_received", "session_end_reason" ]
                remove_field => [ "log" ]
            }
            mutate {
                convert => {
                    "SessionID" => "integer"
                    "SourcePort" => "integer"
                    "DestinationPort" => "integer"
                    "NATSourcePort" => "integer"
                    "NATDestinationPort" => "integer"
                    "Bytes" => "integer"
                    "BytesSent" => "integer"
                    "BytesReceived" => "integer"
                    "ElapsedTimeInSecs" => "integer"
                }
                remove_field => [ "ConfigVersion", "GenerateTime", "VirtualSystem", "InboundInterface", "OutboundInterface", "LogAction", "TimeLogged", "RepeatCount", "Flags", "Action", "Packets", "StartTime", "seqno", "actionflags", "cpadding", "pcap_id", "filedigest", "recipient" ]
            }
        }

        date {
            match => [ "syslog_timastamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
            timezone => "CET"
            remove_field => [ "syslog_timestamp" ]
        }
    }
}

我们试图做的是将 URIHost 项可视化为 X 轴,将字节、BytesSent 和 BytesReceived 总和可视化为 Y 轴。


我认为你可以使用aggregate filter https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html执行你的任务。这aggregate过滤器支持根据公共字段值将多个日志行聚合到一个事件中。在您的情况下,我们将使用的公共字段是SessionID field.

然后我们需要另一个字段来检测应聚合的第一个事件与第二个/最后一个事件。在你的情况下,这将是Type field.

您需要像这样更改当前配置:

filter {

    ... all other filters

    if [Type] == "THREAT" {
        ... all other filters

        aggregate {
            task_id => "%{SessionID}"
            code => "map['URIHost'] = event['URIHost']; map['URIPath'] = event['URIPath']"
        }
    }

    else if [Type] == "TRAFFIC" {
        ... all other filters

        aggregate {
            task_id => "%{SessionID}"
            code => "event['URIHost'] = map['URIHost']; event['URIPath'] = map['URIPath']"
            end_of_task => true
            timeout => 120
        }
    }
}

总体思路是,当Logstash遇到THREAT日志它会暂时存储URIHost and URIPath在内存中的事件映射中,然后当TRAFFIC日志进来后,URIHost and URIPath字段将添加到事件中。如果需要,您也可以复制其他字段。您还可以根据您期望的时间来调整超时(以秒为单位)TRAFFIC最后一个事件之后发生的事件THREAT event.

最后,您将获得包含来自两者的数据合并的文档THREAT and TRAFFIC日志行,您可以轻松创建显示每个字节数的可视化URIHost如您的屏幕截图所示。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

按字段关联 ELK 中的消息 的相关文章

  • Elasticsearch 对字符串排序未返回预期结果

    当对包含多个单词的字符串字段进行排序时 Elasticsearch 会拆分字符串值并使用最小值或最大值作为排序值 即 当对值为 老虎之眼 的字段进行升序排序时 排序值为 Eye 当按降序排序时 排序值为 Tiger 假设我的索引中有 老虎之
  • 使elasticsearch中的所有对象嵌套对象

    是否可以让elasticsearch中的所有嵌套对象自动映射到默认嵌套的类型 而不是对象 是的 您可以使用以下方法来做到这一点动态模板 https www elastic co guide en elasticsearch referenc
  • 将 Elasticsearch 结果导出到 CSV 文件

    我正在尝试将使用以下查询找到的结果导出到桌面上的 CSV 中 这是我第一次使用 Elasticsearch 和 cURL 所以我对如何做到这一点感到困惑 from elasticsearch import Elasticsearch es
  • Elasticsearch 崩溃后无法恢复

    磁盘空间不足 导致 Elasticsearch 分片崩溃 三个节点现在为红色 两个节点已恢复 它们的状态为黄色 ES 的 CPU 利用率为 150 内存利用率很高 正在尝试恢复它们 但似乎存在一些版本匹配冲突 我清理了磁盘空间并删除了分片的
  • 在 ElasticSearch 7+ 中,如何搜索所有文本字段?

    我想在 Elasticsearch 7 3 中存储的文档中搜索单词 我希望在以前版本的 Elasticsearch 上运行的一个示例是 query bool must match all oliver must not should fro
  • 在elasticsearch中过滤facet

    我有一个如下查询 query query query string query s q filter ids values list ids facets destination terms field destination en hot
  • match_none 有什么用?

    我浏览了docs https www elastic co guide en elasticsearch reference current query dsl match all query html query dsl match no
  • Python elasticsearch DSL 聚合/每个文档嵌套值的度量

    我试图找到 2 级嵌套中的最小值 每个文档单独的最小值 到目前为止 我能够进行聚合 计算搜索结果中所有嵌套值的最小值 但无需按文档进行分隔 我的示例架构 class MyExample DocType myexample id Intege
  • Logstash删除类型并保留_type

    我有一个logstash 客户端和服务器 客户端将带有logstash的udp输出的日志文件发送到服务器 服务器也运行logstash来获取这些日志 在服务器上 我有一个 json 过滤器 它会在实际日志的字段中提取 json 格式的消息
  • 如何提高elasticsearch中的索引类型?

    我以前是这样搜索的 curl XGET localhost 9200 users search 但 users 包含用户 a b c 如下所示 curl XGET localhost 9200 users a b c search user
  • 如何在弹性搜索中生成多个布尔查询的查询

    我想使用 spring 框架在 elasticsearch 中动态生成多个布尔运算的查询 我在elasticsearch中的数据就像 masterID
  • Elasticsearch:如何查询连接数?

    如何询问我的 Elasticsearch 服务器现在有多少个连接 这与插座数量相同吗 我也不知道如何获得这些数字 这与客户端的数量不同 对吧 因为每个客户端可能打开多个连接 找不到任何相关信息 但我确实发现您可以在 Elasticsearc
  • 如何编写Elasticsearch多个必须脚本查询?

    我想使用查询来比较多个字段 我有字段 1 到 4 我想搜索字段 1 大于字段 2 的数据 并且下面的查询工作正常 size 0 source field1 field2 field3 field4 sort query bool filte
  • Elasticsearch 单个字段的多个分析器

    我使用严格的预定义映射将不同类型的文档存储在单个索引中 它们都有一些字段 例如 body 但我希望在索引时对它们进行稍微不同的分析 例如 对特定文档使用不同的标记过滤器 并在搜索时以相同的方式处理 据我所知 分析器不能按文档指定 我还考虑使
  • 我们可以同时使用拼音标记和同义词吗?

    我正在尝试同时启用语音分析器和同义词 这似乎不起作用 它们一起使用有错吗 在下面的实现中 我希望使用同义词转换搜索查询 然后使用语音分析器来检索结果 但我的同义词在这里完全被忽略了 如果我在创建索引时删除语音分析器 那么同义词就可以正常工作
  • 如何修复从 React 对 Elasticsearch 进行 API 调用时的“混合内容:”错误

    我正在使用 firebase 的 elasticsearch 的 bitnami 版本 我发现它只能连接到http并不是https当我使用邮递员尝试时 我的create react app已部署到 firebase 我得到Mixed Con
  • 在 Elasticsearch 中删除文件后回收磁盘空间

    当我从 Elasticsearch 中删除文档时 为什么我的 总大小 保持不变 尽管由于没有以前存储的数据而明显小得多 我读过有关索引优化的内容 但我不确定这是什么或如何做到这一点 Thanks 我确信 SO 和 Google 上都有大量与
  • Elasticsearch:根据类型对不同字段进行排序

    我的索引中有两种类型 Event and City 我正在尝试按日期将它们全部排序 但是 每种类型的日期字段名称都不同 为了Event该值是在updated at领域和City日期是在update at其嵌套对象之一中的字段city eve
  • Elastic Search 启动错误 - “\Common 此时出现意外。”

    我已经下载并解压了elasticsearch 当我运行批处理文件 elasticsearch bat 时 出现以下错误 Common was unexpected at this time Solved 通过编辑 bat 文件的第 46 行
  • 为 Logstash 中的新字段设置 Elasticsearch Analyzer

    通过使用GROK filter 我们可以向Logstash添加新字段 但是 我想知道如何为该特定字段设置分析器 例如 我有一个新的 id 字段 其中有一个字段 例如a b 但是 Elasticsearch 附带的普通分析器会将其分解为a a

随机推荐

  • 如何更新 certifi 的根证书?

    我正在使用 certifi python 模块来验证 ssl 连接 我查看了 certifi python2 7 site packages certifi cacert pem 中包含的根证书 其中一些证书已过期 我如何更新这些证书 我尝
  • RegEx 从 CSS 背景样式中提取 URL

    我有一个这种形式的字符串 url http www example com imgs backgrounds bg80 jpg repeat scroll 10 0 transparent 这是来自某个元素的 CSS 样式 该元素目前在页面
  • 在 Clojure 中递归反转序列

    我想在 Clojure 中反转序列而不使用reverse函数 并递归地执行此操作 这是我想出的 defn reverse recursively coll loop r rest coll acc conj first coll if co
  • python中根据变量类型处理数据子集

    我将以下数据存储在 csv df sample csv 中 我将列名放在名为 cols list 的列表中 df 数据 样本 df data sample pd DataFrame new video BASE SHIVER PREFER
  • 如果未登录则重定向 html 页面

    我的网页上有简单的登录表单 它使用 javascript 来登录用户并且工作正常 问题是 用户直接在地址栏中输入登陆页面 URL 他无需登录即可直接访问该页面 如果他没有登录 我想将他重定向到登录页面 以下是loding和目标页面的链接 l
  • 当按下 flutter 中编写的按钮时,有没有办法运行 python 脚本?

    本质上 我想做的是 按下我在 Flutter 中编程的按钮 当按下该按钮时 Python 脚本应该开始在我的 Android 设备上运行 我想在 python 中使用 youtube dl 用于下载 Youtube 视频 库 但我想知道是否
  • Google Cloud Build 获取身份令牌

    在我的场景中 我想在 Google Cloud Build 期间触发基于 HTTP 端点的 Google Cloud Function HTTP 请求是使用 python 3 7 slim 容器的步骤完成的 基于this https clo
  • 如何在 Chart.js 中将 Y 轴值从数字更改为字符串?

    我在用Chart js http www chartjs org 我正在尝试更改 y 轴 请参见下面的屏幕截图 我尝试填写yLabels具有字符串数组的属性 但这没有用 任何帮助 将不胜感激 jQuery document ready fu
  • 如何在 Firebase 实时数据库上安排通知?

    我正在为我工 作的公司开发一个 flutter 通信应用程序 但我遇到了两个问题 这是我需要做的 1 向用户组或特定用户发送通知 并将这些通知保存在数据库或json文件中 该列表将作为 最新新闻 出现在我的应用程序的主屏幕上 问题是 当应用
  • 是否可以从一个存储库中拉取并推送到另一个存储库?

    我在 github 上有一个公开的存储库 在那里我有一个正在开发的开源应用程序 用于制作产品目录和小型 cms 内容 我还有一个私有存储库 未托管在 github 中 它是在 github 中托管的开源应用程序下开发的应用程序 由于我目前正
  • 如何使用Python优化大型数据集的API调用?

    客观的 将地址列表发送到 API 并提取某些信息 例如 指示地址是否位于洪水区域的标志 Solution 适用于小数据的 Python 脚本 Problem 我想针对大输入优化当前的解决方案 如何提高 API 调用的性能 如果我有 100
  • Numpy - 两个矩阵的行之间的协方差

    我需要计算两个不同矩阵的每一行之间的协方差 即第一个矩阵的第一行与第二个矩阵的第一行之间的协方差 依此类推 直到两个矩阵的最后一行 我可以在没有 NumPy 的情况下使用下面附加的代码来完成此操作 我的问题是 是否可以避免使用 for 循环
  • ApplicationDelegate类的要点

    在 Objective C 中将变量和方法签名放在 ApplicationDelegate h 中的要点是什么 通过这样做 所有这些方法和变量都可以被另一个视图控制器类看到吗 这是重点吗 另外 每个项目中是否只有一个应用程序委托类 应用程序
  • 如何让我的 DIV 出现在另一个 DIV 下方

    我有这样的情况 div div div Div A 是 B 和 C 的全屏宽度容器 Div B 是一个小矩形 例如 100 x 200 px Div C 是另一个小矩形 例如 100 x 200 像素 现在发生的情况是 B 和 C 出现在同
  • 计算一系列 csv 文件的行数

    我正在学习 R 教程 并怀疑我必须使用其中一个函数 但我不确定是哪一个 是的 我研究了它们 但在我更加熟悉 R 术语之前 它们非常令人困惑 在我的工作目录中有一个文件夹 specdata Specdata 包含数百个名为 001 csv 3
  • 使用 Xlib 捕获鼠标

    我想编写一个简单的 Xlib 程序来改变鼠标行为 举个例子 反转垂直移动 我在捕获事件时遇到问题 我想要代码 捕获控制器位置的变化 我向上移动鼠标 MotionEvent 计算新的光标位置 new x difference x 设置新的光标
  • Docker 应用程序更新后无法连接到数据库

    在我的公司 我有一个旧的 Symfony 应用程序在 Docker 容器中运行 该应用程序连接到一个 SQL 数据库 该数据库也在 Docker 容器内运行 该图像是使用 php 7 2 apache stretch 构建的 但该版本已不再
  • 如何在杂志/报纸等砖石布局中浮动元素?

    我正在尝试实现一种布局 其中项目将像报纸 杂志文章部分一样浮动 它类似于什么jQuery 的砌体 http masonry desandro com 做 但我试图仅使用 CSS3 来实现这一点 我想也许box显示属性可以做到这一点 尽管尝试
  • 如何在WebView中隐藏滚动条?

    我切换到WKWebView因为UIWeb视图Apple 不再建议使用 使用以下代码从 WebView 中的容器加载 HTML 文件 let webview myWKWebViewClass webview for Bundle main f
  • 按字段关联 ELK 中的消息

    相关 在ELK中合并日志和查询 https stackoverflow com questions 28429607 combine logs and query in elk 我们正在设置 ELK 并希望在 Kibana 4 中创建可视化