Akka Extension(扩展)
Akka Extension是Akka提供的一套可插拔的、用来增强Akka能力的机制,akka-cluster
等很多内建功能也是基于它实现的。同时,Akka Extension还提供了某种程度上的依赖管理功能,Fusion也基于它实现了**akka-fusion**框架的模块化管理。
Akka Extension提供了两个基本组件:Extension
和ExtensionId
。每个Akka扩展在同一个ActorSystem
内保证只加载一次,你可以选择按需加载,也可以选择随ActorSystem
创建时即加载。有关这部分的内容参考接下来的 从配置加载 。
这样,在你的应用中只需要全局保证一个ActorSystem即可,其它的服务、资源都可以通过 Akka Extension 来维护。同时,你可以很自然的在自己的Akka Extension实例内部引用其它的Akka Extension(需要保证它们都使用同一个ActorSystem)。 这可能是在不使用IOC(如Spring、Guice等)情况下最好的进行依赖管理机制之一。
从配置加载
akka {
# 用于为Akka制作的第三方扩展库,需要随ActorSystem一起加载。
# 若最终用户需要在`application.conf`中配置扩展,应使用`extensions`属性配置。
library-extensions = ${?akka.library-extensions} ["akka.serialization.SerializationExtension"]
# 在此列出需要加载的多个自定义扩展,需要使用类的全限定名。
extensions = []
}
构建扩展
akka-fusion在提供了 FusionExtension
帮助trait来构建Akka Extension。
trait FusionExtension extends Extension {
val classicSystem: ExtendedActorSystem
val configuration: Configuration = Configuration(classicSystem.settings.config)
def typedSystem: ActorSystem[Nothing] = classicSystem.toTyped
}
FusionExtension
在默认Extension
基础之上添加了ActorSystem[T]
和akka.actor.ActorSystem
引用,提供了Configuration
(对Lightbend Config的增强包装)。
通过Akka Extension来管理资源
接下来为FusionJdbc
作为示例,说明FusionExtension
是怎样来管理我们的数据库访问资源的。FusionJdbc
管理了一个或多个数据库连接池,连接池通过 HikariCP 实现。
class FusionJdbc private (override val classicSystem: ExtendedActorSystem) extends FusionExtension {
val components = new JdbcComponents(classicSystem)
FusionCore(classicSystem).shutdowns.beforeActorSystemTerminate("StopFusionJdbc") { () =>
components.closeAsync()(classicSystem.dispatcher)
}
def component: HikariDataSource = components.component
}
object FusionJdbc extends FusionExtensionId[FusionJdbc] {
override def createExtension(system: ExtendedActorSystem): FusionJdbc = new FusionJdbc(system)
}
FusionJdbc
将由Akka保证在同一个ActorSystem中只被实例化一次,就像Spring框架里的@Service
注解、Guice框架的Singleton
注解一样,它们都是 单例 。
final private[jdbc] class JdbcComponents(system: ExtendedActorSystem)
extends Components[HikariDataSource](JdbcConstants.PATH_DEFAULT) {
override def configuration: Configuration = Configuration(system.settings.config)
override protected def componentClose(c: HikariDataSource): Future[Done] = Future.successful {
c.close()
Done
}
override protected def createComponent(id: String): HikariDataSource =
JdbcUtils.createHikariDataSource(configuration.getConfig(id))
}
JdbcComponents
继承了Components
,Components
提供了一个保存同一类型组件的多个实例的优秀方案。它基于 Lightbend Config 实现了可配置化,通过构造函数传入的配置路径(id)来决定引用哪一个配置,并保存id的实例的对应关系。
请关注FusionCore(system).shutdowns.beforeActorSystemTerminate
这行代码,它使用CoordinatedShutdown
来协调资源的关闭,它将在ActorSystem
终止前关闭所有数据库连接池。更多内容请参阅: FusionCore#shutdowns
Components
Components
提供代码实现如下:
abstract class Components[T](DEFAULT_ID: String) extends StrictLogging {
protected val components = mutable.Map.empty[String, T]
def configuration: Configuration
protected def createComponent(id: String): T
protected def componentClose(c: T): Future[Done]
def component: T = lookup(DEFAULT_ID)
final def lookup(id: String): T = synchronized(lookupComponent(id))
protected def lookupComponent(id: String): T = components.getOrElseUpdate(id, createComponent(id))
final def register(id: String, other: T, replaceExists: Boolean = false): T =
synchronized(registerComponent(id, other, replaceExists))
protected def registerComponent(id: String, other: T, replaceExists: Boolean): T = {
require(id != DEFAULT_ID, s"id不能为默认配置ID,$id == $DEFAULT_ID")
val isReplace = configuration.getOrElse(id + ".replace-exists", replaceExists)
components.get(id).foreach {
case c if isReplace =>
try {
Await.ready(componentClose(c), 30.seconds)
} catch {
case e: Throwable =>
logger.error(s"registerComponent replace exists component 30s timeout error: ${e.toString};id: $id", e)
}
components.remove(id)
case _ =>
throw new IllegalAccessException(s"id重复,$id")
}
components.put(id, other)
other
}
def closeAsync()(implicit ec: ExecutionContext): Future[Done] = synchronized {
Future.sequence(components.valuesIterator.map(componentClose).toList).map(_ => Done)
}
}
通过Akka Extension来管理服务(依赖)
修定你有3个服务:
- FileService:统一的文件服务,如提供用户头像链接
- UserService:用户服务
- LoginService:实现用户登录、注册等业务逻辑
你可以如下定义3个服务
/*
* Copyright 2019 helloscala.com
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package docs.extension.customservice
import akka.actor.typed.ActorSystem
import fusion.common.extension.{ TypedExtension, TypedExtensionId }
import helloscala.common.exception.HSUnauthorizedException
import helloscala.common.util.{ DigestUtils, StringUtils }
import scala.concurrent.Future
case class LoginDTO(account: String, password: String)
case class LoginBO(id: String, nickname: String)
case class UserBO(id: String, nickname: String, avatarId: String, avatarUrl: String)
case class UserDO(id: String, nickname: String, avatarId: String, password: String, salt: String)
class UserRepository {
def findByAccount(account: String): Future[UserDO] =
Future.successful(UserDO(StringUtils.randomString(24), account, StringUtils.randomString(24), "password", "salt"))
def findById(id: String): Future[UserDO] =
Future.successful(UserDO(id, "用户", StringUtils.randomString(24), "password", "salt"))
}
class FileService private (override val typedSystem: ActorSystem[Nothing]) extends TypedExtension {
def findUrlById(fileId: String): Future[String] = Future.successful {
s"http://localhost:9999/file/$fileId.png"
}
}
object FileService extends TypedExtensionId[FileService] {
override def createExtension(system: ActorSystem[_]): FileService = new FileService(system)
}
class UserService private (override val typedSystem: ActorSystem[Nothing]) extends TypedExtension {
import typedSystem.executionContext
private val fileService = FileService(typedSystem)
private val userRepository = new UserRepository()
def findBOById(id: String): Future[UserBO] = {
userRepository.findById(id).flatMap { user =>
fileService.findUrlById(user.avatarId).map { url =>
UserBO(user.id, user.nickname, user.avatarId, url)
}
}
}
def findByAccount(account: String): Future[UserDO] = {
userRepository.findByAccount(account)
}
}
object UserService extends TypedExtensionId[UserService] {
override def createExtension(system: ActorSystem[_]): UserService = new UserService(system)
}
class LoginService private (override val typedSystem: ActorSystem[Nothing]) extends TypedExtension {
import typedSystem.executionContext
private val userService = UserService(typedSystem)
def login(dto: LoginDTO): Future[LoginBO] = {
userService.findByAccount(dto.account).map {
case user if user.password == DigestUtils.sha256Hex(dto.password + user.salt) =>
LoginBO(user.id, user.nickname)
case _ =>
throw HSUnauthorizedException("密码不匹配")
}
}
}
object LoginService extends TypedExtensionId[LoginService] {
override def createExtension(system: ActorSystem[_]): LoginService = new LoginService(system)
}
通过以上代码,你看到了怎样使用Akka Extension来实现服务的依赖管理。所有的服务之间只有一个显示依赖:ActorSystem
。因为我们的框架是基于Akka的,所以我们认为显示依赖ActorSystem
并不是一个问题。