在 Apache Storm Bolt 中使用 Apache Camel ProducerTemplate

2023-12-20

我正在尝试编写简单的 Storm + Camel 项目。 我的 Storm 拓扑分析推文,一个 Bolt 应该将推文文本发送到 apache 骆驼路由,而该路由又使用 websocket 通知某些 Web 应用程序。

由于尝试使用一次构建 CamelContext 时从 Bolt 收到 NotSerializedExceptions,我无法使其工作。

我已经尝试过的:

  • 在 Bolt 的构造函数中传递 CamelContext - 导致 NotSerializedException
  • 在stormconf中传递CamelContext,并在bolt的prepare(...)方法中使用它来访问它。结果是 :

    14484 [main] 错误 org.apache.storm.zookeeper.server.NIOServerCnxnFactory - 线程 Thread[main,5,main] 死亡 java.lang.IllegalArgumentException:拓扑conf不是json可序列化的 在 backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4] 在 backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4] 在 backtype.storm.LocalCluster.submitTopology(来源未知)~[storm-core-0.9.4.jar:0.9.4]

骆驼路线:

public class MyRouteBuilder extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        from("direct:main")
                .to("websocket:localhost:8085/main?sendToAll=true");
    }
}

风暴拓扑: Tweet Spout 使用 twitter4j stremaing API 传播推文。

public class TwitterStreamTopology {

    public static void main(String[] args) {
        CamelContext producerTemplate = new RouteStarter().buildRoute();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
        builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
        Config conf = new Config();
        conf.put("producerTemplate", producerTemplate.createProducerTemplate());
        conf.setDebug(true);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("mytopology", conf, builder.createTopology());

        Utils.sleep(20000);
        cluster.shutdown();
    }
}

WebsocketBolt:

public class WebSocketBolt extends BaseBasicBolt {
    private ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
    }
}

有没有办法很好地做到这一点?

或者我应该让骆驼路由被http访问,并在boltprepare(...)方法中创建一些HttpClient?这看起来仍然有点矫枉过正,必须有一种方法让它变得更容易。

感谢您的帮助!


问题的根本原因是您将 ProducerTemplate 添加到 Storm 配置中,并且它抛出异常,因为它不可序列化。如果那是你自己的类,你可以更改代码以使其工作,但由于那是一个 Camel 类,我会推荐一种不同的方法。

  1. WebSocketBolt:将 ProducerTemplate 私有成员更改为瞬态:private transient ProducerTemplate producerTemplate;这样它就不会尝试被序列化(与将其放入conf中时遇到的问题相同)。
  2. WebSocketBolt:在准备方法中而不是在拓扑中初始化 ProducerTemplate。

像这样的东西:

public class WebSocketBolt extends BaseBasicBolt {
    private transient ProducerTemplate producerTemplate;

    @Override
    public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
        Status s = (Status) input.getValueByField("tweet");
        producerTemplate.sendBody("direct:main", s.getText());
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        super.prepare(stormConf, context);
        CamelContext producerTemplate = new RouteStarter().buildRoute();
        this.producerTemplate = producerTemplate.createProducerTemplate();
    }
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在 Apache Storm Bolt 中使用 Apache Camel ProducerTemplate 的相关文章

随机推荐