客户端代码:
package main
import (
"fmt"
"github.com/Shopify/sarama"
)
//kafka 示例代码
func main() {
//配置
config:= sarama.NewConfig()
//等待服务器所有副本都保存成功后的响应,即数据成功发送到kafka后返回的响应信息
config.Producer.RequiredAcks = sarama.WaitForAll
//随机向partition发送消息
config.Producer.Partitioner = sarama.NewRandomPartitioner
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
config.Producer.Return.Successes = true
fmt.Println("start make producer")
//使用配置,新建一个异步生产者
//sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
client, err := sarama.NewSyncProducer([]string{"192.168.18.131:9092"}, config)
if err != nil{
fmt.Println("product close, err :", err)
return
}
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test, my message is good")
pid, offset, err := client.SendMessage(msg)
if err != nil{
fmt.Println("send message failed,", err)
return
}
// 创建消息
defer client.Close()
fmt.Printf("pid: %v offset: %v\n", pid, offset)
}
执行报错:
panic: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)
这个错误搞了好久....... 后来以下步骤解决:
1,在本机 cmd 打开黑窗口
执行 : telnet kafka机器ip 9092
发现无法连接: 报错为连接失败........ 但是虚拟机端口开放了 9092, 为什么连接不上?
2,登录虚拟机执行命令
/etc/init.d/iptables status : 查看开放端口,9092 端口在列表最后,
删除后,重新执行
比如要删除INPUT里序号为2的规则,执行:
# iptables -D INPUT 2
/sbin/iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
/etc/rc.d/init.d/iptables save #保存配置
/etc/rc.d/init.d/iptables restart #重启服务
以上执行完成后发现还是不行.....................
重启虚拟机, 启动 zookeeper 和 kafka
执行 : telnet kafka机器ip 9092 。成功了,
但是执行示例代码: 又报错:
send message failed, dial tcp 0.0.0.0:9092: connectex: No connection could be made because the target machine actively refused it
修改 kafka配置文件 将 0.0.0.0 修改为 :虚拟机ip 地址 即可
测试消费消息 执行一下命令:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic nginx_log --from-beginning