我正在尝试编写简单的 Storm + Camel 项目。
我的 Storm 拓扑分析推文,一个 Bolt 应该将推文文本发送到 apache 骆驼路由,而该路由又使用 websocket 通知某些 Web 应用程序。
由于尝试使用一次构建 CamelContext 时从 Bolt 收到 NotSerializedExceptions,我无法使其工作。
我已经尝试过的:
- 在 Bolt 的构造函数中传递 CamelContext - 导致 NotSerializedException
-
在stormconf中传递CamelContext,并在bolt的prepare(...)方法中使用它来访问它。结果是 :
14484 [main] 错误 org.apache.storm.zookeeper.server.NIOServerCnxnFactory - 线程 Thread[main,5,main] 死亡
java.lang.IllegalArgumentException:拓扑conf不是json可序列化的
在 backtype.storm.testing$submit_local_topology.invoke(testing.clj:262) ~[storm-core-0.9.4.jar:0.9.4]
在 backtype.storm.LocalCluster$_submitTopology.invoke(LocalCluster.clj:43) ~[storm-core-0.9.4.jar:0.9.4]
在 backtype.storm.LocalCluster.submitTopology(来源未知)~[storm-core-0.9.4.jar:0.9.4]
骆驼路线:
public class MyRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("direct:main")
.to("websocket:localhost:8085/main?sendToAll=true");
}
}
风暴拓扑:
Tweet Spout 使用 twitter4j stremaing API 传播推文。
public class TwitterStreamTopology {
public static void main(String[] args) {
CamelContext producerTemplate = new RouteStarter().buildRoute();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("tweetSpout", new TweetSpout(keywords), 1);
builder.setBolt("websocket", new WebSocketBolt()).shuffleGrouping("tweetSpout");
Config conf = new Config();
conf.put("producerTemplate", producerTemplate.createProducerTemplate());
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mytopology", conf, builder.createTopology());
Utils.sleep(20000);
cluster.shutdown();
}
}
WebsocketBolt:
public class WebSocketBolt extends BaseBasicBolt {
private ProducerTemplate producerTemplate;
@Override
public void execute(Tuple input, BasicOutputCollector basicOutputCollector) {
Status s = (Status) input.getValueByField("tweet");
producerTemplate.sendBody("direct:main", s.getText());
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
this.producerTemplate = (ProducerTemplate) stormConf.get("producerTemplate");
}
}
有没有办法很好地做到这一点?
或者我应该让骆驼路由被http访问,并在boltprepare(...)方法中创建一些HttpClient?这看起来仍然有点矫枉过正,必须有一种方法让它变得更容易。
感谢您的帮助!