如何创建检测连接丢失和自动重新连接的 tcp-inbound-gateway?

2024-04-20

我正在尝试配置一组 spring 集成组件来使用来自 TCP 套接字的数据。基本协议是,打开连接后,系统会提示我输入用户名,然后输入密码,然后如果身份验证成功,数据就会在可用时流式传输给我。每 30 秒就会向我发送一条 ping 消息,以便我可以在没有数据传输的安静时段验证连接是否处于活动状态。

根据 spring-integration 文档,我设置了一个 TCP 网关。http://docs.spring.io/spring-integration/reference/html/ip.html#tcp-gateways http://docs.spring.io/spring-integration/reference/html/ip.html#tcp-gateways

<bean id="authInterceptorFactory"
    class="org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
    <property name="interceptors">
        <array>
           <bean class="com.socketfetching.AuthConnectionInterceptorFactory">
               <constructor-arg value="Login Username:"/>
               <constructor-arg value="${socket.username}"/>
               <constructor-arg value="Password:"/>
               <constructor-arg value="${socket.password}"/>
           </bean>
        </array>
    </property>
</bean>

<bean id="lfSeserializer" class="org.springframework.integration.ip.tcp.serializer.ByteArrayLfSerializer"/>

<ip:tcp-connection-factory id="connectionFactory"
  type="client"
  host="${socket.url}"
  port="${socket.port}"
  single-use="false"
  so-keep-alive="true"
  interceptor-factory-chain="authInterceptorFactory"
  deserializer="lfSeserializer"
  serializer="lfSeserializer"
/>

<int:channel id="socketInitChannel"/>

<ip:tcp-inbound-gateway id="inGateway"
    request-channel="clientBytes2StringChannel"
    reply-channel="socketInitChannel"
    connection-factory="connectionFactory"
    reply-timeout="10000"
    retry-interval="5000"
    auto-startup="true"
    client-mode="true"/>

InterceptorFactory 处理打开连接时发生的握手,并将预期的提示和我想要的响应作为参数。这种握手工作完美,我的应用程序正在从服务器接收其定期 ping。

client-mode=true 导致网关在启动时立即打开连接并等待用户名提示。

我的问题是连接丢失后的恢复。如果我终止网络连接,显然 ping 会停止发送,我希望我的网关能够检测到这一点并开始尝试定期重新连接。当我的网络连接恢复时,网关应该成功重新连接。

我认为重试间隔可能会处理这个问题,但它似乎没有任何效果。文档建议我使用 TaskScheduler 来实现此目的...但我不太确定如何将它与来自服务器的 ping 消息集成。

有什么建议吗?

编辑: 我找到了一个可行的解决方案,尽管我不确定它是否理想。我的网关上的重试间隔意味着每 5 秒将测试一次连接是否有效,并在需要时重新创建。它通过在我的 AuthConnectionInterceptor 上调用 isOpen() 来完成此操作。因此,我能够重写此方法来检查当前时间与通过拦截器的最后一条消息之间的增量。如果时间间隔太长,我会手动终止连接并触发重新连接。

这些课程的完整源代码如下...... 拦截器工厂: 包 com.socketfetching;

import org.apache.log4j.Logger;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptor;
import org.springframework.integration.ip.tcp.connection.TcpConnectionInterceptorFactory;

/**
 * AuthConnectionInterceptorFactory
 * Created by: Seth Kelly
 * Date: 10/3/13
 */
public class AuthConnectionInterceptorFactory implements TcpConnectionInterceptorFactory {
    private static Logger logger = Logger.getLogger(AuthConnectionInterceptorFactory.class);

    private String usernamePrompt;
    private String username;
    private String passwordPrompt;
    private String password;

    public AuthConnectionInterceptorFactory(String usernamePrompt, String username, String passwordPrompt, String password) {
        this.usernamePrompt = usernamePrompt;
        this.username = username;
        this.passwordPrompt = passwordPrompt;
        this.password = password;
    }

    @Override
    public TcpConnectionInterceptor getInterceptor() {
        return new AuthConnectionInterceptor(usernamePrompt, username, passwordPrompt, password);
    }
}

拦截器:

package com.socketfetching;

import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.ip.tcp.connection.AbstractTcpConnectionInterceptor;
import org.springframework.integration.support.MessageBuilder;

/**
 * AuthConnectionInterceptor
 * Created by: Seth Kelly
 * Date: 10/3/13
 *
 * Handles username/password authentication when opening a new TCP connection.
 */
public class AuthConnectionInterceptor extends AbstractTcpConnectionInterceptor {
    private static Logger logger = Logger.getLogger(AuthConnectionInterceptor.class);

    private String usernamePrompt;
    private String username;
    private String passwordPrompt;
    private String password;

    private Boolean usernameSent = false;
    private Boolean passwordSent = false;

    private static final String PING_PREFIX = "Ping";

    private DateTime lastMsgReceived;
    private Integer inactivityTimeout = 35000;

    public AuthConnectionInterceptor(String usernamePrompt, String username, String passwordPrompt, String password) {

        this.usernamePrompt = usernamePrompt;
        this.username = username;
        this.passwordPrompt = passwordPrompt;
        this.password = password;
    }

    @Override
    public boolean onMessage(Message<?> message) {
        lastMsgReceived = new DateTime();
        Boolean forwardMessage = true;

        if(!this.isServer()) {
            String payload = new String((byte[])message.getPayload());

            if(!usernameSent) {
                if(payload.equals(usernamePrompt))  {
                    try {
                        logger.debug("Sending username=" + username + "to authenticate socket.");
                        super.send(MessageBuilder.withPayload(username).build());
                        usernameSent = true;
                        forwardMessage = false;

                    } catch (Exception e) {
                        throw new MessagingException("Negotiation error", e);
                    }
                }
                else {
                    throw new MessagingException("Negotiation error.  expected message=" + usernamePrompt +
                            " actual message=" + payload);
                }
            }
            else if(!passwordSent) {
                if(payload.equals(passwordPrompt))  {
                    try {
                        logger.debug("Sending password to authenticate socket.");
                        super.send(MessageBuilder.withPayload(password).build());
                        passwordSent = true;
                        forwardMessage = false;

                    } catch (Exception e) {
                        throw new MessagingException("Negotiation error", e);
                    }
                }
                else {
                    throw new MessagingException("Negotiation error.  expected message=" + passwordPrompt +
                            " actual message=" + payload);
                }
            }
            else if(payload.startsWith(PING_PREFIX)) {
                //Just record that we received the ping.
                forwardMessage = false;
            }
        }

        if(forwardMessage)
            return super.onMessage(message);
        else
            return true;
    }

    @Override
    public boolean isOpen() {
        DateTime now = new DateTime();
        if((lastMsgReceived == null) ||
                ((now.getMillis() - lastMsgReceived.getMillis()) < inactivityTimeout)) {
            return super.isOpen();
        }
        else
        {
            if(super.isOpen()) {
                super.close();
            }
            return false;
        }
    }


    public Integer getInactivityTimeout() {
        return inactivityTimeout;
    }

    public void setInactivityTimeout(Integer inactivityTimeout) {
        this.inactivityTimeout = inactivityTimeout;
    }
}

None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何创建检测连接丢失和自动重新连接的 tcp-inbound-gateway? 的相关文章

随机推荐

  • .NET MailMessage 类注入安全吗?

    我怀疑是否邮件留言 http msdn microsoft com en us library system net mail mailmessage aspx类受到保护电子邮件注入 http en wikipedia org wiki E
  • JS:反转数组,但仅反转原始数组 --> 错误:运行时没有输出

    我有以下问题 反转数组 编写一个接受数组并就地反转该数组的函数 该行为应该模仿本机 reverse 数组方法的行为 但是 您的反向函数应该接受要作为参数进行操作的数组 而不是作为该数组上的方法来调用 不要在您自己的实现中使用本机 rever
  • angularjs - 单击具有实际网址的链接时刷新

    我使用 RouteProvider 为我的 url 定义控制器和模板 当我单击与实际位置具有相同 URL 的链接时 没有任何反应 我想要reload 如果用户单击此类链接 即使位置未更改 也会调用该方法 换句话说 如果我将位置设置为相同的值
  • 通过nodejs服务器+socket.io从mp3文件同步流式传输音乐

    我的服务器上有一个 mp3 文件 我希望所有访问该网址的客户都能同步收听该音乐 That is 假设该文件播放 6 分钟 我在上午 10 00 开始播放这首歌 上午 10 03 发出的请求应从歌曲的第 3 分钟开始收听 我所有的客户都应该同
  • 带有属性占位符的 Spring Cloud AWS SQS SendTo 注释

    这个问题 https github com spring cloud spring cloud aws issues 65建议 SendTo 注释支持属性占位符 但我无法让它工作 这是我想要做的一些简化的代码片段 比尝试用文字解释更容易 我
  • Visual Composer 无法加载并给出 TypeError: _.template(...).trim is not a function

    我的视觉作曲家插件不起作用 它卡在加载页面上 它给出了一个错误 TypeError template trim 不是函数 错误出现在这行代码上 这个 controls template 模板 数据 vc template options t
  • 如何检测位图中的红色像素

    android中的getPixels 是上下左右读取像素 还是左右上下读取像素 基本上是按行或列读取 如果我想知道图片中的红色值较高的位置 我可以这样做吗 我假设它是按列读取的 Bitmap thumbnail Bitmap data ge
  • set_time_limit(0) 和“最大执行时间” PHP

    我有一个脚本 我已经设置了 set time limit 0 但仍然得到 Fatal error Maximum execution time of 90 seconds exceeded in home Feed php on line
  • Ajax.ActionLink 与 Html.ActionLink + Jquery.Ajax 调用

    我可以通过 Ajax ActionLink Getcustomers GetCustomers Customer 调用 asp net mvc 控制器 我可以使用 Html ActionLink 和 jquery ajax 调用执行相同的操
  • 使 PlaySound 非阻塞

    我一直在测试声音 我注意到PlaySound正在阻塞 即它会等到声音播放完毕才返回 include
  • 是否可以将 Promise 包装在生成器内?

    我正在尝试使用生成器创建一个承诺包装器 以便我可以执行以下操作 var asyncResult PromiseWrapper ajax 到目前为止 我一直在尝试 function PromiseWrapper promise return
  • 使用 angularJS 触发点击最近的 div

    嗨 我有以下代码 angular module myApp controller myController function scope scope clickedInput function setTimeout function ang
  • Linux用户空间线程、内核线程、轻量级进程

    我对所有这些实体以及它们在 Linux 中如何互连感到有点困惑 Unix 内部原理 一书指出lightweight process LWP 是内核支持的用户线程 并且该内核看不到进程内的线程 对于 Linux 来说仍然如此吗 据我了解 用户
  • 为什么调用成员函数不会调用该对象的 ODR-USE?

    Here in cppref http en cppreference com w cpp language initialization says 如果非内联变量 C 17 起 的初始化推迟到主 线程函数的第一条语句之后进行 它发生在第一
  • Matlab:在类中导入函数

    我在 Matlab 中有一个类文件 我使用包规范创建了 i 目录结构 MyPkg F1 F2 F3 fun m myc m 我的班级是myc并将其插入包装中MyPkg 一个功能fun保存在子包中F3在主要的一处 我想使用函数fun在我的课堂
  • 应用程序可以在 WIFI 上运行,但不能在 3G 上运行

    有两项活动 Main and Detail活动 Main Activity基本上是一个GridView Detail Activity基本上显示了单击项目的详细信息 我正在传递所选项目的 id pid 来自Main to the Detai
  • 如何删除neo4j中的所有索引?

    我想使用 cypher 批量删除所有存在的索引 可以吗 我正在使用 neo4j 3 4 7 DROP INDEX ON Label attributename 如果我在稍后阶段创建相同的索引 它会替换现有索引吗 删除所有索引和约束的快速方法
  • Polly 断路器已处理和未处理的异常

    我想使用 Polly 来实现断路器模式 在文档中 有一个半开状态描述 https github com App vNext Polly wiki Circuit Breaker half open 上面写着 如果收到已处理的异常 则会重新引
  • 更改 Laravel 5.5 中的基本 URL

    我正在尝试更改基本 URLhttp myscreenclass com to http myscreenclass com home 我更改了 env 文件并更新了 config app php 文件 我尝试用多种不同的方式解决这个问题 但
  • 如何创建检测连接丢失和自动重新连接的 tcp-inbound-gateway?

    我正在尝试配置一组 spring 集成组件来使用来自 TCP 套接字的数据 基本协议是 打开连接后 系统会提示我输入用户名 然后输入密码 然后如果身份验证成功 数据就会在可用时流式传输给我 每 30 秒就会向我发送一条 ping 消息 以便