大数据016——Kafka

2023-05-16

1. Kafka 简介

Kafka 是一个高吞吐量、低延迟分布式的消息队列系统。kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。Kafka 也是一个高度可扩展的消息系统,它在LinkedIn 的中央数据管道总扮演着十分重要的角色。

1.1 Kafka 的主要设计目标

Kafka 作为一种分布式的、基于发布/订阅的消息系统,其主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久能力,即使对TB级别以上的数据也能保证常数时间的访问的性能;
  • 高吞吐率,即使在非常廉价的商用机上也能做到单机支持每秒100K条消息的传输;
  • 支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个分区内的消息顺序传输;
  • 支持离线数据处理和实时数据处理;
  • 支持在线水平处理。

1.2 消息队列的特点

解耦、冗余、扩展性、灵活性和峰值处理能力、可恢复性、顺序保证、缓冲、异步通信。

2. Kafka 的架构

2.1 Kafka 的基本组成

在 Kafka 集群中生产者将消息发送给以Topic命名的消息队列Queue中,消费者订阅发往以某个Topic命名的消息队列Queue中的消息。其中Kafka集群由若干个Broker组成,Topic由若干个Partition组成,每个Partition里面的消息通过Offset来获取。

  • Broker:一台Kafka服务器就是一个Broker,一个集群有多个Broker组成,一个Broker可以容纳多个Topic,Broker 于Broker之间没有Master和Standby的概念,它们之间的地位基本是平等的;
  • Topic:每条发送到Kafka集群的消息都属于某个主题,这个主题就称为Topic。物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存在一个或多个Broker上,但是用户只需制定消息的主题Topic即可生产或消费数据而不需要关心数据存放在何处;
  • Partition:为了实现可扩展性,一个非常大的Topic可以被分为多个Partition,从而分布在多台Broker上。Partition中的每条消息都会被分配一个自增Id(offset)。Kafka只保证一个Partition中的顺序将消息发送给消费者,但不保证单个Topic中的多个Partition之间的顺序;
  • Offset:消息在Topic的Partition中的位置,同一个Partition中的消息随着消息的写入,其对应的Offset也自增;
  • Replica:副本;Topic的Partition含有N个Replica,N为副本因子。其中一个Replica为Leader,其他都为Follower,Leader处理Partition可以向一个Topic发布一些消息;
  • Consumer:消息消费者,即向指定的Topic获取消息,根据指定Topic的分区索引及其对应分区上的消息偏移量来过去消息;
  • ConsumerGroup:消费者组;每个消费者属于一个消费者组,如果所有的消费者都具有相同的消费者组,那么消息将会在该消费者之间进行负载均衡。也就是说一个Partition中的消息只会被相同ConsumerGroup中的某个Consumer消费,每个ConsumerGroup消息消费是互相独立的;
  • Zookeeper:存放Kafka集群相关元数据的组件。在Zookeeper集群中会保存Topic的状态信息、Broker的状态信息、消费者的消费信息。

3. Kafka 的使用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过 kafka 以统一接口服务的方式开放给各种 consumer,例如hadoop、Hbase、Solr 等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如 spark streaming 和 storm。

4. Kafka 集群部署

这里准备了Zookeeper和Kafka集群的三台服务器节点:node01、node02、node03;

1)、kafka 是一个分布式消息队列,需要依赖 ZooKeeper,请先安装好 ZK集群;

2)、下载压缩包(官网地址:http://kafka.apache.org/downloads.html),上传至服务器并解压:

tar zxvf kafka_2.10-0.9.0.1.tgz -C /home/

3)、修改配置文件,根目录config/server.properties:

broker.id=0;
zookeeper.connect=node01,node02,node03

核心配置参数说明:

  • broker.id: broker 集群中唯一标识 id,0、1、2、3 依次增长(broker即 Kafka 集群中的一台服务器)

    注:当前 Kafka 集群共三台节点,分别为:node1、node2、node3。对应的 broker.id 分别为 0、1、2。

  • zookeeper.connect: ZK集群地址列表。

4)、将当前 node01 服务器上的 Kafka 目录同步到其他 node02、node03 服务器上。

5)、启动 Kafka 集群

  • 事先启动ZK集群;

  • 分别在三台服务器上执行以下命令启动(在kafka安装根目录下):

    bin/kafka-server-start.sh config/server.properties
    

6)、测试

  • 创建 topic:
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic mytest

​ 参数说明:
​ --replication-factor :指定每个分区的复制因子个数,默认 1 个;
​ --partitions :指定当前创建的 kafka 分区数量,默认为 1 个;
​ --topic :指定新建 topic 的名称;

  • 查看 topic 列表:
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
  • 查看某个topic 描述:
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic mytest

如:

[root@node01 kafka_2.10-0.9.0.1]# bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --describe --topic mytest
Topic:mytest	PartitionCount:3	ReplicationFactor:2	Configs:
	Topic: mytest	Partition: 0	Leader: 2	Replicas: 2,0	Isr: 2,0
	Topic: mytest	Partition: 1	Leader: 1	Replicas: 0,1	Isr: 1
	Topic: mytest	Partition: 2	Leader: 1	Replicas: 1,2	Isr: 1,2
  • 创建生产者:
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic mytest
  • 创建消费者:
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic mytest

生产者终端命令行键入字符,消费者终端将打印收到的消息。

5. Kafka 的 Java API

  • 创建消息生产者:
/**
 * 向kafka中生产数据
 *
 * @author root
 */
public class MyProducer extends Thread {

    private String topic; //发送给Kafka的数据,topic
    private Producer<Integer, String> producerForKafka;

    public MyProducer(String topic) {

        this.topic = topic;

        Properties conf = new Properties();
        conf.put("metadata.broker.list", "node01:9092,node02:9092,node03:9092");
        conf.put("serializer.class", StringEncoder.class.getName());
        conf.put("acks",1);

        producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf));
    }


    @Override
    public void run() {
        int counter = 0;
        while (true) {
            counter++;
            String value = "shsxt" + counter;

            KeyedMessage<Integer, String> message = new KeyedMessage<>(topic, value);

            producerForKafka.send(message);
            System.out.println(value + " - -- -- --- -- - -- - -");

            //hash partitioner 当有key时,则默认通过key 取hash后 ,对partition_number 取余数
//			producerForKafka.send(new KeyedMessage<Integer, String>(topic,22,userLog));
//            每2条数据暂停2秒
            if (0 == counter % 2) {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static void main(String[] args) {

        new MyProducer("testkafka").start();

    }
}

  • 创建消息消费者
public class MyConsumer extends Thread {
	private final ConsumerConnector consumer;
	private final String topic;

	public MyConsumer(String topic) {
		consumer = Consumer
				.createJavaConsumerConnector(createConsumerConfig());

		this.topic = topic;
	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", "node01:2181,node02:node03:2181");
		props.put("group.id", "group01");
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("auto.commit.interval.ms", "100");
        props.put("auto.offset.reset","smallest");
//        props.put("auto.commit.enable","false"); // 关闭自动提交,开启手动提交

		return new ConsumerConfig(props);

	}


// push消费方式,服务端推送过来。主动方式是pull
	public void run() {

		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        //mytopic2
		topicCountMap.put(topic, 1); // 描述读取哪个topic,需要几个线程读



		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> list = consumerMap.get(topic);  // 每个线程对应于一个KafkaStream

        KafkaStream stream = list.get(0);


		ConsumerIterator<byte[], byte[]> it = stream.iterator();
        System.out.println("start................");
        while (it.hasNext()){

            String data = new String(it.next().message());

            System.out.println("开始处理数据 ...:"+ data);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

   //         consumer.commitOffsets();//手动提交
        }
			
	}


	public static void main(String[] args) {
		MyConsumer consumerThread = new MyConsumer("testkafka");
		consumerThread.start();
	}
}

代码优化

上述代码存在些问题:

  • 若设置自动提交的延时过大如:props.put("auto.commit.interval.ms", "6000");会出现提交不及时而重复消费消息的后果;
  • 若处理消息的时间过长,会出现提交数据后为完成数据处理的消息丢失现象;

处理上述现象可通过设置手动提交解决:

props.put("auto.commit.enable","false"); // 关闭自动提交,开启手动提交
consumer.commitOffsets(); //手动提交

6. Flume & Kafka

6.1 Flume 与 Kafka整合

  • Flume 配置文件 fk.conf 内容如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = node01
a1.sources.r1.port = 41414
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = testflume
a1.sinks.k1.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
a1.sinks.k1.channel = c1

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 10000
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 启动 Kafka 集群

    bin/kafka-server-start.sh config/server.properties

  • 启动 Flume 集群(注意fk.conf文件存放位置)

bin/flume-ng agent -n a1 -c conf -f conf/fk.conf -Dflume.root.logger=DEBUG,console
  • 创建 topic(也可以不去创建,系统默认创建副本数为1、分区数为1的topic)
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --create --replication-factor 2 --partitions 3 --topic testflume
  • 启动消费者
bin/kafka-console-consumer.sh --zookeeper node01:2181,node02:2181,node03:2181 --from-beginning --topic testflume
  • Flume 中 source 类型为 AVRO 类型,此时通过 Java 发送 rpc 请求,测试数据是否传入 Kafka。其中,Java 发送 Rpc 请求 Flume 代码示例如下:
public class RpcClientDemo {
	
	public static void main(String[] args) throws InterruptedException {
		MyRpcClientFacade client = new MyRpcClientFacade();
		// Initialize client with the remote Flume agent's host and port
		client.init("node01", 41414);

		// Send 10 events to the remote Flume agent. That agent should be
		// configured to listen with an AvroSource.
		for (int i =0; i < 300; i++) {
            int number = new Random().nextInt(3);
            String sampleData ;

            if(number == 0){
                sampleData  = "Hello Flume! ERROR   " + i;
            }else if(number==1){
                sampleData  = "Hello Flume! INFO   " + i;
            }else {
                sampleData  = "Hello Flume! WARNING   " + i;
            }

			client.sendDataToFlume(sampleData);
			System.out.println("发送数据:" + sampleData);
			Thread.sleep(500);
		}

		client.cleanUp();
	}
}

class MyRpcClientFacade {
	private RpcClient client;
	private String hostname;
	private int port;

	public void init(String hostname, int port) {
		// Setup the RPC connection
		this.hostname = hostname;
		this.port = port;
		this.client = RpcClientFactory.getDefaultInstance(hostname, port);
		// Use the following method to create a thrift client (instead of the
		// above line):
		// this.client = RpcClientFactory.getThriftInstance(hostname, port);
	}

	public void sendDataToFlume(String data) {
		// Create a Flume Event object that encapsulates the sample data
		Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

		// Send the event
		try {
			client.append(event);
		} catch (EventDeliveryException e) {
			// clean up and recreate the client
			client.close();
			client = null;
			client = RpcClientFactory.getDefaultInstance(hostname, port);
			// Use the following method to create a thrift client (instead of
			// the above line):
			// this.client = RpcClientFactory.getThriftInstance(hostname, port);
		}
	}

	public void cleanUp() {
		// Close the RPC connection
		client.close();
	}
}
  • 运行上述代码,即可完成Flume于Kafka整合的简单流程。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

大数据016——Kafka 的相关文章

  • 我的2011——毕业之年的总结与彷徨

    题记 眼看2011即将成为过去 xff0c 难得在这最后的时刻 xff0c 抽点时间 xff0c 倒上一杯热茶 xff0c 回忆这一年的浮浮沉沉 这一年 xff0c 我和所有毕业生一样 xff0c 离开了呆了四年的大学校园 呆腻了校园的生活
  • centos安装anaconda教程

    1 更新yum 命令 xff1a sudo yum update 2 安装anaconda 2 1 查看anaconda对应python版本 我选的3 8版 Old package lists Anaconda documentation
  • Android布局 -- Navigation实现底部导航栏

    底部导航栏加页卡的切换 xff0c 很多App采用这种布局设计 xff0c 在以前的开发中 xff0c 需要自定义底部导航栏以及使用FragmentTransaction来管理Fragment的切换 xff0c 代码量较大 xff0c 而使
  • ViewModelProviders is deprecated

    原有的创建ViewModel的方法 xff1a viewModel 61 ViewModelProviders of this get ViewModel class 提示ViewModelProviders过时 改为 xff1a view
  • Android Fragment退出 返回上一个Fragment与直接退出

    例如应用底部有两个导航按钮A与B xff0c 刚进入的时候显示为第一个AFragment xff0c 点击B切换到BFragment 如果需求是在BFragment点击返回键回到AFragment xff0c 需要配置 app defaul
  • Android基础 -- 子线程可以修改UI吗?

    子线程可以修改UI吗 xff1f 为什么会产生这样的问题 xff0c 可能是因为在开发过程中遇到了 34 Only the original thread that created a view hierarchy can touch it
  • leetcode 417. 太平洋大西洋水流问题

    https leetcode cn com problems pacific atlantic water flow 思路是从海洋开始逆流 如果可以逆流到 就标记为1 然后检查两个海洋都可以逆流到的区域 DFS public List lt
  • Android模拟器检测常用方法

    在Android开发过程中 xff0c 防作弊一直是老生常谈的问题 xff0c 而模拟器的检测往往是防作弊中的重要一环 xff0c 接下来有关于模拟器的检测方法 xff0c 和大家进行一个简单的分享 1 传统的检测方法 传统的检测方法主要是
  • RecyclerView 隐藏部分分割线

    在项目中遇到复杂点的RecyclerView xff0c 可能会有隐藏部分分割线的需求 xff0c 例如item1和item3之间的分割线隐藏 xff0c item4和item5之间的分割线隐藏等 在看了文档里的ItemDecoration
  • 浅谈去中心化应用

    1 中心化应用 现在我们所使用的应用基本上都是中心化的应用 xff0c 什么是中心化应用呢 xff0c 举个栗子 xff0c 我们在天猫买东西的时候 xff0c 需要先付款给支付宝 xff0c 然后卖家发货 xff0c 我们确认收货之后 x
  • Java二分搜索树及其添加删除遍历

    对于树这种结构 xff0c 相信大家一定耳熟能详 xff0c 二叉树 二分搜索树 AVL树 红黑树 线段树 Trie等等 xff0c 但是对于树的应用以及编写一棵解决特定问题的树 xff0c 不少同学都会觉得不是一件简单的事情 xff0c
  • 游戏平台SDK设计和开发之旅——XSDK功能点梳理

    做游戏开发或者相关工作的同学 xff0c 可能都知道 xff0c 在游戏上线之前 xff0c 需要将游戏分发到各大渠道平台 xff0c 比如九游 xff0c 百度 xff0c 360 xff0c 华为等等 其中和技术相关的事情 xff0c
  • 谈谈 GitHub 开放私有仓库一事的影响

    GitHub 此次宣布免费开放私有仓库 xff0c 在我看来有以下几点影响 xff1a 缓和与同类产品间的竞争压力小部分个人项目由开源转闭源微软在技术社区中的企业形象进一步强化为未来的企业服务预热 下面根据以上几点 xff0c 我来简单谈下
  • 每天坚持刷 LeetCode 的人,究竟会变得有多强... 学习技巧都藏在这几个公众号里面了......

    信息爆炸时代 xff0c 与其每天被各种看过就忘的内容占据时间 xff0c 不如看点真正对你有价值的信息 xff0c 下面小编为你推荐几个高价值的公众号 xff0c 它们提供的信息能真正提高你生活的质量 人工智能爱好者社区 专注人工智能 机
  • 超酷炫!智能无人机中文教程重磅上线!

    前 言 对于大多数无人机爱好者来说 xff0c 能自己从头开始组装一台无人机 xff0c 之后加入 AI 算法 xff0c 能够航拍 xff0c 可以目标跟踪 xff0c 是心中的梦想 并且 xff0c 亲自从零开始完成复杂系统 xff0c
  • B 站硬件大佬又在 GitHub 上开源了一款神器...

    公众号关注 GitHubDaily 设为 星标 xff0c 每天带你逛 GitHub xff01 转自量子位 这次 xff0c 野生钢铁侠稚晖君带着他的硬核项目又来了 上次自制纯手工打造 AI 小电视 xff0c 播放量就超过 300 万
  • 用 C 语言来刷 LeetCode,网友直呼:那是真的牛批...

    公众号关注 GitHubDaily 设为 星标 xff0c 每天带你逛 GitHub xff01 大家好 xff0c 我是小 G 如果你是计算机科班出身 xff0c 那么 C 语言 xff0c 估计是你在初入编程时 xff0c 最早接触的编
  • 【pytorch torchvision源码解读系列—3】Inception V3

    框架中有一个非常重要且好用的包 xff1a torchvision xff0c 顾名思义这个包主要是关于计算机视觉cv的 这个包主要由3个子包组成 xff0c 分别是 xff1a torchvision datasets torchvisi
  • 【pytorch torchvision源码解读系列—5】DenseNet

    pytorch框架中有一个非常重要且好用的包 xff1a torchvision xff0c 顾名思义这个包主要是关于计算机视觉cv的 这个包主要由3个子包组成 xff0c 分别是 xff1a torchvision datasets to
  • Eclipse使用JDBC方式连接SQLServer2016

    Eclipse使用JDBC方式连接SQLServer2016 今天下午在查找很多JDBC连接SQL时发现大多数都是2012甚至更久以前的版本 xff0c 所以就此把步骤记录下来 xff0c 以免自己下次使用又忘记了 在连接的时候 xff0c

随机推荐

  • 魔改《自动化学报》Latex模板

    想用latex写一个中文文档 xff0c 看上了 自动化学报 的模板 xff0c 感觉不错 xff0c 下载下来在本地的tex live上编译 xff0c 报了一大串错 xff1b 上传到overleaf xff0c 还是报错 xff1b
  • TX2安装jetpack

    目前官网支持的下载为JetPack L4T 3 2 1 linux x64 b23和JetPack L4T 3 3 linux x64 b39 首先使用具有Ubuntu16 04的host主机 xff08 我使用的是个人笔记本 xff0c
  • TF-IDF算法

    TF IDF算法 TF IDF term frequency inverse document frequency 是一种用于信息检索与数据挖掘的常用加权技术 xff0c 常用于挖掘文章中的关键词 xff0c 而且算法简单高效 xff0c
  • 大数据009——MapReduce

    分布式离线计算框架MapReduce MapReduce是一种编程模型 Hadoop MapReduce采用Master slave 结构 只要按照其编程规范 xff0c 只需要编写少量的业务逻辑代码即可实现一个强大的海量数据并发处理程序
  • MapReduce实例——wordcount(单词统计)

    1 MR实例开发整体流程 最简单的MapReduce应用程序至少包含 3 个部分 xff1a 一个 Map 函数 一个 Reduce 函数和一个 main 函数 在运行一个mapreduce计算任务时候 xff0c 任务过程被分为两个阶段
  • MapReduce实例——好友推荐

    1 实例介绍 好友推荐算法在实际的社交环境中应用较多 xff0c 比如qq软件中的 你可能认识的好友 或者是Facebook中的好友推介 好友推荐功能简单的说是这样一个需求 xff0c 预测某两个人是否认识 xff0c 并推荐为好友 xff
  • Hadoop源码分析——JobClient

    1 MapReduce作业处理过程概述 当用户使用Hadoop的Mapreduce计算模型来进行处理问题时 xff0c 用户只需要定义所需的Mapper和Reduce处理函数 xff0c 还有可能包括的Combiner Comparator
  • 大数据010——Hive

    1 Hive 概述 Hive 是建立在 Hadoop 上的数据仓库基础构架 它提供了一系列的工具 xff0c 可以用来进行数据提取转化加载 xff08 ETL xff09 xff0c 这是一种可以存储 查询和分析存储在 Hadoop 中的大
  • 大数据011——Sqoop

    1 Sqoop 概述 Sqoop是Hadoop和关系数据库服务器之间传送数据的一种工具 它是用来从关系数据库如 xff1a MySQL xff0c Oracle到Hadoop的HDFS xff0c 并从Hadoop的文件系统导出数据到关系数
  • 大数据012——HBase

    1 HBase 简介 HBase Hadoop Database xff0c 是一个高可靠性 高性能 面向列 可伸缩 实时读写的分布式数据库 xff1b 在Hadoop生态圈中 xff0c 它是其中一部分且利用Hadoop HDFS作为其文
  • Hadoop源码分析——MapReduce输入和输出

    Hadoop中的MapReduce库支持集中不同的格式的输入数据 例如 xff0c 文本模式的输入数据的每一行被视为一个key value键值对 key是文件的偏移量 xff0c value是那一行的内容 另一种常见的格式是以key进行排序
  • 大数据013——Flume

    1 Flume 简介 Flume是由Cloudera软件公司提供的一个高可用的 xff0c 高可靠的 xff0c 分布式的海量日志采集 聚合和传输的系统 xff0c 后与2009年被捐赠了apache软件基金会 xff0c 为hadoop相
  • Hadoop源码分析——计算模型MapReduce

    MapReduce 是一个计算模型 xff0c 也是一个处理和生成超大数据集的算法模型的相关实现 用户首先创建一个Map函数处理一个基于key value pair的数据集合 xff0c 输出中间的基于 key value pair 的数据
  • 从SDLC到DevSecOps的转变

    OSSTMM 根据开源安全测试方法手册OSSTMM Open Source Security Testing Methodology Manual 的表述 安全测试包括但不限于以下几种做法 漏洞扫描 安全扫描 渗透测试 风险评估 安全审核
  • 大数据014——Storm 简介及入门案例

    分布式实时数据处理框架 Storm 1 Storm简介与核心概念 1 1 Storm 简介 全称为 Apache Storm xff0c 是一个分布式实时大数据处理系统 它是一个流数据框架 xff0c 具有最高的获取率 它比较简单 xff0
  • Hive与HBase整合详解

    参考之前小节的大数据010 Hive与大数据012 HBase成功搭建Hive和HBase的环境 xff0c 并进行了相应的测试 xff0c 并且在大数据011 Sqoop中实现Hive HBase与MySQL之间的相互转换 xff1b 本
  • 大数据015——Elasticsearch基础

    1 Elasticsearch 简介 Elasticsearch是一个基于Lucene的实时的分布式搜索和分析 引擎 设计用于云计算中 xff0c 能够达到实时搜索 xff0c 稳定 xff0c 可靠 xff0c 快速 xff0c 安装使用
  • 大数据015——Elasticsearch深入

    1 Elasticsearch 核心概念 1 1 cluster 代表一个集群 xff0c 集群中有多个节点 xff0c 其中有一个为主节点 xff0c 这个主节点是可以通过选举产生的 xff0c 主从节点是对于集群内部来说的 es的一个重
  • 大数据014——Storm 集群及入门案例

    分布式实时数据处理框架 Storm 1 Storm 集群 1 1 Storm 版本变更 版本编写语言重要特性HA 高可用0 9 xjava 43 clojule改进与Kafka HDFS HBase的集成不支持 xff0c storm集群只
  • 大数据016——Kafka

    1 Kafka 简介 Kafka 是一个高吞吐量 低延迟分布式的消息队列系统 kafka 每秒可以处理几十万条消息 xff0c 它的延迟最低只有几毫秒 Kafka 也是一个高度可扩展的消息系统 xff0c 它在LinkedIn 的中央数据管