Як зробити простий додаток за допомогою кластера Akka

Якщо ви читали мою попередню історію про Scalachain, ви, мабуть, помітили, що це далеко не розподілена система. У ньому відсутні всі функції для належної роботи з іншими вузлами. Додайте до цього, що блокчейн, складений одним вузлом, марний. З цієї причини я вирішив, що настав час попрацювати над цим питанням.

Оскільки Scalachain працює на базі Akka, чому б не використати шанс зіграти з кластером Akka? Я створив простий проект, щоб трохи поспілкуватися з кластером Акка, і в цій історії я збираюся поділитися своїми знаннями. Ми збираємося створити кластер із трьох вузлів, використовуючи маршрутизатори, що знають про кластери, щоб збалансувати навантаження між ними. Все буде працювати в контейнері Docker, і ми будемо використовувати docker-compose для легкого розгортання.

Добре, котимось! ?

Короткий вступ до кластеру Akka

Кластер Akka надає велику підтримку у створенні розподілених додатків. Найкращий варіант використання - це коли у вас є вузол, який потрібно повторити N разів у розподіленому середовищі. Це означає, що всі N-вузли є одноранговими з однаковим кодом. Кластер Akka надає вам нестандартне відкриття членів того самого кластера. Використовуючи кластерні маршрутизатори, можна збалансувати повідомлення між акторами в різних вузлах. Також можна вибрати політику балансування, зробивши баланс навантаження шматочком пирога!

Насправді ви можете вибрати між двома типами маршрутизаторів:

Груповий маршрутизатор - Актори, яким потрібно надсилати повідомлення, які називаються маршрутами, визначаються за допомогою шляху до актора. Маршрутизатори діляться маршрутами, створеними в кластері. У цьому прикладі ми використаємо груповий маршрутизатор.

Пул-маршрутизатор - маршрути створюються та розгортаються маршрутизатором, тому вони є його дітьми в ієрархії акторів. Маршрутизатори не розділяються між маршрутизаторами. Це ідеально підходить для сценарію первинної репліки, де кожен маршрутизатор є основним, а його маршрути - репліками.

Це лише вершина айсберга, тому я запрошую вас прочитати офіційну документацію для отримання додаткової інформації.

Кластер для математичних обчислень

Давайте уявимо сценарій використання. Припустимо розробити систему для виконання математичних обчислень за запитом. Система розгортається в режимі онлайн, тому їй потрібен REST API для отримання запитів на обчислення. Внутрішній процесор обробляє ці запити, виконуючи обчислення і повертаючи результат.

Зараз процесор може обчислити лише число Фібоначчі. Ми вирішили використовувати кластер вузлів для розподілу навантаження між вузлами та підвищення продуктивності. Кластер Akka буде обробляти кластерну динаміку та балансування навантаження між вузлами. Добре, звучить непогано!

Ієрархія актора

Перш за все: нам потрібно визначити ієрархію акторів. Систему можна розділити на три функціональні частини: бізнес-логіку , управління кластером і сам вузол . Також є сервер, але він не є актором, і над цим ми працюватимемо пізніше.

Бізнес-логіка

Додаток повинен робити математичні обчислення. Ми можемо визначити простого Processorактора для управління всіма обчислювальними завданнями. Кожне обчислення, яке ми підтримуємо, може бути реалізоване в конкретному акторі, який буде дочірнім для цього Processor. Таким чином, додаток є модульним і простішим для розширення та обслуговування. Зараз єдиною дитиною Processorбуде ProcessorFibonacciактор. Припускаю, ви здогадуєтесь, яке його завдання. Цього має бути достатньо для початку.

Кластерне управління

Для управління кластером нам потрібен файл ClusterManager. Звучить просто, правда? Цей актор обробляє все, що пов’язано з кластером, як повернення його членів на запит. Було б корисно реєструвати, що відбувається всередині кластера, тому ми визначаємо ClusterListenerактора. Це дочірній матеріал ClusterManagerі підписується на події кластера, реєструючи їх.

Вузол

NodeАктор є коренем нашої ієрархії. Це точка входу нашої системи, яка взаємодіє з API. The Processorта ClusterManagerє його дітьми, разом з ProcessorRouterактором. Це балансир навантаження системи, розподіляючи навантаження між Processors. Ми налаштуємо його як маршрутизатор, відомий кластеру, тому кожен ProcessorRouterможе надсилати повідомлення до Processors на кожному вузлі.

Впровадження актора

Час реалізувати наших акторів! Першими ми реалізуємо актори, пов’язані з бізнес-логікою системи. Потім ми переходимо до акторів для управління кластером і Nodeдо кінцевого актора ( ).

Процесор Фібоначчі

Цей актор виконує обчислення числа Фібоначчі. Він отримує Computeповідомлення, що містить номер для обчислення та посилання актора на відповідь. Посилання є важливим, оскільки можуть бути різні суб’єкти запиту. Пам’ятайте, що ми працюємо в розподіленому середовищі!

Після отримання Computeповідомлення fibonacciфункція обчислює результат. Ми обгортаємо його ProcessorResponseоб’єктом, щоб надати інформацію про вузол, який виконував обчислення. Це буде корисно пізніше, щоб побачити, як діє політика кругового розбору.

Потім результат надсилається актору, якому ми повинні відповісти. Простенька.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Процесор

ProcessorАктор управляє конкретним суб-процесори, як один Фібоначчі. Він повинен створити екземпляр підпроцесорів і переслати їм запити. Зараз у нас є тільки один допоміжний процесор, тому Processorотримує один вид повідомлення: ComputeFibonacci. Це повідомлення містить число Фібоначчі для обчислення. Після отримання номер для обчислення надсилається на номер FibonacciProcessorразом із посиланням на sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Ми хотіли б реєструвати корисну інформацію про те, що відбувається в кластері. Це може допомогти нам налагодити систему, якщо нам потрібно. Це мета ClusterListenerактора. Перед початком роботи він підписується на повідомлення про події кластера. Актор реагує на повідомлення , як MemberUp, UnreachableMemberабо MemberRemoved, увійшовши в відповідна подія. Після ClusterListenerзупинки він відписується від подій кластера.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

Актор, відповідальний за управління кластером, є ClusterManager. Він створює ClusterListenerактора та надає список членів кластеру за запитом. Його можна розширити, щоб додати більше функціональних можливостей, але зараз цього достатньо.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcessorRouter

Балансування навантаження серед процесорів здійснюється за допомогою ProcessorRouter. Він створюється Nodeактором, але цього разу вся необхідна інформація надається в конфігурації системи.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Давайте розберемо відповідну частину application.confфайлу.

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

Перше, що потрібно вказати шлях до актора маршрутизатора, тобто /node/processorRouter. Усередині цієї властивості ми можемо налаштувати поведінку маршрутизатора:

  • router: це політика балансування навантаження повідомлень. Я вибрав round-robin-group, але є багато інших.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?