AKKA入门

2023-05-16

1 Guardian.java

package com.example.demo;

import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.actor.typed.pubsub.Topic;

public class Guardian extends AbstractBehavior<Void> {

    private static ActorRef<Topic.Command<Message>> topic = null;
    private static ActorRef<Message> subscriberActor = null;


    private Guardian(ActorContext<Void> context) {
        super(context);
    }


    public static Behavior<Void> create() {
        return Behaviors.setup(context -> {
            context.spawn(MemberEventLogger.create(), "listener");

            topic = context.spawn(Topic.create(Message.class, "my-topic"), "MyTopic");
            subscriberActor = context.spawn(SubscriberActor.create(),"Subscribe");

            topic.tell(Topic.subscribe(subscriberActor));

            return Behaviors.empty();
        });
    }

    public static void publish(Message message) {
        topic.tell(Topic.publish(message));
        return;
    }

    @Override
    public Receive createReceive() {
        return newReceiveBuilder()
                .build();
    }

}

2 MemberEventLogger.java

package com.example.demo;

import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.cluster.ClusterEvent;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;

public class MemberEventLogger extends AbstractBehavior<ClusterEvent.MemberEvent>  {

    private MemberEventLogger(ActorContext<ClusterEvent.MemberEvent> context) {
        super(context);
    }
    public static Behavior<ClusterEvent.MemberEvent> create() {
        return Behaviors.setup(context -> {
            Cluster cluster = Cluster.get(context.getSystem());

            context.getLog().info("Started [{}], cluster.selfAddress = {})",
                    context.getSystem(),
                    cluster.selfMember().address());

            cluster.subscriptions().tell(new Subscribe<>(context.getSelf(), ClusterEvent.MemberEvent.class));

            return Behaviors.receiveMessage(event -> {
                context.getLog().info("MemberEvent: {}", event);
                return Behaviors.same();
            });
        });
    }

    @Override
    public Receive createReceive() {
        return newReceiveBuilder()
                .build();
    }
}

3 Message.java

package com.example.demo;

public class Message implements MySerializable {
    private String text;

    public Message(){};

    public Message(String text) {
        this.text = text;
    }

    public String getText() {
        return text;
    }
}

4 MySerializable.java

package com.example.demo;

public interface MySerializable {
}

5 SubscriberActor.java

package com.example.demo;

import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class SubscriberActor extends AbstractBehavior<Message> {

    private SubscriberActor(ActorContext<Message> context) {
        super(context);
    }

    public static Behavior<Message> create() {
        return Behaviors.setup(context -> new SubscriberActor(context));
    }

    @Override
    public Receive<Message> createReceive() {
        return newReceiveBuilder()
                .onMessage(Message.class, this::onMsg)
                .build();
    }

    private Behavior<Message> onMsg(Message msg) {
        log.info("ip: {} 收到消息:{}", getContext().getSystem().address().toString(), msg.getText());
        return this;
    }

}

6 HelloWorld.java

package com.example.demo;

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Adapter;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.pubsub.Topic;
import akka.cluster.ClusterEvent;
import akka.cluster.typed.Cluster;
import akka.cluster.typed.Subscribe;
import akka.management.cluster.bootstrap.ClusterBootstrap;
import akka.management.scaladsl.AkkaManagement;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;

@Slf4j
@Service
public class HelloWorld {

    private ActorSystem system = null;
    private static String remoteAddress = "";

    @PostConstruct
    public void start() throws InterruptedException {
        Config config = ConfigFactory.load("application.conf");
        system = ActorSystem.create(Guardian.create(), "Appka",config);
        remoteAddress = system.address().toString();

        AkkaManagement.get(system).start();
        ClusterBootstrap.get(system).start();

        Thread.sleep(20000);
        Guardian.publish(new Message(remoteAddress));

    }
}

7 application.conf

akka {
  loglevel = "DEBUG"
  actor.provider = cluster

  coordinated-shutdown.exit-jvm = on

  actor.serialization-bindings {
      "com.example.demo.MySerializable" = jackson-json
  }

  cluster {
    shutdown-after-unsuccessful-join-seed-nodes = 60s
    downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
  }
}

#management-config
akka.management {
  cluster.bootstrap {
    contact-point-discovery {
      # For the kubernetes API this value is substributed into the %s in pod-label-selector
      service-name = "appka"

      # pick the discovery method you'd like to use:
      discovery-method = kubernetes-api

      required-contact-point-nr = 2
      required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
    }
  }
}
#management-config

#akka.management {
#  health-checks {
#    readiness-checks {
#      example-ready = "akka.cluster.bootstrap.demo.DemoHealthCheck"
#    }
#  }
#}

8 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.5</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>demo</description>
    <properties>
        <java.version>1.8</java.version>
        <akka.management.version>1.2.0</akka.management.version>
        <scala.binary.version>2.13</scala.binary.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>com.typesafe.akka</groupId>
                <artifactId>akka-bom_${scala.binary.version}</artifactId>
                <version>2.7.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <!--akka management-->
        <dependency>
            <groupId>com.lightbend.akka.management</groupId>
            <artifactId>akka-management-cluster-http_${scala.binary.version}</artifactId>
            <version>${akka.management.version}</version>
        </dependency>
        <dependency>
            <groupId>com.lightbend.akka.management</groupId>
            <artifactId>akka-management-cluster-bootstrap_${scala.binary.version}</artifactId>
            <version>${akka.management.version}</version>
        </dependency>
        <dependency>
            <groupId>com.lightbend.akka.discovery</groupId>
            <artifactId>akka-discovery-kubernetes-api_${scala.binary.version}</artifactId>
            <version>${akka.management.version}</version>
        </dependency>

        <!--akka-->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster-typed_${scala.binary.version}</artifactId>
        </dependency>

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-serialization-jackson_${scala.binary.version}</artifactId>
        </dependency>


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>RELEASE</version>
            <scope>compile</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

AKKA入门 的相关文章

  • 现代 Akka DI 与 Guice

    这里是 Java 8 Guice 4 0 和 Akka 2 3 9 我试图弄清楚如何用 JSR330 风格注释我的演员类 Inject注释 然后通过 Guice 将它们全部连接起来 但实际上我读过的每一篇文章 下面的一些例子 要么使用 Sc
  • Akka actor 查找或依赖注入

    我刚刚开始使用 Akka 我无法决定是否应该使用依赖注入 如蛋糕模式 或 actor 查找来将 actor 彼此解耦 首选方法是什么 您应该更喜欢向彼此介绍 Actor 这意味着在消息中发送 ActorRef 或与消息一起发送 或者将其传递
  • Akka 在 Actor 之外进行日志记录

    我有一个 Akka Actor 打电话给MyObject foo MyObject不是演员 如何设置登录 使用 Actor 就很简单 因为我可以混合 Actor Logging 在 MyObject 中 我无权访问 context syst
  • Scala 与 Akka 中的相互身份验证

    我将使用 Akka 在 Scala 中创建一个 TLS 会话 并在客户端和服务器之间进行相互身份验证 我创建了两个 CA 证书 它们必须信任来自另一部分的相应证书 您能给我一个如何实现这一点的例子吗 谢谢 我创建了一个 github 项目
  • Akka 远程 `system.shutdown()` 导致 EndpointDisociatedException?

    我有简单的客户端和服务器设置 在发送所有异步请求并在服务器确认后的某个时刻 客户端调用system shutdown 这导致服务器akka remote EndpointDisassociatedException和一堆日志错误 如下所示
  • 当 akka actor 在测试线程之外抛出异常时,scalatest 失败

    我曾经遇到过这样的情况 在我测试一个 Actor 时 Actor 意外抛出异常 由于错误 但测试仍然通过 现在 大多数情况下 Actor 中的异常意味着无论测试正在验证什么 都不会正确显示 因此测试失败 但在极少数情况下 情况并非如此 异常
  • 我应该使用 Java 中的哪个线程池?

    有大量的任务 每个任务都属于一个组 要求是每组任务应该像在单线程中执行一样串行执行 并且在多核 或多CPU 环境中吞吐量应该最大化 注意 组的数量也与任务数量成正比 最简单的解决方案是使用 ThreadPoolExecutor 并同步 或锁
  • 用于 REST 轮询的 Akka

    我正在尝试将大型 Scala Akka PlayMini 应用程序与外部 REST API 连接起来 这个想法是定期轮询 基本上每 1 到 10 分钟 根 URL 然后爬取子级 URL 以提取数据 然后将其发送到消息队列 我想出了两种方法来
  • Akka/Java getContext().become 带参数?

    在 Akka Scala 中 可以将参数传递给自定义接收函数 因此可以通过 params 传递整个 actor 状态 而无需使用可变变量 context become myCustomReceive param1 param2 但在 Jav
  • Akka设计原则

    在开发一个相当大的 Akka 应用程序时 我在使用普通方法和非 Akka 类时遇到了一个非常简单的结构 但在使用 Akka 时实际上很难确定 这就是为什么我来这里问你什么建议是解决此问题的最佳方法 所以问题是这样的 我有一个父角色 我们称他
  • akka java 编程覆盖配置

    我能找到的几个关于此问题的主题都是针对 Scala 而不是 Java 而且没有一个涉及远程参与者 我有一个基本配置文件 SERVER CONFIG FILE Include akka common TheSystem akka actor
  • 节点如何知道哪些节点已经看到集群当前状态?

    我正在阅读 akka 文档 并在理解他们的实现方式时遇到了一些麻烦Gossip 文档在这里 http doc akka io docs akka 2 4 common cluster html Gossip Protocol 让我困惑的部分
  • 为 testProbe 提供 ActorPath

    我有一些代码使用 ActorPath 而不是 ActorRef 向演员发送消息 system actorSelection user myActor a message 我需要确保这条消息确实被发送了 因此 我需要在测试中创建一个位于此 a
  • 如何检测scala执行上下文耗尽?

    我的 Playframework 应用程序有时没有响应 我想在运行时检测到这一点 记录有关当前在耗尽的执行上下文上运行的内容的信息 实现这一目标的最佳策略是什么 我考虑过将小型可运行对象发布到执行上下文 如果它们没有及时执行 我会记录一条警
  • 为什么 Actor.receive 是偏函数?

    Why is Actor receive部分功能 我总是可以使用带有匹配表达式的正则函数来代替它 It is a PartialFunction捕获消息被处理或未处理的可能性Actor 未处理的消息将 不让演员失败MatchError 产卵
  • Akka 流如何不断实现?

    我在用阿卡流 http doc akka io docs akka stream and http experimental 1 0 scala stream index html在 Scala 中进行轮询AWS SQS https aws
  • akka 远程处理中出现“最大允许大小 128000 字节,编码类 scala 的实际大小”错误

    我想使用 Akka Remoting 在参与者之间通过网络交换消息 但是对于大型字符串消息 我收到以下错误 akka remote OversizedPayloadException Discarding oversized payload
  • 我应该使用 akka.http.scaladsl.util.FastFuture 而不是 scala.concurrent.Future 吗?

    我应该使用 akka http scaladsl util FastFuture 而不是 scala concurrent Future 吗 评论说 Provides alternative implementations of the b
  • 在 Akka 中配置嵌套 Router

    我有一些嵌套的路由器 应创建它FromConfig 我想要的是这样的 test akka actor deployment worker router round robin nr of instances 5 slave router b
  • Akka 与现有 java 项目集成的示例

    如果我已经有现有的javaWeb 应用程序使用spring and servlet容器 将 Akka 集成到其中的正确方法是什么 就像我将会有Actor1 and Actor2互相沟通的 开始使用这些演员的切入点是什么 例如 1 把它放在那

随机推荐

  • yii2.0 basic版使用CRUD生成器的时候提示错误及页面显示错误

    在使用model生成器的时候不用加命名空间 xff0c 写好表名就可以直接生成了 但是在用CRUD的时候填写完controllers和model class之后点击生成会弹出错误提示 xff1a Class 39 TelBook 39 sp
  • linux日志对应内容

    var log messages 包括整体系统信息 xff0c 其中也包含系统启动期间的日志 此外 xff0c mail xff0c cron xff0c daemon xff0c kern和auth等内容也记录在var log messa
  • 常用证书操作函数

    现有的证书大都采用X 509规范 xff0c 主要同以下信息组成 xff1a 版本号 证书序列号 有效期 拥有者信息 颁发者信息 其他扩展信息 拥有者的公钥 CA对以上信息的签名 OpenSSL实现了对X 509数字证书的所有操作 包括签发
  • MongoDB 匹配查询和比较操作符

    一 匹配查询 1 查询所有 span class token operator gt span db accounts find span class token punctuation span span class token punc
  • 我的2014——典型程序员的一年,不想再重来

    兴冲冲地拿起 xff0c 信誓旦旦的搁在一边 xff0c 以为很快就会回来 xff0c 却一晃而过 xff0c 不再回来 xff1b 我不想再重复过去 xff0c 决定去做 xff0c 写下来 题记 已经记不起我2014的年初是否有过规划
  • 我的2016——程序员年到三十,工作第四年

    看到CSDN 我的2016 主题征文活动 已经是1月6号 xff0c 而截止时间是1月8号 xff0c 对比去年的总结是在闲等活动开始 xff0c 今年在时间上真的是天差地别 但是 xff0c 一年到头 xff0c 还是需要花些时间来回顾这
  • mac下 ndk_build: command not found

    参考 http blog csdn net greenbird811 article details 7543305 在mac下调用ndk build c代码文件提示错误 fix 1 启动终端Terminal 2 进入当前用户的home目录
  • 公司分配IP地址,求主机号码的最小值和最大值。

    问题描述如下 xff1a 姐 xff1a 注意减去2的实际意义 xff1a 网络地址后的第一个主机地址是本网段的网络地址192 168 0 0 xff0c 最 后一个主机地址是本网段的广播地址192 168 255 255
  • Erlang入门

    64 author sunxu 64 copyright C 2023 lt COMPANY gt 64 doc 64 end Created 16 2月 2023 22 16 module test author 34 sunxu 34
  • IPv4地址、IPv6地址和Mac地址的位数

    xff08 1 xff09 IPv4的地址是32位 xff0c 用点分十进制表示 xff0c 每八位划分 xff0c 也就是四个0 255的十进制数 xff0c 这是很常见的 xff08 2 xff09 IPv6的地址是128位 xff0c
  • 用C#连接数据库的方法

    连接SQL Server数据库的方法 xff1a 1 在程序中引用System Data SqlClient命名空间 2 编写连接字符串 xff0c 格式为 xff1a Data Source 61 服务器名称 Initial Catalo
  • gcc 不支持 //注释的解决

    这段时间用slickedit写代码 xff08 windows平台下 xff0c 装了Cygwin xff09 xff0c 编译器用的gcc xff0c 但是有个问题就是用 34 34 写注释的时候 xff0c 编译的时候有错 xff1a
  • python实现按照文件名称进行文件分类

    问题 xff1a 大量名称中带有数字的图片 视频 xff0c 根据名称中数字按照一定的等差数列来排序 xff0c 并且放入指定对应的文件夹中 span class token keyword import span os span clas
  • 【深度学习】Yolov3详解笔记及Pytorch代码

    Yolov3详解笔记及Pytorch代码 预测部分网络结构backbone xff1a Darknet 53output预测结果的解码 训练部分计算loss所需参数pred是什么target是什么loss的计算过程 预测部分 网络结构 DB
  • 【深度学习】各种卷积的理解笔记(2D,3D,1x1,可分离卷积)

    卷积 1 2D卷积单通道版本多通道版本 2 3D卷积3 1x1卷积作用应用 4 卷积算法5 可分离卷积空间可分离卷积深度可分离卷积 1 2D卷积 卷积的目的是从输入中提取有用的特征 在图像处理中 xff0c 卷积可以选择多种不同的滤波器 每
  • 【深度学习】(2+1)D模型框架结构笔记

    xff08 2 43 1 xff09 D 模型框架结构笔记 SpatioTemporalConv模块结构SpatioTemporalResBlock模块结构SpatioTemporalResLayer模块结构2Plus1DNet Spati
  • 【机器学习】LR回归(逻辑回归)和softmax回归

    LR回归 xff08 逻辑回归 xff09 和softmax回归 1 LR回归Logistic回归的函数形式Logistic回归的损失函数Logistic回归的梯度下降法Logistic回归防止过拟合Multinomial Logistic
  • 【深度学习】时间注意力模块与空间注意力模块

    注意力模块 通道 xff08 时间 xff09 注意力模块空间注意力模块 通道 xff08 时间 xff09 注意力模块 为了汇总空间特征 xff0c 作者采用了全局平均池化和最大池化两种方式来分别利用不同的信息 输入是一个 H W C 的
  • 【机器学习】机器学习与统计分布的关系

    这里写目录标题 1 常见的统计学分布1 xff09 离散分布a 伯努利分布b 二项分布c 泊松分布 2 xff09 连续分布a 正态分布 xff08 高斯分布 xff09 b 均匀分布 为什么我们喜欢用 sigmoid 这类 S 型非线性变
  • AKKA入门

    1 Guardian java package com example demo import akka actor typed javadsl ActorContext import akka actor typed ActorRef i