基于zookeeper的MySQL主主负载均衡的简单实现

2023-11-15

基于zookeeper的MySQL主主负载均衡的简单实现

1.先上原理图

2.说明

两个mysql采用主主同步的方式进行部署。

在安装mysql的服务器上安装客户端(目前是这么做,以后想在zookeeper扩展集成),客户端实时监控mysql应用的可用性,可用时想zookeepercreateNode,当网络不可用或者mysql应用不可用时,建立的znode消失。

在客户端,通过改造proxool数据库连接池的方式,在建立连接之前,从zookeeper中去取真实的数据库URL,如果有多个URL,即有多个服务时,采用随机算法去拿连接(以后准备扩展权重)。当连接不可用时,数据库连接池将重建连接,这时候又回去zookeeper拿连接,因为agent建立的临时znode消失了,就不能拿到已经失效的url了。

这个方案只是初步的实验和实现了,还有很多后续的问题,主要为了解决lvs+keepalived只能在同一个区域内的问题。

3.部分实现

  1).agent

  

/**
 * 数据库可用性检测
 * @author tomsnail
 * @date 2015年4月3日 上午10:11:51
 */
public class TestMySQL {

    public static boolean test(String url){
        
         Connection conn = null;
         Statement stmt = null;
         ResultSet rs  = null;
         String sql = ConfigHelp.getLocalConifg("jdbc_inventory.house-keeping-test-sql", "select 0");
            try {
                Class.forName(ConfigHelp.getLocalConifg("jdbc_inventory.driver-class", "com.mysql.jdbc.Driver"));// 动态加载mysql驱动
                conn = DriverManager.getConnection(url);
                stmt = conn.createStatement();
                rs = stmt.executeQuery(sql);
                while (rs.next()) {
                }
                return true;
            } catch (SQLException e) {
                e.printStackTrace();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if(rs!=null){
                        rs.close();
                    }
                    if(stmt!=null){
                        stmt.close();
                    }
                    if(conn!=null)
                        conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }

            }
        return false;
    }
}
/**
 * zookeeper客户端
 * @author tomsnail
 * @date 2015年4月3日 上午10:11:51
 */
public class TestServer {

    private static final Logger logger = LoggerFactory
            .getLogger(TestServer.class);

    private static ZooKeeper zk;
    
    private String path;

    //同步锁
    private Lock _lock = new ReentrantLock();
    
    // 用于等待 SyncConnected 事件触发后继续执行当前线程
    private CountDownLatch latch = new CountDownLatch(1);
    

    public TestServer() {
        zk = connectServer();
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        Thread.currentThread().sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //logger.info("check zk...");
                    _lock.lock();
                    if (zk != null) {
                        if (zk.getState().isAlive()
                                && zk.getState().isConnected()) {
                            //logger.info("zk is ok");
                            _lock.unlock();
                            continue;
                        }
                    }
                    close();
                    logger.info("reConnectServer ...");
                    zk = connectServer();
                    logger.info("reConnectServer ok");
                    _lock.unlock();
                }

            }

            private void close() {
                if(zk!=null){
                    try {
                        zk.close();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    zk = null;
                }
            }
        }).start();
    }



    // 连接 ZooKeeper 服务器
    private ZooKeeper connectServer() {
        ZooKeeper zk = null;
        try {

            zk = new ZooKeeper(ConfigHelp.ZK_CONNECTION_STRING,
                    ConfigHelp.ZK_SESSION_TIMEOUT, new Watcher() {
                        @Override
                        public void process(WatchedEvent event) {
                            if (event.getState() == Event.KeeperState.SyncConnected) {
                                latch.countDown(); // 唤醒当前正在执行的线程
                            }
                        }
                    });
            latch.await(); // 使当前线程处于等待状态
        } catch (Exception e) {
            logger.error("", e);
        }
        if (zk != null) {
            try {
                Stat stat = zk.exists(ConfigHelp.ZK_ROOT_PATH, false);
                if (stat == null) {
                    String path = zk.create(ConfigHelp.ZK_ROOT_PATH,
                            "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode
                    logger.info("create zookeeper node ({})", path);
                }
                stat = zk.exists(ConfigHelp.ZK_RMI_PATH, false);
                if (stat == null) {

                    String path = zk.create(ConfigHelp.ZK_RMI_PATH,
                            "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                            CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode
                    logger.info("create zookeeper node ({})", path);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return zk;
    }

    // 创建 ZNode
    public void createNode(String url) {
        _lock.lock();
        try {
            byte[] data = url.getBytes();
            path = zk.create(ConfigHelp.ZK_RMI_PATH + "/", data,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL); // 创建一个临时性且有序的 ZNode
            logger.info("create zookeeper node ({} => {})", path, url);
        } catch (Exception e) {
            logger.error("", e);
            e.printStackTrace();
        }
        _lock.unlock();
    }
    
    public void deleteNode(String url){
        _lock.lock();
        try {
            Stat stat = zk.exists(path, false);
            if(stat!=null){
                zk.delete(url, stat.getVersion());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        _lock.unlock();
    }
}
/**
 * 数据库检测测试主类
 * @author tomsnail
 * @date 2015年4月3日 上午10:11:51
 */
public class TestMain {
    
    private static TestServer testServer = new TestServer();

    public static void main(String[] args) {
        String url = ConfigHelp.getLocalConifg("jdbc_inventory.driver-url", "select 0");
        boolean isOK = false;
        while(true){
            if(TestMySQL.test(url)){
                if(isOK){
                    
                }else{
                    testServer.createNode(url);//建立znode
                }
                isOK = true;
            }else{
                isOK = false;
                testServer.deleteNode(url);//删除znode
            }
            
            try {
                Thread.currentThread().sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

  2).proxool

/**
 * zookeeper信息定义类
 * @author tomsnail
 * @date 2015年4月2日 下午6:49:13
 */
public class ZkInfoDefinition {
    
    public static final String PREFIX_ZK = "zookeeper";
    
    public static final String ZK_URL = "zkUrl";
    
    public static final String ZK_SESSION_TIMEOUT = "sessionTimeout";
    
    public static final String ZK_PATH = "zkPath";
    
    public static final String ZK_ENABLE = "zkEnable";

    public static String zkUrl="192.168.102.1:31315";
    
    public static int sessionTimeout = 5000;
    
    public static boolean isEnable = false;
    
    public static String zkPath = "/root/db";

    public String getZkUrl() {
        return zkUrl;
    }

    public void setZkUrl(String zkUrl) {
        this.zkUrl = zkUrl;
    }

    public int getSessionTimeout() {
        return sessionTimeout;
    }

    public void setSessionTimeout(int sessionTimeout) {
        this.sessionTimeout = sessionTimeout;
    }

    public String getZkPath() {
        return zkPath;
    }

    public void setZkPath(String zkPath) {
        this.zkPath = zkPath;
    }

    public ZkInfoDefinition(String zkUrl, int sessionTimeout, String zkPath) {
        super();
        this.zkUrl = zkUrl;
        this.sessionTimeout = sessionTimeout;
        this.zkPath = zkPath;
    }
    public ZkInfoDefinition(){
        
    }
}
/**
 * zookeeper客户端
 * @author tomsnail
 * @date 2015年4月3日 上午10:15:11
 */
public class ZkClient {

       private static final Logger logger = LoggerFactory.getLogger(ZkClient.class);
       
        // 用于等待 SyncConnected 事件触发后继续执行当前线程
        private CountDownLatch latch = new CountDownLatch(1);
     
        // 定义一个 volatile 成员变量,用于保存最新的 RMI 地址(考虑到该变量或许会被其它线程所修改,一旦修改后,该变量的值会影响到所有线程)
        private volatile List<String> dataList = new ArrayList<String>();
     
        private Lock _lock = new ReentrantLock();
        
        private static  ZooKeeper zk;
        
        private LBUrl lbUrl;
        
        
        public ZkClient(){
            this(new BasicLBUrl());
        }
        
        // 构造器
        public ZkClient(LBUrl lbUrl) {
            this.lbUrl = lbUrl;
            zk = connectServer(); // 连接 ZooKeeper 服务器并获取 ZooKeeper 对象
            watchNode();
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    while (true) {
                        try {
                            Thread.currentThread().sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        _lock.lock();
                        if (zk != null) {
                            if (zk.getState().isAlive()
                                    && zk.getState().isConnected()) {
                                _lock.unlock();
                                continue;
                            }
                        }
                        if(zk!=null){
                            try {
                                zk.close();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            zk = null;
                        }
                        zk = connectServer();
                        _lock.unlock();
                    }
                }
            }).start();
        }
     
        // 查找 URL 服务
        public String getUrl() {
            if (dataList!=null&&dataList.size()>0) {
               return this.lbUrl.getUrl(dataList);
            }
            return null;
        }
        
        public List<String> getUrls(){
            return dataList;
        }
     
        // 连接 ZooKeeper 服务器
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
                zk = new ZooKeeper(ZkInfoDefinition.zkUrl, ZkInfoDefinition.sessionTimeout, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown(); // 唤醒当前正在执行的线程
                        }
                    }
                });
                latch.await(); // 使当前线程处于等待状态
            } catch (Exception e) {
                logger.error("", e);
            }
            return zk;
        }
     
        // 观察 /registry 节点下所有子节点是否有变化
        private void watchNode() {
            _lock.lock();
            if(zk!=null&&zk.getState().isAlive()&&zk.getState().isConnected()){
                
            }else{
                if(zk!=null){
                    try {
                        zk.close();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    zk = null;
                }
                zk = connectServer();
            }
            try {
                List<String> nodeList = zk.getChildren(ZkInfoDefinition.zkPath, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeChildrenChanged) {
                            watchNode(); // 若子节点有变化,则重新调用该方法(为了获取最新子节点中的数据)
                        }
                    }
                });
                List<String> dataList = new ArrayList<String>(); // 用于存放 /registry 所有子节点中的数据
                for (String node : nodeList) {
                    byte[] data = zk.getData(ZkInfoDefinition.zkPath + "/" + node, false, null); // 获取 /registry 的子节点中的数据
                    dataList.add(new String(data));
                   
                }
                logger.debug("node data: {}", dataList);
                this.dataList = dataList;
            } catch (Exception e) {
                logger.error("", e);
            }
            _lock.unlock();
        }
     
        public static void main(String[] args) {
            ZkClient client = new ZkClient();
            System.out.println(client.getUrl());
        }
}
/**
 * 从zookeeper获得URL连接操作类
 * @author tomsnail
 * @date 2015年4月2日 下午6:56:06
 */
public class ZkUrlOperation {
    
    private static final ZkUrlOperation instance = new ZkUrlOperation();

    private static ZkInfoDefinition zkInfoDefinition;
    
    private static ZkClient zkClient;
    
    private static final byte[] _lock = new byte[0];
    
    private  ZkUrlOperation(){
        
    }
    
    public static ZkUrlOperation getInstance(){
        return instance;
    }
    
    public  void addZkInfoDefinition(ZkInfoDefinition zkInfoDefinition){
        ZkUrlOperation.zkInfoDefinition = zkInfoDefinition;
    }
    
    public  void addZkInfoDefinition(String key,String value){
        if(ZkUrlOperation.zkInfoDefinition==null){
            ZkUrlOperation.zkInfoDefinition = new ZkInfoDefinition();
        }
        if(key.contains(ZkInfoDefinition.ZK_PATH)){
            ZkUrlOperation.zkInfoDefinition.setZkPath(value);
        }
        if(key.contains(ZkInfoDefinition.ZK_SESSION_TIMEOUT)){
            ZkUrlOperation.zkInfoDefinition.setSessionTimeout(Integer.valueOf(value));;
        }
        if(key.contains(ZkInfoDefinition.ZK_URL)){
            ZkUrlOperation.zkInfoDefinition.setZkUrl(value);;
        }
        if(key.contains(ZkInfoDefinition.ZK_ENABLE)){
            ZkUrlOperation.zkInfoDefinition.isEnable = Boolean.valueOf(value);
        }
    }
    
    
    public String getUrl(){
        synchronized (_lock) {
            if(zkInfoDefinition.isEnable){
                if(zkClient==null){
                    zkClient = new ZkClient();
                }
                
                String url = zkClient.getUrl();
                return url;
            }else{
                return "";
            }
            
        }
        
        
    }
    
    public boolean isAvailUrl(String url){
        synchronized (_lock) {
            if(zkInfoDefinition.isEnable){
                if(zkClient==null){
                    zkClient = new ZkClient();
                }
                List<String> urls = zkClient.getUrls();
                for(int i=0;i<urls.size();i++){
                    if(url.equals(urls.get(i))){
                        return true;
                    }
                }
                return false;
            }
            return false;
            
        }
        
    }
    
    
    
}

 

1
« 上一篇: 重学JAVA基础(三):动态代理
» 下一篇: 重学JAVA基础(四):线程的创建与执行
posted @  2015-04-03 10:20  TomSnail 阅读( 5012) 评论( 2编辑  收藏
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于zookeeper的MySQL主主负载均衡的简单实现 的相关文章

  • 内存卡永久删除的文件如何恢复?

    内存卡是和机械硬盘 U盘一个性质的数据存储工具 可以说是 同行 而作用更是不必多说 就是存储文件数据 谈谈今天的主题 万一真出现了这种情况 那存储我们电脑数据的内存卡永久删除的文件怎么恢复 内存卡永久删除的文件怎么恢复 内存卡永久删除的文件
  • gitee中git不能使用http协议提交项目

    git使用https协议提交项目的时候出现以下错误 error RPC failed curl 56 GnuTLS recv error 110 The TLS connection was non properly terminated

随机推荐

  • mixins详解

    实现一个日志功能 组件在挂载前打印 Component will mount 组件挂载后打印 Component did mount 不能忍受的写法 var AComponent React createClass componentWil
  • README_Albumentations

    一 文档 GitHub https github com albumentations team albumentations 官方文档 Albumentations Documentation 二 Installation pip ins
  • Amazon AWS —— 免费的午餐不好吃

    转自acgcss 众技术宅所周知 Amazon AWS 之前提供了 新用户凭有效信用卡免费试用一年 的活动 至今没有给出截止日期 虽说免费的午餐很诱人 而且由于信用卡的门槛 也避免了一部分的滥用 但是要安心吃好这顿饭 没有第一个吃螃蟹的人提
  • Python简要复习

    Python程序设计复习 Python基础知识 python的特点 兼具编译型和解释型特性 兼顾过程式 函数式和面向对象编程范式的通用编程语言 解释型语言无需像编译型需要一次性的编译成机器码 然后运行 而是由名叫解释器的程序动态的将源代码逐
  • 快手登录不上去 显示服务器繁忙,快手登录失败怎么回事

    大家好 我是时间财富网智能客服时间君 上述问题将由我为大家进行解答 快手登录失败的原因 1 可能是登录环境不太安全 2 可能是新手机的原因 3 可能是长期未登录或者是异地登录 4 可能是账号存在被盗风险或者已经被其他人登录了 建议修改密码
  • JAVA注解实现@WebServlet(一)

    JAVA注解实现 WebServlet 提示 需要些反射和文件操作 文章目录 JAVA注解实现 WebServlet 前言 一 创建注解RequestMapping 二 创建一个继承HttpServlet的类 三 创建过滤器 总结 前言 在
  • mysql invalid uuid_我为什么不建议开发中使用UUID作为MySQL的主键

    我是少侠露飞 学习塑造人生 技术改变世界 引言 我在之前一篇博客专门介绍了MySQL聚簇索引和非聚簇索引 附传送门 享学MySQL 系列 MySQL索引的数据结构 索引种类及聚簇索引和非聚簇索引 简单来说 就是我们设计表的时候 基本都会人为
  • 【linux kernel】linux中断管理—软中断

    linux中断管理 软中断 一 简介 软中断是linux预留给系统中对时间要求最为严苛和最重要的中断下半部使用的 并且 驱动中只有一些对时间极其敏感的模块使用了 例如 块设备和网络子系统 linux系统中定义了几种软中断类型 如下所示 in
  • 面试题:连续子数组的最大和与循环列表中的子数组最大和

    一 连续子数组的最大和 LeetCode 53 Maximum Subarray 题意 给定一个整数数组 nums 找到一个具有最大和的连续子数组 子数组最少包含一个元素 返回其最大和 定义dp i 为前i个数中的连续子数组的最大和 状态转
  • Jenkins+SonarQube 代码质量检测详解

    一 SonarQube 概述 1 SonarQube 简介 Sonar Qube是一个开源的代码分析平台 支持Java Python PHP JavaScript CSS等25种以上的语言 可以检测出重复代码 代码漏洞 代码规范和安全性漏洞
  • HTTP基础知识(用一万字帮助你入门)

    HTTP中文意思是超文本传输协议 它可以承载的内容有很多像html web Api css js等等 入门HTTP 一 初识 1 1背景知识 二 协议分析 2 1http的发展历程 2 2状态码 2 3缓存 2 4HTTP 2概述 2 5H
  • 利用Python实现黑客帝国代码雨,打造属于自己的黑客帝国

    导语 看安全类文章的时候 发现文章前面经常会加个代码雨的特效图 感觉拿来用python实现一下当成一个小案例还是不错的 让我们愉快地开始吧 开发工具 Python版本 3 6 4 相关模块 pygame模块 以及一些python自带的模块
  • 计算机打印错误,打印机错误正在打印处理方法,详细教您电脑打印机错误正在打印处理方法...

    打印机 是办公常见的打印设备 平时需要打印表格 订单什么的 但是有的时候打印东西会提示 正在打印打印错误 但是打印机连接完好 驱动也安装正确 这是怎么回事呢 下面 小编就教大家如何去解决打印机出现正在打印处理方法 如今在办公室中电脑和打印机
  • CVE-2021-3156 sudo堆溢出 漏洞分析

    漏洞简介 sudo 是 linux 系统管理指令 是允许系统管理员让普通用户执行一些或者全部的 root 命令的一个工具 它允许授权用户以 root 权限执行命令或者程序 sudo 的 sudoer 插件里面存在一个堆溢出漏洞 攻击者可以利
  • Java异步执行代码块,史上最简单的异步执行!!!

    声明 private static final ExecutorService executor Executors newCachedThreadPool new ThreadFactory int i 0 Override public
  • Git命令使用教程

    git文件提交大致流程 本地文件 git add 文件名 暂存区 git commit m 提交信息 本地仓库 git push 远程仓库地址 复制的 分支名称 要推送的分支名称 远程仓库 github git操作详细流程 1 先配置提交人
  • Unity导入google.protobuf失败,无法找到google命名空间

    问题 1 刚开始把protobuf的文件夹直接从其他项目里 unity2021 里复制到unity 2020 版本 当时报错protobuf dll的依赖项system memory版本不对 2 没有使用原来的protobuf文件了 使用v
  • Qt 第29课、主窗口中的状态栏

    1 主窗口中的状态栏 状态栏的概念和意义 状态栏是应用程序中输出简要信息的区域 状态栏一般位于主窗口的最底部 状态栏的消息类型 实时消息 如 当前程序状态 永久消息 如 程序版本号 机构名称 进度消息 如 进度条提示 百分比提示 在 Qt
  • 【Spring] Spring boot 报错 Unable to start ServletWebServerApplicationContext due to missing ServletWe

    1 概述 spring 报错如下 Error starting ApplicationContext To display the conditions report re run your application with debug e
  • 基于zookeeper的MySQL主主负载均衡的简单实现

    基于zookeeper的MySQL主主负载均衡的简单实现 1 先上原理图 2 说明 两个mysql采用主主同步的方式进行部署 在安装mysql的服务器上安装客户端 目前是这么做 以后想在zookeeper扩展集成 客户端实时监控mysql应