C# 操作Kafka

2023-11-10

1.C# 连接Kafka知识分享


前些天公司的Boss突然下达一个命令,消息中间件要用Kafka,既然领导都决定了用就用呗。那就网上百度一下去Kafka如何安装啊,Kafka用代码如何连接操作。在安装和使用过过程中遇到了一些坎坷的事情,最总还是解决了。

我所在部门使用C#编程语言,所以连接Kafka用C#语言去实现,可能朋友们会说那不是很简单吗?百度一下网上一大堆。百度是一大堆但未必是你想要的,网上找了好多篇都是基于Java语言编写的,C#的也有,但是没Java资料丰富。

https://gitee.com/autumn_2/MQExtend.Core.git 基于MQ提供的Sdk。二次封装后支持对ActiveMQ、Kafak相关操作(本人也是一个小白,写的东西也是半桶水。但希望对大家有帮助)

2.Confluent kafka

在选择Kafka类库之前看了https://blog.csdn.net/xinlingjun2007/article/details/80295332 这篇博客,所以就选择了Confluent kafka 类库了

2.1 消息发送

kafka 中的auto.create.topics.enable默认为false的,所以在发送消息给Topics之前,确保Topics在Kafka里要存在。这个需要注意一下。

public void PushMessage()
{
	var config = new ProducerConfig()
	{
		BootstrapServers = "localhost:9092",
		Acks = Acks.Leader
	};
	//message<key,value> 这个key目前没用,做消息指定分区投放有用的;我们直接用null
	using(var producer = new ProducerBuilder<Null, string>(config).Build())
	{
		producer.Produce("TopicName", new Message<Null, string>()
		{
			Value = "需要发送的消息内容"
		}, (result) =>
		{
			WriteLog(!result.Error.IsError ? $"Delivered message to {result.TopicPartitionOffset}" : $"Delivery Error: {result.Error.Reason}");
		});
		Console.WriteLine("消息发送成功");
	}
}

2.2 消息消费

session.timeout.ms

如果consumer在这段时间内没有发送心跳信息,则它会被认为挂掉了。默认3秒。

auto.offset.reset

消费者在读取一个没有偏移量的分区或者偏移量无效的情况下,如何处理。默认值是latest。

earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

none:各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

enable .auto.commit

默认值true,表明消费者是否自动提交偏移。为了尽量避免重复数据和数据丢失,可以改为false,自行控制何时提交

/// <summary>
/// 消息订阅
/// </summary>
/// <param name="subscribe"></param>
public void Subscribe(string queueName, Action<IMessageContent> action)
{
	var config = new ConsumerConfig()
	{
		BootstrapServers = "",
		GroupId = "gms_20200327_group",
		AutoOffsetReset =  AutoOffsetReset.Earliest,
		EnableAutoCommit = false
	};
	//如果Kafka配置了安全认证(我这里只是写案例,本地配置了sasl安全认证)就加这块代码
	if (!string.IsNullOrEmpty(this.UserName) && !string.IsNullOrEmpty(this.Password))
	{
		config.SecurityProtocol = SecurityProtocol.SaslPlaintext;
		config.SaslMechanism = SaslMechanism.Plain;
		config.SaslUsername = this.UserName;
		config.SaslPassword = this.Password;
	}

	using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
	{
		//订阅topicName
		consumer.Subscribe(queueName);

		CancellationTokenSource cts = new CancellationTokenSource();
		Console.CancelKeyPress += (sender, e) =>
		{
			//prevent the process from terminating.
			e.Cancel = true;
			cts.Cancel();
		};

		//是否消费成功
		bool isOK = false;
		//result
		ConsumeResult<Ignore, string> consumeResult = null;
		try
		{
			while (true)
			{
				isOK = false;
				try
				{
					//consumer.Assign(new TopicPartitionOffset(queueName, 0, Offset.Beginning));
					consumeResult = consumer.Consume(cts.Token);
					if (consumeResult.IsPartitionEOF)
					{
						WriteLog($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");
						continue;
					}
					//接收到的消息记录Log
					WriteLog($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Value}");
					//消息消费
					action?.Invoke(new KafkaMessageContent(consumeResult.Value, consumeResult.Key?.ToString()));
					//消费成功
					isOK = true;
					//提交方法向Kafka集群发送一个“提交偏移量”请求,并同步等待响应。
					//与消费者能够消费消息的速度相比,这是非常慢的。
					//一个高性能的应用程序通常会相对不频繁地提交偏移量,并且在失败的情况下被设计来处理重复的消息
					consumer.Commit(consumeResult);
					//消费成功Log记录
					WriteLog($"Consumed message '{consumeResult.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
				}
				catch (ConsumeException e)
				{
					isOK = false;
					WriteError($"Error occured: {e.Error.Reason}");
				}
				catch (Exception ex)
				{
					isOK = false;
					WriteError($"Error occured: {ex.StackTrace}");
				}

				//消费失败后置处理
				if (!isOK && consumeResult != null)
				{
					//消费失败代码逻辑处理
					ErrorHandler(consumer, consumeResult);
				}
			}
		}
		catch (OperationCanceledException e)
		{
			WriteException(e);
			// Ensure the consumer leaves the group cleanly and final offsets are committed.
			consumer.Close();
		}
	}
}

注意:这里有一点点需要注意下consumer.Commit(consumeResult)。我先列举一个例子如果一个分区里面有10条消息

1

2

3(消费失败)

4

5

6

7

8

9

10

 

  1. 如果3这条消息消费失败,那么就会被catch捕获,代码没执行到consumer.Commit() 提交偏移量这段代码。
  2. 因为异常被catch了,所以消费者继续poll消息,获取到4这条消息。4这条消息成功了,就会执行consumer.Commit() 提交偏移量。(注意:这里就提交了最新的偏移量了),换句话说如果3这条消费失败,不去做一些额外的处理3这条消息就消费不到了(注意:不人工干预是消费不到的)。
  3. 我这边处理逻辑是将消费失败的消息将其转发到DLQ队列,就是创建一个Topics的同时在创建一个DLQ.Topics。这个DLQ.Topics专门用来存放 Topics消费失败的消息。处理代码如下:
/// <summary>
/// 消费异常处理
/// </summary>
/// <param name="consumer">消费者</param>
/// <param name="consumeResult">消息</param>
private void ErrorHandler(IConsumer<Ignore, string> consumer, ConsumeResult<Ignore, string> consumeResult)
{
	if (consumeResult != null && consumer != null)
	{
		string queueName = consumeResult.Topic;
		WriteLog($"Consumed '{queueName}' message fail '{consumeResult.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
		//消费失败,并且需要不需要转发到DLQ队列中,所以我们这里需要把(Offset-1)
		if (!this.SubscribeConfig.TransformToDLQ || queueName.StartsWith("DLQ.", StringComparison.OrdinalIgnoreCase))
		{
			//偏移量往回拉一位,尝试6次操作。如果执行失败,确保消息不遗漏直接停止消费。
			OffsetBack(consumer, consumeResult);
			return;
		}

		//需要转发到DLQ队列中
		string transformTopics = "DLQ." + queueName;
		WriteLog($"消息开始转发到{transformTopics}队列");
		KafkaProducerConfig config = new KafkaProducerConfig(ServerConfig.BrokerUri,
			ServerConfig.UserName, ServerConfig.Password, transformTopics);
		try
		{
			//将消息转发到死信队列
			using (IProducerChannel producer = new KafkaProducer(config))
			{
				producer.Producer(consumeResult.Value?.ToString());
			}
			//提交偏移量
			consumer.Commit(consumeResult);
			WriteLog($"消息转发到{transformTopics}队列成功");
		}
		catch (Exception ex)
		{
			WriteError($"消息转发到{transformTopics}队列失败。Error occured: {ex.StackTrace}");
			//偏移量往回拉一位,尝试6次操作。如果执行失败,确保消息不遗漏直接停止消费。
			OffsetBack(consumer, consumeResult);
		}
	}
}

/// <summary>
/// 把Offset偏移量往回拉一位
/// </summary>
/// <param name="consumer"></param>
/// <param name="consumeResult"></param>
/// <param name="tryTimes">默认执行6次</param>
private void OffsetBack(IConsumer<Ignore, string> consumer, ConsumeResult<Ignore, string> consumeResult, int tryTimes = 6)
{
	int count = tryTimes;
	string queueName = consumeResult.Topic;
	while (count > 0)
	{
		WriteLog($"消息消费失败,执行偏移量Offset-1操作");
		try
		{
			//消费失败,重置一下最新偏移量
			consumer.Assign(new TopicPartitionOffset(queueName, consumeResult.Partition, consumeResult.Offset));
			WriteLog($"偏移量重置成功{consumeResult.Offset}");
			count--;
			return;
		}
		catch (Exception ex)
		{
			WriteLog($"消息消费失败,执行偏移量Offset-1操作失败。Error occured: {ex.StackTrace}");
			//尝试重置偏移量次数到了最大次数,直接抛出异常。停止消费
			if (count == 0)
			{
				WriteError($"消息消费失败,执行偏移量Offset-1操作失败次数已达到${tryTimes},消费者停止消费");
				//抛出这个异常,会引发Subscribe()到catch代码块。catch会停止消费
				throw new OperationCanceledException($"消息消费失败,执行偏移量Offset-1操作失败次数已达到${tryTimes},消费者停止消费");
			}
			//停止3s在重新重置偏移量
			Thread.Sleep(3000);
		}
	}
}
  • 以上的消费异常处理只是本人的观点,可能有更好的处理方案,输入在下方一起交交流。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

C# 操作Kafka 的相关文章

随机推荐

  • PHP 23种设计模式

    设计模式的目的 重用性 相同功能的代码 不用多次编写 可读性 编程规范性 便于其他程序员的阅读和理解 可扩展性 当需要增加新的功能时 非常的方便 称为可维护 可靠性 当我们增加新的功能后 对原来的功能没有影响 使程序呈现高内聚 低耦合的特性
  • redis 3.0的集群部署

    文章转载自 http hot66hot iteye com blog 2050676 转载请注明出处 http hot66hot iteye com admin blogs 2050676 最近研究redis cluster 正好搭建了一个
  • 使用ChatGPT的gpt-3.5-turbo模型, PHP接入代码

  • CAN 总线通信 简要概括

    CAN 通信总线协议 简要概括 CAN 介绍 CAN 类型 高速 CAN 低速 CAN CAN 总线系统结构 CAN bus通信帧 CAN 基本数据帧结构 CAN 硬件设计 CAN 介绍 CAN Controller Area Networ
  • 2022年Python面试题汇总【面试官爱问】

    2022年Python面试题汇总 常问 1 请你讲讲python获取输入的方式 以及python如何打开文件 2 Python数据处理的常用函数 3 请你说说python传参传引用 4 请你说说python和java的区别 5 Python
  • IPv6知识概述 - IPv6地址

    IPv6知识概述 IPv6地址 IPv6地址表示 根据RFC4291 IPv6地址有3中格式 首选格式 压缩表示和内嵌IPv4地址的IPv6地址表示 首选格式 IPv6的地址长度是128位 bit 将这128位的地址按每16位划分为一个段
  • wincc怎么做一个弹出画面_Wincc如何利用单个弹出窗口画面的模板,来实现调用多组画面参数?...

    点击上方蓝色字 小叔聊自控 在后台回复关键字 画面脚本 即可获得本次视频中的所有文件及项目包 以下视频中包含本期所有内容 大家好 我是小叔 今天我们来了解一下Wincc如何利用单个弹出窗口的画面模板 来实现调用多组画面参数的方法 怎么来理解
  • Unity版本更新之后IOS审核提示机型适配变少

    之前用2020 3 1提交IOS白包 后来更新版本后使用2020 3 10打包的 提示适配 机器变少了 We identified one or more issues with a recent submission for App St
  • 问题集锦~

    1 Wireshark抓包过程遇到的一点小问题 在使用wireshark进行抓包时 发现目标为本机时 无法抓包 这是由于wireshark并不会抓取本机loop的流量 只会抓取流经网卡的流量 如果需要使用wireshark抓取本机的数据包
  • Q_DECLARE_PRIVATE与Q_DECLARE_PUBLIC

    Q DECLARE PRIVATE与Q DECLARE PUBLIC 这两个宏在Qt的源码中随处可见 重要性不言而喻 在 部落格的 Inside Qt Series 系列文章中 他用了3篇文章来讲这个问题 因为 QObject 本身比较复杂
  • java.lang.ClassNotFoundException: org.springframework.web.context.ContextLoaderListener问题

    解决方案 1 spring web的jar包缺失 2 刷新工程 因为工程是部署在服务器下的 可能没被检测到
  • Spring Web MVC框架(六) 异常处理

    Spring Web MVC对异常处理有着完善的支持 我们可以捕获控制器中抛出的任何异常 然后按照异常类型将异常信息映射到某个视图文件 向用户显示对应的信息 ExceptionHandler 最简单的办法就是使用 ExceptionHand
  • PyTorch中nn.Module类中__call__方法介绍

    在PyTorch源码的torch nn modules module py文件中 有一条 call 语句和一条forward语句 如下 call Callable Any call impl forward Callable Any for
  • java 反射lib下的jar_JAVA通过反射调用外部的jar包

    把外包jar的信息写在配置文件中 这样如果外部jar改变了 只需要修改properties相应的配置即可 config properties文件内容如下 jarUrl E MessageSend jar className org line
  • 为Android安装BusyBox —— 完整的bash shell

    http www cnblogs com xiaowenji archive 2011 03 12 1982309 html 大家是否有过这样的经历 在命令行里输入adb shell 然后使用命令操作你的手机或模拟器 但是那些命令都是常见L
  • 星星之火-35:为什么傅里叶分析需要引入负频率以及负频率的物理意义是什么?

    1 傅里叶分析的量化模型 下图是通过傅里叶分析从时域信号中获取谐波分量的幅度特征的基本模型 是不是似曾相识 是的 这个模型就是从高频已调信号中解调出基带信号的模型 该模型是利用函数正交性原理 通过指定频率的复指数信号 从时域信号中获取指定频
  • Python 终端进度条评测 包含一个作者自定义的轻量案例源码

    Python 终端进度条对比评测 效果展示 TPDM 代码部分 输出包体 129M RICH 代码部分 输出包体 68M 当前案例 代码部分 包体大小 总结 你可以根据你的实际情况来做出选择 效果展示 TPDM 可以看到效果是蛮好的 其次
  • MQ线上平滑迁移方案

    一 迁移的问题点 1 多生产者 多消费者切换排期跨度较大 场景一 多个生产者 一个消费者 如何保证多个生产者不同排期切换平滑稳定过渡 不漏消费 不重复消费 场景二 一个生产者 多个消费者 如何保证多个消费者不同排期切换平滑稳定过渡 不漏消费
  • Linux——Docker网络通信

    文档中使用的镜像不同 自行选择镜像 Docker Docker提供了映射容器端口到宿主机和容器互联机制来为容器提供网络服务 一 Dockerhost单主机网络 Docker网络从覆盖范围可分为单个host上的容器和跨多个host的网络 DO
  • C# 操作Kafka

    1 C 连接Kafka知识分享 前些天公司的Boss突然下达一个命令 消息中间件要用Kafka 既然领导都决定了用就用呗 那就网上百度一下去Kafka如何安装啊 Kafka用代码如何连接操作 在安装和使用过过程中遇到了一些坎坷的事情 最总还