forked from akka/alpakka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReferenceSpec.scala
121 lines (97 loc) · 3.26 KB
/
ReferenceSpec.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.scaladsl
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.alpakka.reference.scaladsl.Reference
import akka.stream.alpakka.reference.{Authentication, ReferenceReadMessage, ReferenceWriteMessage, SourceSettings}
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.testkit.TestKit
import akka.util.ByteString
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
import scala.collection.immutable
import scala.concurrent.Future
/**
* Append "Spec" to every Scala test suite.
*/
class ReferenceSpec extends WordSpec with BeforeAndAfterAll with ScalaFutures with Matchers {
implicit val sys = ActorSystem("ReferenceSpec")
implicit val mat = ActorMaterializer()
final val ClientId = "test-client-id"
"reference connector" should {
/**
* Type annotations not generally needed on local variables.
* However it allows to check if the types are really what we want.
*/
"compile settings" in {
val providedAuth: Authentication.Provided =
Authentication.Provided().withVerifier(c => true)
val noAuth: Authentication.None =
Authentication.None
val settings: SourceSettings = SourceSettings(ClientId)
settings.withAuthentication(providedAuth)
settings.withAuthentication(noAuth)
}
"compile source" in {
// #source
val settings: SourceSettings = SourceSettings(ClientId)
val source: Source[ReferenceReadMessage, Future[Done]] =
Reference.source(settings)
// #source
}
"compile flow" in {
// #flow
val flow: Flow[ReferenceWriteMessage, ReferenceWriteMessage, NotUsed] =
Reference.flow()
// #flow
implicit val ec = scala.concurrent.ExecutionContext.global
val flow2: Flow[ReferenceWriteMessage, ReferenceWriteMessage, NotUsed] =
Reference.flow()
}
"run source" in {
val source = Reference.source(SourceSettings(ClientId))
val msg = source.runWith(Sink.head).futureValue
msg.data should contain theSameElementsAs Seq(ByteString("one"))
}
"run flow" in {
val flow = Reference.flow()
val source = Source(
immutable.Seq(
ReferenceWriteMessage()
.withData(immutable.Seq(ByteString("one")))
.withMetrics(Map("rps" -> 20L, "rpm" -> 30L)),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("two"),
ByteString("three"),
ByteString("four")
)
),
ReferenceWriteMessage().withData(
immutable.Seq(
ByteString("five"),
ByteString("six"),
ByteString("seven")
)
)
)
)
val result = source.via(flow).runWith(Sink.seq).futureValue
result.flatMap(_.data) should contain theSameElementsAs Seq(
"one",
"two",
"three",
"four",
"five",
"six",
"seven"
).map(ByteString.apply)
result.head.metrics.get("total") should contain(50L)
}
}
override def afterAll() =
TestKit.shutdownActorSystem(sys)
}