博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用
阅读量:7085 次
发布时间:2019-06-28

本文共 7070 字,大约阅读时间需要 23 分钟。

前言

在中,我们已经部署好了一个Spark的开发环境。 在中,我们已经写好了一个Spark的应用。 本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。

akka是什么?

akka的作用

akka的名字是action kernel的回文。根据官方定义:akka用于resilient elastic distributed real-time transaction processing。 个人理解是: resilient:是指对需求和安全性等方面(来自于外部的)的一种适应力(弹性)。 elastic:是指对资源利用方面的弹性。 因此,akka是一个满足需求弹性、资源分配弹性的分布式实时事务处理系统。 akka只是一个类库,一个工具,并没有提供一个平台。

akka的运行模式和用例

  • akka有两种运行模式:
    • As a library: 一个使用于web应用,把akka作为一个普通的jar包放到classpath或者WEB-INF/lib
    • As an application: 也称为micro system。
  • akka的用例 akka的用例很多,可以参照.

本文中的用例

在本文中,一个Spark + akka的环境里,akka被用于as an application模式下。 我们会创建一个akka工程,含有两个应用:

  • akka host application 建立一个actor system, 定义了所有的任务(actors)。等待客户端的请求。 部分actor使用了spark的云计算功能。 这是一个spark的应用。
  • akka client application 调用host application上特定的actor。

我们看出,这里我们把akka作为一个任务处理器,并通过spark来完成任务。

项目结构和文件说明

说明

这个工程包含了两个应用。 一个Consumer应用:CusomerApp:实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。 一个Producer应用:ProducerApp:实现了向Kafka集群发消息的功能。

文件结构

AkkaSampleApp    # 项目目录|-- build.bat    # build文件    |-- src    |-- main        |-- resources            |-- application.conf   # Akka Server应用的配置文件            |-- client.conf        # Akka Client应用的配置文件        |-- scala            |-- ClientActor.scala       # Akka Client的Actor:提供了一种调用Server Actor的方式。            |-- ClientApp.scala         # Akka Client应用            |-- ProductionReaper.scala  # Akka Shutdown pattern的实现者            |-- Reaper.scala            # Akka Shutdown pattern的Reaper抽象类            |-- ServerActor.scala       # Akka Server的Actor,提供一个求1到n的MapReduce计算。使用了Spark。            |-- ServerApp.scala         # Akka Server应用

构建工程目录

可以运行:

mkdir AkkaSampleAppmkdir -p /AkkaSampleApp/src/main/resourcesmkdir -p /AkkaSampleApp/src/main/scala

代码

build.sbt

name := "akka-sample-app" version := "1.0" scalaVersion := "2.11.8"scalacOptions += "-feature"scalacOptions += "-deprecation" scalacOptions += "-language:postfixOps" libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-actor" % "2.4.10", "com.typesafe.akka" %% "akka-remote" % "2.4.10", "org.apache.spark" %% "spark-core" % "2.0.0" ) resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"

application.conf

akka {  #loglevel = "DEBUG"  actor {    provider = "akka.remote.RemoteActorRefProvider"  }  remote {    enabled-transports = ["akka.remote.netty.tcp"]    netty.tcp {      hostname = "127.0.0.1"      port = 2552 } #log-sent-messages = on #log-received-messages = on } }

cient.conf

akka {  #loglevel = "DEBUG"  actor {    provider = "akka.remote.RemoteActorRefProvider"  }  remote {    enabled-transports = ["akka.remote.netty.tcp"]    netty.tcp {      hostname = "127.0.0.1"      port = 0 } #log-sent-messages = on #log-received-messages = on } }

注:port = 0表示这个端口号会自动生成一个。

ClientActor.scala

import akka.actor._import akka.event.Loggingclass ClientActor(serverPath: String) extends Actor { val log = Logging(context.system, this) val serverActor = context.actorSelection(serverPath) def receive = { case msg: String => log.info(s"ClientActor received message '$msg'") serverActor ! 10000L } }

ClientApp.scala

import com.typesafe.config.ConfigFactoryimport akka.actor._import akka.remote.RemoteScope import akka.util._ import java.util.concurrent.TimeUnit import scala.concurrent._ import scala.concurrent.duration._ object ClientApp { def main(args: Array[String]): Unit = { val system = ActorSystem("LocalSystem", ConfigFactory.load("client")) // get the remote actor via the server actor system's address val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552") val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress)))) // invoke the remote actor via a client actor. // val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor" // val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor") buildReaper(system, actor) // tell actor ! 10000L waitShutdown(system, actor) } private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = { import Reaper._ val reaper = system.actorOf(Props(classOf[ProductionReaper])) // Watch the action reaper ! WatchMe(actor) } private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = { // trigger the shutdown operation in ProductionReaper system.stop(actor) // wait to shutdown Await.result(system.whenTerminated, 60.seconds) } }

ProductionReaper.scala

当所有的Actor停止后,终止Actor System。

class ProductionReaper extends Reaper { // Shutdown def allSoulsReaped(): Unit = { context.system.terminate() } }

Reaper.scala

import akka.actor.{
Actor, ActorRef, Terminated}import scala.collection.mutable.ArrayBuffer object Reaper { // Used by others to register an Actor for watching case class WatchMe(ref: ActorRef) } abstract class Reaper extends Actor { import Reaper._ // Keep track of what we're watching val watched = ArrayBuffer.empty[ActorRef] // Derivations need to implement this method. It's the // hook that's called when everything's dead def allSoulsReaped(): Unit // Watch and check for termination final def receive = { case WatchMe(ref) => context.watch(ref) watched += ref case Terminated(ref) => watched -= ref if (watched.isEmpty) allSoulsReaped() } }

ServerActor.scala

提供一个求1到n平方和的MapReduce计算。

import akka.actor.Actorimport akka.actor.Propsimport akka.event.Logging import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf class ServerActor extends Actor { val log = Logging(context.system, this) def receive = { case n: Long => squareSum(n) } private def squareSum(n: Long): Long = { val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) val squareSum = sc.parallelize(1L until n).map { i => i * i }.reduce(_ + _) log.info(s"============== The square sum of $n is $squareSum. ==============") squareSum } }

ServerApp.scala

import scala.concurrent.duration._import com.typesafe.config.ConfigFactoryimport akka.actor.ActorSystem import akka.actor.Props object ServerApp { def main(args: Array[String]): Unit = { val system = ActorSystem("ServerActorSystem") val actor = system.actorOf(Props[ServerActor], name = "serverActor") } }

构建工程

进入目录AkkaSampleApp。运行:

sbt package

第一次运行时间会比较长。

测试应用

启动Spark服务

  • 启动spark集群master server
$SPARK_HOME/sbin/start-master.sh

master服务,默认会使用7077这个端口。可以通过其日志文件查看实际的端口号。

  • 启动spark集群slave server
$SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077

启动Akka Server应用

运行:

$SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ServerApp target/scala-2.11/akka-sample-app_2.11-1.0.jar

如果出现java.lang.NoClassDefFoundError错误, 请参照, 确保akka的包在Spark中设置好了。 注:可以使用Ctrl+C来中断这个Server应用。

启动Akka Client应用

新启动一个终端,运行:

java -classpath ./target/scala-2.11/akka-sample-app_2.11-1.0.jar:$AKKA_HOME/lib/akka/*:$SCALA_HOME/lib/* ClientApp# or# $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ClientApp target/scala-2.11/akka-sample-app_2.11-1.0.jar

然后:看看Server应用是否开始处理了。

总结

Server应用需要Spark的技术,因此,是在Spark环境中运行。 Clinet应用,可以是一个普通的Java应用。

下面请看

至此,我们已经写好了一个spark集群+akka+scala的应用。下一步请看:

参照

转载于:https://www.cnblogs.com/liuys635/p/11052854.html

你可能感兴趣的文章
查看 MySQL 用户权限
查看>>
统一预付费ECS资源到期日
查看>>
Nature:美国军方资助科研项目,AI植入大脑治疗心理疾病
查看>>
使用Iterator遍历Sheet(POI)验证及解释结果有序性
查看>>
HttpContext.Current.Cache 过期时间
查看>>
提问的智慧
查看>>
AIX平台上11.2 Grid Infrastructure RDBMS进程的user是grid用户?
查看>>
MySQL 存储过程常用SQL语句收集
查看>>
Java——基于java自身包实现消息系统间的通信(TCP/IP+NIO)
查看>>
理解dockerfile是如何工作的?
查看>>
Win10 UWP开发中的重复性静态UI绘制小技巧 1
查看>>
UWP入门(四)--设置控件样式
查看>>
驱动程序调试方法之printk——自制proc文件(二)
查看>>
win8双屏敲代码
查看>>
广域网成为“云计算”发展的瓶颈所在
查看>>
[WCF安全系列]绑定、安全模式与客户端凭证类型:NetNamedPipeBinding、NetTcpBinding与NetMsmqBinding...
查看>>
《Linux From Scratch》第三部分:构建LFS系统 第六章:安装基本的系统软件- 6.42. Perl-5.20.2...
查看>>
VC中分割文件路径的分割类
查看>>
2017年最佳开源网络监控工具
查看>>
关于CLR内存管理一些深层次的讨论[上篇]
查看>>