Skip to content

Commit 9577eb0

Browse files
filter_lua: Add chunk mode for processing multiple records
Documentation for fluent/fluent-bit#8478 Signed-off-by: Richard Treu <[email protected]>
1 parent a039f6f commit 9577eb0

File tree

1 file changed

+180
-0
lines changed

1 file changed

+180
-0
lines changed

pipeline/filters/lua.md

+180
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The plugin supports the following configuration parameters:
2121
| time\_as\_table | By default when the Lua script is invoked, the record timestamp is passed as a *floating number* which might lead to precision loss when it is converted back. If you desire timestamp precision, enabling this option will pass the timestamp as a Lua table with keys `sec` for seconds since epoch and `nsec` for nanoseconds. |
2222
| code | Inline LUA code instead of loading from a path via `script`. |
2323
| enable_flb_null| If enabled, null will be converted to flb_null in Lua. It is useful to prevent removing key/value since nil is a special value to remove key value from map in Lua. Default is false. |
24+
| chunk_mode | If enabled, a whole chunk will be sent to Lua script as a table of timestamps and records. It may be used for e.g. parallel execution inside Lua. Default is false. |
2425

2526
## Getting Started <a id="getting_started"></a>
2627

@@ -348,3 +349,182 @@ Configuration to get istio logs and apply response code filter to them.
348349
#### Output
349350

350351
In the output only the messages with response code 0 or greater than 399 are shown.
352+
353+
### Chunk mode
354+
355+
There is a `chunk_mode` for the Lua filter in Fluent Bit. This mode can be useful for cases like parallelization, particularly when utilizing Lua lanes.
356+
357+
#### Function Signature
358+
359+
The Lua functions associated with this mode accept only two arguments:
360+
361+
```
362+
function process_records(tag, records)
363+
```
364+
365+
#### Configuration
366+
367+
The configuration for the Lua filter using chunk mode looks like this:
368+
369+
```
370+
[FILTER]
371+
Name lua
372+
Match my_logs
373+
script lanes_example.lua
374+
call process_records
375+
chunk_mode On
376+
time_as_table On
377+
```
378+
#### Note
379+
380+
- This mode currently only supports `time_as_table` by default.
381+
- Records are always emitted; there is no return code to be set.
382+
383+
#### Return Table Format
384+
385+
The return table must maintain this format, i.e., a table of timestamp and record pairs.
386+
387+
| Timestamp | Record |
388+
|--------------------------|---------------------------------------|
389+
| { | { |
390+
| sec: <timestamp_sec>, | message: "your_dummy_log_message" |
391+
| nsec: <timestamp_nsec>| } |
392+
| } | |
393+
| | |
394+
| { | { |
395+
| sec: <timestamp_sec>, | message: "your_dummy_log_message" |
396+
| nsec: <timestamp_nsec>| } |
397+
| } | |
398+
399+
Please refer to the following examples to see how to build the return table.
400+
401+
#### Input Table Example
402+
403+
```
404+
function process_records(tag, records)
405+
if records and type(records) == "table" then
406+
for i, record_row in ipairs(records) do
407+
local timestamp = record_row.timestamp
408+
local record = record_row.record
409+
410+
print("Timestamp entry:", timestamp.sec, timestamp.nsec)
411+
print("Record entry:", record.message)
412+
end
413+
else
414+
print("Error: Invalid 'records' table or nil")
415+
end
416+
return records
417+
end
418+
```
419+
420+
421+
#### Parallelization Example
422+
423+
Ensure that you have Lua lanes installed (e.g. `luarocks install lanes`) and to set the path appropriately (`luarocks show lanes`) in your lua script.
424+
To inject multiple dummy messages at once, you can adjust the `Copies` parameter of the `dummy` input.
425+
Keep in mind that this example will create a new thread for every record in the chunk to keep the example simple.
426+
427+
```
428+
fluent-bit.conf:
429+
430+
[SERVICE]
431+
Flush 5
432+
Log_Level debug
433+
Daemon off
434+
HTTP_Server Off
435+
436+
[INPUT]
437+
Name dummy
438+
Tag my_logs
439+
Rate 1
440+
Copies 2
441+
Dummy {"message":"your_dummy_log_message"}
442+
443+
[FILTER]
444+
Name lua
445+
Match my_logs
446+
script lanes_example.lua
447+
call threads
448+
chunk_mode On
449+
time_as_table On
450+
451+
[OUTPUT]
452+
Name stdout
453+
Match my_logs
454+
```
455+
456+
```lua
457+
lanes_example.lua:
458+
459+
-- Specify path to Lua Lanes module
460+
-- Install via: 'luarocks install lanes'
461+
local lanes_path = "/usr/local/share/lua/5.1/lanes.lua"
462+
463+
-- Load Lanes lib
464+
local lanes = assert(loadfile(lanes_path))().configure()
465+
466+
-- Lua function that will be executed as separate threads
467+
local function process_log(timestamp, record)
468+
-- Add your CPU intensive code here
469+
print("Timestamp:", timestamp.sec, timestamp.nsec)
470+
print("Record:", record.message)
471+
472+
record.message = "Modified"
473+
return timestamp, record
474+
end
475+
476+
-- Entry function
477+
function threads(tag, records)
478+
print("LUA ")
479+
local thread_handles = {}
480+
local results = {}
481+
if records and type(records) == "table" then
482+
print("Number of incoming records:", #records)
483+
for i, log_event in ipairs(records) do
484+
local timestamp = log_event.timestamp
485+
local record = log_event.record
486+
487+
-- Use lanes.gen to create a new thread
488+
local thread = lanes.gen("*", process_log)(timestamp, record)
489+
490+
-- Store the thread handle
491+
table.insert(thread_handles, thread)
492+
end
493+
-- Wait for all threads to finish
494+
for _, thread in ipairs(thread_handles) do
495+
-- Get the result returned by each thread
496+
local modified_record = thread[2]
497+
local modified_timestamp = thread[1]
498+
local result = {timestamp = modified_timestamp, record = modified_record}
499+
table.insert(results, result)
500+
end
501+
print("All threads returned")
502+
else
503+
print("Error: Invalid or nil 'records' table.")
504+
end
505+
506+
return results
507+
end
508+
```
509+
510+
You should get a similar output:
511+
```
512+
...
513+
LUA
514+
Number of incoming records: 2
515+
Timestamp: 1707308482 97855348
516+
Record: your_dummy_log_message
517+
Timestamp: 1707308482 97664060
518+
Record: your_dummy_log_message
519+
All threads returned
520+
...
521+
[0] my_logs: [[1707308479.115073794, {}], {"message"=>"Modified"}]
522+
[1] my_logs: [[1707308479.147705065, {}], {"message"=>"Modified"}]
523+
[2] my_logs: [[1707308480.097053227, {}], {"message"=>"Modified"}]
524+
[3] my_logs: [[1707308480.097306893, {}], {"message"=>"Modified"}]
525+
[4] my_logs: [[1707308481.097325851, {}], {"message"=>"Modified"}]
526+
[5] my_logs: [[1707308481.097515912, {}], {"message"=>"Modified"}]
527+
[6] my_logs: [[1707308482.097664060, {}], {"message"=>"Modified"}]
528+
[7] my_logs: [[1707308482.097855348, {}], {"message"=>"Modified"}]
529+
...
530+
```

0 commit comments

Comments
 (0)