Akka Cluster简介与核心条件搭建。Akka Cluster简介与基本条件搭建。

  akka集群是高容错、去中心化、不在单点故障以及不有单点瓶颈的集群。它以gossip协议通信及拥有故障自动检测功能。

  akka集群是高容错、去中心化、不在单点故障以及不在单点瓶颈的集群。它应用gossip协议通信与有着故障自动检测功能。

Gossip收敛
  集众多被各个一个节点被其他节点监督(默认的卓绝要命数目为5)。集众多被的节点互相监督着,某节点所监督的状态呢着吃其他监察着。通过gossip协议,节点向任何节点传递温馨所呈现节点的风靡状态(Up、Joining等等),同时节点吧在收取来自外节点的信息,这些消息包括什么样节点和这些节点对应的状态,并这些节点加入到祥和的seen表里去,表示友好曾经见了这些节点的新颖状态了,当有的节点都拿任何节点“看见”了后,我们得以说”Gossip收敛”完成了。

Gossip收敛
  集众多被各个一个节点被其他节点监督(默认的顶深数额为5)。集众多被的节点互相监督着,某节点所监督的状态呢在为别监察着。通过gossip协议,节点向任何节点传递温馨所见节点的风靡状态(Up、Joining等等),同时节点吧在吸纳来自其他节点的信息,这些信息包括哪些节点和这些节点对应之状态,并这些节点加入到好的seen表里去,表示自己都见了这些节点的新型状态了,当有的节点都拿其他节点“看见”了后,我们可以说”Gossip收敛”完成了。

  根据以上陈述,当集众多中某个节点不可达(unreachable)时,gossip收敛不克不辱使命。那些不可达的节点需要变成可直达状态(reachable)或者down状态,收敛才能够开展。
  akka集群不设有leader选举,但是有leader节点,但是leader节点可以变换,leader负责执行leader action,当每次收敛完成后,leader需要做三项事:

  根据以上陈述,当集众多中某个节点不可达(unreachable)时,gossip收敛不克不负众望。那些不可达的节点需要变成可达到状态(reachable)或者down状态,收敛才会开展。
  akka集群不在leader选举,但是是leader节点,但是leader节点可以换,leader负责执行leader action,当每次收敛完成后,leader需要做三件事:

  • 将处于joining状态节点变更为Up状态, 即joining->up
  • leaving->exiting
  • exiting->removed
  • 将处于joining状态节点变更为Up状态, 即joining->up
  • leaving->exiting
  • exiting->removed

failure Detector

  集众多中,一个节点被外节点监督(默认最可怜数量也5),任何一个节点被探测到不行及时,那么是信息将受通过gossip协议传播到其他节点去,其他节点也用以此节点标为不可达。同时故障检测机制为会见以节点打不足及标记为可高达,同时扩散给任何节点。
有关裁判一个节点是否可直达的道是运历史数据遭到老是心跳时间间隔的平均值与心跳次数也全方差去构建一个刚好无限分布,F是这个分布的密度分布函数,利用以下公式:

phi = -log10(1 - F(timeSinceLastHeartbeat))

  phi反应了手上网的上下情况,当akka.cluster.failure-detector.threshold阈值配置不就,并无是等某个心跳检测超时时,才会将节点标记为不可达。其值默认为18,想使抱重新强之灵敏度,需要把阈值设置降低。

图片 1

failure Detector

  集众多被,一个节点被其他节点监督(默认最要命数目也5),任何一个节点被探测到不足及时,那么是消息将受通过gossip协议传播及外节点去,其他节点吧用这节点标为不可达。同时故障检测机制吗会以节点打不足及标记为可达到,同时扩散给任何节点。
至于裁判一个节点是否只是达成之方是动历史数据被老是心跳时间距离的平均值与心跳次数为都方差去构建一个恰好无限分布,F是这分布之密度分布函数,利用以下公式:

phi = -log10(1 - F(timeSinceLastHeartbeat))

  phi反应了手上网络的上下情况,当akka.cluster.failure-detector.threshold阈值配置不立,并无是等某个心跳检测超时时,才见面管节点标记为不可达。其值默认为18,想如果收获重新胜之灵敏度,需要将阈值设置降低。

图片 2

实践

  编程方式构建集群
  akka.tcp://myCluster@127.0.0.1:2551节点:

application.conf:

akka {
  actor {
    provider = cluster
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-nodes = []
  }
}

package nathan

import akka.actor.{Actor, ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory

object Main extends App {
  val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
  Cluster(actorSystem).join(Address(protocol = "akka.tcp",system = "myCluster",host = "127.0.0.1",port = 2551))
}

  上述代码Cluster(actorSystem).join(address)举凡盖address为根基创建集群,集群的名号也”myCluster”,其中富含”akka.tcp://myCluster@127.0.0.1:2551″的节点。集群的名目也夫首先单投入的节点的名决定,其他后在的节点的称应当与那保持一致。当这单节点集群创建了后,这个单节点就改成seedNode,也就是说,其他节点通过向种子节点发出Join指令,就得进入集群。

  akka.tcp://myCluster@127.0.0.1:2552节点
application.conf

akka {
  actor {
    provider = cluster
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
  }
}

package nathan

import akka.actor.{ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory

object Main extends App {
  val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
  Cluster(actorSystem).joinSeedNodes(List(Address(protocol = "akka.tcp",system = "myCluster1",host = "127.0.0.1",port = 2551)))
}

  Cluster(actorSystem).joinSeedNodes(List(address))代码作用为某个种子节点发出Join命令以投入集群。这里填写的米节点越多越好,这样消息在集结众多中扩散可以重新快。

实践

  编程方式构建集群
  akka.tcp://myCluster@127.0.0.1:2551节点:

application.conf:

akka {
  actor {
    provider = cluster
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }
  cluster {
    seed-nodes = []
  }
}

package nathan

import akka.actor.{Actor, ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory

object Main extends App {
  val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
  Cluster(actorSystem).join(Address(protocol = "akka.tcp",system = "myCluster",host = "127.0.0.1",port = 2551))
}

  上述代码Cluster(actorSystem).join(address)是以address为根基创建集群,集群的称呼也”myCluster”,其中含有”akka.tcp://myCluster@127.0.0.1:2551″的节点。集群的称号也其首先独在的节点的讳决定,其他后加入的节点的号应当与那个保持一致。当以此单节点集群创建了后,这个单节点就变成seedNode,也就是说,其他节点通过奔米节点发出Join指令,就好进入集群。

  akka.tcp://myCluster@127.0.0.1:2552节点
application.conf

akka {
  actor {
    provider = cluster
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2552
    }
  }
}

package nathan

import akka.actor.{ActorSystem, Address}
import akka.cluster.Cluster
import com.typesafe.config.ConfigFactory

object Main extends App {
  val actorSystem = ActorSystem("myCluster", ConfigFactory.load())
  Cluster(actorSystem).joinSeedNodes(List(Address(protocol = "akka.tcp",system = "myCluster1",host = "127.0.0.1",port = 2551)))
}

  Cluster(actorSystem).joinSeedNodes(List(address))代码作用为某种子节点发出Join命令以进入集群。这里填写的子节点越多越好,这样消息在集结众多中扩散可以又快。

监听集群节点状态

  集群时间发生如下几种有如下几种:MemberJoinedMemberWeaklyUpMemberUpMemberLeftMemberExitedMemberRemovedLeaderChangedRoleLeaderChangedUnreachableMemberReachableMember等等。

class ListenClusterActor extends Actor {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
  }
  override def postStop(): Unit = cluster.unsubscribe(self)
  override def receive: Receive = {
    case MemberJoined(member) =>
      println("join:" + member)
    case MemberUp(member) =>
      println("up:" + member)
    case MemberExited(member) =>
      println("exited:" + member)
    case MemberRemoved(member,previousStatus) =>
      println("removed:" + member+" before status:"+previousStatus)
    case UnreachableMember(member) =>
      println("unreachable:" + member)
  }
}

当其他节点加入集群时跟离时,打印如下:

join:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Joining)
up:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Up)
exited:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Exiting)
removed:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Removed) before status:Exiting

监听集群节点状态

  集群时间发生如下几种植起如下几种植:MemberJoinedMemberWeaklyUpMemberUpMemberLeftMemberExitedMemberRemovedLeaderChangedRoleLeaderChangedUnreachableMemberReachableMember等等。

class ListenClusterActor extends Actor {
  val cluster = Cluster(context.system)
  override def preStart(): Unit = {
    cluster.subscribe(self, InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
  }
  override def postStop(): Unit = cluster.unsubscribe(self)
  override def receive: Receive = {
    case MemberJoined(member) =>
      println("join:" + member)
    case MemberUp(member) =>
      println("up:" + member)
    case MemberExited(member) =>
      println("exited:" + member)
    case MemberRemoved(member,previousStatus) =>
      println("removed:" + member+" before status:"+previousStatus)
    case UnreachableMember(member) =>
      println("unreachable:" + member)
  }
}

当其他节点加入集群时跟去时,打印如下:

join:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Joining)
up:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Up)
exited:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Exiting)
removed:Member(address = akka.tcp://myCluster@127.0.0.1:2552, status = Removed) before status:Exiting

相关文章

发表评论

电子邮件地址不会被公开。 必填项已用*标注