腾讯mini项目-【指标监控服务重构】2023-08-13

2023-11-17

今日已办

使用watermill框架替代当前的base_runner框架
a. 参考官方提供的sarama kafka Pub/Sub(https://github.com/ThreeDotsLabs/watermill-kafka/)实现kafka-go(https://github.com/segmentio/kafka-go)的Pub/Sub(sarama需要cgo,会导致一些额外的镜像依赖)
b. 参考https://watermill.io/docs/messages-router/实现不同topic(不同事件)走不同的Handler处理逻辑,相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/)

watermill-kafka

  1. 构造 publisher, subscriber qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com)
  2. 定义 router
  3. 获取符合正则表达式的主题
  4. 为每一个主题添加 middleware、handler
// Package consumer
// @Author xzx 2023/8/11 18:53:00
package consumer

import (
	"context"
	kc "github.com/Kevinello/kafka-client"
	"github.com/ThreeDotsLabs/watermill"
	"github.com/ThreeDotsLabs/watermill/message"
	"github.com/ThreeDotsLabs/watermill/message/router/middleware"
	"github.com/ThreeDotsLabs/watermill/message/router/plugin"
	"github.com/qiulin/watermill-kafkago/pkg/kafkago"
	"go.uber.org/zap"
	"profile/internal/config"
	"profile/internal/connector"
	"profile/internal/log"
	"profile/internal/schema"
	"profile/internal/schema/performance"
	"strings"
	"time"
)

// ProfileContext
// @Description:
// @Author xzx 2023-08-11 22:21:41
type ProfileContext struct {
	Status int
	Ctx    context.Context

	Router *message.Router
	Event  schema.Event

	AppID         string // API 上报
	FetchScenario string // API 上报
}

// NewProfileContext
// @Description
// @Author xzx 2023-08-11 22:49:00
// @Return *ProfileContext
func NewProfileContext() *ProfileContext {
	profileCtx := &ProfileContext{
		Ctx: context.Background(),
	}
	profileCtx.init()
	return profileCtx
}

// init
// @Description 初始化
// @Author xzx 2023-08-11 22:22:01
func (profileCtx *ProfileContext) init() {
	logger := watermill.NewStdLogger(false, false)
	publisher, subscriber := newPubSub()
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		log.Logger.Fatal("creates a new Router with given configuration error", zap.Error(err))
	}

	router.AddPlugin(plugin.SignalsHandler)
	router.AddMiddleware(
		middleware.Retry{
			MaxRetries:      3,
			InitialInterval: time.Millisecond * 100,
			Logger:          logger,
		}.Middleware,
		middleware.Recoverer,
	)

	getTopics := kc.GetTopicReMatch(strings.Split(config.Profile.GetString("kafka.topicRE"), ","))
	topics, err := getTopics(config.Profile.GetString("kafka.bootstrap"))
	if err != nil {
		log.Logger.Fatal("get topics failed", zap.Error(err))
		return
	}

	for _, topic := range topics {
		var category string
		if strings.Contains(topic, performance.CategoryCrash) {
			category = performance.CategoryCrash
		} else if strings.Contains(topic, performance.CategoryLag) {
			category = performance.CategoryLag
		} else {
			continue
		}
		router.AddHandler("profile-consume", topic, subscriber, connector.GetTopic(category), publisher, profileCtx.WriteKafka).
			AddMiddleware(
				profileCtx.UnpackKafkaMessage,
				profileCtx.InitPerformanceEvent,
				profileCtx.AnalyzeEvent,
			)
	}
	profileCtx.Router = router
}

// Run
// @Description
// @Author xzx 2023-08-12 13:52:53
func (profileCtx *ProfileContext) Run() {
	// router.Run contains defer cancel()
	if err := profileCtx.Router.Run(profileCtx.Ctx); err != nil {
		log.Logger.Error("runs all plugins and handlers and starts subscribing to provided topics error", zap.Error(err))
	}
}

func (profileCtx *ProfileContext) Done() <-chan struct{} {
	return profileCtx.Ctx.Done()
}

func (profileCtx *ProfileContext) Err() error {
	return profileCtx.Ctx.Err()
}

func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {
	return profileCtx.Ctx.Deadline()
}

func (profileCtx *ProfileContext) Value(key any) any {
	return profileCtx.Ctx.Value(key)
}

// newPubSub
// @Description
// @Author xzx 2023-08-13 16:00:26
// @Return message.Publisher
// @Return message.Subscriber
func newPubSub() (message.Publisher, message.Subscriber) {
	logger := watermill.NewStdLogger(false, false)
	marshaler := kafkago.DefaultMarshaler{}
	publisher := kafkago.NewPublisher(kafkago.PublisherConfig{
		Brokers:     []string{config.Profile.GetString("kafka.bootstrap")},
		Async:       false,
		Marshaler:   marshaler,
		OTELEnabled: false,
		Ipv4Only:    true,
		Timeout:     100 * time.Second,
	}, logger)

	subscriber, err := kafkago.NewSubscriber(kafkago.SubscriberConfig{
		Brokers:       []string{config.Profile.GetString("kafka.bootstrap")},
		Unmarshaler:   marshaler,
		ConsumerGroup: config.Profile.GetString("kafka.group"),
		OTELEnabled:   false,
	}, logger)
	if err != nil {
		log.Logger.Fatal("Unable to create subscriber", zap.Error(err))
	}
	return publisher, subscriber
}

组内会议

已完成:

  1. 找到了基于 kafka-go 的 watermill 的 pub/sub 模型:https://github.com/qiulin/watermill-kafkago
  2. 将 baserunner 替换为 watermill 的 pub/sub 中可以大致还原原本的功能
  3. grafana 展示 trace 的SQL比较复杂,因为表结构的定义有出入,需要进行重命名,时间范围也需要重新修改SQL,而且组员反馈 grafana 的可视化并没有signoz 来得直观友好

待办:

  1. 调研 HyperScan golang implement 和 benchmark
  2. 优化 watermill-pub/sub 的逻辑

问题:

  1. watermill-pub/sub : 目前的做法有将4个阶段整合为一个 Handler 来处理,还有将其抽离为Middleware 的做法,原有 baserunner 的逻辑是所有 topic 的都走 4 个阶段的 Handler
  2. 是否可以停止 grafana 展示 trace 的工作

明日待办

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

腾讯mini项目-【指标监控服务重构】2023-08-13 的相关文章

  • golang - 省略 json 属性进行序列化的优雅方法

    我有一个用户结构 其中包含密码等敏感字段 type User struct UID string json uid binding required Password string json password binding require
  • Golang - 如何在特定时间执行函数

    我需要在一天中的特定时间运行一个函数 例如 0010 0610 1210 1810 我目前的方法使用自动收报机for range time Tick 21600 time Second 我以这些时间间隔之一 例如 1210 手动启动该程序
  • 为什么这里会出现僵局

    我想了解 golang 通道是如何工作的 我读了一本关于go语言的书 发现了下面的例子 package main import fmt Send the sequence 2 3 4 to returned channel func gen
  • Golang导入包错误

    go 5 2 在以下任一位置找不到包 github com googollee go socket io usr local go src github com googollee go socket io 来自 GOROOT Users
  • 为什么 Go 中无法将 [Size]byte 转换为字符串?

    我有一个大小字节数组 是我做完之后得到的md5 Sum data byte testing var pass string var b 16 byte b md5 Sum data pass string b 错误 cannot conve
  • 是否可以从 JS 显式调用导出的 Go WebAssembly 函数?

    是否可以调用 Go WebAssembly 函数 除了main 在 JavaScript 中 让我先展示一下我做了什么 我的Go函数定义如下 package main import fmt func main fmt Println it
  • 如何获取变量的内存大小?

    有谁知道如何获取变量的内存大小 int string struct等 并打印它 是否可以 var i int 1 I want to get something like this fmt Println Size of i is i Al
  • Go中的切片分配是否复制内存

    目的 我有一个大缓冲区 我想要一个指向缓冲区中不同位置的指针数组 切片 我在做什么 datPtrs make byte n for i 0 i
  • 在 docker build 中缓存“go get”

    我想将 golang 单元测试封装在 docker compose 脚本中 因为它依赖于多个外部服务 我的应用程序有很多依赖项 因此需要一段时间go get 如何以允许构建 docker 容器的方式缓存包 而无需每次要测试时下载所有依赖项
  • 在 Golang Server 中接受持久的 tcp 连接

    我正在尝试使用 Go 并且想创建一个 TCP 服务器 我可以通过 telnet 访问该服务器 发送命令并接收响应 const CONN HOST localhost CONN PORT 3333 CONN TYPE tcp func mai
  • 按顺序范围循环映射

    我正在寻找一种确定的方法来范围Go map为了 Go 规范 https golang org ref spec For statements陈述如下 映射的迭代顺序未指定 并且不保证从一次迭代到下一次迭代的顺序相同 如果在迭代过程中删除尚未
  • gcloud 部署应用程序找不到导入包 - golang

    我已经将应用程序的一个版本部署到 GAE 但现在部署新版本时遇到问题 当我尝试时gcloud app deploy version VERSION 我收到一堆错误 显示远程构建找不到我的导入包 Beginning deployment of
  • Golang delve,如何启动调试器并启动正在调试的应用程序?

    我正在尝试设置一个可以远程连接的无头深度调试器 我无法找到一种方法来启动调试服务器 而该服务器不会暂停我正在调试的应用程序 我一直在使用dlv attach headless true listen 2345 attach 32但这会暂停该
  • 为什么结构体不能转换为嵌入类型

    package main type Inner struct x int type Outer struct Inner func main x Inner 1 y Outer x cannot convert x type Inner t
  • 将绝对路径和相对路径组合起来得到新的绝对路径

    我正在编写一个程序 其中一个组件必须能够采用给定的路径 例如 help index html or help 和基于该位置的相对路径 例如 otherpage index html or sub dir of help or help2 h
  • 如何使用json传递opentracing数据

    我的 API 网关启动一个跟踪器和一个用于验证电子邮件的范围 然后它传递给user service用于验证 我想通过这个span详情至user service作为 json 对象并启动另一个span as a tracer start sp
  • 在 Alpine 中找不到运行时/cgo

    In an alpine edge我安装的容器通过 RUN apk add no cache musl dev go 我试着跑go get github com golang protobuf protoc gen go then 这会导致
  • golang从sdin扫描一行数字

    我正在尝试从标准输入读取输入 3 2 1
  • Google Cloud Kubernetes 上任务队列的替代方案

    我发现任务队列主要用于App Engine标准环境 我正在将现有服务从 App Engine 迁移到 Kubernetes 任务队列的一个好的替代方案是什么 推送队列是当前正在使用的队列 我在线阅读文档并浏览了此链接 何时使用 PubSub
  • 无需时间即可生成随机字符串?

    我知道如何使用 Runes 和播种 rand Init 在 go 中生成随机字符串time UnixNano 我的问题是 是否可以 使用 stdlib 在不使用当前时间戳 安全 的情况下播种 rand 此外 我问 因为仅仅依靠时间来为敏感操

随机推荐

  • React 开发一个移动端项目(1)

    技术栈 项目搭建 React 官方脚手架 create react app react hooks 状态管理 redux redux thunk UI 组件库 antd mobile ajax请求库 axios 路由 react route
  • 1)数据库系统概述

    文章目录 一 数据库概念 二 常见的数据模型 三 关系型数据库 1 关系术语 2 关系的特点 3 常见存在的关系问题 4 关系运算 四 数据库设计 1 设计步骤 2 设计实例 一 数据库概念 长期存放在计算机上有组织的可共享的数据集合 二
  • secure boot 是什么

    一 secure boot 是什么 secure boot 中文叫安全启动 是电脑行业成员共同开发的一种安全机制 用于确保电脑只能启动原装系统 如果电脑重装了其他系统 那么这个系统是启动不了的 说白了 就是垄断 所以安装完其他系统 必须关闭
  • flex伸缩布局看着一篇就够啦

    flex伸缩布局 flex弹性概念 弹性盒子是一种按行或者按列布局元素的一种布局方式 它是需要父子盒子嵌套使用的 作用在父盒子 容器 上的属性有 flex direction 改变轴方向 flex wrap 换行 flex flow 前两项
  • 项目重点问题

    面试大保健 https blog csdn net Mrerlou article details 114295888 ops request misc 257B 2522request 255Fid 2522 253A 252216786
  • Java Swing图书管理系统,界面漂亮功能全,完整源码、下载直接使用 -417

    今天为利用晚上的一小段时间为大家分享一个写的不错的窗体程序 图书管理系统 417 系统功能已经比较全面 而且界面非常漂亮 有较强的参考和使用价值 项目系统有完整源码 下载后即可以运行 希望大家可以喜欢 喜欢的帮忙点赞和关注 一起编程 一起进
  • Shell脚本运行中的停止方法

    Linux系统Shell中提交了一个脚本 但是需要停止这个进程 如何处理 方式1 killall file flume kafka 说明 killall是一个命令 不是kill all file flume kafka是脚本名 此方法简单粗
  • MyBatis-Plus 的基础增删改查

    目录 1 简介 2 准备工作 3 MyBatis Plus 实现增删改查 1 MyBatis Plus 简介 MyBatis Plus 简称 MP 是 MyBatis 的增强工具 在 MyBatis 的基础上只做增强不做改变 为简化开发 提
  • 探索APP进程的启动流程(二)完结篇

    首先回顾下冷启动的流程图 共有四个步骤 1 launcher进程通过binder请求ams启动Activity AMS进程查询内存中是否存在该进程 2 内存中无相应进程 ams通过socket发送创建进程命令和相关资料到zygote进程 3
  • [HackTheBox]WEB题目

    0x01 50 Points I know Mag1k 问题描述 Can you get to the profile page of the admin 访问分配的地址 是一个带注册的登入页面 尝试常规注入 无效 来到注册页面注册 再退出
  • Springboot 实现发送邮件功能,使用QQ邮箱

    引入依赖
  • 若依分离版 --- 设置多数据源(主mysql,从sqlserver)

    主库mysql 从库sqlserver 1 修改application druid yml配置 配至slave 修改validationQuery SELECT 1 要不然sqlserver会报错 下面是完整的配置 可直接使用 根据自己情况
  • 朴素贝叶斯与KNN算法

    朴素贝叶斯算法 数学基础 我们先举一个例子 投硬币是一个随机过程 我们不能预测任意一次投币结果是正面还是反面 我们只能谈论其下一次结果是正面或者反面的概率 如果容貌取得一些额外的数据 如硬币的精准成分 硬币的最初位置 投币的力量与方向 硬币
  • springmvc+mybatis+mysql+log4j.xml+logjdbc+maven聚合+nexus+dubbo demo骨架

    说明 该项目采用maven聚合工程 项目骨架是我们以前架构师搭建骨架 现在已经拆分出来供大家下载使用 可以扩展使用 里面用到技术有springmvc mybatis mysql log4j xml logjdbc maven nexus d
  • java项目整合linux上的redis

    在linux上部署redis 1 在linux上安装redis 先下载redis 下载网址为 https redis io 放到 usr local src目录下 使用命令解压tar xzvf redis tar gz 进入redis目录
  • OWASP Dependency-Check工具集成

    OWASP Dependency Check工具集成 SonarQube 插件不执行分析 而是读取现有的 Dependency Check 报告 先要通过Jenkins插件去执行扫描 然后sonar里面再分析报告 一 搭建本地NVD Mir
  • 解决新版edge浏览器首页被搜狗、haoqq等垃圾搜索引擎捆绑问题,并将启动首页设为edge自带新标签页

    最近想试一下edge浏览器 发现每次启动都是这个haoqq com 这是主页被恶意挟持了 经过不懈努力终于给他关闭了 edge真香 解决步骤 1 打开电脑管家 或火绒软件 找到浏览器保护 电脑管家是在工具箱里 火绒的话在防护中心 系统防护
  • Verilog:【1】时钟分频电路(clk_divider.sv)

    碎碎念 作为Basic Verilog的第一个学习笔记 打算用这种命名方式来对博客进行命名 应该有助于检索 简单阅览了部分工程的代码 发现里面有很多嵌套关系 因此决定先从基础模块开始 也是为了整个博客内容的流畅性 读者朋友有问题的话 也可以
  • java robot截图报错_robot framework笔记(介绍+基础关键字)

    robot framework库 Builtin 提供了一组通常需要的通用关键字 String 生成 修改和验证字符串的库 Screenshot 提供关键字捕捉桌面截图 XML 生成 修改和验证xml文件的库 DateTime 日期和时间转
  • 腾讯mini项目-【指标监控服务重构】2023-08-13

    今日已办 使用watermill框架替代当前的base runner框架 a 参考官方提供的sarama kafka Pub Sub https github com ThreeDotsLabs watermill kafka 实现kafk