博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Scala使用Akka模拟RPC机制代码
阅读量:5966 次
发布时间:2019-06-19

本文共 4553 字,大约阅读时间需要 15 分钟。

 上代码:  另一个版本(自己加注释):

RemoteMessage.scala

trait RemoteMessage extends Serializable//Worker -> Mastercase class RegisterWorker(id: String, memory: Int, cores: Int) extends RemoteMessagecase class Heartbeat(id: String) extends RemoteMessage//Master -> Workercase class RegisteredWorker(masterUrl: String) extends RemoteMessage//Worker -> selfcase object SendHeartbeat// Master -> selfcase object CheckTimeOutWorker

WorkerInfo.scala

class WorkerInfo(val id: String, val memory: Int, val cores: Int) {  // 上一次心跳  var lastHeartbeatTime : Long = _}

 

Worker.scala

import java.util.UUIDimport akka.actor.{Actor, ActorSelection, ActorSystem, Props}import com.typesafe.config.ConfigFactoryimport scala.concurrent.duration._class Worker(val masterHost: String, val masterPort: Int, val memory: Int, val cores: Int) extends Actor{  var master : ActorSelection = _  val workerId = UUID.randomUUID().toString  val HEART_INTERVAL = 10000  //  override def preStart(): Unit = {    //跟Master建立连接    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")    //向Master发送注册消息    master ! RegisterWorker(workerId, memory, cores)  }  override def receive: Receive = {    case RegisteredWorker(masterUrl) => {      println(masterUrl)      //启动定时器发送心跳      import context.dispatcher      //多长时间后执行 单位,多长时间执行一次 单位, 消息的接受者(直接给master发不好, 先给自己发送消息, 以后可以做下判断, 什么情况下再发送消息), 信息      context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat)    }    case SendHeartbeat => {      println("send heartbeat to master")      master ! Heartbeat(workerId)    }  }}object Worker {  def main(args: Array[String]) {    val host = args(0)    val port = args(1).toInt    val masterHost = args(2)    val masterPort = args(3).toInt    val memory = args(4).toInt    val cores = args(5).toInt    // 准备配置    val configStr =      s"""         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname = "$host"         |akka.remote.netty.tcp.port = "$port"       """.stripMargin    val config = ConfigFactory.parseString(configStr)    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的    val actorSystem = ActorSystem("WorkerSystem", config)    actorSystem.actorOf(Props(new Worker(masterHost, masterPort, memory, cores)), "Worker")    actorSystem.awaitTermination()  }}

 

Master.scala

import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.ConfigFactoryimport scala.collection.mutableimport scala.concurrent.duration._class Master(val host: String, val port: Int) extends Actor {  // workerId -> WorkerInfo  val idToWorker = new mutable.HashMap[String, WorkerInfo]()  // WorkerInfo  val workers = new mutable.HashSet[WorkerInfo]() //使用set删除快, 也可用linkList  //超时检查的间隔  val CHECK_INTERVAL = 15000//这个时间间隔一定要大于心跳的时间间隔.  override def preStart(): Unit = {    println("preStart invoked")    //导入隐式转换    import context.dispatcher //使用timer太low了, 可以使用akka的, 使用定时器, 要导入这个包    context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker)  }  // 用于接收消息  override def receive: Receive = {    case RegisterWorker(id, memory, cores) => {      //判断一下,是不是已经注册过      if(!idToWorker.contains(id)){        //把Worker的信息封装起来保存到内存当中        val workerInfo = new WorkerInfo(id, memory, cores)        idToWorker(id) = workerInfo        workers += workerInfo        sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")//通知worker注册      }    }    case Heartbeat(id) => {      if(idToWorker.contains(id)){        val workerInfo = idToWorker(id)        //报活        val currentTime = System.currentTimeMillis()        workerInfo.lastHeartbeatTime = currentTime      }    }    case CheckTimeOutWorker => {      val currentTime = System.currentTimeMillis()      val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)      for(w <- toRemove) {        workers -= w        idToWorker -= w.id      }      println(workers.size)    }  }}object Master {  def main(args: Array[String]) {    val host = args(0)    val port = args(1).toInt    // 准备配置    val configStr =      s"""         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"         |akka.remote.netty.tcp.hostname = "$host"         |akka.remote.netty.tcp.port = "$port"       """.stripMargin    val config = ConfigFactory.parseString(configStr)    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的    val actorSystem = ActorSystem("MasterSystem", config)    //创建Actor    val master = actorSystem.actorOf(Props(new Master(host, port)), "Master")    actorSystem.awaitTermination()  }}

 

转载地址:http://yevax.baihongyu.com/

你可能感兴趣的文章
python MySQLdb安装和使用
查看>>
Java小细节
查看>>
poj - 1860 Currency Exchange
查看>>
chgrp命令
查看>>
Java集合框架GS Collections具体解释
查看>>
洛谷 P2486 BZOJ 2243 [SDOI2011]染色
查看>>
数值积分中的辛普森方法及其误差估计
查看>>
Web service (一) 原理和项目开发实战
查看>>
跑带宽度多少合适_跑步机选购跑带要多宽,你的身体早就告诉你了
查看>>
深入理解Java的接口和抽象类
查看>>
Javascript异步数据的同步处理方法
查看>>
iis6 zencart1.39 伪静态规则
查看>>
SQL Server代理(3/12):代理警报和操作员
查看>>
Linux备份ifcfg-eth0文件导致的网络故障问题
查看>>
2018年尾总结——稳中成长
查看>>
JFreeChart开发_用JFreeChart增强JSP报表的用户体验
查看>>
度量时间差
查看>>
通过jsp请求Servlet来操作HBASE
查看>>
Shell编程基础
查看>>
Shell之Sed常用用法
查看>>