Akka Message Distribution

Akka 并行消息处理测试

Akka 消息分发

使用 Akka 进行消息并行分发处理,
待处理消息 [A,B,C,D,E,F] 共6个
消息处理并行度 3
创建 一个 JobManagerActor 和 三个 JobExecutorActor, JobExecutorActor执行完分配的作业后向JobManagerActor请求下一个消息进行处理,直到所有的消息处理结束

测试代码

说明:
消息类: ConfMessage、ResJobMessage、ReqJobMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import akka.actor.{Actor, ActorLogging, ActorSystem, PoisonPill, Props}
import scala.collection.mutable.Queue

final case class ConfMessage(conf: String)
final case class ResJobMessage(jobConf: String)
case object ReqJobMessage

final case class JobExecutorActor(executorId: String) extends Actor with ActorLogging {
override def preStart(): Unit = {
log.info(s"JobExecutorActor[$executorId] preStart")
}
override def postStop(): Unit = {
log.info(s"JobExecutorActor[$executorId] postStop")
}
override def receive: Receive = {
case ResJobMessage(jobConf) =>
log.info(s"JobExecutorActor[$executorId] receive ResJobMessage[$jobConf] from JobManagerActor[${sender().path}]")
val time = scala.util.Random.nextInt(1000)
Thread.sleep(time)
log.info(s"JobExecutor finished job[$jobConf] in $time sss")
sender() ! ReqJobMessage
log.info(s"JobExecutorActor[$executorId] send ReqJobMessage[$ReqJobMessage] to JobManagerActor[${sender().path}]")
}
}

object JobExecutorActor {
def props(executorId: String):Props = Props(new JobExecutorActor(executorId))
}

final class JobManagerActor extends Actor with ActorLogging {
var maxExecutorNum = 0
var jobConfQueue = new Queue[String]

var totalJobNum = 0
var currentJobNum = 0
override def preStart(): Unit = {
log.info(s"JobManagerActor preStart")
}
override def postStop(): Unit = {
log.info(s"JobManagerActor postStop")
}

override def receive: Receive = {
case ConfMessage(conf) =>
log.info(s"${self.path}%10s receive ConfUrlMessage[$conf] from Main ActorSystem[${sender().path}]")

// get config by confUrlMessage
jobConfQueue ++= conf.split(",")
totalJobNum = jobConfQueue.size
maxExecutorNum = 3

for(executorID <- 0 until Math.min(maxExecutorNum,totalJobNum)){
val executorActor = context.actorOf(JobExecutorActor.props("JobExecutorActor-" + executorID.toString), "JobExecutorActor-" + executorID.toString)
context.watch(executorActor)
val jobConf = jobConfQueue.dequeue()
executorActor ! ResJobMessage(jobConf)
currentJobNum += 1
log.info(s"JobManagerActor send ResJobMessage[$jobConf] to JobExecutorActor[${executorActor.path}]")
}

case ReqJobMessage =>
log.info(s"JobManagerActor receive RequestJobMessage[$ReqJobMessage] from JobExecutorActor[${sender().path}]")
if(jobConfQueue.nonEmpty)
sender() ! ResJobMessage(jobConfQueue.dequeue())
else if(jobConfQueue.isEmpty && currentJobNum>0) {
sender() ! PoisonPill
context.unwatch(sender())
currentJobNum -= 1
log.info(s"JobManagerActor stop JobExecutorActor[${sender().path}] for all job has been send to JobExecutorActor currentJobNum-$currentJobNum")
if(currentJobNum == 0){
context.system.terminate()
log.info(s"JobManagerActor terminated for all job have been finished...")
}
}
}
}

object JobManagerActor {
def props: Props = Props(new JobManagerActor)
}
final object JobStart{
def main(args: Array[String]): Unit = {
//val confUrl: String = args(0)
val conf: String = "A,B,C,D,E,F"
val actorSystem:ActorSystem = ActorSystem("SparkETL")
val jobManagerActor = actorSystem.actorOf(JobManagerActor.props, "JobManagerActor")
jobManagerActor ! ConfMessage(conf)
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
pcliu@vmpcliu:~/tmp$ spark-submit --class JobStart /app/IntellijIdea/SparkETL/SparkETL4/target/scala-2.11/SparkDataImport-2.11.8-2.3.0-assembly-3.0.jar 
19/08/14 17:05:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[INFO] [08/14/2019 17:05:52.219] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor] JobManagerActor preStart
[INFO] [08/14/2019 17:05:52.228] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor] akka://SparkETL/user/JobManagerActor%10s receive ConfUrlMessage[A,B,C,D,E,F] from Main ActorSystem[akka://SparkETL/deadLetters]
[INFO] [08/14/2019 17:05:52.239] [SparkETL-akka.actor.default-dispatcher-2] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutorActor[JobExecutorActor-0] preStart
[INFO] [08/14/2019 17:05:52.254] [SparkETL-akka.actor.default-dispatcher-2] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutorActor[JobExecutorActor-0] receive ResJobMessage[A] from JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.257] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor] JobManagerActor send ResJobMessage[A] to JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-0]
[INFO] [08/14/2019 17:05:52.259] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutorActor[JobExecutorActor-1] preStart
[INFO] [08/14/2019 17:05:52.261] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutorActor[JobExecutorActor-1] receive ResJobMessage[B] from JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.263] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor] JobManagerActor send ResJobMessage[B] to JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-1]
[INFO] [08/14/2019 17:05:52.264] [SparkETL-akka.actor.default-dispatcher-3] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutorActor[JobExecutorActor-2] preStart
[INFO] [08/14/2019 17:05:52.265] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor] JobManagerActor send ResJobMessage[C] to JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-2]
[INFO] [08/14/2019 17:05:52.268] [SparkETL-akka.actor.default-dispatcher-3] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutorActor[JobExecutorActor-2] receive ResJobMessage[C] from JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.431] [SparkETL-akka.actor.default-dispatcher-2] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutor finished job[A] in 171 sss
[INFO] [08/14/2019 17:05:52.432] [SparkETL-akka.actor.default-dispatcher-2] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutorActor[JobExecutorActor-0] send ReqJobMessage[ReqJobMessage] to JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.436] [SparkETL-akka.actor.default-dispatcher-2] [akka://SparkETL/user/JobManagerActor] JobManagerActor receive RequestJobMessage[ReqJobMessage] from JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-0]
[INFO] [08/14/2019 17:05:52.437] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutorActor[JobExecutorActor-0] receive ResJobMessage[D] from JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.849] [SparkETL-akka.actor.default-dispatcher-3] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutor finished job[C] in 574 sss
[INFO] [08/14/2019 17:05:52.849] [SparkETL-akka.actor.default-dispatcher-3] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutorActor[JobExecutorActor-2] send ReqJobMessage[ReqJobMessage] to JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.851] [SparkETL-akka.actor.default-dispatcher-7] [akka://SparkETL/user/JobManagerActor] JobManagerActor receive RequestJobMessage[ReqJobMessage] from JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-2]
[INFO] [08/14/2019 17:05:52.851] [SparkETL-akka.actor.default-dispatcher-7] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutorActor[JobExecutorActor-2] receive ResJobMessage[E] from JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.854] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutor finished job[B] in 590 sss
[INFO] [08/14/2019 17:05:52.854] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutorActor[JobExecutorActor-1] send ReqJobMessage[ReqJobMessage] to JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.855] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor] JobManagerActor receive RequestJobMessage[ReqJobMessage] from JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-1]
[INFO] [08/14/2019 17:05:52.856] [SparkETL-akka.actor.default-dispatcher-6] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutorActor[JobExecutorActor-1] receive ResJobMessage[F] from JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.874] [SparkETL-akka.actor.default-dispatcher-7] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutor finished job[E] in 23 sss
[INFO] [08/14/2019 17:05:52.875] [SparkETL-akka.actor.default-dispatcher-7] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutorActor[JobExecutorActor-2] send ReqJobMessage[ReqJobMessage] to JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.875] [SparkETL-akka.actor.default-dispatcher-7] [akka://SparkETL/user/JobManagerActor] JobManagerActor receive RequestJobMessage[ReqJobMessage] from JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-2]
[INFO] [08/14/2019 17:05:52.892] [SparkETL-akka.actor.default-dispatcher-7] [akka://SparkETL/user/JobManagerActor] JobManagerActor stop JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] for all job has been send to JobExecutorActor currentJobNum-2
[INFO] [08/14/2019 17:05:52.903] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-2] JobExecutorActor[JobExecutorActor-2] postStop
[INFO] [08/14/2019 17:05:52.949] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutor finished job[D] in 511 sss
[INFO] [08/14/2019 17:05:52.950] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutorActor[JobExecutorActor-0] send ReqJobMessage[ReqJobMessage] to JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:52.950] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor] JobManagerActor receive RequestJobMessage[ReqJobMessage] from JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-0]
[INFO] [08/14/2019 17:05:52.950] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor] JobManagerActor stop JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] for all job has been send to JobExecutorActor currentJobNum-1
[INFO] [08/14/2019 17:05:52.960] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-0] JobExecutorActor[JobExecutorActor-0] postStop
[INFO] [08/14/2019 17:05:53.030] [SparkETL-akka.actor.default-dispatcher-6] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutor finished job[F] in 173 sss
[INFO] [08/14/2019 17:05:53.030] [SparkETL-akka.actor.default-dispatcher-6] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutorActor[JobExecutorActor-1] send ReqJobMessage[ReqJobMessage] to JobManagerActor[akka://SparkETL/user/JobManagerActor]
[INFO] [08/14/2019 17:05:53.030] [SparkETL-akka.actor.default-dispatcher-6] [akka://SparkETL/user/JobManagerActor] JobManagerActor receive RequestJobMessage[ReqJobMessage] from JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-1]
[INFO] [08/14/2019 17:05:53.031] [SparkETL-akka.actor.default-dispatcher-6] [akka://SparkETL/user/JobManagerActor] JobManagerActor stop JobExecutorActor[akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] for all job has been send to JobExecutorActor currentJobNum-0
[INFO] [08/14/2019 17:05:53.031] [SparkETL-akka.actor.default-dispatcher-5] [akka://SparkETL/user/JobManagerActor/JobExecutorActor-1] JobExecutorActor[JobExecutorActor-1] postStop
[INFO] [08/14/2019 17:05:53.031] [SparkETL-akka.actor.default-dispatcher-6] [akka://SparkETL/user/JobManagerActor] JobManagerActor terminated for all job have been finished...
[INFO] [08/14/2019 17:05:53.039] [SparkETL-akka.actor.default-dispatcher-4] [akka://SparkETL/user/JobManagerActor] JobManagerActor postStop
19/08/14 17:05:53 INFO ShutdownHookManager: Shutdown hook called
19/08/14 17:05:53 INFO ShutdownHookManager: Deleting directory /tmp/spark-c42b9741-c6f8-4b45-bc97-0b6c807b71f2
pcl