Skip to content

Commit

Permalink
Add dep conf events (#66)
Browse files Browse the repository at this point in the history
  • Loading branch information
KineticCookie authored Oct 9, 2020
1 parent d110e68 commit dbe4b8a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 6 deletions.
5 changes: 4 additions & 1 deletion src/main/scala/io/hydrosphere/serving/manager/App.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import io.hydrosphere.serving.manager.config.ManagerConfiguration
import io.hydrosphere.serving.manager.domain.application.ApplicationEvents
import io.hydrosphere.serving.manager.domain.application.migrations.ApplicationMigrationTool
import io.hydrosphere.serving.manager.domain.clouddriver.CloudDriver
import io.hydrosphere.serving.manager.domain.deploy_config.DeploymentConfigurationEvents
import io.hydrosphere.serving.manager.domain.image.ImageRepository
import io.hydrosphere.serving.manager.domain.model_version.ModelVersionEvents
import io.hydrosphere.serving.manager.domain.monitoring.MetricSpecEvents
Expand Down Expand Up @@ -83,6 +84,7 @@ object App {
modelPubSub <- Resource.liftF(ModelVersionEvents.makeTopic)
servablePubSub <- Resource.liftF(ServableEvents.makeTopic)
monitoringPubSub <- Resource.liftF(MetricSpecEvents.makeTopic)
depPubSub <- Resource.liftF(DeploymentConfigurationEvents.makeTopic)
core <- {
implicit val rng = rngF
implicit val cd = cloudDriver
Expand All @@ -91,6 +93,7 @@ object App {
implicit val (modelPub, modelSub) = modelPubSub
implicit val (servablePub, servableSub) = servablePubSub
implicit val (metricPub, metricSub) = monitoringPubSub
implicit val (depPub, depSUb) = depPubSub
implicit val hsRepo = new DBDeploymentConfigurationRepository()
implicit val modelRepo = DBModelRepository.make()
implicit val modelVersionRepo = DBModelVersionRepository.make()
Expand Down Expand Up @@ -122,7 +125,7 @@ object App {
appController = new ApplicationController[F](core.appService)
hsController = new HostSelectorController[F]
servableController = new ServableController[F](core.servableService, cloudDriver)
sseController = new SSEController[F](appPubSub._2, modelPubSub._2, servablePubSub._2, monitoringPubSub._2)
sseController = new SSEController[F](appPubSub._2, modelPubSub._2, servablePubSub._2, monitoringPubSub._2, depPubSub._2)
monitoringController = new MonitoringController[F](core.monitoringService, core.repos.monitoringRepository)
depConfController = new DeploymentConfigController[F](core.deploymentConfigService)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import io.hydrosphere.serving.manager.api.http.controller.application.Applicatio
import io.hydrosphere.serving.manager.api.http.controller.servable.ServableView
import io.hydrosphere.serving.manager.discovery._
import io.hydrosphere.serving.manager.domain.application.ApplicationEvents
import io.hydrosphere.serving.manager.domain.deploy_config
import io.hydrosphere.serving.manager.domain.deploy_config.DeploymentConfigurationEvents
import io.hydrosphere.serving.manager.domain.model_version.ModelVersionEvents
import io.hydrosphere.serving.manager.domain.monitoring.MetricSpecEvents
import io.hydrosphere.serving.manager.domain.servable.ServableEvents
Expand All @@ -27,7 +29,8 @@ class SSEController[F[_]](
applicationSubscriber: ApplicationEvents.Subscriber[F],
modelSubscriber: ModelVersionEvents.Subscriber[F],
servableSubscriber: ServableEvents.Subscriber[F],
metricSpecSubscriber: MetricSpecEvents.Subscriber[F]
metricSpecSubscriber: MetricSpecEvents.Subscriber[F],
depSubscriber: DeploymentConfigurationEvents.Subscriber[F]
)(
implicit F: ConcurrentEffect[F],
cs: ContextShift[F],
Expand All @@ -53,7 +56,10 @@ class SSEController[F[_]](
val msSSE = metricSpecSubscriber.subscribe
.flatMap(x => fs2.Stream.emits(SSEController.fromMetricSpecDiscovery(x)))

val joined = appsSSE merge modelSSE merge servableSSE merge msSSE
val depSSE = depSubscriber.subscribe
.flatMap(x => fs2.Stream.emits(SSEController.fromDepConfDiscovery(x)))

val joined = appsSSE merge modelSSE merge servableSSE merge msSSE merge depSSE

Source.fromGraph(joined.toSource)
.keepAlive(5.seconds, () => ServerSentEvent.heartbeat)
Expand All @@ -71,6 +77,26 @@ class SSEController[F[_]](
}

object SSEController extends CompleteJsonProtocol {
def fromDepConfDiscovery(x: DeploymentConfigurationEvents.Event): List[ServerSentEvent] = {
x match {
case DiscoveryEvent.Initial => Nil
case DiscoveryEvent.ItemUpdate(items) =>
items.map { s =>
ServerSentEvent(
data = s.toJson.compactPrint,
`type` = "DeploymentConfigurationUpdate"
)
}
case DiscoveryEvent.ItemRemove(items) =>
items.map { s =>
ServerSentEvent(
data = s,
`type` = "DeploymentConfigurationRemove"
)
}
}
}

def fromServableDiscovery(x: ServableEvents.Event): List[ServerSentEvent] = {
x match {
case DiscoveryEvent.Initial => Nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.hydrosphere.serving.manager.domain

import io.hydrosphere.serving.manager.discovery.DiscoveryService
import io.hydrosphere.serving.manager.infrastructure.protocol.CompleteJsonProtocol._
import io.hydrosphere.serving.manager.infrastructure.protocol.PlayJsonAdapter._
import skuber.Pod.{Affinity, Toleration}
Expand Down Expand Up @@ -68,4 +69,5 @@ package object deploy_config {
implicit val format: RootJsonFormat[DeploymentConfiguration] = jsonFormat5(DeploymentConfiguration.apply)
}

object DeploymentConfigurationEvents extends DiscoveryService[DeploymentConfiguration, String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import spray.json._

import scala.reflect.runtime.universe.TypeTag

class DBDeploymentConfigurationRepository[F[_]]()(implicit F: Bracket[F, Throwable], tx: Transactor[F]) extends DeploymentConfigurationRepository[F] {
class DBDeploymentConfigurationRepository[F[_]]()(implicit F: Bracket[F, Throwable], tx: Transactor[F], pub: DeploymentConfigurationEvents.Publisher[F]) extends DeploymentConfigurationRepository[F] {
override def create(entity: DeploymentConfiguration): F[DeploymentConfiguration] = {
insertQ(entity).run.transact(tx).as(entity)
insertQ(entity).run.transact(tx).as(entity).flatTap(pub.update)
}

override def get(name: String): F[Option[DeploymentConfiguration]] = {
Expand All @@ -28,7 +28,7 @@ class DBDeploymentConfigurationRepository[F[_]]()(implicit F: Bracket[F, Throwab
}

override def delete(name: String): F[Int] = {
deleteQ(name).run.transact(tx)
deleteQ(name).run.transact(tx).flatTap(_ => pub.remove(name))
}
}

Expand Down

0 comments on commit dbe4b8a

Please sign in to comment.