Akka|Introduction

基本概念

所有内容基于中文 Wiki Akka 中文指南

Akka 是一个用于在 JVM 平台上构建高并发、分布式和可容错的事件驱动应用程序的运行时工具包

Akka 让开发者们专注于满足业务需求,而不是编写初级代码来提供可靠的应用行为、容错性和高性能

提供的功能

Akka 提供了:

  • 不使用原子或锁之类的低级并发构造的多线程行为,甚至可以避免你考虑内存可见性问题: Volatile(?)
  • 系统及其组件之间的透明远程通信,使我们不需要再编写和维护困难的网络通信代码
  • 一个集群的、高可用的体系结构,具有弹性、可按需扩展性,使你能够提供真正的反应式系统

通过学习 Akka 以及如何使用其核心的 Actor 模型,我们将能够熟练的使用大量的工具集,这些工具可以在统一的编程模型中解决困难的分布式/并行系统问题,在统一的编程模型中,所有东西都紧密且高效地结合在一起

快速上手

部署

从官网下载压缩包,解压项目,可以通过 Gradle 或者是 Maven 来进行构建

官方说是只要一个命令就能运行,但是由于这个 demo 项目并没有版本控制,目前的代码版本需要 JDK17,因此还配置了 JDK17 的环境来编写代码

一个小插曲:配置完JAVA_HOME 以及对应的环境变量之后,输入 java-version 还是显示用的11,查询经验帖得知:需要删除环境变量中 path 对应的 ...\Oracle\Java\javapath 缓存

之后一个命令直接跑起来这个 demo 项目

1
$ mvn compile exec:exec

可以看到如下的日志信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[INFO] --- exec-maven-plugin:3.0.0:exec (default-cli) @ app ---
[2024-05-28 14:05:58,241] [INFO] [akka.event.slf4j.Slf4jLogger] [hello-akka.actor.default-dispatcher-5] [] - Slf4jLogger started
[2024-05-28 14:05:58,374] [INFO] [com.example.HelloWorld] [hello-akka.actor.default-dispatcher-5] [akka://hello/user/greeter] - Hello World!
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorld] [hello-akka.actor.default-dispatcher-5] [akka://hello/user/greeter] - Hello Akka!
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorldBot] [hello-akka.actor.default-dispatcher-6] [akka://hello/user/World] - Greeting 1 for Actor[akka://hello/user/greeter#-1692900445]
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorldBot] [hello-akka.actor.default-dispatcher-5] [akka://hello/user/Akka] - Greeting 1 for Actor[akka://hello/user/greeter#-1692900445]
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorld] [hello-akka.actor.default-dispatcher-5] [akka://hello/user/greeter] - Hello World!
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorld] [hello-akka.actor.default-dispatcher-5] [akka://hello/user/greeter] - Hello Akka!
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorldBot] [hello-akka.actor.default-dispatcher-5] [akka://hello/user/Akka] - Greeting 2 for Actor[akka://hello/user/greeter#-1692900445]
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorldBot] [hello-akka.actor.default-dispatcher-4] [akka://hello/user/World] - Greeting 2 for Actor[akka://hello/user/greeter#-1692900445]
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorld] [hello-akka.actor.default-dispatcher-6] [akka://hello/user/greeter] - Hello Akka!
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorld] [hello-akka.actor.default-dispatcher-6] [akka://hello/user/greeter] - Hello World!
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorldBot] [hello-akka.actor.default-dispatcher-4] [akka://hello/user/Akka] - Greeting 3 for Actor[akka://hello/user/greeter#-1692900445]
[2024-05-28 14:05:58,390] [INFO] [com.example.HelloWorldBot] [hello-akka.actor.default-dispatcher-6] [akka://hello/user/World] - Greeting 3 for Actor[akka://hello/user/greeter#-1692900445]
[2024-05-28 14:06:01,379] [INFO] [akka.actor.CoordinatedShutdown] [hello-akka.actor.default-dispatcher-5] [CoordinatedShutdown(akka://hello)] - Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
[INFO] ------------------------------------------------------------------------

三个Actor

在这个最简单的 Demo 示例中包含了三个 Actor

  • Greeter 发送和接收消息
  • GreeterBot 接收和发送消息,并统计已经发送的消息的个数
  • GreeterMain 监管总任务的主Actor

总体的流程大致如下:一开始先是 Greeter 发送消息,之后 Bot 收到消息,触发Greeted 逻辑

Greeter 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//泛型表示行为类处理HelloWorld.Greet类型的消息
public class HelloWorld extends AbstractBehavior<HelloWorld.Greet> {

//通过 record 关键字创造两种消息,一种是第一次的打招呼,另一次是收到后的回复
public static record Greet(String whom, ActorRef<Greeted> replyTo) {}
public static record Greeted(String whom, ActorRef<Greet> from) {}

//初始化以及上下文的配置
public static Behavior<Greet> create() {
return Behaviors.setup(HelloWorld::new);
}

private HelloWorld(ActorContext<Greet> context) {
super(context);
}

//创造消息接收器
@Override
public Receive<Greet> createReceive() {
//onMessage()设定了只有当收到 Greet 类型的消息时,才会触发 onGreet()
return newReceiveBuilder().onMessage(Greet.class, this::onGreet).build();
}
//实际处理 Greet 处理消息
private Behavior<Greet> onGreet(Greet command) {
//父类的上下文日志记录器来记录日志
getContext().getLog().info("Hello {}!", command.whom);
//收到消息之后回复发送者一个 Greeted 消息,其中 whom 参数设定原先发消息的那个 Actor,from 参数设定自己
command.replyTo.tell(new Greeted(command.whom, getContext().getSelf()));
return this;
}
}

可以发现其核心业务在于 newReceiveBuilder().onMessage() 配置只要有对应类型的消息收到,就触发对应的逻辑

GreeterBot 分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//泛型表示行为类处理HelloWorld.Greet类型的消息
public class HelloWorldBot extends AbstractBehavior<HelloWorld.Greeted> {

public static Behavior<HelloWorld.Greeted> create(int max) {
return Behaviors.setup(context -> new HelloWorldBot(context, max));
}

private final int max;
private int greetingCounter;

//参数初始化
private HelloWorldBot(ActorContext<HelloWorld.Greeted> context, int max) {
super(context);
this.max = max;
}

//创建消息接收器,只有收到 Greeted 类型的数据,才会触发 onGreeted 逻辑
@Override
public Receive<HelloWorld.Greeted> createReceive() {
return newReceiveBuilder().onMessage(HelloWorld.Greeted.class, this::onGreeted).build();
}
//处理消息
private Behavior<HelloWorld.Greeted> onGreeted(HelloWorld.Greeted message) {
greetingCounter++;
getContext().getLog().info("Greeting {} for {}", greetingCounter, message.from());
if (greetingCounter == max) {
return Behaviors.stopped();
} else {
//如果没有到达发送消息的数量上限就再进行一轮,从 Greet 开始
message.from().tell(new HelloWorld.Greet(message.whom(), getContext().getSelf()));
//返回当前实例表示事件行为没有发生变化
return this;
}
}
}

每当HelloWorldBot接收到 HelloWorld.Greeted 消息时,会触发 onGreeted 方法,根据处理逻辑记录问候次数并决定是否停止Actor。这种设计使得HelloWorldBot能够异步、高效地处理消息。

GreeterMain 分析

这里才是整个最佳实践的入口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class HelloWorldMain extends AbstractBehavior<HelloWorldMain.SayHello> {


public static void main(String[] args) throws Exception {

final ActorSystem<SayHello> system =
ActorSystem.create(HelloWorldMain.create(), "hello");

system.tell(new HelloWorldMain.SayHello("World"));
system.tell(new HelloWorldMain.SayHello("Akka"));


Thread.sleep(3000);
system.terminate();
}


//定义消息静态记录类
public static record SayHello(String name) {}

public static Behavior<SayHello> create() {
return Behaviors.setup(HelloWorldMain::new);
}

private final ActorRef<HelloWorld.Greet> greeter;

private HelloWorldMain(ActorContext<SayHello> context) {
super(context);
//构造函数这里调用 Greeter Actor 的启动函数初始化一个 Greeter 类型的引用
greeter = context.spawn(HelloWorld.create(), "greeter");
}

@Override
public Receive<SayHello> createReceive() {
return newReceiveBuilder().onMessage(SayHello.class, this::onSayHello).build();
}

private Behavior<SayHello> onSayHello(SayHello command) {
//每一次收到 sayHello 的消息(也就是主程序的控制指令)时就会先创建一个 bot 之后发送 greet 消息
// 以及 bot 的引用给 Greet Actor 来作为这个最佳实践的起步
ActorRef<HelloWorld.Greeted> replyTo =
getContext().spawn(HelloWorldBot.create(3), command.name);
greeter.tell(new HelloWorld.Greet(command.name, replyTo));
return this;
}
}

整体流程梳理

整体的流程架构其实如下:

  1. 主程序向 HelloWorldMain 发送 SayHello("World")
  2. HelloWorldMain 创建 HelloWorldBot("World"),并向 HelloWorld 发送 Greet("World", replyTo)replyToHelloWorldBot 的引用)。
  3. HelloWorld 接收到 Greet("World", replyTo),记录日志 “Hello World!”,并向 HelloWorldBot 发送 Greeted("World", from)fromHelloWorld 的引用)。
  4. HelloWorldBot 接收到 Greeted("World", from),记录日志 “Greeting 1 for from”,并向 HelloWorld 发送新的 Greet("World", replyTo)
  5. 重复步骤 3 和 4 两次,直到 HelloWorldBot 问候计数达到 3,然后 HelloWorldBot 停止自己。
  6. 主程序向 HelloWorldMain 发送 SayHello("Akka"),类似的步骤再次发生,直到 HelloWorldBot 问候计数达到 3 并停止自己。

当然结果我们也看到了其实是名为 Hello 以及 Akka 的两个 Bot 几乎是同一时间被创建了,这里就是Akka 异步的体现:

  • HelloWorldMainHelloWorld 发送 Greet 消息后,不会等待 HelloWorld 的回复,而是可以继续发送更多的消息或执行其他操作。
  • 同样地,HelloWorld 处理 Greet 消息并发送 Greeted 消息后,也不会等待 HelloWorldBot 的回复,而是可以处理其他消息。
  • HelloWorldBot 处理 Greeted 消息后,会决定是否发送新的 Greet 消息,但不会等待 HelloWorld 的回复,而是可以继续处理后续的 Greeted 消息。

同时,HelloWorldMain 在处理 SayHello("World") 消息时,会创建一个 HelloWorldBot 并向 HelloWorld 发送 Greet("World", replyTo) 消息。在处理 SayHello("Akka") 消息时,会再次创建一个新的 HelloWorldBot 并向 HelloWorld 发送 Greet("Akka", replyTo) 消息。由于消息处理是异步非阻塞的HelloWorldMain 可以在创建第一个 HelloWorldBot 后立即创建第二个 HelloWorldBot,无需等待第一个 HelloWorldBot 完成其处理。

采用工厂方法Ref的方式传递而不是直接new对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class HelloWorld extends AbstractBehavior<HelloWorld.Greet> {

public static Behavior<Greet> create() {
return Behaviors.setup(HelloWorld::new);
}

private HelloWorld(ActorContext<Greet> context) {
super(context);
}

// Actor implementation here...
}

public class MainApp {
public static void main(String[] args) {
ActorSystem<HelloWorld.Greet> system = ActorSystem.create(HelloWorld.create(), "helloWorldSystem");
ActorRef<HelloWorld.Greet> greeter = system;
greeter.tell(new HelloWorld.Greet("World", system));
}
}

观察上面几个 Actor 的代码我们可以注意到,几乎都遵循着不存在 new 对象的情况

一般是使用 Behaviors.setup 来创建 Behavior,然后返回 ActorRef

这主要是:

  • 便于 Akka 框架的生命周期管理
  • 实现地址的透明性,这里的透明是指 Actor 的位置(本地或远程)对发送消息的一方是透明的。使用 ActorRef 可以将消息发送给本地 Actor 或分布在集群中的远程 Actor,而无需了解 Actor 的具体实现或位置
  • Actor 模型中,Actor 之间通过消息传递进行通信,避免共享内存和状态,从而简化并发编程并提高系统的健壮性

Mediator模型

我们以用户订阅股票价格数据变动为例来作为应用场景

消息实体类

这里定义用于 Akka Actor 之间通信的消息实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//消息中分为两类
public class Messages {
//股票价格更新消息实体
public static class StockPriceUpdate {
//初始化后就无法再被修改
public final String symbol;
public final double price;

public StockPriceUpdate(String symbol, double price) {
this.symbol = symbol;
this.price = price;
}
}
//股票价格订阅消息实体
public static class Subscribe {
public final String symbol;

public Subscribe(String symbol) {
this.symbol = symbol;
}
}
}

其中,StockPriceUpdate 是股票价格更新实体类,在系统中传递股票价格更新的消息。当股票价格更新时,这个类的实例可以被创建并发送到订阅者,以通知他们最新的价格。

Subscribe 是用来订阅股票价格更新的消息。系统中的客户端希望订阅指定股票的价格更新时,这个类的实例可以被创建并发送到发布-订阅系统,以注册订阅请求

订阅者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class StockSubscriber extends AbstractActor {
private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
private final ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
private final String symbol;

public StockSubscriber(String symbol) {
this.symbol = symbol;
}

//订阅注册到Mediator
@Override
public void preStart() {
mediator.tell(new DistributedPubSubMediator.Subscribe(symbol, getSelf()), getSelf());
}

//消费消息逻辑
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Messages.StockPriceUpdate.class, msg -> {
log.info("Received stock price update: {} - {}", msg.symbol, msg.price);
})
.build();
}
}

这里的一个核心操作就是 mediator.tell(new DistributedPubSubMediator.Subscribe(symbol, getSelf()), getSelf());

表示向 Mediator 注册自己订阅者的身份,之后收到消息会由 Mediator 来转发

发布者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class StockPublisher extends AbstractActor {
private final ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
private final String symbol;

public StockPublisher(String symbol) {
this.symbol = symbol;
}

//定期发布股票价格更新
@Override
public void preStart() {
getContext().getSystem().scheduler().schedule(
Duration.create(1, TimeUnit.SECONDS),
Duration.create(1, TimeUnit.SECONDS),
() -> mediator.tell(new DistributedPubSubMediator.Publish(symbol, new Messages.StockPriceUpdate(symbol, Math.random() * 100)), getSelf()),
getContext().dispatcher()
);
}

@Override
public Receive createReceive() {
return receiveBuilder()
.build();
}
}

这里的 preStart 是一个定时任务,核心还是在 mediator.tell(Publish) 来实现向 Mediator 注册发布者身份


Akka|Introduction
http://example.com/2024/05/28/Akka-Introduction/
作者
Noctis64
发布于
2024年5月28日
许可协议