Skip to content

Commit 002d489

Browse files
Backport PR #16968 to 8.16: Fix BufferedTokenizer to properly resume after a buffer full condition respecting the encoding of the input string (#16968) (#17021)
Backport PR #16968 to 8.16 branch, original message: ---- Permit to use effectively the tokenizer also in context where a line is bigger than a limit. Fixes an issues related to token size limit error, when the offending token was bigger than the input fragment in happened that the tokenzer wasn't unable to recover the token stream from the first delimiter after the offending token but messed things, loosing part of tokens. ## How solve the problem This is a second take to fix the processing of tokens from the tokenizer after a buffer full error. The first try #16482 was rollbacked to the encoding error #16694. The first try failed on returning the tokens in the same encoding of the input. This PR does a couple of things: - accumulates the tokens, so that after a full condition can resume with the next tokens after the offending one. - respect the encoding of the input string. Use `concat` method instead of `addAll`, which avoid to convert RubyString to String and back to RubyString. When return the head `StringBuilder` it enforce the encoding with the input charset. (cherry picked from commit 1c8cf54) Co-authored-by: Andrea Selva <[email protected]>
1 parent 32e6def commit 002d489

File tree

4 files changed

+426
-13
lines changed

4 files changed

+426
-13
lines changed

logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java

+88-13
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@
2323
import org.jruby.Ruby;
2424
import org.jruby.RubyArray;
2525
import org.jruby.RubyClass;
26+
import org.jruby.RubyEncoding;
2627
import org.jruby.RubyObject;
2728
import org.jruby.RubyString;
2829
import org.jruby.anno.JRubyClass;
2930
import org.jruby.anno.JRubyMethod;
3031
import org.jruby.runtime.ThreadContext;
3132
import org.jruby.runtime.builtin.IRubyObject;
33+
import org.jruby.util.ByteList;
3234
import org.logstash.RubyUtil;
3335

36+
import java.nio.charset.Charset;
37+
3438
@JRubyClass(name = "BufferedTokenizer")
3539
public class BufferedTokenizerExt extends RubyObject {
3640

@@ -40,10 +44,13 @@ public class BufferedTokenizerExt extends RubyObject {
4044
freeze(RubyUtil.RUBY.getCurrentContext());
4145

4246
private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
47+
private StringBuilder headToken = new StringBuilder();
4348
private RubyString delimiter = NEW_LINE;
4449
private int sizeLimit;
4550
private boolean hasSizeLimit;
4651
private int inputSize;
52+
private boolean bufferFullErrorNotified = false;
53+
private String encodingName;
4754

4855
public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
4956
super(runtime, metaClass);
@@ -80,23 +87,76 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
8087
@JRubyMethod
8188
@SuppressWarnings("rawtypes")
8289
public RubyArray extract(final ThreadContext context, IRubyObject data) {
90+
RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context);
91+
encodingName = encoding.getEncoding().getCharsetName();
8392
final RubyArray entities = data.convertToString().split(delimiter, -1);
93+
if (!bufferFullErrorNotified) {
94+
input.clear();
95+
input.concat(entities);
96+
} else {
97+
// after a full buffer signal
98+
if (input.isEmpty()) {
99+
// after a buffer full error, the remaining part of the line, till next delimiter,
100+
// has to be consumed, unless the input buffer doesn't still contain fragments of
101+
// subsequent tokens.
102+
entities.shift(context);
103+
input.concat(entities);
104+
} else {
105+
// merge last of the input with first of incoming data segment
106+
if (!entities.isEmpty()) {
107+
RubyString last = ((RubyString) input.pop(context));
108+
RubyString nextFirst = ((RubyString) entities.shift(context));
109+
entities.unshift(last.concat(nextFirst));
110+
input.concat(entities);
111+
}
112+
}
113+
}
114+
84115
if (hasSizeLimit) {
85-
final int entitiesSize = ((RubyString) entities.first()).size();
116+
if (bufferFullErrorNotified) {
117+
bufferFullErrorNotified = false;
118+
if (input.isEmpty()) {
119+
return RubyUtil.RUBY.newArray();
120+
}
121+
}
122+
final int entitiesSize = ((RubyString) input.first()).size();
86123
if (inputSize + entitiesSize > sizeLimit) {
87-
throw new IllegalStateException("input buffer full");
124+
bufferFullErrorNotified = true;
125+
headToken = new StringBuilder();
126+
String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize);
127+
inputSize = 0;
128+
input.shift(context); // consume the token fragment that generates the buffer full
129+
throw new IllegalStateException(errorMessage);
88130
}
89131
this.inputSize = inputSize + entitiesSize;
90132
}
91-
input.append(entities.shift(context));
92-
if (entities.isEmpty()) {
133+
134+
if (input.getLength() < 2) {
135+
// this is a specialization case which avoid adding and removing from input accumulator
136+
// when it contains just one element
137+
headToken.append(input.shift(context)); // remove head
93138
return RubyUtil.RUBY.newArray();
94139
}
95-
entities.unshift(input.join(context));
96-
input.clear();
97-
input.append(entities.pop(context));
98-
inputSize = ((RubyString) input.first()).size();
99-
return entities;
140+
141+
if (headToken.length() > 0) {
142+
// if there is a pending token part, merge it with the first token segment present
143+
// in the accumulator, and clean the pending token part.
144+
headToken.append(input.shift(context)); // append buffer to first element and
145+
// create new RubyString with the data specified encoding
146+
RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString());
147+
input.unshift(encodedHeadToken); // reinsert it into the array
148+
headToken = new StringBuilder();
149+
}
150+
headToken.append(input.pop(context)); // put the leftovers in headToken for later
151+
inputSize = headToken.length();
152+
return input;
153+
}
154+
155+
private RubyString toEncodedRubyString(ThreadContext context, String input) {
156+
// Depends on the encodingName being set by the extract method, could potentially raise if not set.
157+
RubyString result = RubyUtil.RUBY.newString(new ByteList(input.getBytes(Charset.forName(encodingName))));
158+
result.force_encoding(context, RubyUtil.RUBY.newString(encodingName));
159+
return result;
100160
}
101161

102162
/**
@@ -108,15 +168,30 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
108168
*/
109169
@JRubyMethod
110170
public IRubyObject flush(final ThreadContext context) {
111-
final IRubyObject buffer = input.join(context);
112-
input.clear();
171+
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
172+
headToken = new StringBuilder();
113173
inputSize = 0;
114-
return buffer;
174+
175+
// create new RubyString with the last data specified encoding, if exists
176+
RubyString encodedHeadToken;
177+
if (encodingName != null) {
178+
encodedHeadToken = toEncodedRubyString(context, buffer.toString());
179+
} else {
180+
// When used with TCP input it could be that on socket connection the flush method
181+
// is invoked while no invocation of extract, leaving the encoding name unassigned.
182+
// In such case also the headToken must be empty
183+
if (!buffer.toString().isEmpty()) {
184+
throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen");
185+
}
186+
encodedHeadToken = (RubyString) buffer;
187+
}
188+
189+
return encodedHeadToken;
115190
}
116191

117192
@JRubyMethod(name = "empty?")
118193
public IRubyObject isEmpty(final ThreadContext context) {
119-
return RubyUtil.RUBY.newBoolean(input.isEmpty() && (inputSize == 0));
194+
return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0));
120195
}
121196

122197
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.logstash.common;
21+
22+
import org.jruby.RubyArray;
23+
import org.jruby.RubyEncoding;
24+
import org.jruby.RubyString;
25+
import org.jruby.runtime.ThreadContext;
26+
import org.jruby.runtime.builtin.IRubyObject;
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
import org.logstash.RubyTestBase;
30+
import org.logstash.RubyUtil;
31+
32+
import java.util.List;
33+
34+
import static org.junit.Assert.assertEquals;
35+
import static org.junit.Assert.assertTrue;
36+
import static org.logstash.RubyUtil.RUBY;
37+
38+
@SuppressWarnings("unchecked")
39+
public final class BufferedTokenizerExtTest extends RubyTestBase {
40+
41+
private BufferedTokenizerExt sut;
42+
private ThreadContext context;
43+
44+
@Before
45+
public void setUp() {
46+
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
47+
context = RUBY.getCurrentContext();
48+
IRubyObject[] args = {};
49+
sut.init(context, args);
50+
}
51+
52+
@Test
53+
public void shouldTokenizeASingleToken() {
54+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\n"));
55+
56+
assertEquals(List.of("foo"), tokens);
57+
}
58+
59+
@Test
60+
public void shouldMergeMultipleToken() {
61+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo"));
62+
assertTrue(tokens.isEmpty());
63+
64+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("bar\n"));
65+
assertEquals(List.of("foobar"), tokens);
66+
}
67+
68+
@Test
69+
public void shouldTokenizeMultipleToken() {
70+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n"));
71+
72+
assertEquals(List.of("foo", "bar"), tokens);
73+
}
74+
75+
@Test
76+
public void shouldIgnoreEmptyPayload() {
77+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
78+
assertTrue(tokens.isEmpty());
79+
80+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar"));
81+
assertEquals(List.of("foo"), tokens);
82+
}
83+
84+
@Test
85+
public void shouldTokenizeEmptyPayloadWithNewline() {
86+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n"));
87+
assertEquals(List.of(""), tokens);
88+
89+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n"));
90+
assertEquals(List.of("", "", ""), tokens);
91+
}
92+
93+
@Test
94+
public void shouldNotChangeEncodingOfTokensAfterPartitioning() {
95+
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A
96+
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1"));
97+
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, rubyInput);
98+
99+
// read the first token, the £ string
100+
IRubyObject firstToken = tokens.shift(context);
101+
assertEquals("£", firstToken.toString());
102+
103+
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion
104+
RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding");
105+
assertEquals("ISO-8859-1", encoding.toString());
106+
}
107+
108+
@Test
109+
public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() {
110+
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character
111+
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1"));
112+
sut.extract(context, rubyInput);
113+
IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41})
114+
.force_encoding(context, RUBY.newString("ISO8859-1"));
115+
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, capitalAInLatin1);
116+
assertTrue(tokens.isEmpty());
117+
118+
tokens = (RubyArray<RubyString>)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A}));
119+
120+
// read the first token, the £ string
121+
IRubyObject firstToken = tokens.shift(context);
122+
assertEquals("£A", firstToken.toString());
123+
124+
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion
125+
RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding");
126+
assertEquals("ISO-8859-1", encoding.toString());
127+
}
128+
129+
@Test
130+
public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() {
131+
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A
132+
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1"));
133+
RubyArray<RubyString> tokens = (RubyArray<RubyString>)sut.extract(context, rubyInput);
134+
135+
// read the first token, the £ string
136+
IRubyObject firstToken = tokens.shift(context);
137+
assertEquals("£", firstToken.toString());
138+
139+
// flush and check that the remaining A is still encoded in ISO8859-1
140+
IRubyObject lastToken = sut.flush(context);
141+
assertEquals("A", lastToken.toString());
142+
143+
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion
144+
RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding");
145+
assertEquals("ISO-8859-1", encoding.toString());
146+
}
147+
148+
@Test
149+
public void givenDirectFlushInvocationUTF8EncodingIsApplied() {
150+
RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A
151+
IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1"));
152+
153+
// flush and check that the remaining A is still encoded in ISO8859-1
154+
IRubyObject lastToken = sut.flush(context);
155+
assertEquals("", lastToken.toString());
156+
157+
// verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion
158+
RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding");
159+
assertEquals("UTF-8", encoding.toString());
160+
}
161+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to Elasticsearch B.V. under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.logstash.common;
21+
22+
import org.jruby.RubyArray;
23+
import org.jruby.RubyString;
24+
import org.jruby.runtime.ThreadContext;
25+
import org.jruby.runtime.builtin.IRubyObject;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import org.logstash.RubyTestBase;
29+
import org.logstash.RubyUtil;
30+
31+
import java.util.List;
32+
33+
import static org.junit.Assert.assertEquals;
34+
import static org.junit.Assert.assertTrue;
35+
import static org.logstash.RubyUtil.RUBY;
36+
37+
@SuppressWarnings("unchecked")
38+
public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase {
39+
40+
private BufferedTokenizerExt sut;
41+
private ThreadContext context;
42+
43+
@Before
44+
public void setUp() {
45+
sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER);
46+
context = RUBY.getCurrentContext();
47+
IRubyObject[] args = {RubyUtil.RUBY.newString("||")};
48+
sut.init(context, args);
49+
}
50+
51+
@Test
52+
public void shouldTokenizeMultipleToken() {
53+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||"));
54+
55+
assertEquals(List.of("foo", "b|r"), tokens);
56+
}
57+
58+
@Test
59+
public void shouldIgnoreEmptyPayload() {
60+
RubyArray<RubyString> tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString(""));
61+
assertTrue(tokens.isEmpty());
62+
63+
tokens = (RubyArray<RubyString>) sut.extract(context, RubyUtil.RUBY.newString("foo||bar"));
64+
assertEquals(List.of("foo"), tokens);
65+
}
66+
}

0 commit comments

Comments
 (0)