Skip to content

Commit 9655df9

Browse files
committed
first push
0 parents  commit 9655df9

28 files changed

+1223
-0
lines changed

Diff for: .gitignore

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.idea
2+
.vscode
3+
.bsp
4+
.metals
5+
metals.sbt
6+
.bloop
7+
target
8+
.scala-build

Diff for: .scalafmt.conf

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
version = "3.7.3"
2+
runner.dialect = scala3
3+
align.preset = most
4+
assumeStandardLibraryStripMargin = true
5+
align.stripMargin = true

Diff for: README.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# simple-messaging-service
2+
3+
A messaging service backed by RabbitMQ which supports delayed sends.
4+
5+
Current status is roughed in MVP.
6+
7+
General concept mappings are topics = exchanges, subscriptions = queues.
8+
9+
Currently, all reads are acking - to be improved. Messages can be read
10+
one-at-a-time over REST, or streamed via websocket connection.

Diff for: build.sbt

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Global / onChangedBuildSource := ReloadOnSourceChanges
2+
3+
ThisBuild / scalaVersion := "3.3.0"
4+
ThisBuild / organization := "works.scala"
5+
ThisBuild / scalacOptions ++= Seq()
6+
ThisBuild / testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")
7+
8+
lazy val root = project
9+
.in(file("."))
10+
.settings(
11+
name := "simple-messaging-service",
12+
libraryDependencies ++= Dependencies.server,
13+
fork := true
14+
)
15+
16+
addCommandAlias("fmt", "all root/scalafmtSbt root/scalafmtAll")
17+
addCommandAlias("fmtCheck", "all root/scalafmtSbtCheck root/scalafmtCheckAll")

Diff for: docker-compose.yaml

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
version: '3'
2+
services:
3+
rmq:
4+
image: rabbitmq:3.12-management
5+
ports:
6+
- "5672:5672"
7+
- "15672:15672"

Diff for: project/Dependencies.scala

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
import sbt._
2+
3+
object Dependencies {
4+
5+
object Versions {
6+
val zio = "2.0.15"
7+
val zioConfig = "3.0.7"
8+
val zioLogging = "2.1.12"
9+
val zioMock = "1.0.0-RC9"
10+
val zioTelemetry = "3.0.0-RC10"
11+
val zioMetricsConnector = "2.0.8"
12+
val tapir = "1.4.0"
13+
val tapirZioJson = "1.3.0"
14+
val tapirZioMetrics = "1.4.0"
15+
val logback = "1.4.7"
16+
val caliban = "2.2.1"
17+
val rmq = "5.18.0"
18+
val hop = "5.0.0"
19+
}
20+
21+
val server: Seq[ModuleID] = Seq(
22+
"dev.zio" %% "zio" % Versions.zio,
23+
"dev.zio" %% "zio-streams" % Versions.zio,
24+
"dev.zio" %% "zio-test" % Versions.zio,
25+
"dev.zio" %% "zio-test-sbt" % Versions.zio,
26+
"dev.zio" %% "zio-test-magnolia" % Versions.zio % Test,
27+
"dev.zio" %% "zio-mock" % Versions.zioMock % Test,
28+
"dev.zio" %% "zio-config" % Versions.zioConfig,
29+
"dev.zio" %% "zio-config-magnolia" % Versions.zioConfig,
30+
"dev.zio" %% "zio-config-typesafe" % Versions.zioConfig,
31+
"dev.zio" %% "zio-logging" % Versions.zioLogging,
32+
"dev.zio" %% "zio-logging-slf4j2" % Versions.zioLogging,
33+
"dev.zio" %% "zio-opentelemetry" % Versions.zioTelemetry,
34+
"dev.zio" %% "zio-metrics-connectors" % Versions.zioMetricsConnector,
35+
"ch.qos.logback" % "logback-classic" % Versions.logback,
36+
"com.softwaremill.sttp.tapir" %% "tapir-zio" % Versions.tapir,
37+
"com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % Versions.tapir,
38+
"com.softwaremill.sttp.tapir" %% "tapir-swagger-ui-bundle" % Versions.tapir,
39+
"com.softwaremill.sttp.tapir" %% "tapir-zio-metrics" % Versions.tapirZioMetrics,
40+
"com.softwaremill.sttp.tapir" %% "tapir-json-zio" % Versions.tapirZioJson,
41+
"com.github.ghostdogpr" %% "caliban" % Versions.caliban,
42+
"com.github.ghostdogpr" %% "caliban-zio-http" % Versions.caliban,
43+
"com.github.ghostdogpr" %% "caliban-tapir" % Versions.caliban,
44+
"com.github.ghostdogpr" %% "caliban-tracing" % Versions.caliban,
45+
"com.github.ghostdogpr" %% "caliban-tools" % Versions.caliban,
46+
"com.github.ghostdogpr" %% "caliban-client" % Versions.caliban,
47+
"com.rabbitmq" % "amqp-client" % Versions.rmq,
48+
"com.rabbitmq" % "http-client" % Versions.hop
49+
)
50+
51+
}

Diff for: project/build.properties

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version=1.9.0

Diff for: project/plugins.sbt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")

Diff for: src/main/resources/logback.xml

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<configuration>
2+
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
3+
<encoder>
4+
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
5+
</encoder>
6+
</appender>
7+
8+
<root level="debug">
9+
<appender-ref ref="STDOUT"/>
10+
</root>
11+
12+
<logger name="io.netty" level="ERROR"/>
13+
14+
</configuration>

Diff for: src/main/scala/works/scala/sss/Main.scala

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import zio.*
2+
import works.scala.sss.api.controllers.*
3+
import caliban.parsing.adt.OperationType.Subscription
4+
import sttp.tapir.swagger.bundle.SwaggerInterpreter
5+
import zio.http.*
6+
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
7+
import works.scala.sss.api.services.*
8+
import works.scala.sss.rmq.RMQ
9+
import works.scala.sss.extensions.Extensions.*
10+
11+
object Main extends ZIOAppDefault:
12+
13+
val makeRoutes = for {
14+
topics <- ZIO.service[TopicController]
15+
subscriptions <- ZIO.service[SubscriptionController]
16+
publishing <- ZIO.service[PublishingController]
17+
messages <- ZIO.service[MessageController]
18+
} yield {
19+
val combined = List(topics, subscriptions, publishing, messages)
20+
.flatMap(_.routes)
21+
val doc = SwaggerInterpreter().fromServerEndpoints(
22+
combined,
23+
"Simple Scheduling Service",
24+
"1"
25+
)
26+
combined ++ doc
27+
}
28+
29+
val program = for {
30+
_ <- ZIO.serviceWithZIO[InitService](_.initRmq)
31+
_ <-
32+
ZIO.serviceWithZIO[DelayService](_.consume).resurrect.ignore.forever.fork
33+
routes <- makeRoutes
34+
msg <- ZIO.service[MessageController]
35+
_ <- ZIO.logInfo("Server started: http://localhost:9000/docs")
36+
_ <- Server.install(
37+
ZioHttpInterpreter()
38+
.toHttp(routes)
39+
.withDefaultErrorResponse ++ msg.socketApp
40+
)
41+
_ <- ZIO.never
42+
} yield ExitCode.success
43+
44+
override def run: ZIO[Any & (ZIOAppArgs & Scope), Any, Any] =
45+
program
46+
.provide(
47+
Server.Config.default.port(9000).ulayer,
48+
Server.live,
49+
TopicController.layer,
50+
SubscriptionController.layer,
51+
PublishingController.layer,
52+
TopicServiceImpl.layer,
53+
SubscriptionServiceImpl.layer,
54+
PublishingServiceImpl.layer,
55+
ZLayer(RMQ.client),
56+
ZLayer(RMQ.connectionFactory),
57+
ZLayer.scoped {
58+
RMQ.connection
59+
},
60+
RMQ.Config("localhost", 5672, "guest", "guest").ulayer,
61+
InitServiceImpl.layer,
62+
DelayServiceImpl.layer,
63+
ConsumerServiceImpl.layer,
64+
MessageController.layer
65+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package works.scala.sss.api.controllers
2+
3+
import caliban.GraphQL
4+
import sttp.tapir.*
5+
import sttp.tapir.json.zio.jsonBody
6+
import sttp.tapir.server.ServerEndpoint
7+
import works.scala.sss.api.models.ApiError
8+
import zio.*
9+
10+
trait BaseController:
11+
val routes: List[ServerEndpoint[Any, Task]] = List.empty
12+
val graphs: List[GraphQL[Any]] = List.empty
13+
14+
extension [T](task: Task[T])
15+
def handleErrors: Task[Either[ApiError, T]] =
16+
task.mapError(e => ApiError(e.getMessage)).either
17+
18+
val baseEndpoint: Endpoint[Unit, Unit, ApiError, Unit, Any] =
19+
endpoint
20+
.errorOut(jsonBody[ApiError])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package works.scala.sss.api.controllers
2+
3+
import com.rabbitmq.client.Connection
4+
import sttp.tapir.*
5+
import sttp.tapir.json.zio.jsonBody
6+
import sttp.tapir.server.ServerEndpoint
7+
import works.scala.sss.api.models.MessageConsumeResponse
8+
import works.scala.sss.api.services.ConsumerService
9+
import works.scala.sss.rmq.RMQ
10+
import zio.http.ChannelEvent.*
11+
import zio.http.*
12+
import zio.http.socket.*
13+
import zio.stream.*
14+
import zio.*
15+
import zio.Duration.*
16+
import zio.http.ChannelEvent.UserEvent.HandshakeComplete
17+
import zio.http.Path.Segment.Root
18+
import works.scala.sss.extensions.Extensions.*
19+
20+
import java.util.UUID
21+
import scala.language.postfixOps
22+
23+
object MessageController:
24+
val layer: ZLayer[ConsumerService & Connection, Nothing, MessageController] =
25+
ZLayer {
26+
for {
27+
svc <- ZIO.service[ConsumerService]
28+
conn <- ZIO.service[Connection]
29+
} yield MessageController(svc, conn)
30+
}
31+
32+
case class MessageController(
33+
consumerService: ConsumerService,
34+
rmqConnection: Connection
35+
) extends BaseController:
36+
37+
private val consumeEndpoint =
38+
baseEndpoint
39+
.tag("messages")
40+
.in("messages")
41+
42+
val consume =
43+
consumeEndpoint
44+
.name("consume")
45+
.description(
46+
"Consume and auto-ack a single message from the queue if non-empty"
47+
)
48+
.in(path[String]("subscription"))
49+
.get
50+
.out(jsonBody[MessageConsumeResponse])
51+
.serverLogic(in => consumerService.ackingConsume(in).handleErrors)
52+
53+
private def socketLogic(subscription: String) = Http
54+
.collectZIO[WebSocketChannelEvent] {
55+
consumerService.handleWs(subscription, UUID.randomUUID().toString)
56+
}
57+
58+
val socketApp = Http.collectZIO[Request] {
59+
case Method.GET -> !! / "stream" / subscription => // Don't put at "messages" because conflict with tapir
60+
socketLogic(subscription).toSocketApp.toResponse
61+
}
62+
63+
override val routes: List[ServerEndpoint[Any, Task]] = List(
64+
consume
65+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package works.scala.sss.api.controllers
2+
3+
import sttp.tapir.*
4+
import sttp.tapir.json.zio.jsonBody
5+
import works.scala.sss.api.models.{
6+
PublishMessageRequest,
7+
PublishMessageResponse
8+
}
9+
import works.scala.sss.api.services.PublishingService
10+
import zio.*
11+
import sttp.tapir.server.ServerEndpoint
12+
13+
object PublishingController:
14+
val layer: ZLayer[PublishingService, Nothing, PublishingController] = ZLayer {
15+
ZIO.service[PublishingService].map(PublishingController.apply)
16+
}
17+
18+
case class PublishingController(publishingService: PublishingService)
19+
extends BaseController:
20+
21+
private val publishEndpoint =
22+
baseEndpoint
23+
.in("publish")
24+
.tag("publish")
25+
26+
val publishMessage =
27+
publishEndpoint
28+
.name("publishMessage")
29+
.description("Publish a message to a topic")
30+
.post
31+
.in(path[String]("topic"))
32+
.in(jsonBody[PublishMessageRequest])
33+
.out(jsonBody[PublishMessageResponse])
34+
.serverLogic(in =>
35+
publishingService.publishMessage(in._1, in._2).handleErrors
36+
)
37+
38+
override val routes: List[ServerEndpoint[Any, Task]] = List(
39+
publishMessage
40+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package works.scala.sss.api.controllers
2+
3+
import sttp.tapir.*
4+
import sttp.tapir.json.zio.jsonBody
5+
import works.scala.sss.api.models.{
6+
CreateSubscriptionRequest,
7+
CreateSubscriptionResponse,
8+
DeleteSubscriptionResponse,
9+
GetSubscriptionResponse,
10+
GetSubscriptionsResponse
11+
}
12+
import works.scala.sss.api.services.SubscriptionService
13+
import zio.*
14+
import sttp.tapir.server.ServerEndpoint
15+
16+
object SubscriptionController:
17+
val layer: ZLayer[SubscriptionService, Nothing, SubscriptionController] =
18+
ZLayer {
19+
ZIO.service[SubscriptionService].map(SubscriptionController.apply)
20+
}
21+
22+
case class SubscriptionController(subscriptionService: SubscriptionService)
23+
extends BaseController:
24+
25+
private val subEndpoint =
26+
baseEndpoint
27+
.tag("subscriptions")
28+
.in("subscriptions")
29+
30+
val createSubscription =
31+
subEndpoint
32+
.name("createSubscription")
33+
.description("Create a new subscription")
34+
.post
35+
.in(jsonBody[CreateSubscriptionRequest])
36+
.out(jsonBody[CreateSubscriptionResponse])
37+
.serverLogic(in =>
38+
subscriptionService.createSubscription(in).handleErrors
39+
)
40+
41+
val getSubscription =
42+
subEndpoint
43+
.name("getSubscription")
44+
.description("Get information about a subscription")
45+
.in(path[String]("id"))
46+
.out(jsonBody[GetSubscriptionResponse])
47+
.serverLogic(id => subscriptionService.getSubscription(id).handleErrors)
48+
49+
val getSubscriptions =
50+
subEndpoint
51+
.name("getSubscriptions")
52+
.description("Get all subscriptions")
53+
.out(jsonBody[GetSubscriptionsResponse])
54+
.serverLogic(_ => subscriptionService.getSubscriptions().handleErrors)
55+
56+
val deleteSubscription =
57+
subEndpoint
58+
.name("deleteSubscription")
59+
.description("Delete a subscription")
60+
.in(path[String]("name"))
61+
.out(jsonBody[DeleteSubscriptionResponse])
62+
.serverLogic(name =>
63+
subscriptionService.deleteSubscription(name).handleErrors
64+
)
65+
66+
override val routes: List[ServerEndpoint[Any, Task]] = List(
67+
createSubscription,
68+
getSubscription,
69+
getSubscriptions,
70+
deleteSubscription
71+
)

0 commit comments

Comments
 (0)