9-gin使用websocket

2023-05-16

[toc]

gin使用websocket

Gin 框架默认不支持 websocket,可以使用 github.com/gorilla/websocket 实现。

Talk is cheap. Show me the code,代码如下:

项目布局:

github.com
└── leffss
    └── ginWebsocket
        ├── go.mod
        ├── go.sum
        ├── main.go
        └── ws
            └── ws.go

具体原理就不讲了,可以看代码注释,比较详细了。

ws.go

package ws

import (
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/gorilla/websocket"
	uuid "github.com/satori/go.uuid"
)

// Manager 所有 websocket 信息
type Manager struct {
	Group map[string]map[string]*Client
	groupCount, clientCount uint
	Lock sync.Mutex
	Register, UnRegister chan *Client
	Message	chan *MessageData
	GroupMessage chan *GroupMessageData
	BroadCastMessage chan *BroadCastMessageData
}

// Client 单个 websocket 信息
type Client struct {
	Id, Group string
	Socket *websocket.Conn
	Message   chan []byte
}

// messageData 单个发送数据信息
type MessageData struct {
	Id, Group string
	Message    []byte
}

// groupMessageData 组广播数据信息
type GroupMessageData struct {
	Group string
	Message    []byte
}

// 广播发送数据信息
type BroadCastMessageData struct {
	Message    []byte
}

// 读信息,从 websocket 连接直接读取数据
func (c *Client) Read() {
	defer func() {
		WebsocketManager.UnRegister <- c
		log.Printf("client [%s] disconnect", c.Id)
		if err := c.Socket.Close(); err != nil {
			log.Printf("client [%s] disconnect err: %s", c.Id, err)
		}
	}()

	for {
		messageType, message, err := c.Socket.ReadMessage()
		if err != nil || messageType == websocket.CloseMessage {
			break
		}
		log.Printf("client [%s] receive message: %s", c.Id, string(message))
		c.Message <- message
	}
}

// 写信息,从 channel 变量 Send 中读取数据写入 websocket 连接
func (c *Client) Write() {
	defer func() {
		log.Printf("client [%s] disconnect", c.Id)
		if err := c.Socket.Close(); err != nil {
			log.Printf("client [%s] disconnect err: %s", c.Id, err)
		}
	}()

	for {
		select {
		case message, ok := <-c.Message:
			if !ok {
				_ = c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			log.Printf("client [%s] write message: %s", c.Id, string(message))
			err := c.Socket.WriteMessage(websocket.BinaryMessage, message)
			if err != nil {
				log.Printf("client [%s] writemessage err: %s", c.Id, err)
			}
		}
	}
}

// 启动 websocket 管理器
func (manager *Manager) Start() {
	log.Printf("websocket manage start")
	for {
		select {
		// 注册
		case client := <-manager.Register:
			log.Printf("client [%s] connect", client.Id)
			log.Printf("register client [%s] to group [%s]", client.Id, client.Group)

			manager.Lock.Lock()
			if manager.Group[client.Group] == nil {
				manager.Group[client.Group] = make(map[string]*Client)
				manager.groupCount += 1
			}
			manager.Group[client.Group][client.Id] = client
			manager.clientCount += 1
			manager.Lock.Unlock()

		// 注销
		case client := <-manager.UnRegister:
			log.Printf("unregister client [%s] from group [%s]", client.Id, client.Group)
			manager.Lock.Lock()
			if _, ok := manager.Group[client.Group]; ok {
				if _, ok := manager.Group[client.Group][client.Id]; ok {
					close(client.Message)
					delete(manager.Group[client.Group], client.Id)
					manager.clientCount -= 1
					if len(manager.Group[client.Group]) == 0 {
						//log.Printf("delete empty group [%s]", client.Group)
						delete(manager.Group, client.Group)
						manager.groupCount -= 1
					}
				}
			}
			manager.Lock.Unlock()

			// 发送广播数据到某个组的 channel 变量 Send 中
			//case data := <-manager.boardCast:
			//	if groupMap, ok := manager.wsGroup[data.GroupId]; ok {
			//		for _, conn := range groupMap {
			//			conn.Send <- data.Data
			//		}
			//	}
		}
	}
}

// 处理单个 client 发送数据
func (manager *Manager) SendService() {
	for {
		select {
		case data := <-manager.Message:
			if groupMap, ok := manager.Group[data.Group]; ok {
				if conn, ok := groupMap[data.Id]; ok {
					conn.Message <- data.Message
				}
			}
		}
	}
}

// 处理 group 广播数据
func (manager *Manager) SendGroupService() {
	for {
		select {
		// 发送广播数据到某个组的 channel 变量 Send 中
		case data := <-manager.GroupMessage:
			if groupMap, ok := manager.Group[data.Group]; ok {
				for _, conn := range groupMap {
					conn.Message <- data.Message
				}
			}
		}
	}
}

// 处理广播数据
func (manager *Manager) SendAllService() {
	for {
		select {
		case data := <-manager.BroadCastMessage:
			for _, v := range manager.Group {
				for _, conn := range v {
					conn.Message <- data.Message
				}
			}
		}
	}
}

// 向指定的 client 发送数据
func (manager *Manager) Send(id string, group string, message []byte) {
	data := &MessageData{
		Id: id,
		Group: group,
		Message:    message,
	}
	manager.Message <- data
}

// 向指定的 Group 广播
func (manager *Manager) SendGroup(group string, message []byte) {
	data := &GroupMessageData{
		Group: group,
		Message:    message,
	}
	manager.GroupMessage <- data
}

// 广播
func (manager *Manager) SendAll(message []byte) {
	data := &BroadCastMessageData{
		Message:    message,
	}
	manager.BroadCastMessage <- data
}

// 注册
func (manager *Manager) RegisterClient(client *Client) {
	manager.Register <- client
}

// 注销
func (manager *Manager) UnRegisterClient(client *Client) {
	manager.UnRegister <- client
}

// 当前组个数
func (manager *Manager) LenGroup() uint {
	return manager.groupCount
}

// 当前连接个数
func (manager *Manager) LenClient() uint {
	return manager.clientCount
}

// 获取 wsManager 管理器信息
func (manager *Manager) Info() map[string]interface{} {
	managerInfo := make(map[string]interface{})
	managerInfo["groupLen"] = manager.LenGroup()
	managerInfo["clientLen"] = manager.LenClient()
	managerInfo["chanRegisterLen"] = len(manager.Register)
	managerInfo["chanUnregisterLen"] = len(manager.UnRegister)
	managerInfo["chanMessageLen"] = len(manager.Message)
	managerInfo["chanGroupMessageLen"] = len(manager.GroupMessage)
	managerInfo["chanBroadCastMessageLen"] = len(manager.BroadCastMessage)
	return managerInfo
}

// 初始化 wsManager 管理器
var WebsocketManager = Manager{
	Group: make(map[string]map[string]*Client),
	Register:    make(chan *Client, 128),
	UnRegister:  make(chan *Client, 128),
	GroupMessage:   make(chan *GroupMessageData, 128),
	Message:   make(chan *MessageData, 128),
	BroadCastMessage: make(chan *BroadCastMessageData, 128),
	groupCount: 0,
	clientCount: 0,
}

// gin 处理 websocket handler
func (manager *Manager) WsClient(ctx *gin.Context) {
	upGrader := websocket.Upgrader{
		// cross origin domain
		CheckOrigin: func(r *http.Request) bool {
			return true
		},
		// 处理 Sec-WebSocket-Protocol Header
		Subprotocols: []string{ctx.GetHeader("Sec-WebSocket-Protocol")},
	}

	conn, err := upGrader.Upgrade(ctx.Writer, ctx.Request, nil)
	if err != nil {
		log.Printf("websocket connect error: %s", ctx.Param("channel"))
		return
	}

	client := &Client{
		Id:     uuid.NewV4().String(),
		Group:  ctx.Param("channel"),
		Socket: conn,
		Message:   make(chan []byte, 1024),
	}

	manager.RegisterClient(client)
	go client.Read()
	go client.Write()
	time.Sleep(time.Second * 15)
	// 测试单个 client 发送数据
	manager.Send(client.Id, client.Group, []byte("Send message ----" + time.Now().Format("2006-01-02 15:04:05")))
}

// 测试组广播
func TestSendGroup() {
	for {
		time.Sleep(time.Second * 20)
		WebsocketManager.SendGroup("leffss", []byte("SendGroup message ----" + time.Now().Format("2006-01-02 15:04:05")))
	}
}

// 测试广播
func TestSendAll() {
	for {
		time.Sleep(time.Second * 25)
		WebsocketManager.SendAll([]byte("SendAll message ----" + time.Now().Format("2006-01-02 15:04:05")))
		fmt.Println(WebsocketManager.Info())
	}
}

main.go

package main

import (
	"context"
	"log"
	"net/http"
	"os"
	"os/signal"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/leffss/ginWebsocket/ws"
)

func main() {
	go ws.WebsocketManager.Start()
	go ws.WebsocketManager.SendService()
	go ws.WebsocketManager.SendService()
	go ws.WebsocketManager.SendGroupService()
	go ws.WebsocketManager.SendGroupService()
	go ws.WebsocketManager.SendAllService()
	go ws.WebsocketManager.SendAllService()
	go ws.TestSendGroup()
	go ws.TestSendAll()

	router := gin.Default()
	router.GET("/", func(c *gin.Context) {
		c.String(http.StatusOK, "Welcome Gin Server")
	})

	wsGroup := router.Group("/ws")
	{
		wsGroup.GET("/:channel", ws.WebsocketManager.WsClient)
	}

	srv := &http.Server{
		Addr:    ":8080",
		Handler: router,
	}

	go func() {
		// 服务连接
		if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
			log.Fatalf("Server Start Error: %s\n", err)
		}
	}()

	// 等待中断信号以优雅地关闭服务器(设置 5 秒的超时时间)
	quit := make(chan os.Signal)
	signal.Notify(quit, os.Interrupt)
	<-quit

	ctx, cancel := context.WithTimeout(context.Background(), 5 * time.Second)
	defer cancel()
	if err := srv.Shutdown(ctx); err != nil {
		log.Fatal("Server Shutdown Error:", err)
	}
	log.Println("Server Shutdown")
}

测试 websocket 代码 main.go

package main

import (
    "flag"
    "fmt"
    "net/url"
    "time"

    "github.com/gorilla/websocket"
)

var addr = flag.String("addr", "127.0.0.1:8080", "http service address")

func main() {
    u := url.URL{Scheme: "ws", Host: *addr, Path: "/ws/leffss"}
    var dialer *websocket.Dialer

    conn, _, err := dialer.Dial(u.String(), nil)
    if err != nil {
        fmt.Println(err)
        return
    }

    go timeWriter(conn)

    for {
        _, message, err := conn.ReadMessage()
        if err != nil {
            fmt.Println("read:", err)
            return
        }

        fmt.Printf("received: %s\n", message)
    }
}

func timeWriter(conn *websocket.Conn) {
    for {
        time.Sleep(time.Second * 5)
        conn.WriteMessage(websocket.TextMessage, []byte(time.Now().Format("2006-01-02 15:04:05")))
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

9-gin使用websocket 的相关文章

  • WebSocket 连接建立时出错:net::ERR_CONNECTION_CLOSED

    当我尝试建立一个wss与我的服务器的连接 与 wss mydomain 3000 的 WebSocket 连接失败 错误 连接建立 net ERR CONNECTION CLOSED 我目前有一个 apache2 虚拟主机配置设置来侦听端口
  • nginx + python + websocket

    我如何配置nginx 最新版本 他们说它支持websockets 来支持WebSockets 我如何使用 python 来运行 websockets 连接 这就是我想要的 客户端使用 JavaScript 创建 WebSocket webs
  • Spring 5 的反应式 WebSockets - 如何获取初始消息

    我遵循了该教程 特别是浏览器 WebSocket 客户端的部分 http www baeldung com spring 5 reactive websockets http www baeldung com spring 5 reacti
  • Websocket 在客户端返回 500,在服务器返回 101

    我们尝试使用 nginx ingress 控制器在 Kubernetes 集群上实现 WebSocket 入口 yaml apiVersion extensions v1beta1 kind Ingress metadata annotat
  • 如何使用 Scarlet 在 Android 上通过 WebSocket 进行连接?

    README md 中的代码 val scarletInstance Scarlet Builder webSocketFactory okHttpClient newWebSocketFactory GDAX URL addMessage
  • 通过nodejs服务器+socket.io从mp3文件同步流式传输音乐

    我的服务器上有一个 mp3 文件 我希望所有访问该网址的客户都能同步收听该音乐 That is 假设该文件播放 6 分钟 我在上午 10 00 开始播放这首歌 上午 10 03 发出的请求应从歌曲的第 3 分钟开始收听 我所有的客户都应该同
  • WebSocket 握手:意外响应代码:404

    正在编写我的第一个 websocket 程序并且正在得到 WebSocket 握手 意外响应代码 404 加载网页时出错 我使用的是 JDK 1 7 和 jboss 8 wildfly8 0 有人可以帮忙吗 window onload in
  • websocket握手问题

    我正在使用 python 实现一个简单的 websocket 服务器 我使用的握手来自 握手本身似乎有效 但是当我点击发送时 我收到一个 JavaScript 错误 未捕获的错误 INVALID STATE ERR DOM 异常 11 这是
  • ServerEndpoint 和 web.xml

    我有一些 Soap REST servlet 现在还有一个 WebSocket ServerEndpoint game public class WebSocketgame 我有下一个麻烦 如果 web xml 存在 WebSocket 不
  • 使用 Socket.IO 的 Python 客户端到 nodeJS 服务器

    我正在尝试使用 socket io 将值从我的树莓派 在 python 2 7 9 中 发送到我的 nodeJS 服务器 我的目标是通过 websocket 连接从我的 pi 连续发送许多值到我的节点服务器 本地 该服务器应该获取这些值并将
  • WebSocket 和负载平衡是瓶颈吗?

    当有一堆充当 WebSocket 无人机的系统和这些无人机前面的负载均衡器时 当 WebSocket 请求进入 LB 时 它会选择一个 WebSocket 无人机 并建立 WebSocket 我在 ELB 上使用 AWS ELB tcp S
  • webpack-dev-server 中的代理 websockets 连接

    是否可以在 webpack 开发服务器中代理 websocket 连接 我知道如何将常规 HTTP 请求代理到另一个后端 但它不适用于 websockets 大概是因为代理配置中的目标以 http 开头 webpack dev server
  • Flask-SocketIO 未使用 Gevent/Gevent-websocket

    我正在使用 Flask 和 Flask SocketIO 构建用于 websocket 通信的 Web 界面 数据 API 我想开始转向使用 Gevent Gevent websocket Gunicorn 以及最终使用 Nginx 进行负
  • 在 Apache 上设置 websocket?

    所以我正在对 websockets 进行一些研究 我有几个问题似乎找不到明确的答案 如何在 Linux 服务器上设置 Web 套接字 有 Apache 模块吗 我可以吗have使用第 3 方 PHP 代码或类似代码 除了浏览器兼容性之外 问
  • 如何通过 libwebsocket 发送异步数据?

    我正在将 Warmcat 的 libwebsocket C 库用于小型 Websocket 服务器 我已经启动并运行了这些示例 并且可以发送数据以响应从 websocket 接收数据 例如回显发送的反向字节 但是 我无法弄清楚如何在不使用
  • 如何终止 Websocket 连接?

    如何终止 Websocket 连接 我不是在谈论关闭两端的连接 而是在 中间 中断它 我需要测试重新连接时必须发生的一些应用程序逻辑 通过 SocketIO 处理 不 拔掉网络电缆不算数 因为我无法在单元测试中真正实现自动化 此外 我希望只
  • React Native Android 无法连接到 WebSocket

    尽管 Web 实现可以工作 但 android 模拟器以及我的设备无法连接到 WebSocket 在引发错误的地方收到以下事件错误代码 然后断开连接 connection error Event isTrusted false messag
  • 通过 PHP 连接到 socket.io(nodejs)

    我需要通过 php 连接到 websocket 发送数据并立即断开连接 无需等待套接字的响应 我用了大象io http elephant io 但更新库后不起作用 请告诉我如何通过 PHP 连接到 websocket 我也遇到了这个问题 学
  • WebSocket 和 Origin 标头字段

    以下引用自 RFC6455 WebSocket 协议 不打算处理来自任何网页的输入但 仅对于某些站点应验证 Origin 场是原点 他们期望 如果服务器不接受指示的来源 那么它应该用回复来响应 WebSocket 握手 包含 HTTP 40
  • 如何将中间件绑定到socket.io中的事件

    现在您可以将中间件绑定到io use middleware 但这仅在建立套接字连接时触发 有没有办法在将其传递给事件句柄之前拦截它 就像在expressjs中一样 换句话说 In 快递 js你可以做 app get middleware1

随机推荐

  • Oracle 索引详解(index)

    文章目录 1 概述2 索引管理2 1 创建索引2 2 删除索引2 3 修改索引2 4 查询索引 3 索引类型3 1 B Tree 平衡树索引3 2 bitmap 位图索引3 3 反向键索引3 4 基于函数索引 4 扩展4 1 走不走索引的情
  • Windbg调试(使用方法)

    一 Windbg版本信息 Windbg分32位和64位版本 xff0c 32位程序应使用32位Windbg调试 xff0c 64位程序应64位Windbg调试 若想使用64位的Windbg分析32位的程序 使用如下命令进行CPU模式的切换
  • Hi3861开发环境搭建 ||避坑指南|| [适用于几乎所有以Hi3861为主控的开发板]

    Hi3861开发环境搭建 避坑指南 适用于几乎所有以Hi3861为主控的开发板 前言 xff1a 这几天为了搭建Hi3861的开发环境 xff0c 看了不少官方文档和视频 xff0c 但是依然折腾了很久才配置好编译 上传都能正常的环境 xf
  • 论文分享——Bottom-Up and Top-Down Attention for Image Captioning and Visual Question Answering

    文章目录 文章简介1 背景介绍研究背景概念介绍问题描述IC与VQA领域的主要挑战 2 相关研究CNN 43 RNN体系架构Attention mechanismBottom Up and Top Down AttentionBottom U
  • spring mvc 源码分析之父子容器问题

    spring mvc 源码分析 spring mvc 源码分析之父子容器问题 spring mvc 源码分析前言为什么要弄两个ioc容器 xff1f 一个不可以吗存在两个容器 父容器是spring ioc容器自容器是springmvc io
  • C++11特性

    C 43 43 11已经出来很久了 xff0c 网上也早有很多优秀的C 43 43 11新特性的总结文章 xff0c 在编写本博客之前 xff0c 博主在工作和学习中学到的关于C 43 43 11方面的知识 xff0c 也得益于很多其他网友
  • CentOS 7 Squid缓存代理服务器搭建——筑梦之路

    简介 xff1a Squid 是 Linux Unix 平台下最为流行的高性能免费应用层代理服务器 xff0c 它具有权限管理灵活 性能高和效率快的特点 代理服务器可以提供文件缓存 复制和地址过滤等服务 xff0c 充分利用有限的出口带宽
  • python列表的基础操作

    python列表的操作 列表是python最为基础的数据结构 xff0c 极为重要 这话怎么理解呢 xff1f 是最常用的 xff0c 想不到其他的 xff0c 就用列表是其他数据结构的基础 xff0c 可以继承列表然后定义属于自己的数据类
  • C++正则表达式(regex_match、regex_search与regex_replace)

    前言 正则表达式是在字符串处理中常用和重要的工具 xff0c 主要用于字符串的匹配 在C 中正则表达式的使用非常方便 xff0c 但到了C 43 43 中让我有点懵逼了 xff0c 花了些时间查阅了很多资料 xff0c 下面主要会写到C 4
  • Java爬虫详解

    这是 Java 爬虫系列文章的第一篇 第一篇是关于 Java 爬虫入门内容 在该篇中我们以采集开源情报网站中的ip数据为例 需要提取的内容如下图所示 Statistics AbuseIPDB nbsp nbsp nbsp 我们需要提取图中圈
  • ubuntu安装过程

    feat 新增功能 fix 修复 bug docs 文档相关的改动 style 对代码的格式化改动 xff0c 代码逻辑并未产生任何变化 test 新增或修改测试用例 refactor 重构代码或其他优化举措 chore 项目工程方面的改动
  • SpringSecurityOAuth和Jwt

    一 SpringSecurityOAuth简介 grant type为授权模式password为密码模式 64 EnableAuthorizationServer表示定义认证服务器 64 EnableResourceServer表示定义资源
  • 无法使用ssh的可能

    1 SSH服务未启动 xff1a 请确保SSH服务在远程计算机上已经启动 在Ubuntu上 xff0c 可以通过运行以下命令来启动SSH服务 xff1a sudo service ssh start 2 防火墙阻止了SSH连接 xff1a
  • CentOS7.0 关闭防火墙

    systemctl stop firewalld service 停止firewall systemctl disable firewalld service 禁止firewall开机启动
  • 如何查看张量tensor,并将其转换为numpy数据

    在tensorflow 中一般数据都是用tensor来表示 xff0c 而在python 中一般是用numpy包 xff0c 然而有时候需要打印变量的数据 xff0c 可用以下方法来打印 xff1a 一 import tensorflow
  • 浅谈联邦学习Federated Learning

    最近人工智能 大数据领域的公众号疯狂给我推送 联邦学习 相关的文章 xff0c 使得本来并不好奇的我 xff0c 有了一丝丝揭开它神秘面纱的冲动 公众号的每篇推文写得都很好 xff0c 但同时也十分学术 xff0c 作为刚上路的我 xff0
  • 云计算中的容器技术及其实践案例

    第一章 xff1a 什么是容器技术 随着云计算和DevOps的普及 xff0c 容器技术在IT行业中越来越受到关注 容器是一种轻量级 可移植 可扩展的应用程序封装技术 xff0c 可以将应用程序及其所有依赖项打包到一个独立的可执行文件中 相
  • 小坑整理

    for range坑 问题描述 range循环的时候 xff0c 取赋值的地址 xff0c 然后保存到map中 xff0c 结果是什么样子 xff1f 这里先回想下变量是什么时候才申请内存 xff1f 只有赋值给新的变量时 xff0c 内存
  • go操作es

    TOC 前言 xff1a elasticsearch 是一个基于Lucene构建的开源的 分布式 restful接口的全文搜索引擎 es还是一个分布式的文档数据库 xff0c 其中每个字段均可被索引 xff0c 而且每个字段的数据均可被搜索
  • 9-gin使用websocket

    toc gin使用websocket Gin 框架默认不支持 websocket xff0c 可以使用 github com gorilla websocket 实现 Talk is cheap Show me the code xff0c