阿里开源一站式分布式事务框架seata源码分析(AT模式下TC分析)

2023-11-06

DefaultCoordinator1:seata Server分析(TC源码分析)

在seata tc服务的启动的时候都是通过它提供的启动脚本来启动,对于seata Server的入口分析,也是启动脚本就可以看出。如下我截最后执行命令,才下面可以很清晰的看到它的入口启动main类为io.seata.server.Server

exec "$JAVACMD" $JAVA_OPTS -server -Xmx2048m -Xms2048m -Xmn1024m -Xss512k -XX:SurvivorRatio=10 -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=256m -XX:MaxDirectMemorySize=1024m -XX:-OmitStackTraceInFastThrow -XX:-UseAdaptiveSizePolicy -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath="$BASEDIR"/logs/java_heapdump.hprof -XX:+DisableExplicitGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=75 -Xloggc:"$BASEDIR"/logs/seata_gc.log -verbose:gc -Dio.netty.leakDetectionLevel=advanced \
  -classpath "$CLASSPATH" \
  -Dapp.name="seata-server" \
  -Dapp.pid="$$" \
  -Dapp.repo="$REPO" \
  -Dapp.home="$BASEDIR" \
  -Dbasedir="$BASEDIR" \
  io.seata.server.Server \
  "$@"

 

seata-server代码结构

1.1:Server

seata TC是一个很简单的由netty提供的server服务,经典的多reactor模式使得server端的性能既有,可以承受TM与RM端频繁的交互。

public class Server {

    //最小pool执行线程数
    private static final int MIN_SERVER_POOL_SIZE = 50;
    //最大执行线程数
    private static final int MAX_SERVER_POOL_SIZE = 500;
    //请求等待队列最大值
    private static final int MAX_TASK_QUEUE_SIZE = 20000;
    //线程keep-alive 时间 500ms
    private static final int KEEP_ALIVE_TIME = 500;
    //worker 线程pool channel接受数据后由worker中线程处理执行
    private static final ThreadPoolExecutor WORKING_THREADS = new ThreadPoolExecutor(MIN_SERVER_POOL_SIZE,
        MAX_SERVER_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(MAX_TASK_QUEUE_SIZE),
        new NamedThreadFactory("ServerHandlerThread", MAX_SERVER_POOL_SIZE), new ThreadPoolExecutor.CallerRunsPolicy());

    /**
     * The entry point of application.
     *
     * @param args the input arguments
     * @throws IOException the io exception
     */
    public static void main(String[] args) throws IOException {
        // get port first, use to logback.xml
        //获取启动端口
        int port = PortHelper.getPort(args);
        //将配置设置到环境中
        System.setProperty(ConfigurationKeys.SERVER_PORT, Integer.toString(port));

        // 创建log  log使用logback管理
        final Logger logger = LoggerFactory.getLogger(Server.class);
        if (ContainerHelper.isRunningInContainer()) {
            logger.info("The server is running in container.");
        }

        //initialize the parameter parser
        //Note that the parameter parser should always be the first line to execute.
        //Because, here we need to parse the parameters needed for startup.
        //初始化请求参数解析器 将请求参数转化对象
        ParameterParser parameterParser = new ParameterParser(args);

        //初始化 metrics
        MetricsManager.get().init();

        //设置store.mode 类型 事务会话信息存储方式 目前支持3种	file本地文件(不支持HA),db数据库|redis(支持HA)
        System.setProperty(ConfigurationKeys.STORE_MODE, parameterParser.getStoreMode());

        //初始化NettyRemotingServer
        NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(WORKING_THREADS);
        //设置 netty server监听port
        nettyRemotingServer.setListenPort(parameterParser.getPort());
        //初始化id发号器 用于tc生成的id 经典的雪花算法 在TC HA部署注意雪花算法问题
        UUIDGenerator.init(parameterParser.getServerNode());
        //log store mode : file, db, redis
        //初始化 store
        SessionHolder.init(parameterParser.getStoreMode());

        DefaultCoordinator coordinator = new DefaultCoordinator(nettyRemotingServer);
        coordinator.init();
        //设置netty 输入in handler
        nettyRemotingServer.setHandler(coordinator);
        // 注册关闭钩子函数 关闭服务资源
        ShutdownHook.getInstance().addDisposable(coordinator);
        ShutdownHook.getInstance().addDisposable(nettyRemotingServer);

        //验证host有效性 127.0.0.1 and 0.0.0.0 这里无效 
        //设置XID
        if (NetUtil.isValidIp(parameterParser.getHost(), false)) {
            XID.setIpAddress(parameterParser.getHost());
        } else {
            //获取实际地址
            XID.setIpAddress(NetUtil.getLocalIp());
        }
        //设置端口号
        XID.setPort(nettyRemotingServer.getListenPort());

        try {
            //初始化启动
            nettyRemotingServer.init();
        } catch (Throwable e) {
            logger.error("nettyServer init error:{}", e.getMessage(), e);
            System.exit(-1);
        }

        System.exit(0);
    }
}

1.1:NettyRemotingServer

与之前描述的RmNettyRemotingClient与TmNettyRemotingClient类似(底层都是继承AbstractNettyRemoting抽象类),主要负责注册processor与destroyChannel,如下所示:

public class NettyRemotingServer extends AbstractNettyRemotingServer {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyRemotingServer.class);

    private TransactionMessageHandler transactionMessageHandler;

    private final AtomicBoolean initialized = new AtomicBoolean(false);

    @Override
    public void init() {
        // registry processor
        registerProcessor();
        if (initialized.compareAndSet(false, true)) {
            //初始化server
            super.init();
        }
    }

    /**
     * Instantiates a new Rpc remoting server.
     *
     * @param messageExecutor   the message executor
     */
    public NettyRemotingServer(ThreadPoolExecutor messageExecutor) {
        super(messageExecutor, new NettyServerConfig());
    }

    /**
     * Sets transactionMessageHandler.
     *
     * @param transactionMessageHandler the transactionMessageHandler
     */
    public void setHandler(TransactionMessageHandler transactionMessageHandler) {
        this.transactionMessageHandler = transactionMessageHandler;
    }

    public TransactionMessageHandler getHandler() {
        return transactionMessageHandler;
    }

    @Override
    public void destroyChannel(String serverAddress, Channel channel) {
        //destroy channel
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("will destroy channel:{},address:{}", channel, serverAddress);
        }
        //关闭channel
        channel.disconnect();
        channel.close();
    }

    private void registerProcessor() {
        // 1. 注册请求消息处理器
        ServerOnRequestProcessor onRequestProcessor =
            new ServerOnRequestProcessor(this, getHandler());
        super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS, onRequestProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_SEATA_MERGE, onRequestProcessor, messageExecutor);
        // 2. 注册response消息处理器
        ServerOnResponseProcessor onResponseProcessor =
            new ServerOnResponseProcessor(getHandler(), getFutures());
        super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT_RESULT, onResponseProcessor, messageExecutor);
        super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK_RESULT, onResponseProcessor, messageExecutor);
        // 3. registry rm message processor
        RegRmProcessor regRmProcessor = new RegRmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_RM, regRmProcessor, messageExecutor);
        // 4. registry tm message processor
        RegTmProcessor regTmProcessor = new RegTmProcessor(this);
        super.registerProcessor(MessageType.TYPE_REG_CLT, regTmProcessor, null);
        // 5. registry heartbeat message processor
        ServerHeartbeatProcessor heartbeatMessageProcessor = new ServerHeartbeatProcessor(this);
        super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, heartbeatMessageProcessor, null);
    }

}

从上述代码可以看出TC需要与TM,TM进行交互,并处理对于的request请求和response请求,都是通过Proessor做到的。而关于respnse的发送底层和之前描述的seata client的单条发送一致,都是基于AbstractNettyRemoting类实现。seata client包含了meager message的过程而server并没有此response过程。

1.2:NettyServerBootstrap

负责server端netty server有关的设置与初始化

public class NettyServerBootstrap implements RemotingBootstrap {

    private static final Logger LOGGER = LoggerFactory.getLogger(NettyServerBootstrap.class);
    private final ServerBootstrap serverBootstrap = new ServerBootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;
    //piplines handler
    private ChannelHandler[] channelHandlers;
    private int listenPort;
    private final AtomicBoolean initialized = new AtomicBoolean(false);

    public NettyServerBootstrap(NettyServerConfig nettyServerConfig) {

        this.nettyServerConfig = nettyServerConfig;
        //是否开启 io多路复用 epoll模式
        if (NettyServerConfig.enableEpoll()) {

            this.eventLoopGroupBoss = new EpollEventLoopGroup(nettyServerConfig.getBossThreadSize(),
                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new EpollEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
                    nettyServerConfig.getServerWorkerThreads()));
        } else {
            this.eventLoopGroupBoss = new NioEventLoopGroup(nettyServerConfig.getBossThreadSize(),
                new NamedThreadFactory(nettyServerConfig.getBossThreadPrefix(), nettyServerConfig.getBossThreadSize()));
            this.eventLoopGroupWorker = new NioEventLoopGroup(nettyServerConfig.getServerWorkerThreads(),
                new NamedThreadFactory(nettyServerConfig.getWorkerThreadPrefix(),
                    nettyServerConfig.getServerWorkerThreads()));
        }

        // init listenPort in constructor so that getListenPort() will always get the exact port
        //设置 端口号 默认8091
        setListenPort(nettyServerConfig.getDefaultListenPort());
    }

    /**
     * Sets channel handlers.
     *
     * @param handlers the handlers
     */
    protected void setChannelHandlers(final ChannelHandler... handlers) {
        if (handlers != null) {
            channelHandlers = handlers;
        }
    }

    /**
     * Add channel pipeline last.
     *
     * @param channel  the channel
     * @param handlers the handlers
     */
    private void addChannelPipelineLast(Channel channel, ChannelHandler... handlers) {
        if (channel != null && handlers != null) {
            channel.pipeline().addLast(handlers);
        }
    }

    /**
     * Sets listen port.
     *
     * @param listenPort the listen port
     */
    public void setListenPort(int listenPort) {

        if (listenPort <= 0) {
            throw new IllegalArgumentException("listen port: " + listenPort + " is invalid!");
        }
        this.listenPort = listenPort;
    }

    /**
     * Gets listen port.
     *
     * @return the listen port
     */
    public int getListenPort() {
        return listenPort;
    }

    @Override
    public void start() {
        //netty server端设置 启动
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
            .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)
            .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())
            .option(ChannelOption.SO_REUSEADDR, true)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childOption(ChannelOption.TCP_NODELAY, true)
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())
            .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
                new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),
                    nettyServerConfig.getWriteBufferHighWaterMark()))
            .localAddress(new InetSocketAddress(listenPort))
                //pipeline 设置 channel
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Ove
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

阿里开源一站式分布式事务框架seata源码分析(AT模式下TC分析) 的相关文章

随机推荐