forked from akka/alpakka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReferenceTest.java
156 lines (126 loc) · 4.77 KB
/
ReferenceTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
/*
* Copyright (C) 2016-2018 Lightbend Inc. <http://www.lightbend.com>
*/
/*
* Start package with 'docs' prefix when testing APIs as a user.
* This prevents any visibility issues that may be hidden.
*/
package docs.javadsl;
import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.alpakka.reference.Authentication;
import akka.stream.alpakka.reference.ReferenceReadMessage;
import akka.stream.alpakka.reference.ReferenceWriteMessage;
import akka.stream.alpakka.reference.SourceSettings;
import akka.stream.alpakka.reference.javadsl.Reference;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
import akka.util.ByteString;
import org.junit.*;
import java.util.*;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
/** Append "Test" to every Java test suite. */
public class ReferenceTest {
static ActorSystem sys;
static Materializer mat;
static final String clientId = "test-client-id";
/** Called before test suite. */
@BeforeClass
public static void setUpBeforeClass() {
sys = ActorSystem.create("ReferenceTest");
mat = ActorMaterializer.create(sys);
}
/** Called before every test. */
@Before
public void setUp() {}
@Test
public void settingsCompilationTest() {
final Authentication.Provided providedAuth =
Authentication.createProvided().withVerifierPredicate(c -> true);
final Authentication.None noAuth = Authentication.createNone();
final SourceSettings settings = SourceSettings.create(clientId);
settings.withAuthentication(providedAuth);
settings.withAuthentication(noAuth);
}
@Test
public void sourceCompilationTest() {
// #source
final SourceSettings settings = SourceSettings.create(clientId);
final Source<ReferenceReadMessage, CompletionStage<Done>> source = Reference.source(settings);
// #source
}
@Test
public void flowCompilationTest() {
// #flow
final Flow<ReferenceWriteMessage, ReferenceWriteMessage, NotUsed> flow = Reference.flow();
// #flow
final Executor ex = Executors.newCachedThreadPool();
final Flow<ReferenceWriteMessage, ReferenceWriteMessage, NotUsed> flow2 =
Reference.flowAsyncMapped(ex);
}
@Test
public void testSource() throws Exception {
final Source<ReferenceReadMessage, CompletionStage<Done>> source =
Reference.source(SourceSettings.create(clientId));
final CompletionStage<ReferenceReadMessage> stage = source.runWith(Sink.head(), mat);
final ReferenceReadMessage msg = stage.toCompletableFuture().get();
Assert.assertEquals(Collections.singletonList(ByteString.fromString("one")), msg.getData());
final Integer expected = 100;
Assert.assertEquals(expected, msg.getBytesRead());
Assert.assertNull(msg.getBytesReadFailure());
}
@Test
public void testFlow() throws Exception {
final Flow<ReferenceWriteMessage, ReferenceWriteMessage, NotUsed> flow = Reference.flow();
Map<String, Long> metrics =
new HashMap<String, Long>() {
{
put("rps", 20L);
put("rpm", Long.valueOf(30L));
}
};
final Source<ReferenceWriteMessage, NotUsed> source =
Source.from(
Arrays.asList(
ReferenceWriteMessage.create()
.withData(Collections.singletonList(ByteString.fromString("one")))
.withMetrics(metrics),
ReferenceWriteMessage.create()
.withData(
Arrays.asList(
ByteString.fromString("two"),
ByteString.fromString("three"),
ByteString.fromString("four")))));
final CompletionStage<List<ReferenceWriteMessage>> stage =
source.via(flow).runWith(Sink.seq(), mat);
final List<ReferenceWriteMessage> result = stage.toCompletableFuture().get();
final List<ByteString> bytes =
result.stream().flatMap(m -> m.getData().stream()).collect(Collectors.toList());
Assert.assertEquals(
Arrays.asList(
ByteString.fromString("one"),
ByteString.fromString("two"),
ByteString.fromString("three"),
ByteString.fromString("four")),
bytes);
final long actual = result.stream().findFirst().get().getMetrics().get("total");
Assert.assertEquals(50L, actual);
}
/** Called after every test. */
@After
public void tearDown() {}
/** Called after test suite. */
@AfterClass
public static void tearDownAfterClass() {
TestKit.shutdownActorSystem(sys);
}
}