golang获取postgres或clickhouse连接

2023-05-16

package edgedb

import (
	"database/sql"
	"errors"
	"fmt"
	"sort"
	"strings"
	"time"

	"gorm.io/driver/clickhouse"
	"gorm.io/driver/postgres"
	"gorm.io/gorm"
)

var (
	pgConn *gorm.DB
	ckConn *gorm.DB
)

type Config struct {
	// 遥感数据存储,(默认: postgresql)
	// 支持 postgresql/clickhouse,可多选通过 "," 连接多个选择
	DeviceTelemetryRecordBackend string `yaml:"" env:"DEVICE_TELEMETRY_RECORD_BACKEND" envDefault:"postgresql"`

	Postgresql struct {
		Host       string `yaml:"host" env:"POSTGRESQL_SERVER_HOST" envDefault:"postgresql"`
		Port       int    `yaml:"port" env:"POSTGRESQL_SERVER_PORT" envDefault:"5432"`
		EdgedbUser string `yaml:"user" env:"POSTGRESQL_SERVER_EDGEDB_USER" envDefault:"postgres"`
		EdgedbPass string `yaml:"pass" env:"POSTGRESQL_SERVER_EDGEDB_PASS" envDefault:"password"`
		EdgedbName string `yaml:"name" env:"POSTGRESQL_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
		PoolSize   int    `yaml:"pool_size" env:"POSTGRESQL_SERVER_POOLSIZE" envDefault:"500"` // 设置连接池的最大连接数
	}

	Clickhouse struct {
		Enabled      bool   `yaml:"host" env:"CLICKHOUSE_SERVER_ENABLED" envDefault:"false"`
		Host         string `yaml:"host" env:"CLICKHOUSE_SERVER_HOST" envDefault:"clickhouse"`
		Port         int    `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"9000"`
		User         string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:"default"`
		Pass         string `yaml:"port" env:"CLICKHOUSE_SERVER_PORT" envDefault:""`
		EdgedbUser   string `yaml:"user" env:"CLICKHOUSE_SERVER_EDGEDB_USER" envDefault:"root"`
		EdgedbPass   string `yaml:"pass" env:"CLICKHOUSE_SERVER_EDGEDB_PASS" envDefault:"[yz^_^edge]"`
		EdgedbName   string `yaml:"name" env:"CLICKHOUSE_SERVER_EDGEDB_NAME" envDefault:"edgedb"`
		ReadTimeout  int    `yaml:"read_timeout" env:"READ_TIMEOUT" envDefault:"10"`
		WriteTimeout int    `yaml:"write_timeout" env:"WRITE_TIMEOUT" envDefault:"10"`
	}
}

// SetupConn 初始化连接,配置连接池
// 注意:不要关闭数据库连接客户端 sql.DB,否则连接池无法生效,如无特殊需求,只需判断有无异常即可。通过 GetXXConn 函数向连接池申请连接
func SetupConn(cfg *Config) (*sql.DB, error) {
	// 加载配置
	if cfg == nil {
		err := errors.New("Config needed")
		return nil, err
	}

	// 初始化 Postgres 连接
	db, err := _setupPostgresConn(cfg)
	if err != nil {
		return nil, err
	}
	defer db.Close()

	// 如系统配置 Clickhouse 作为遥测扩展存储,则额外增加 CK 连接的初始化
	telemetryDatabaseBackends := strings.Split(cfg.DeviceTelemetryRecordBackend, ",")
	sort.Strings(telemetryDatabaseBackends)
	index := sort.SearchStrings(telemetryDatabaseBackends, "clickhouse")
	if index < len(telemetryDatabaseBackends) && telemetryDatabaseBackends[index] == "clickhouse" {
		// 初始化 Clickhouse 连接
		if _, err := _setupClickhouseConn(cfg); err != nil {
			return nil, err
		}
	}

	return db, nil
}

func _setupPostgresConn(cfg *Config) (*sql.DB, error) {
	if cfg == nil {
		if config, err := configs.LoadConfig(); err != nil {
			return nil, err
		} else {
			cfg = config
		}
	}

	dsn := fmt.Sprintf(
		"host=%s port=%d user=%s password=%s dbname=%s sslmode=disable TimeZone=Asia/Shanghai",
		cfg.Postgresql.Host,
		cfg.Postgresql.Port,
		cfg.Postgresql.EdgedbUser,
		cfg.Postgresql.EdgedbPass,
		cfg.Postgresql.EdgedbName,
	)
	conn, err := gorm.Open(postgres.New(postgres.Config{
		DSN:                  dsn,
		PreferSimpleProtocol: true, // disables implicit prepared statement usage
	}), &gorm.Config{})
	if err != nil {
		return nil, err
	}

	pgDB, err := conn.DB()
	if err != nil {
		return nil, err
	}

	// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
	// 设置可空闲的最大连接数,随时等待调用
	pgDB.SetMaxIdleConns(10)

	// SetMaxOpenConns sets the maximum number of open connections to the database.
	// 设置连接池的最大连接数,不配置默认就是不限制 (当前500,依赖 postgres max_connections)
	pgDB.SetMaxOpenConns(cfg.Postgresql.PoolSize)

	// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
	// 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期
	pgDB.SetConnMaxLifetime(time.Second * 600)

	// Global
	pgConn = conn

	return pgDB, nil
}

// 初始化clickhouse的连接,配置连接池
func _setupClickhouseConn(cfg *Config) (*sql.DB, error) {
	if cfg == nil {
		if config, err := configs.LoadConfig(); err != nil {
			return nil, err
		} else {
			cfg = config
		}
	}

	dsn := fmt.Sprintf(
		"tcp://%s:%d?database=%s&username=%s&password=%s&read_timeout=%d&write_timeout=%d",
		cfg.Clickhouse.Host,
		cfg.Clickhouse.Port,
		cfg.Clickhouse.EdgedbName,
		cfg.Clickhouse.EdgedbUser,
		cfg.Clickhouse.EdgedbPass,
		cfg.Clickhouse.ReadTimeout,
		cfg.Clickhouse.WriteTimeout,
	)
	conn, err := gorm.Open(clickhouse.New(clickhouse.Config{
		DSN:                       dsn,
		DisableDatetimePrecision:  true,     // disable datetime64 precision, not supported before clickhouse 20.4
		DontSupportRenameColumn:   true,     // rename column not supported before clickhouse 20.4
		SkipInitializeWithVersion: false,    // smart configure based on used version
		DefaultGranularity:        3,        // 1 granule = 8192 rows
		DefaultCompression:        "LZ4",    // default compression algorithm. LZ4 is lossless
		DefaultIndexType:          "minmax", // index stores extremes of the expression
		DefaultTableEngineOpts:    "ENGINE=MergeTree() ORDER BY tuple()",
	}), &gorm.Config{})
	if err != nil {
		return nil, err
	}
	ckDB, err := conn.DB()
	if err != nil {
		return nil, err
	}

	// SetMaxIdleConns sets the maximum number of connections in the idle connection pool.
	// 设置可空闲的最大连接数,随时等待调用
	ckDB.SetMaxIdleConns(1)

	// SetMaxOpenConns sets the maximum number of open connections to the database.
	// 设置连接池的最大连接数,不配置,默认为 0,就是不限制
	ckDB.SetMaxOpenConns(10)

	// SetConnMaxLifetime sets the maximum amount of time a connection may be reused.
	// 连接的最长存活期,超过这个时间,连接将会重置,不再被复用,不配置默认就是永不过期
	ckDB.SetConnMaxLifetime(time.Second * 600)

	ckConn = conn
	return ckDB, nil
}

// GetDBConn 从连接池获取数据库连接
func GetDBConn() (*gorm.DB, error) {
	var pgDB *sql.DB

	if pgConn == nil {
		if db, err := _setupPostgresConn(nil); err != nil {
			return nil, err
		} else {
			pgDB = db
		}
	} else if db, err := pgConn.DB(); err != nil {
		// 尝试获取连接池失败,重新建立连接
		if db, err := _setupPostgresConn(nil); err != nil {
			return nil, err
		} else {
			pgDB = db
		}
	} else {
		pgDB = db
	}

	if err := pgDB.Ping(); err != nil {
		// 尝试 PING 失败,重新建立连接
		_ = pgDB.Close()

		if db, err := _setupPostgresConn(nil); err != nil {
			return nil, err
		} else {
			pgDB = db
		}
	}

	return pgConn, nil
}

// GetDBConn 从连接池获取数据库连接
func GetClickhouseConn() (*gorm.DB, error) {
	var ckDB *sql.DB

	if ckConn == nil {
		if db, err := _setupClickhouseConn(nil); err != nil {
			return nil, err
		} else {
			ckDB = db
		}
	} else if db, err := ckConn.DB(); err != nil {
		if db, err := _setupClickhouseConn(nil); err != nil {
			return nil, err
		} else {
			ckDB = db
		}
	} else {
		ckDB = db
	}

	if err := ckDB.Ping(); err != nil {
		_ = ckDB.Close()

		if db, err := _setupClickhouseConn(nil); err != nil {
			return nil, err
		} else {
			ckDB = db
		}
	}

	return ckConn, nil
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

golang获取postgres或clickhouse连接 的相关文章

  • 权重实现随机抽奖

    一般抽奖是怎么实现的 在实习期间学会了一种通用的写法 在这里记录一下 最近在学Golang语法基础 这里就用Golang来写 package main import fmt time math rand func main r rand N
  • beego+goAdmin+mysql+docker+natapp作为微信小程序地服务器“伪部署”

    写在前面的话 1 为什么我要叫伪部署 答 因为我把它们放在服务器运行 都是开发模式 生产模式实在不会弄 所以就这样了 2 系统环境 答 腾讯云服务器 系统为 ubuntu 版本不记得 应该是比较高的 3 前提假设 答 假设你的服务器已经安装
  • go 进阶 go-zero相关: 七. 拦截器与熔断拦截器

    目录 一 拦截器的基础使用 1 服务端拦截器 2 客户端拦截器 二 拦截器底层底层执行原理 三 go zero默认添加的拦截器 客户端 1 熔断器拦截器 BreakerInterceptor 服务端 一 拦截器的基础使用 在go zero
  • Golang三剑客之Pflag、Viper、Cobra

    如何构建应用框架 想知道如何构建应用框架 首先你要明白 一个应用框架包含哪些部分 在我看来 一个应用框架需要包含以下 3 个部分 命令行参数解析 主要用来解析命令行参数 这些命令行参数可以影响命令的运行效果 配置文件解析 一个大型应用 通常
  • 在 clickhouse 中枢轴

    我想在 clickhouse 中进行数据透视 我的数据格式为 rule name result string 1 result 1 string 2 result 2 string 3 result 3 string 4 result 4
  • 【go语言开发】Minio基本使用,包括环境搭建,接口封装和代码测试

    本文主要介绍go语言使用Minio对象存储 首先介绍搭建minio 创建bucket等 然后讲解封装minio客户端接口 包括但不限于 上传文件 下载 获取对象url 最后测试开发的接口 文章目录 前言 Minio docker安装mini
  • Clickhouse 不返回列标题

    我正在尝试从 clickhouse 获取一些关系数据并在 pandas 中使用 它有效 但 pd read sql query 返回数据帧 其中列名是第一行的值 相反 我希望看到关系表中命名的列名称 我用 Postgres 做了同样的尝试
  • go-zero开发入门-API网关开发示例

    开发一个 API 网关 代理 https blog csdn net Aquester article details 134856271 中的 RPC 服务 网关完整源代码 file main go package main import
  • go-zero开发入门之网关往rpc服务传递数据1

    go zero 的网关往 rpc 服务传递数据时 可以使用 headers 但需要注意前缀规则 否则会发现数据传递不过去 或者对方取不到数据 go zero 的网关对服务的调用使用了第三方库 grpcurl 入口函数为 InvokeRPC
  • 【golang】go执行shell命令行的方法( exec.Command )

    所需包 import os exec cmd 的用法 cmd exec Command ls lah ls是命令 后面是参数 e cmd Run 多个参数的要分开传入 如 ip link show bond0 cmd
  • 协程-单线程内的异步执行

    1 仿协程实例 不同事件依次顺序执行 coding utf 8 import time def calculate 1 step event name for index in range step print This is s even
  • go语言实现文件夹上传前后端代码案例

    go语言实现文件夹上传前后端代码案例 前端用于上传的测试界面 如果上传的文件夹有子文件要遍历子文件夹创建出子文件夹再进行拷贝 需要获取文件名和对应的路径 将文件的相对路径和文件对象添加到FormData中 这几行代码很关键 for let
  • 【go语言】error错误机制及自定义错误返回类型

    简介 Go 语言通过内置的 error 接口来处理错误 该接口定义如下 type error interface Error string 这意味着任何实现了 Error 方法的类型都可以作为错误类型 在 Go 中 通常使用 errors
  • Golang拼接字符串性能对比

    g o l a n g golang g o l an g
  • go cannot find package “github.com/gorilla/websocket“解读

    Go无法找到包 github com gorilla websocket 的解决方案 在Go开发过程中 我们经常会依赖第三方库来简化开发工作 而使用 go get 命令安装这些库时 有时候我们可能会遇到类似于以下错误的情况 plaintex
  • Go、Docker、云原生学习笔记全攻略:从零开始,一步步走向精通!(2024版)

    第一章 Go语言学习宝典 一 介绍 01 Go 语言的前生今世 二 开发环境搭建 01 Go 语言开发环境搭建 三 初识GO语言 01 Go 多版本管理工具 02 第一个 Go 程序 hello world 与 main 函数 03 Go
  • 如何在clickhouse中根据日期和时间段选择数据

    我想通过两者过滤一些数据yyyymmdd 日期 和hhmmss 时间 但是clickhouse不支持time类型 所以我选择datetime将它们结合起来 但如何做这样的事情 这是代码dolphindb 它支持second类型来表示hhmm
  • 返回 clickhouse 数组作为列

    Clickhouse 是否可以将包含一对数组的结果转换为列 形成这个结果 f1 f2 f3 a 1 2 3 x y z b 4 5 6 x y z to f1 x y z a 1 2 3 b 4 5 6 这个想法是不必为每行重复标题值 就我
  • 如何对多行的一列值求和?

    我有这个表 我想添加几行的 change 列的值 或者更准确地说 从 ne 值为零的行到 ne 值为零的下一行 不是第二个本身 任何答案将不胜感激 rn date ne change 0 2008 12 07 0 10330848398 1
  • Pandas:如何将数据框插入 Clickhouse

    我正在尝试将 Pandas 数据框插入 Clickhouse 这是我的代码 import pandas import sqlalchemy as sa uri clickhouse default localhost default ch

随机推荐