Skip to content

Commit

Permalink
Polish in FluxWriter and MustacheView
Browse files Browse the repository at this point in the history
  • Loading branch information
rstoyanchev authored and Dave Syer committed May 21, 2019
1 parent 8b75b8f commit a26158c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;

/**
* A {@link Writer} that can write a {@link Flux} (or {@link Publisher}) to a data buffer.
Expand All @@ -37,47 +37,46 @@
*/
class FluxWriter extends Writer {

private final Supplier<DataBuffer> factory;
private final DataBufferFactory factory;

private final Charset charset;

private List<String> current = new ArrayList<>();

private List<Object> accumulated = new ArrayList<>();

FluxWriter(Supplier<DataBuffer> factory) {
this(factory, Charset.defaultCharset());
}

FluxWriter(Supplier<DataBuffer> factory, Charset charset) {
FluxWriter(DataBufferFactory factory, Charset charset) {
this.factory = factory;
this.charset = charset;
}

public Publisher<? extends Publisher<? extends DataBuffer>> getBuffers() {
@SuppressWarnings("unchecked")
public Flux<? extends Publisher<? extends DataBuffer>> getBuffers() {
Flux<String> buffers = Flux.empty();
if (!this.current.isEmpty()) {
this.accumulated.add(new ArrayList<>(this.current));
this.current.clear();
}
List<String> chunks = new ArrayList<>();
for (Object thing : this.accumulated) {
if (thing instanceof Publisher) {
@SuppressWarnings("unchecked")
Publisher<String> publisher = (Publisher<String>) thing;
buffers = buffers.concatWith(publisher);
buffers = concatValues(chunks, buffers);
buffers = buffers.concatWith((Publisher<String>) thing);
}
else {
@SuppressWarnings("unchecked")
List<String> list = (List<String>) thing;
buffers = buffers.concatWithValues(list.toArray(new String[0]));
chunks.add((String) thing);
}
}
return buffers.map((string) -> Mono.just(buffer().write(string, this.charset)));
buffers = concatValues(chunks, buffers);
return buffers.map((string) -> Mono.fromCallable(() ->
this.factory.allocateBuffer().write(string, this.charset)));
}

private Flux<String> concatValues(List<String> chunks, Flux<String> buffers) {
if (!chunks.isEmpty()) {
buffers = buffers.concatWithValues(chunks.toArray(new String[0]));
chunks.clear();
}
return buffers;
}

@Override
public void write(char[] cbuf, int off, int len) throws IOException {
this.current.add(new String(cbuf, off, len));
this.accumulated.add(new String(cbuf, off, len));
}

@Override
Expand All @@ -92,23 +91,8 @@ public void release() {
// TODO: maybe implement this and call it on error
}

private DataBuffer buffer() {
return this.factory.get();
}

public void write(Object thing) {
if (thing instanceof Publisher) {
if (!this.current.isEmpty()) {
this.accumulated.add(new ArrayList<>(this.current));
this.current.clear();
}
this.accumulated.add(thing);
}
else {
if (thing instanceof String) {
this.current.add((String) thing);
}
}
this.accumulated.add(thing);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import org.springframework.core.io.Resource;
import org.springframework.http.MediaType;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.web.reactive.result.view.AbstractUrlBasedView;
import org.springframework.web.reactive.result.view.View;
import org.springframework.web.server.ServerWebExchange;
Expand Down Expand Up @@ -101,8 +102,8 @@ protected Mono<Void> renderInternal(Map<String, Object> model, MediaType content
}
boolean sse = MediaType.TEXT_EVENT_STREAM.isCompatibleWith(contentType);
Charset charset = getCharset(contentType).orElse(getDefaultCharset());
FluxWriter writer = new FluxWriter(
() -> exchange.getResponse().bufferFactory().allocateBuffer(), charset);
ServerHttpResponse response = exchange.getResponse();
FluxWriter writer = new FluxWriter(response.bufferFactory(), charset);
Mono<Template> rendered;
if (!this.cache || !templates.containsKey(resource)) {
rendered = Mono.fromCallable(() -> compile(resource))
Expand All @@ -119,11 +120,10 @@ protected Mono<Void> renderInternal(Map<String, Object> model, MediaType content
else {
map = model;
}
rendered = rendered.doOnSuccess((template) -> template.execute(map, writer));
return rendered
.thenEmpty(Mono.defer(() -> exchange.getResponse()
.writeAndFlushWith(Flux.from(writer.getBuffers()))))
.doOnTerminate(() -> close(writer));
return rendered.flatMap((template) -> {
template.execute(map, writer);
return response.writeAndFlushWith(writer.getBuffers());
}).doOnTerminate(() -> close(writer));
}

private void close(FluxWriter writer) {
Expand Down Expand Up @@ -199,8 +199,7 @@ public void execute(Fragment frag, Writer out) throws IOException {
if (out instanceof FluxWriter) {
FluxWriter fluxWriter = (FluxWriter) out;
fluxWriter.flush();
fluxWriter.write(Flux.from(this.publisher)
.map((value) -> frag.execute(value)));
fluxWriter.write(Flux.from(this.publisher).map(frag::execute));
}
}
catch (IOException ex) {
Expand Down

0 comments on commit a26158c

Please sign in to comment.