如何通过宁静向druid插入数据

2024-03-04

通过以下教程http://druid.io/docs/latest/tutorials/tutorial-loading-streaming-data.html http://druid.io/docs/latest/tutorials/tutorial-loading-streaming-data.html,我能够通过 Kafka 控制台将数据插入到 druid 中

卡夫卡控制台

规格文件如下所示

示例/索引/wikipedia.spec

[
  {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [{
        "type" : "count",
        "name" : "count"
      }, {
        "type" : "doubleSum",
        "name" : "added",
        "fieldName" : "added"
      }, {
        "type" : "doubleSum",
        "name" : "deleted",
        "fieldName" : "deleted"
      }, {
        "type" : "doubleSum",
        "name" : "delta",
        "fieldName" : "delta"
      }],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE"
      }
    },
    "ioConfig" : {
      "type" : "realtime",
      "firehose": {
        "type": "kafka-0.8",
        "consumerProps": {
          "zookeeper.connect": "localhost:2181",
          "zookeeper.connection.timeout.ms" : "15000",
          "zookeeper.session.timeout.ms" : "15000",
          "zookeeper.sync.time.ms" : "5000",
          "group.id": "druid-example",
          "fetch.message.max.bytes" : "1048586",
          "auto.offset.reset": "largest",
          "auto.commit.enable": "false"
        },
        "feed": "wikipedia"
      },
      "plumber": {
        "type": "realtime"
      }
    },
    "tuningConfig": {
      "type" : "realtime",
      "maxRowsInMemory": 500000,
      "intermediatePersistPeriod": "PT10m",
      "windowPeriod": "PT10m",
      "basePersistDirectory": "\/tmp\/realtime\/basePersist",
      "rejectionPolicy": {
        "type": "messageTime"
      }
    }
  }
]

I start realtime via

java -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.realtime.specFile=examples/indexing/wikipedia.spec -classpath config/_common:config/realtime:lib/* io.druid.cli.Main server realtime

在 Kafka 控制台中,我粘贴并输入以下内容

{"timestamp": "2013-08-10T01:02:33Z", "page": "Good Bye", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

然后我倾向于通过创建来执行查询select.json并运行curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json

选择.json

 {
   "queryType": "select",
   "dataSource": "wikipedia",
   "dimensions":[],
   "metrics":[],
   "granularity": "all",
   "intervals": [
     "2000-01-01/2020-01-02"
   ],

   "filter" : {"type":"and",
        "fields" : [
                { "type": "selector", "dimension": "user", "value": "catty" }
        ]
   },

   "pagingSpec":{"pagingIdentifiers": {}, "threshold":500}
 }

我能够得到以下结果。

[ {
  "timestamp" : "2013-08-10T01:02:33.000Z",
  "result" : {
    "pagingIdentifiers" : {
      "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z" : 0
    },
    "events" : [ {
      "segmentId" : "wikipedia_2013-08-10T00:00:00.000Z_2013-08-11T00:00:00.000Z_2013-08-10T00:00:00.000Z",
      "offset" : 0,
      "event" : {
        "timestamp" : "2013-08-10T01:02:33.000Z",
        "continent" : "North America",
        "robot" : "false",
        "country" : "United States",
        "city" : "San Francisco",
        "newPage" : "true",
        "unpatrolled" : "true",
        "namespace" : "article",
        "anonymous" : "false",
        "language" : "en",
        "page" : "Good Bye",
        "region" : "Bay Area",
        "user" : "catty",
        "deleted" : 200.0,
        "added" : 57.0,
        "count" : 1,
        "delta" : -143.0
      }
    } ]
  }
} ]

看来我已经正确设置了 Druid。

现在,我想通过 HTTP 端点插入数据。根据如何将数据实时输入到Druid中? https://stackoverflow.com/questions/31630369/how-realtime-data-input-to-druid,似乎推荐的方法是使用tranquility

宁静

我通过以下方式启动了索引服务

java -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -classpath config/_common:config/overlord:lib/*: io.druid.cli.Main server overlord

conf/服务器.json好像

{
   "dataSources" : [
      {
         "spec" : {
            "dataSchema" : {
                "dataSource" : "wikipedia",
                "parser" : {
                    "type" : "string",
                    "parseSpec" : {
                      "format" : "json",
                      "timestampSpec" : {
                        "column" : "timestamp",
                        "format" : "auto"
                      },
                      "dimensionsSpec" : {
                        "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
                        "dimensionExclusions" : [],
                        "spatialDimensions" : []
                      }
                    }
                },
                "metricsSpec" : [{
                    "type" : "count",
                    "name" : "count"
                }, {
                    "type" : "doubleSum",
                    "name" : "added",
                    "fieldName" : "added"
                }, {
                    "type" : "doubleSum",
                    "name" : "deleted",
                    "fieldName" : "deleted"
                }, {
                    "type" : "doubleSum",
                    "name" : "delta",
                    "fieldName" : "delta"
                }],
                "granularitySpec" : {
                    "type" : "uniform",
                    "segmentGranularity" : "DAY",
                    "queryGranularity" : "NONE"
                }
            },
            "tuningConfig" : {
               "windowPeriod" : "PT10M",
               "type" : "realtime",
               "intermediatePersistPeriod" : "PT10M",
               "maxRowsInMemory" : "100000"
            }
         },
         "properties" : {
            "task.partitions" : "1",
            "task.replicants" : "1"
         }
      }
   ],
   "properties" : {
      "zookeeper.connect" : "localhost",
      "http.port" : "8200",
      "http.threads" : "8"
   }
}

然后,我使用启动服务器

bin/tranquility server -configFile conf/server.json

我执行帖子到http://xx.xxx.xxx.xxx:8200/v1/post/wikipedia http://xx.xxx.xxx.xxx:8200/v1/post/wikipedia, with content-type equals application/json

{"timestamp": "2013-08-10T01:02:33Z", "page": "Selamat Pagi", "language" : "en", "user" : "catty", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}

我得到以下回应

{"result":{"received":1,"sent":0}}

看来宁静已经收到了我们的数据,但是没能发送给druid!

我试着跑curl -X POST 'http://localhost:8084/druid/v2/?pretty' -H 'content-type: application/json' -d @select.json,但没有得到我通过宁静插入的输出。

知道为什么吗?谢谢。


当您发送的数据超出窗口期时,通常会发生这种情况。如果您手动插入数据,请提供准确的当前时间戳 (UTC)(以毫秒为单位)。否则,如果您使用任何脚本来生成数据,则可以轻松完成。确保当前时间为 UTC。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何通过宁静向druid插入数据 的相关文章

随机推荐