专注Java教育14年 全国咨询/投诉热线:400-8080-105
动力节点LOGO图
始于2009,口口相传的Java黄埔军校
首页 学习攻略 Java学习 Akka Java集群示例

Akka Java集群示例

更新时间:2022-10-18 11:14:27 来源:动力节点 浏览1312次

介绍

这是一个 Java、Maven、Akka 项目,演示了如何设置一个基本的 Akka 集群。

这个项目是一系列项目中的一个,它从一个简单的 Akka Cluster 项目开始,逐步构建到事件溯源和命令查询职责分离的示例。

该项目系列由以下 GitHub 存储库组成:

akka-java-cluster (这个项目)

akka-java-集群感知

akka-java-cluster-singleton

akka-java-集群分片

akka-java-cluster-persistence

akka-java-cluster-persistence-query

每个项目都可以独立于其他项目进行克隆、构建和运行。

关于 Akka 聚类

根据Akka 文档,“ Akka Cluster 提供了一种容错分散的基于点对点的集群成员服务,没有单点故障或单点瓶颈。它使用 gossip 协议和自动故障检测器来做到这一点。

Akka 集群允许构建分布式应用程序,其中一个应用程序或服务跨越多个节点。"

Akka 文档中的上述段落包含了许多最初可能难以理解的概念。考虑一些仅在两句话中就被抛弃的术语,例如“容错”、“去中心化”、“点对点”和“无单点故障”。最后一句几乎是随便说的“一个应用程序或服务跨越多个节点”。等待; 什么?应用程序或服务如何跨越多个节点?

答案是 Akka 提供了一个抽象层,该层由参与者在参与者系统中相互交互组成。Akka 是一个演员模型的实现。演员模型” (维基百科)将“演员”视为并发计算的通用原语。为了响应它接收到的消息,演员可以:做出本地决策,创建更多演员,发送更多消息,并确定如何响应收到的下一条消息。参与者可以修改自己的私有状态,但只能通过消息相互影响(避免需要任何锁)。

Akka Actor 通过异步消息相互通信。Akka Actor 系统在 Java 虚拟机上运行,​​并且使用 Akka 集群,单个 Actor 系统可以在逻辑上跨越多个网络 JVM。这个网络化的参与者系统抽象层使参与者可以跨节点集群透明地与每个参与者进行通信。一种思考方式是,从演员的角度来看,他们生活在一个演员系统中,演员系统在一个或多个节点上运行的事实在很大程度上隐藏在抽象层中。

ClusterListenerActor 演员

Akka Actor 是用 Java 或 Scala 实现的。您可以将参与者创建为 Java 或 Scala 类。有两种实现actor的方法,无类型和有类型。这个 Akka Java 集群示例项目系列中使用了无类型的 actor。

对于那些有兴趣深入了解 Actor 如何工作以及如何实现的细节的人来说,关于Actors的 Akka 文档部分 是一个很好的起点。

我们将要查看的第一个 Actor 名为 ClusterListenerActor。该参与者设置为接收有关集群事件的消息。当节点加入和离开集群时,此参与者会收到有关这些事件的消息。然后将这些接收到的消息写入记录器。

ClusterListenerActor 提供了集群活动的简单视图。以下是日志输出的示例:

03:20:29.569 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - MemberUp(Member(address = akka.tcp://cluster@127.0.0.1:2553, status = Up)) sent to Member(address = akka.tcp://cluster@127.0.0.1:2551, status = Up)
03:20:29.570 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - 1 (LEADER) (OLDEST) Member(address = akka.tcp://cluster@127.0.0.1:2551, status = Up)
03:20:29.570 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - 2 Member(address = akka.tcp://cluster@127.0.0.1:2552, status = Up)
03:20:29.570 INFO  cluster-akka.actor.default-dispatcher-4 akka.tcp://cluster@127.0.0.1:2551/user/clusterListener - 3 Member(address = akka.tcp://cluster@127.0.0.1:2553, status = Joining)

让我们从完整的 ClusterListenerActor 源文件开始。请注意,此 actor 是作为扩展基于 Akka 的类的单个 Java 类实现的。

package cluster;
import akka.actor.AbstractLoggingActor;
import akka.actor.Cancellable;
import akka.actor.Props;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.cluster.ClusterEvent.CurrentClusterState;
import akka.cluster.Member;
import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
class ClusterListenerActor extends AbstractLoggingActor {
    private final Cluster cluster = Cluster.get(context().system());
    private Cancellable showClusterStateCancelable;
    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(ShowClusterState.class, this::showClusterState)
                .matchAny(this::logClusterEvent)
                .build();
    }
    private void showClusterState(ShowClusterState showClusterState) {
        log().info("{} sent to {}", showClusterState, cluster.selfMember());
        logClusterMembers(cluster.state());
        showClusterStateCancelable = null;
    }
    private void logClusterEvent(Object clusterEventMessage) {
        log().info("{} sent to {}", clusterEventMessage, cluster.selfMember());
        logClusterMembers();
    }
    @Override
    public void preStart() {
        log().debug("Start");
        cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
                ClusterEvent.ClusterDomainEvent.class);
    }
    @Override
    public void postStop() {
        log().debug("Stop");
        cluster.unsubscribe(self());
    }
    static Props props() {
        return Props.create(ClusterListenerActor.class);
    }
    private void logClusterMembers() {
        logClusterMembers(cluster.state());
        if (showClusterStateCancelable == null) {
            showClusterStateCancelable = context().system().scheduler().scheduleOnce(
                    Duration.ofSeconds(15),
                    self(),
                    new ShowClusterState(),
                    context().system().dispatcher(),
                    null);
        }
    }
    private void logClusterMembers(CurrentClusterState currentClusterState) {
        Optional<Member> old = StreamSupport.stream(currentClusterState.getMembers().spliterator(), false)
                .reduce((older, member) -> older.isOlderThan(member) ? older : member);
        Member oldest = old.orElse(cluster.selfMember());
        StreamSupport.stream(currentClusterState.getMembers().spliterator(), false)
                .forEach(new Consumer<Member>() {
                    int m = 0;
                    @Override
                    public void accept(Member member) {
                        log().info("{} {}{}{}", ++m, leader(member), oldest(member), member);
                    }
                    private String leader(Member member) {
                        return member.address().equals(currentClusterState.getLeader()) ? "(LEADER) " : "";
                    }
                    private String oldest(Member member) {
                        return oldest.equals(member) ? "(OLDEST) " : "";
                    }
                });
    }
    private static class ShowClusterState {
        @Override
        public String toString() {
            return ShowClusterState.class.getSimpleName();
        }
    }
}

这个类是一个简单的actor实现的例子。然而,这个actor的独特之处在于它订阅了Akka系统来接收集群事件消息。

@Override
public void preStart() {
    log().debug("Start");
    cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
            ClusterEvent.ClusterDomainEvent.class);
}

Actor 被设置为接收集群事件消息。当这些消息到达时,参与者调用编写的方法来记录事件并记录集群的当前状态。

@Override
public Receive createReceive() {
    return receiveBuilder()
            .match(ShowClusterState.class, this::showClusterState)
            .matchAny(this::logClusterEvent)
            .build();
}

随着集群中的每个节点启动,ClusterListenerActor 的一个实例也会启动。然后,参与者记录每个节点中发生的集群事件。您可以再次从每个节点的角度检查来自每个集群节点的日志,以查看集群事件并查看集群节点的状态。

这个怎么运作

在这个项目中,我们将从一个基于 Akka、Java 和 Maven 的示例的基本模板开始,其中包含运行 Akka 集群的代码和配置。Maven POM 文件使用两个插件,一个用于使用mvn:exec命令运行代码,另一个插件构建一个自包含 JAR 文件用于使用java -jar命令运行代码。

当项目代码被执行时,动作在Runner类main方法中开始。

public static void main(String[] args) {
    if (args.length == 0) {
        startupClusterNodes(Arrays.asList("2551", "2552", "0"));
    } else {
        startupClusterNodes(Arrays.asList(args));
    }
}

该main方法调用startupClusterNodes传递给它一个端口列表的方法。如果未提供参数,则使用默认的三个端口集。

private static void startupClusterNodes(List<String> ports) {
    System.out.printf("Start cluster on port(s) %s%n", ports);
    ports.forEach(port -> {
        ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));
        AkkaManagement.get(actorSystem).start();
        actorSystem.actorOf(ClusterListenerActor.props(), "clusterListener");
        addCoordinatedShutdownTask(actorSystem, CoordinatedShutdown.PhaseClusterShutdown());
        actorSystem.log().info("Akka node {}", actorSystem.provider().getDefaultAddress());
    });
}

这些startupClusterNodes方法循环通过端口列表。为每个端口创建一个参与者系统。

ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));

创建一个演员系统时会发生很多事情。许多决定如何运行actor系统的细节是通过配置设置定义的。这个项目包括一个application.conf配置文件,它位于src/main/resources目录中。最关键的配置设置之一定义了参与者系统主机和端口。当参与者系统在集群中运行时,配置还定义了每个节点将如何定位和加入集群。在这个项目中,节点使用所谓的种子节点加入集群。

cluster {
  seed-nodes = [
    "akka.tcp://cluster@127.0.0.1:2551",
    "akka.tcp://cluster@127.0.0.1:2552"]
}

让我们来看看这个项目的集群启动场景。在此示例中,一个 JVM 启动时没有运行时参数。当不带参数调用Runner类main方法时,默认是在端口 2551、2552 和端口 0 上创建三个参与者系统(零端口会导致随机选择非零端口号)。

由于每个参与者系统都是在特定端口上创建的,因此它会查看种子节点配置设置。如果参与者系统的端口是种子节点之一,它知道它将与其他种子节点联系以形成集群。如果参与者系统的端口不是种子节点之一,它将尝试联系种子节点之一。非种子节点需要向其中一个种子节点宣布自己并要求加入集群。

下面是使用默认端口 2551、2552 和 0 的示例启动场景。在端口 2551 上创建了一个参与者系统;查看配置它知道它是一个种子节点。端口 2551 上的种子节点参与者系统尝试联系端口 2552 上的参与者系统,即另一个种子节点。当创建端口 2552 上的参与者系统时,它会经历相同的过程,在这种情况下,2552 会尝试与 2551 联系并加入。当在随机端口(例如端口 24242)上创建第三个参与者系统时,它会从配置中知道它不是种子节点,在这种情况下,它会尝试与种子参与者系统之一进行通信,宣布自己并加入集群。

您可能已经注意到,在上面的示例中,三个参与者系统是在单个 JVM 中创建的。虽然每个 JVM 运行多个参与者系统是可以接受的,但更常见的情况是每个 JVM 运行一个参与者系统。

让我们看一个稍微现实一点的例子。使用提供的akka脚本启动一个三节点集群。

./akka cluster start 3

每个节点都在单独的 JVM 中运行。在这里,我们有三个参与者系统,它们在三个 JVM 中独立启动。这三个参与者系统遵循与之前相同的启动场景,结果它们形成了一个集群。

当然,最常见的场景是每个参与者系统都是在不同的 JVM 中创建的,每个 JVM 都运行在不同的服务器、虚拟服务器或容器上。同样,相同的启动过程发生在各个参与者系统通过网络找到彼此并形成集群的地方​​。

让我们回到创建actor系统的那一行代码。

ActorSystem actorSystem = ActorSystem.create("cluster", setupClusterNodeConfig(port));

从这个简短的描述中,您可以看到在actor系统抽象层中发生了很多事情,而这个启动过程的总结只是冰山一角,这是抽象层应该做的,它们隐藏了复杂性。

一旦多个参与者系统加入一个集群,从在这个虚拟参与者系统中运行的参与者的角度来看,它们会形成一个单一的虚拟参与者系统。当然,单个参与者实例物理上驻留在特定 JVM 内的特定集群节点中,但在接收和发送参与者消息时,节点边界是透明的并且几乎消失了。正是这种透明性是构建“一个应用程序或服务跨越多个节点”的基础。

此外,通过添加更多节点来扩展集群的灵活性是消除单点瓶颈的机制。当集群中的现有节点无法处理当前负载时,可以添加更多节点来扩展容量。失败也是如此。一个或多个节点的丢失并不意味着整个集群出现故障。可以替换故障节点,并且可以将在故障节点上运行的参与者重新定位到其他节点。

希望本概述能够阐明 Akka 如何提供“无单点故障或单点瓶颈”以及“ Akka 集群如何允许构建分布式应用程序,其中一个应用程序或服务跨越多个节点。 ”

安装

git clone https://github.com/mckeeh3/akka-java-cluster.git
cd akka-java-cluster
mvn clean package

Maven 命令构建项目并创建一个自包含的可运行 JAR。

运行集群(Mac、Linux)

该项目包含一组脚本,可用于启动和停止单个集群节点或启动和停止节点集群。

提供主脚本./akka以运行节点集群或启动和停止单个节点。用于./akka node start [1-9] | stop启动和停止单个节点以及./akka cluster start [1-9] | stop启动和停止节点集群。clusterand start 选项将node在端口 2551 到 2559 上启动 Akka 节点。stdinand输出都使用文件命名约定stderr发送到目录中的文件。/tmp/tmp/-N.log

在端口 2551 上启动节点 1,在端口 2552 上启动节点 2。

./akka node start 1
./akka node start 2

在端口 2553 上停止节点 3。

./akka node stop 3

在端口 2551、2552、2553 和 2554 上启动一个由四个节点组成的集群。

./akka cluster start 4

停止所有当前正在运行的集群节点。

./akka cluster stop

您可以使用该./akka cluster start [1-9]脚本启动多个节点,然后使用./akka node start [1-9]and./akka node stop [1-9] 启动和停止单个节点。

使用该./akka node tail [1-9]命令tail -f创建节点 1 到 9 的日志文件。

该命令使用Akka 管理 扩展 Cluster Http Management./akka cluster status以 JSON 格式显示当前正在运行的集群的状态 。

运行集群(Windows,命令行)

以下 Maven 命令在端口 2551、2552 和 radmonly 选择的端口上运行带有 3 个 Akka 演员系统的signle JVM。

mvn exec:java

使用 CTRL-C 停止。

要在特定端口上运行,请使用以下-D选项传入命令行参数。

mvn exec:java -Dexec.args="2551"

默认无参数等价于以下内容。

mvn exec:java -Dexec.args="2551 2552 0"

运行测试的一种常见方法是在多个命令窗口中启动单个 JVM。这模拟了运行多节点 Akka 集群。例如,在 4 个命令窗口中运行以下 4 个命令。

mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log
mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log
mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log

这将运行一个 4 节点 Akka 集群,在端口 2551 和 2552 上启动 2 个节点,这些节点是配置的集群种子节点和application.conf文件。以及随机选择的端口号上的 2 个节点。可选重定向> /tmp/$(basename $PWD)-4.log是基于项目目录名称将日志输出推送到文件名的示例。

为方便起见,在 Linux 命令 shell 中定义以下别名。

alias p1='cd ~/akka-java/akka-java-cluster'
alias p2='cd ~/akka-java/akka-java-cluster-aware'
alias p3='cd ~/akka-java/akka-java-cluster-singleton'
alias p4='cd ~/akka-java/akka-java-cluster-sharding'
alias p5='cd ~/akka-java/akka-java-cluster-persistence'
alias p6='cd ~/akka-java/akka-java-cluster-persistence-query'
alias m1='clear ; mvn exec:java -Dexec.args="2551" > /tmp/$(basename $PWD)-1.log'
alias m2='clear ; mvn exec:java -Dexec.args="2552" > /tmp/$(basename $PWD)-2.log'
alias m3='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-3.log'
alias m4='clear ; mvn exec:java -Dexec.args="0" > /tmp/$(basename $PWD)-4.log'

p1-6 别名命令是 cd 到六个项目目录之一的快捷方式。m1-4 别名命令使用适当的端口启动和 Akka 节点。Stdout 也被重定向到 /tmp 目录。

提交申请后,顾问老师会电话与您沟通安排学习

免费课程推荐 >>
技术文档推荐 >>