Akka sharding示例

这个示例来自Lightbend的 Lightbend Project Starter里的Cluster Sharding Scala。
是一个简单的分片集群用做记录温度的例子:

Device是Entity/Actor在每个分片上运行多个,用来记录温度并计算平均值。

Device接收消息时默认(初始)情况下执行 counting(Nil)创建一个空的List。

通过模式匹配RecordTemperature(id,temp)类型消息

消息在Device的伴生对象中定义,可以在外部调用Device.RecordTemperature()

将温度存入List 并改变自己的下一步行为 become(counting(temperature))
这样每当该actor收到消息就会递归地增加List

become()unbecome()方法是将运算行为压入/弹出行为栈。第一次become(counting(temperature))是将在栈顶端的初始receive方法counting(Nil)换成counting(temperature)
因为counting()是PartialFunction偏函数,偏函数也是对象,也有它的状态。temperatures可以保存起来。

    import akka.actor._

    object Device {
      case class RecordTemperature(deviceId: Int, temperature: Double)
    }
    class Device extends Actor with ActorLogging {
      import Device._

      override def receive = counting(Nil)

      def counting(values: List[Double]): Receive = {
        case RecordTemperature(id, temp) =>
          val temperatures = temp :: values
          log.info(s"Recording temperature $temp for device $id, 
    average is ${temperatures.sum / temperatures.size} after ${temperatures.size} readings");
          context.become(counting(temperatures))
      }
    }

Devices

    object Devices {
      // Update a random device
      case object UpdateDevice
    }

    class Devices extends Actor with ActorLogging {
      import Devices._

      private val extractEntityId: ShardRegion.ExtractEntityId = {
        case msg @ Device.RecordTemperature(id, _) => (id.toString, msg)
      }

      private val numberOfShards = 100

      private val extractShardId: ShardRegion.ExtractShardId = {
        case Device.RecordTemperature(id, _) => (id % numberOfShards).toString
        // Needed if you want to use 'remember entities':
        //case ShardRegion.StartEntity(id) => (id.toLong % numberOfShards).toString
      }

      val deviceRegion: ActorRef = ClusterSharding(context.system).start(
          typeName = "Device",
          entityProps = Props[Device],
          settings = ClusterShardingSettings(context.system),
          extractEntityId = extractEntityId,
          extractShardId = extractShardId)

      val random = new Random()
      val numberOfDevices = 50

      implicit val ec: ExecutionContext = context.dispatcher
      context.system.scheduler.schedule(10.seconds, 1.second, self, UpdateDevice)

      def receive = {
        case UpdateDevice =>
          val deviceId = random.nextInt(numberOfDevices)
          val temperature = 5 + 30 * random.nextDouble()
          val msg = Device.RecordTemperature(deviceId, temperature)
          log.info(s"Sending $msg");
          deviceRegion ! msg
      }
    }

在idea的scala插件中导入:




使用sbt构建完成后运行sample.sharding.ShardingApp

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Scala与Java的关系 Scala与Java的关系是非常紧密的!! 因为Scala是基于Java虚拟机,也就是...
    灯火gg阅读 3,482评论 1 24
  • 读《快学Scala 》一书的摘要 Scala 运行于JVM之上,拥有海量类库和工具,兼顾函数式编程和面向对象。 在...
    abel_cao阅读 1,291评论 0 8
  • 今天是10月3日,国庆假期第三天。我就在此做一份不完整的9月份回顾吧。这个月学的东西还是挺杂的,给我影响最大的事情...
    德老师奥阅读 359评论 4 5
  • 最近工作上比较忙,总是两天写一次简书,本想今天说对不起观众【虽然没几个】,但是我觉得是对不起自己,我在用简书这款软...
    游弋恶灵阅读 368评论 3 4
  • 早上,这边的太阳特好,暖洋洋地坐在会议室里,有点慵懒,想睡一觉!开会的记录转手就在手边,偷懒没有带笔,顺手借过笔想...
    灿灿_e560阅读 438评论 6 2