diff --git a/Dockerfile.debug b/Dockerfile.debug index c58220408..35ba5fbae 100644 --- a/Dockerfile.debug +++ b/Dockerfile.debug @@ -29,6 +29,7 @@ RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build && rm -rf src # Build the actual binary COPY src ./src +COPY resources ./resources RUN cargo build # final stage diff --git a/Dockerfile.kafka b/Dockerfile.kafka index 108f74c75..dfe123751 100644 --- a/Dockerfile.kafka +++ b/Dockerfile.kafka @@ -42,6 +42,7 @@ RUN mkdir src && echo "fn main() {}" > src/main.rs && \ # Copy the actual source code COPY src ./src +COPY resources ./resources # Build the actual binary with kafka feature RUN cargo build --release --features kafka diff --git a/resources/formats.json b/resources/formats.json new file mode 100644 index 000000000..659ff1950 --- /dev/null +++ b/resources/formats.json @@ -0,0 +1,600 @@ +[ + { + "name": "access_log", + "regex": [ + { + "pattern": "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?) (?<c_ip>[^ ]+) (?<cs_username>[^ ]+) (?<cs_method>[A-Z]+)(?<cs_uri_stem>[^ \\?]+)(?:\\?(?<cs_uri_query>[^ ]*))? (?:-1|\\d+) (?<sc_status>\\d+) \\d+\\s*(?<body>.*)", + "fields": ["timestamp", "c_ip", "cs_username", "cs_method", "cs_uri_stem", "cs_uri_query", "sc_status", "body"] + }, + { + "pattern": "^(?P<c_ip>[\\w\\.:\\-]+)\\s+[\\w\\.\\-]+\\s+(?:-|(?P<cs_username>\\S+))\\s+\\[(?P<timestamp>[^\\]]+)\\] \"(?:\\-|(?P<cs_method>\\w+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+))\" (?P<sc_status>\\d+) (?P<sc_bytes>\\d+|-)(?: \"(?:-|(?P<cs_referer>[^\"]*))\" \"(?:-|(?P<cs_user_agent>[^\"]+))\")?\\s*(?P<body>.*)", + "fields": ["c_ip", "cs_username", "timestamp", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_bytes", "cs_referer", "cs_user_agent", "body"] + }, + { + "pattern": "^(?P<cs_host>[\\w\\-\\.]*)(?::\\d+)?\\s+(?P<c_ip>[\\w\\.:\\-]+)\\s+[\\w\\.\\-]+\\s+(?:-|(?P<cs_username>\\S+))\\s+\\[(?P<timestamp>[^\\]]+)\\] \"(?:\\-|(?P<cs_method>\\w+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+))\" (?P<sc_status>\\d+) (?P<sc_bytes>\\d+|-)(?: \"(?:-|(?P<cs_referer>[^\"]+))\" \"(?P<cs_user_agent>[^\"]+)\")?\\s*(?P<body>.*)", + "fields": ["cs_host", "c_ip", "cs_username", "timestamp", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_bytes", "cs_referer", "cs_user_agent", "body"] + }, + { + "pattern": "^(?P<c_ip>[\\w\\.:\\-]+)\\s+[\\w\\.\\-]+\\s+(?P<cs_username>\\S+)\\s+\"(?:\\-|(?P<cs_method>\\w+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+))\" (?P<sc_status>\\d+) (?P<sc_bytes>\\d+|-)(?: \"(?P<cs_referer>[^\"]+)\" \"(?P<cs_user_agent>[^\"]+)\")?\\s*(?P<body>.*)", + "fields": ["c_ip", "cs_username", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_bytes", "cs_referer", "cs_user_agent", "body"] + } + ] + }, + { + "name": "alb_log", + "regex": [ + { + "pattern": "^(?P<type>(http)|(https)|(h2)|(ws)|(wss)) (?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?P<elb>[^ ]+) (?P<client_ip>[\\w\\.:]+):(?P<client_port>\\d+) (?P<target_ip>[\\w\\.:]+):(?P<target_port>\\d+) (?P<request_processing_time>(-1)|(\\d+(\\.\\d+))?) (?P<target_processing_time>(-1)|(\\d+(\\.\\d+))?) (?P<response_processing_time>(-1)|(\\d+(\\.\\d+))?) (?P<elb_status_code>\\d+|-) (?P<target_status_code>\\d+|-) (?P<received_bytes>\\d+) (?P<sent_bytes>\\d+) \"(?:\\-|(?P<cs_method>\\w+|-) (?P<cs_uri_whole>(?P<cs_uri_stem>(?:(?P<cs_uri_scheme>https|http)?://)?(?:(?P<cs_uri_hostname>[^:]+):(?P<cs_uri_port>\\d+)?)?(?P<cs_uri_path>[^ \\?]+)?)(?:\\?(?P<cs_uri_query>[^ ]*))?) (?P<cs_version>[\\w/\\.]+|-)\\s*)\" \"(?P<user_agent>[^\"]+)\" (?P<ssl_cipher>[\\w-]+) (?P<ssl_protocol>[\\w\\.-]+) (?P<target_group_arn>[^ ]+) \"(?P<trace_id>[^ ]+)\" (?P<domain_name>[^ ]+) (?P<chosen_cert_arn>[^ ]+) ?(?P<matched_rule_priority>(-1)|\\b([0-9]|[1-8][0-9]|9[0-9]|[1-8][0-9]{2}|9[0-8][0-9]|99[0-9]|[1-8][0-9]{3}|9[0-8][0-9]{2}|99[0-8][0-9]|999[0-9]|[1-4][0-9]{4}|50000)\\b)?", + "fields": ["type", "timestamp", "elb", "client_ip", "client_port", "target_ip", "target_port", "request_processing_time", "target_processing_time", "response_processing_time", "elb_status_code", "target_status_code", "received_bytes", "sent_bytes", "cs_method", "cs_uri_whole", "cs_uri_stem", "cs_uri_scheme", "cs_uri_hostname", "cs_uri_port", "cs_uri_path", "cs_uri_query", "cs_version", "user_agent", "ssl_cipher", "ssl_protocol", "target_group_arn", "trace_id", "domain_name", "chosen_cert_arn", "matched_rule_priority"] + } + ] + }, + { + "name": "block_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\S{3,8} \\w{3}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2} \\w+ \\d{4})\\s*(?P<body>.*)", + "fields": ["timestamp", "body"] + }, + { + "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3,6})?(?:Z|[-+]\\d{2}:?\\d{2})?)\\]\\s*(?P<body>.*)", + "fields": ["timestamp", "body"] + } + ] + }, + { + "name": "candlepin_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) \\[(req=(?P<req>[0-9a-f-]+)|=), org=(?P<org>\\w*)\\] (?P<alert_level>\\w+) (?P<module>[\\w.]+) - (?P<body>.*)", + "fields": ["timestamp", "req", "org", "alert_level", "module", "body"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}[+-]\\d{4}) (?P<body>.*)", + "fields": ["timestamp", "body"] + } + ] + }, + { + "name": "choose_repo_log", + "regex": [ + { + "pattern": "^\\[(?P<level>\\w+):[^\\]]+] [^:]+:\\d+ (?P<timestamp>\\d{4}-\\d{2}-\\d{2}[T ]\\d{2}:\\d{2}:\\d{2}(?:[\\.,]\\d{3})?):(?P<body>.*)", + "fields": ["level", "timestamp", "body"] + } + ] + }, + { + "name": "cloudvm_ram_log", + "regex": [ + { + "pattern": "^========== Start of cloudvm ram size dump at (?P<timestamp>[^=]+) ==========(?P<body>.*)", + "fields": ["timestamp", "body"] + } + ] + }, + { + "name": "cups_log", + "regex": [ + { + "pattern": "^(?P<level>[IEW]) \\[(?P<timestamp>\\d{2}/\\S{3,8}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [+-]\\d{2,4})\\] (?P<section>\\w+): (?P<body>.*)", + "fields": ["level", "timestamp", "section", "body"] + }, + { + "pattern": "^(?P<level>[IEW]) \\[(?P<timestamp>\\d{2}/\\S{3,8}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [+-]\\d{2,4})\\](?P<body>.*)", + "fields": ["level", "timestamp", "body"] + } + ] + }, + { + "name": "dpkg_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}[T ]\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?) (?:(?:(?P<action>startup|status|configure|install|upgrade|trigproc|remove|purge)(?: (?P<status>config-files|failed-config|half-configured|half-installed|installed|not-installed|post-inst-failed|removal-failed|triggers-awaited|triggers-pending|unpacked))? (?P<package>[^ ]+) (?P<installed_version>[^ ]+)(?: (?P<available_version>[^ ]+))?)|update-alternatives: (?P<body>.*))", + "fields": ["timestamp", "action", "status", "package", "installed_version", "available_version", "body"] + } + ] + }, + { + "name": "elb_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?P<elb>[^ ]+) (?P<client_ip>[\\w\\.:]+):(?P<client_port>\\d+) (?P<backend_ip>[\\w\\.:]+):(?P<backend_port>\\d+) (?P<request_processing_time>\\d+(\\.\\d+)?) (?P<backend_processing_time>\\d+(\\.\\d+)?) (?P<response_processing_time>\\d+(\\.\\d+)?) (?P<elb_status_code>\\d+|-) (?P<backend_status_code>\\d+|-) (?P<received_bytes>\\d+) (?P<sent_bytes>\\d+) \"(?:\\-|(?P<cs_method>\\w+|-) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+|-)\\s*)\" \"(?P<user_agent>[^\"]+)\" (?P<ssl_cipher>[\\w-]+) (?P<ssl_protocol>[\\w\\.-]+)(?P<body>.*)", + "fields": ["timestamp", "elb", "client_ip", "client_port", "backend_ip", "backend_port", "request_processing_time", "backend_processing_time", "response_processing_time", "elb_status_code", "backend_status_code", "received_bytes", "sent_bytes", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "user_agent", "ssl_cipher", "ssl_protocol", "body"] + } + ] + }, + { + "name": "engine_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}+)\\s+(?P<level>\\w+)\\s+\\[(?P<logger>[^\\]]+)\\]\\s+\\((?P<tid>[^\\)]+)\\)\\s+(?P<body>.*)", + "fields": ["timestamp", "level", "logger", "tid", "body"] + } + ] + }, + { + "name": "env_logger_log", + "regex": [ + { + "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}[^ ]+) (?P<level>\\w+) (?P<module>[^\\]]+)\\]\\s+(?P<body>.*)", + "fields": ["timestamp", "level", "module", "body"] + } + ] + }, + { + "name": "error_log", + "regex": [ + { + "pattern": "^(?P<level>\\w) \\[(?P<timestamp>[^\\]]+)\\] (?P<body>.*)", + "fields": ["level", "timestamp", "body"] + }, + { + "pattern": "^\\[(?P<timestamp>[^\\]]+)\\] \\[(?:(?P<module>[^:]+):)?(?P<level>\\w+)\\](?: \\[pid (?P<pid>\\d+)(:tid (?P<tid>\\d+))?\\])?(?: \\[client (?P<c_ip>[\\w\\.:\\-]+):(?P<c_port>\\d+)\\])? (?P<body>.*)", + "fields": ["timestamp", "module", "level", "pid", "tid", "c_ip", "c_port", "body"] + } + ] + }, + { + "name": "esx_syslog_log", + "regex": [ + { + "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?Z))\\s+(?P<level>\\w+\\((?P<syslog_pri>\\d+)\\))(?:\\[\\+\\]|\\+)?(?:(?: (?P<log_syslog_tag>(?P<log_procname>(?:[^\\[:]+|[^:]+))(?:\\[(?P<log_pid>\\d+)\\])?):\\s*(?:\\w+ \\[(?P<logger>[^ ]+)(?: op[iI][dD]=(?P<opid>[^ \\]]+))?\\]\\s*)?(?P<body>.*))$|:?(?:(?: ---)? last message repeated \\d+ times?(?: ---)?))", + "fields": ["timestamp", "level", "syslog_pri", "log_syslog_tag", "log_procname", "log_pid", "logger", "opid", "body"] + }, + { + "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?Z))\\s+(?P<level>\\w+\\((?P<syslog_pri>\\d+)\\))(?:\\[\\+\\]|\\+)?(?:(?: (?P<log_syslog_tag>(?:host-(?P<log_pid>\\d+))?)\\s+(?P<body>.*))$|:?(?:(?: ---)? last message repeated \\d+ times?(?: ---)?))", + "fields": ["timestamp", "level", "syslog_pri", "log_syslog_tag", "log_pid", "body"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2})\\s+(?P<level>\\w+\\((?P<syslog_pri>\\d+)\\))\\s+(?P<log_procname>[^\\[]+)\\[(?P<log_pid>\\d+)\\]:\\s(?P<new_time>\\d{2}:\\d{2}:\\d{2}\\.\\d+)\\s+(?P<body>.*)", + "fields": ["timestamp", "level", "syslog_pri", "log_procname", "log_pid", "new_time", "body"] + } + ] + }, + { + "name": "haproxy_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: Proxy (?P<frontend_name>[^ ]+) started.", + "fields": ["timestamp", "logging_host", "process_name", "pid", "frontend_name"] + }, + { + "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: Stopping frontend (?P<frontend_name>[^ ]+) in (?P<stopping_timeout>\\d+) ms.", + "fields": ["timestamp", "logging_host", "process_name", "pid", "frontend_name", "stopping_timeout"] + }, + { + "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: Proxy (?P<frontend_name>[^ ]+) stopped \\(FE: (?P<frontend_connections>\\d+) conns, BE: (?P<backend_connections>\\d+) conns\\).", + "fields": ["timestamp", "logging_host", "process_name", "pid", "frontend_name", "frontend_connections", "backend_connections"] + }, + { + "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: (?P<client_ip>[^:]+):(?P<client_port>\\d+) \\[(?P<accept_date>\\d{2}\\/\\w{3}\\/\\d{4}:\\d{2}:\\d{2}:\\d{2}.\\d{3})\\] (?P<frontend_name>[^ ]+) (?P<backend_name>[^ ]+)\\/(?P<server_name>[^ ]+) (?P<tw>\\d+)\\/(?P<tc>\\d+)\\/(?P<tt>\\d+) (?P<bytes_read>\\d+) (?P<termination_state>..) (?P<actconn>\\d+)\\/(?P<feconn>\\d+)\\/(?P<beconn>\\d+)\\/(?P<srv_conn>\\d+)\\/(?P<retries>\\d+) (?P<srv_queue>\\d+)\\/(?P<backend_queue>\\d+)", + "fields": ["timestamp", "logging_host", "process_name", "pid", "client_ip", "client_port", "accept_date", "frontend_name", "backend_name", "server_name", "tw", "tc", "tt", "bytes_read", "termination_state", "actconn", "feconn", "beconn", "srv_conn", "retries", "srv_queue", "backend_queue"] + }, + { + "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: (?P<client_ip>[^:]+):(?P<client_port>\\d+) \\[(?P<accept_date>\\d{2}\\/\\w{3}\\/\\d{4}:\\d{2}:\\d{2}:\\d{2}.\\d{3})\\] (?P<frontend_name>[^ ]+)(?P<ssl>~)? (?P<backend_name>[^ ]+)\\/(?P<server_name>[^ ]+) (?P<tq>-?\\d+)\\/(?P<tw>-?\\d+)\\/(?P<tc>-?\\d+)\\/(?P<tr>-?\\d+)\\/(?P<tt>\\d+) (?P<status_code>\\d{3}|-1) (?P<bytes_read>\\d+) (?P<captured_request_cookie>.*) (?P<captured_response_cookie>.*) (?P<termination_state>....) (?P<actconn>\\d+)\\/(?P<feconn>\\d+)\\/(?P<beconn>\\d+)\\/(?P<srv_conn>\\d+)\\/(?P<retries>\\d+) (?P<srv_queue>\\d+)\\/(?P<backend_queue>\\d+) (?:\\{(?P<captured_request_headers>.*)\\} \\{(?P<captured_response_headers>.*)\\} )?\"(?P<http_method>[A-Z<>]+)(?: (?P<http_url>.*?))?(?: (?P<http_version>HTTP\\/\\d+.\\d+))?\"?", + "fields": ["timestamp", "logging_host", "process_name", "pid", "client_ip", "client_port", "accept_date", "frontend_name", "ssl", "backend_name", "server_name", "tq", "tw", "tc", "tr", "tt", "status_code", "bytes_read", "captured_request_cookie", "captured_response_cookie", "termination_state", "actconn", "feconn", "beconn", "srv_conn", "retries", "srv_queue", "backend_queue", "captured_request_headers", "captured_response_headers", "http_method", "http_url", "http_version"] + }, + { + "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: (?P<client_ip>[^:]+):(?P<client_port>\\d+) \\[(?P<accept_date>\\d{2}\\/\\w{3}\\/\\d{4}:\\d{2}:\\d{2}:\\d{2}.\\d{3})\\] (?P<backend_name>[^ ]+)\\/(?P<server_name>[^ ]+): (?P<ssl_error>.+)", + "fields": ["timestamp", "logging_host", "process_name", "pid", "client_ip", "client_port", "accept_date", "backend_name", "server_name", "ssl_error"] + } + ] + }, + { + "name": "katello_log", + "regex": [ + { + "pattern": "^\\[\\s?(?P<alert_level>\\w+)\\s(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})\\s(?P<module>\\w+)\\]\\s+(?P<message>.*)", + "fields": ["alert_level", "timestamp", "module", "message"] + } + ] + }, + { + "name": "lnav_debug_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(-|\\+)\\d{2}:\\d{2}) (?P<level>\\w) (?P<thread>\\w+) (?P<srcfile>[^:]+):(?P<srcline>\\d+) (?P<body>.*)", + "fields": ["timestamp", "level", "thread", "srcfile", "srcline", "body"] + } + ] + }, + { + "name": "nextflow_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\w{3}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}) \\[(?P<thread>[^\\]]+)\\] (?P<level>[^ ]+)\\s+(?P<module>[^ ]+) - (?P<body>.*)", + "fields": ["timestamp", "thread", "level", "module", "body"] + } + ] + }, + { + "name": "openam_log", + "regex": [ + { + "pattern": "^\"(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})\"\\s+(?P<data>[^ \"]+|\"(?:[^\"]*|\"\")*\")\\s+(?P<loginid>[^ \"]+|\"(?:[^\"]*|\"\")*\")\\s+(?P<contextid>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<ipaddr>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<level>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<domain>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<loggedby>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<messageid>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<modulename>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<nameid>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<hostname>[^ \"]+|\"(?:[^\"]|\"\")*\")(?P<body>.*)", + "fields": ["timestamp", "data", "loginid", "contextid", "ipaddr", "level", "domain", "loggedby", "messageid", "modulename", "nameid", "hostname", "body"] + } + ] + }, + { + "name": "openamdb_log", + "regex": [ + { + "pattern": "^(?P<module>[\\w]+):(?P<timestamp>\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2}:\\d{3} [AP]M \\w+): Thread\\[(?P<thread>[^,]+,\\d+,[^,]+)\\]\\n?(?:\\*+|(?P<body>.*))", + "fields": ["module", "timestamp", "thread", "body"] + } + ] + }, + { + "name": "openstack_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}) (?P<pid>\\d+) (?P<level>\\w+) (?P<logger>\\S+) \\[(?P<tid>[^\\]]+)\\] (?P<body>.*)", + "fields": ["timestamp", "pid", "level", "logger", "tid", "body"] + }, + { + "pattern": "^(?P<level>\\w+) (?P<logger>\\S+) \\[(?P<tid>[^\\]]+)\\] (?P<body>.*)", + "fields": ["level", "logger", "tid", "body"] + }, + { + "pattern": "^[(](?P<logger>[^)]+)[)]: (?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<level>\\w+)(?P<body>.*)", + "fields": ["logger", "timestamp", "level", "body"] + }, + { + "pattern": "^[(](?P<logger>[^)]+)[)]: (?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<level>\\w+) [(](?P<user>[^)]+)[)] (?P<body>.*)", + "fields": ["logger", "timestamp", "level", "user", "body"] + } + ] + }, + { + "name": "page_log", + "regex": [ + { + "pattern": "^(?P<printer>[\\w_\\-\\.]+) (?P<username>[\\w\\.\\-]+) (?P<job_id>\\d+) \\[(?P<timestamp>[^\\]]+)\\] (?P<page_number>total|\\d+) (?P<num_copies>\\d+) (?P<job_billing>[^ ]+) (?P<job_originating_hostname>[\\w\\.:\\-]+)", + "fields": ["printer", "username", "job_id", "timestamp", "page_number", "num_copies", "job_billing", "job_originating_hostname"] + }, + { + "pattern": "^(?P<printer>[\\w_\\-\\.]+) (?P<username>[\\w\\.\\-]+) (?P<job_id>\\d+) \\[(?P<timestamp>[^\\]]+)\\] (?P<page_number>total|\\d+) (?P<num_copies>\\d+) (?P<job_billing>[^ ]+) (?P<job_originating_hostname>[\\w\\.:\\-]+) (?P<job_name>.+) (?P<media>[^ ]+) (?P<sides>.+)(?P<body>.*)", + "fields": ["printer", "username", "job_id", "timestamp", "page_number", "num_copies", "job_billing", "job_originating_hostname", "job_name", "media", "sides", "body"] + } + ] + }, + { + "name": "procstate_log", + "regex": [ + { + "pattern": "^========== Start of system state dump at (?P<timestamp>[^=]+) ==========(?P<body>.*)", + "fields": ["timestamp", "body"] + } + ] + }, + { + "name": "proxifier_log", + "regex": [ + { + "pattern": "\\[(?P<timestamp>\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2})\\]\\s+(?P<app_name>[^ ]+(?: \\*64)?)(?:\\s+(?:-|(?P<app_pid>\\d+)))\\s+(?P<target_host>[^:]+):(?P<target_port>\\d+)\\s+(?P<body>(?:open|close).*)", + "fields": ["timestamp", "app_name", "app_pid", "target_host", "target_port", "body"] + }, + { + "pattern": "\\[(?P<timestamp>\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2})\\]\\s+(?P<app_name>[^ ]+(?: \\*64)?)(?:\\s+(?:-|(?P<app_pid>\\d+)))\\s+(?P<target_host>[^:]+):(?P<target_port>\\d+)\\s+(?P<level>error) : (?P<body>.*)", + "fields": ["timestamp", "app_name", "app_pid", "target_host", "target_port", "level", "body"] + } + ] + }, + { + "name": "rails_log", + "regex": [ + { + "pattern": "^(?P<level_char>[A-Z]),\\s\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2}(?:T| )\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{6})?) #(?P<pid>\\d+)\\]\\s+(?P<level>\\w+) --\\s(?P<module>[^:]+)?:\\s(?:\\[(?P<reqid>\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12})\\]\\s)?(?P<body>.*)", + "fields": ["level_char", "timestamp", "pid", "level", "module", "reqid", "body"] + } + ] + }, + { + "name": "redis_log", + "regex": [ + { + "pattern": "^\\[(?P<pid>\\d+)\\]\\s+(?P<timestamp>\\d{1,2} [a-zA-Z]{3} \\d{2}:\\d{2}:\\d{2}\\.\\d{3})\\s+(?P<level>[\\.\\-\\*\\#])\\s+(?P<body>.*)", + "fields": ["pid", "timestamp", "level", "body"] + }, + { + "pattern": "^(?P<pid>\\d+):(?P<role>[XCSM])\\s+(?P<timestamp>\\d{1,2} [a-zA-Z]{3} \\d{4} \\d{2}:\\d{2}:\\d{2}\\.\\d{3})\\s+(?P<level>[\\.\\*\\#\\-])\\s+(?P<body>.*)", + "fields": ["pid", "role", "timestamp", "level", "body"] + }, + { + "pattern": "^(?P<pid>\\d+):(?P<role>signal-handler) \\((?P<timestamp>\\d+)\\) (?P<body>.*)", + "fields": ["pid", "role", "timestamp", "body"] + } + ] + }, + { + "name": "s3_log", + "regex": [ + { + "pattern": "^(?P<owner>\\S+)\\s+(?P<bucket>\\S+)\\s+\\[(?P<timestamp>[^\\]]+)\\]\\s+(?P<c_ip>[\\w*.:-]+)\\s+(?P<cs_userid>\\S+)\\s+(?P<req_id>\\S+)\\s+(?P<op>\\S+)\\s+(?P<cs_key>\\S+)\\s+\"(?P<cs_method>\\S+)\\s+(?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))?\\s+(?P<cs_version>\\S+)\"\\s+(?P<sc_status>\\d+|-)\\s+(?P<sc_error_code>\\S+)\\s+(?P<sc_bytes>\\d+|-)\\s+(?P<obj_size>\\d+|-)\\s+(?P<total_time>\\d+|-)\\s+(?P<turn_around_time>\\d+|-)\\s+\"(?P<cs_referer>.*?)\"\\s+\"(?P<cs_user_agent>.*?)\"", + "fields": ["owner", "bucket", "timestamp", "c_ip", "cs_userid", "req_id", "op", "cs_key", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_error_code", "sc_bytes", "obj_size", "total_time", "turn_around_time", "cs_referer", "cs_user_agent"] + }, + { + "pattern": "^(?P<owner>\\S+)\\s+(?P<bucket>\\S+)\\s+\\[(?P<timestamp>[^\\]]+)\\]\\s+(?P<c_ip>[\\w*.:-]+)\\s+(?P<cs_userid>\\S+)\\s+(?P<req_id>\\S+)\\s+(?P<op>\\S+)\\s+(?P<cs_key>\\S+)\\s+\"(?P<cs_method>\\S+)\\s+(?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))?\\s+(?P<cs_version>\\S+)\"\\s+(?P<sc_status>\\d+|-)\\s+(?P<sc_error_code>\\S+)\\s+(?P<sc_bytes>\\d+|-)\\s+(?P<obj_size>\\d+|-)\\s+(?P<total_time>\\d+|-)\\s+(?P<turn_around_time>\\d+|-)\\s+\"(?P<cs_referer>.*?)\"\\s+\"(?P<cs_user_agent>.*?)\"\\s+(?P<version_id>\\S+)\\s+(?P<host_id>\\S+)\\s+(?P<sig_version>\\S+)\\s+(?P<cipher_suite>\\S+)\\s+(?P<auth_type>\\S+)\\s+(?P<cs_host>\\S+)\\s+(?P<tls_version>\\S+)", + "fields": ["owner", "bucket", "timestamp", "c_ip", "cs_userid", "req_id", "op", "cs_key", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_error_code", "sc_bytes", "obj_size", "total_time", "turn_around_time", "cs_referer", "cs_user_agent", "version_id", "host_id", "sig_version", "cipher_suite", "auth_type", "cs_host", "tls_version"] + } + ] + }, + { + "name": "simple_rs_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{9}[^\\s]+)\\s+(?P<level>\\w+)\\s+\\[(?P<module>\\w+)\\]\\s+(?P<body>.*)", + "fields": ["timestamp", "level", "module", "body"] + } + ] + }, + { + "name": "snaplogic_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?) (?:(?:(?P<level>\\w{4,}) (?P<logger>[^ ]+) (?P<facility>[^ ]+) (?P<msgid>[^ ]+) (?P<pipe_rid>-|\\d+)(?:\\.(?P<comp_rid>[^ ]+))? (?P<resource_name>[^ ]+) (?P<invoker>[^ ]+))|(?:(?:stdout|stderr): ))(?P<body>.*)", + "fields": ["timestamp", "level", "logger", "facility", "msgid", "pipe_rid", "comp_rid", "resource_name", "invoker", "body"] + } + ] + }, + { + "name": "sssd_log", + "regex": [ + { + "pattern": "^\\((?P<timestamp>\\S{3,8} \\S{3,8} ( \\d|\\d{2}) \\d{2}:\\d{2}:\\d{2}(?:(?:\\.|:)\\d{6})? \\d{4})\\) \\[(?P<service>\\w+)\\] \\[(?P<function>\\w+)\\] \\((?P<debug_level>0x[0-9a-fA-F]{4})\\): (?P<body>.*)", + "fields": ["timestamp", "service", "function", "debug_level", "body"] + }, + { + "pattern": "^\\((?P<timestamp>\\S{3,8} \\S{3,8} ( \\d|\\d{2}) \\d{2}:\\d{2}:\\d{2}(?:(?:\\.|:)\\d{6})? \\d{4})\\) \\[(?P<service>\\w+)(?P<module>\\[.*?\\])\\] \\[(?P<function>\\w+)\\] \\((?P<debug_level>0x[0-9a-fA-F]{4})\\): (?P<body>.*)", + "fields": ["timestamp", "service", "module", "function", "debug_level", "body"] + }, + { + "pattern": "^\\((?P<timestamp>\\d{4}-\\d{2}-\\d{2} [ 0-9]{2}:\\d{2}:\\d{2}(?:(?:\\.|:)\\d{6})?)\\): \\[(?P<service>\\w+)(?P<module>\\[.*?\\])?\\] \\[(?P<function>\\w+)\\] \\((?P<debug_level>0x[0-9a-fA-F]{4})\\): (?P<body>.*)", + "fields": ["timestamp", "service", "module", "function", "debug_level", "body"] + } + ] + }, + { + "name": "strace_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{2}:\\d{2}:\\d{2}\\.\\d{6}|\\d+\\.\\d{6}) (?P<syscall>\\w+)\\((?P<body>.*)\\)\\s+=\\s+(?P<rc>[-\\w]+)(?: (?P<errno>\\w+) \\([^\\)]+\\))?(?: <(?P<duration>\\d+\\.\\d+)>)?", + "fields": ["timestamp", "syscall", "body", "rc", "errno", "duration"] + } + ] + }, + { + "name": "sudo_log", + "regex": [ + { + "pattern": "^(?P<login>\\S+)\\s*: (?:(?P<error_msg>[^;]+);)?\\s*TTY=(?P<tty>[^;]+)\\s+;\\s*PWD=(?P<pwd>[^;]+)\\s+;\\s*USER=(?P<user>[^;]+)\\s+;\\s*COMMAND=(?P<command>.*)", + "fields": ["login", "error_msg", "tty", "pwd", "user", "command"] + } + ] + }, + { + "name": "syslog_log", + "regex": [ + { + "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3,6})?(?:Z|(?:\\+|-)\\d{2}:\\d{2})))(?: (?P<log_hostname>[a-zA-Z0-9:][^ ]+[a-zA-Z0-9]))?(?: \\[CLOUDINIT\\])?(?:(?: syslogd [\\d\\.]+|(?: (?P<log_syslog_tag>(?P<log_procname>(?:[^\\[:]+|[^ :]+))(?:\\[(?P<log_pid>\\d+)\\](?: \\([^\\)]+\\))?)?))):\\s*(?P<body>.*)$|:?(?:(?: ---)? last message repeated \\d+ times?(?: ---)?))", + "fields": ["timestamp", "log_hostname", "log_syslog_tag", "log_procname", "log_pid", "body"] + }, + { + "pattern": "^<(?P<log_pri>\\d+)>(?P<syslog_version>\\d+) (?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{6})?(?:[^ ]+)?) (?P<log_hostname>[^ ]+|-) (?P<log_syslog_tag>(?P<log_procname>[^ ]+|-) (?P<log_pid>[^ ]+|-) (?P<log_msgid>[^ ]+|-)) (?P<log_struct>\\[(?:[^\\]\"]|\"(?:\\.|[^\"])+\")*\\]|-|)\\s+(?P<body>.*)", + "fields": ["log_pri", "syslog_version", "timestamp", "log_hostname", "log_syslog_tag", "log_procname", "log_pid", "log_msgid", "log_struct", "body"] + } + ] + }, + { + "name": "tcf_log", + "regex": [ + { + "pattern": "^TCF (?P<timestamp>\\d{2}:\\d{2}.\\d{3,6}): (?:Server-Properties: (?:.*)|channel server|\\w+: (?P<dir>--->|<---) (?P<type>\\w)(?: (?P<token>\\w+))?(?: (?P<service>\\w+))?(?: (?P<name>\\w+))?(?: (?P<msg>.*))?(?: <eom>))(?P<body>.*)", + "fields": ["timestamp", "dir", "type", "token", "service", "name", "msg", "body"] + } + ] + }, + { + "name": "tcsh_history", + "regex": [ + { + "pattern": "^#(?P<timestamp>\\+\\d+)\\n?(?P<body>.*)?", + "fields": ["timestamp", "body"] + } + ] + }, + { + "name": "uwsgi_log", + "regex": [ + { + "pattern": "^\\[pid: (?P<s_pid>\\d+)\\|app: (?P<s_app>[\\-\\d]+)\\|req: (?P<s_req>[\\-\\d]+)/(?P<s_worker_reqs>\\d+)\\] (?P<c_ip>[^ ]+) \\((?P<cs_username>[^\\)]*)\\) \\{(?P<cs_vars>\\d+) vars in (?P<cs_bytes>\\d+) bytes\\} \\[(?P<timestamp>[^\\]]+)\\] (?P<cs_method>[A-Z]+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? => generated (?P<sc_bytes>\\d+) bytes in (?P<s_runtime>\\d+) (?P<rt_unit>\\w+) \\((?P<cs_version>[^ ]+) (?P<sc_status>\\d+)\\) (?P<sc_headers>\\d+) headers in (?P<sc_header_bytes>\\d+) bytes \\((?P<s_switches>\\d+) switches on core (?P<s_core>\\d+)\\)(?P<body>.*)", + "fields": ["s_pid", "s_app", "s_req", "s_worker_reqs", "c_ip", "cs_username", "cs_vars", "cs_bytes", "timestamp", "cs_method", "cs_uri_stem", "cs_uri_query", "sc_bytes", "s_runtime", "rt_unit", "cs_version", "sc_status", "sc_headers", "sc_header_bytes", "s_switches", "s_core", "body"] + } + ] + }, + { + "name": "vmk_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z) cpu(?P<cpu>\\d+):(?P<world_id>\\d+)(?: opID=(?P<opid>[^\\)]+))?\\)((?:(?P<level>WARNING|ALERT)|(?P<subsystem>[^:]+)): )?(?P<body>.*)", + "fields": ["timestamp", "cpu", "world_id", "opid", "level", "subsystem", "body"] + }, + { + "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?Z))\\s+(?P<level>\\w+)\\((?P<syslog_pri>\\d+)\\)(?:\\[\\+\\]|\\+)? (?:vmkernel|vmkwarning):\\s* (?:cpu(?P<cpu>\\d+):(?P<world_id>\\d+)(?: opID=(?P<opid>[^\\)]+))?\\))?((?:(?:WARNING|ALERT)|(?P<subsystem>[^:]+)): )?(?P<body>.*)", + "fields": ["timestamp", "level", "syslog_pri", "cpu", "world_id", "opid", "subsystem", "body"] + } + ] + }, + { + "name": "vmw_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) (?P<level>\\w+)(?:\\(\\d+\\)+)? (?P<prc>[\\w\\-]+)\\[(?P<tid>\\w+)\\]:? \\[(?:opI(?:D|d)=(?P<opid>[^\\]]+))\\]\\s*(?P<body>.*)", + "fields": ["timestamp", "level", "prc", "tid", "opid", "body"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) (?:- last log rotation time, \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?\\s*(ESX KMX Agent started.|(?:- time the service was last started(?: \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?, )?Section for (?:[^,]+), pid=(?P<tid>\\w+).*)", + "fields": ["timestamp", "tid"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) (?P<level>\\w+)(?:\\(\\d+\\)+) (?P<prc>[\\w\\-]+)\\[(?P<tid>\\w+)\\]: (?:Logs rotated. \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?(?:- last log rotation time, \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?\\s*(ESX KMX Agent started.|(?:- time the service was last started(?: \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z)?, )?Section for (?:[^,]+), pid=(?:\\w+).*)", + "fields": ["timestamp", "level", "prc", "tid"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) \\[(?P<tid>\\w+) (?P<level>\\w+) '(?P<comp>[^']+)'(?: opID=(?P<opid>[^ \\]]+))?(?: user=(?P<user>[^ \\]]+))?\\](?P<body>.*)(?:\\n.*)?", + "fields": ["timestamp", "tid", "level", "comp", "opid", "user", "body"] + }, + { + "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}) (?P<tid>\\w+) (?P<level>\\w+) '(?P<comp>[^']+)'(?: opID=(?P<opid>[^ \\]]+))?(?: user=(?P<user>[^ \\]]+))?\\](?P<body>.*)(?:\\n.*)?", + "fields": ["timestamp", "tid", "level", "comp", "opid", "user", "body"] + }, + { + "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<tid>[\\w\\-]+)\\s+(?P<level>\\w+)\\s+(?P<comp>[^\\]]+)\\]\\s+(?P<body>.*)", + "fields": ["timestamp", "tid", "level", "comp", "body"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}(T| )\\d{2}:\\d{2}:\\d{2}(?:.|,)\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) \\[(?P<prc>[^\\[]+)\\[(?P<tid>\\w+)\\]:\\s+(?P<body>.*)\\]", + "fields": ["timestamp", "prc", "tid", "body"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}(T| )\\d{2}:\\d{2}:\\d{2}(?:.|,)\\d{3}(?:Z|[-+]\\d{2}:\\d{2})?) (?P<level>\\w+) (?P<prc>[^\\[]+)\\[(?P<tid>\\d+)\\]\\s+\\[(?P<file>[^ ]+) (?P<line>\\d+)\\]\\s+(?P<body>.*)", + "fields": ["timestamp", "level", "prc", "tid", "file", "line", "body"] + }, + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?(?:Z|[-+]\\d{2}:\\d{2})) (?P<prc>[^:]+):\\s+(?P<tid>\\d+):\\s+(?P<comp>[^:]+):(?P<line>\\d+)?\\s+(?P<level>\\w+):?\\s+(?P<body>.*)(?:\\n.*)?", + "fields": ["timestamp", "prc", "tid", "comp", "line", "level", "body"] + }, + { + "pattern": "^(?P<prc>[^:]+):(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3})\\[(?P<tid>\\w+)\\](?P<file>[^:]+):(?P<line>\\d+) \\[(?P<level>[a-zA-Z]+)\\]\\s+(?P<body>.*)", + "fields": ["prc", "timestamp", "tid", "file", "line", "level", "body"] + }, + { + "pattern": "^(?P<prc>[^:]+): (?P<tid>\\d+): (?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<file>[^:]+):(?P<line>\\d+) (?P<level>[a-zA-Z]+)\\s+(?P<body>.*)", + "fields": ["prc", "tid", "timestamp", "file", "line", "level", "body"] + } + ] + }, + { + "name": "vmw_py_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{1,3})?(?: (?:AM|PM) UTC)?) \\[(?P<pid>\\d+)\\](?P<level>ERROR|WARNING|INFO|DEBUG):(?P<module>[\\w\\-\\.]+):(?P<body>.*$)", + "fields": ["timestamp", "pid", "level", "module", "body"] + } + ] + }, + { + "name": "vmw_vc_svc_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,3}Z)\\s+(?P<level>\\w+)\\s+(?P<module>\\w+)\\s\\[(?P<srcfile>[^:]+):(?P<srcline>\\d+)\\](\\s+\\[opID=(?P<opid>[^\\]]+)\\])?\\s+(?P<body>.*)", + "fields": ["timestamp", "level", "module", "srcfile", "srcline", "opid", "body"] + } + ] + }, + { + "name": "vpostgres_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\S+) (?P<session_id>[^ ]*) (?P<transaction_id>[^ ]*) (?P<db_name>[^ ]*) (?P<user>[^ ]*) (?P<remote_pair>[^ ]*) (?P<pid>[^ ]+) (?P<num_line>\\d+)(?P<level>[^:]+):\\s+(?P<body>.*)", + "fields": ["timestamp", "session_id", "transaction_id", "db_name", "user", "remote_pair", "pid", "num_line", "level", "body"] + } + ] + }, + { + "name": "web_robot_log", + "regex": [ + { + "fields": ["ip", "timestamp", "method", "resource", "response", "bytes", "referrer", "request", "request-id", "useragent"] + } + ] + }, + { + "name": "xmlrpc_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2} [+-]?\\d{2}:\\d{2}) (?P<pid>\\d+) (?P<client_ip>\\S+): (?P<module>\\w+)/(?P<function>.*)(?P<arguments>\\(.*?\\))?(?P<body>.*)", + "fields": ["timestamp", "pid", "client_ip", "module", "function", "arguments", "body"] + } + ] + }, + { + "name": "zookeeper_log", + "regex": [ + { + "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) \\[myid:(?P<myid>\\d+)?\\] - (?P<level>\\w+)\\s+\\[(?P<thread>.*):(?P<logger>[\\w\\.\\$]+)@(?P<line_number>\\d+)\\] - (?P<body>.*)", + "fields": ["timestamp", "myid", "level", "thread", "logger", "line_number", "body"] + }, + { + "pattern": "^(<(?<pri>\\d+)>)?(?<version>\\d)\\s+(?<timestamp>[^\\s]+)\\s+(?<hostname>[^\\s]+)\\s+(?<appname>[^\\s]+)\\s+(?<procid>[^\\s]+)\\s+(?<msgid>[^\\s]+)?\\s*(\\[(?<structureddata>[^\\]]*)\\])?\\s*(?<message>.*)", + "fields": ["pri", "version", "timestamp", "hostname", "appname", "procid", "msgid", "structureddata", "message"] + } + ] + }, + { + "name": "kubernetes_log", + "regex": [ + { + "pattern": "^(?<severity>[IWEF])(?<month>\\d{2})(?<day>\\d{2})\\s+(?<time>\\d{2}:\\d{2}:\\d{2}\\.\\d{6})\\s+(?<pid>\\d+)\\s+(?<source_file>[^:]+):(?<line_number>\\d+)]\\s+(?<Smessage>.*)$", + "fields": ["severity", "month", "day", "time", "pid", "source_file", "line_number", "message"] + } + ] + }, + { + "name": "postgresql_log", + "regex": [ + { + "pattern": "^(?<timestamp>\\d{4}-\\d{1,2}-\\d{1,2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\w+) \\[(?<process_id>\\d+)\\] (?<log_level>\\w+): (?<sql_statement>.*)$", + "fields": ["timestamp", "process_id", "log_level", "sql_statement"] + } + ] + }, + { + "name": "java", + "regex": [ + { + "pattern": "^(?<timestamp_fb>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s+(?<stream>\\w+)\\s+(?<log_type>\\w+)\\s+(?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2},\\d{3})\\s+(?<log_level>\\w+)\\s+\\[(?<thread>[^\\]]+)\\]\\s+(?<logger>[^\\s]+)\\s+-\\s+(?<client_ip>[^\\s]+)\\s+(?<http_method>\\w+)\\s+(?<url>\/\/[^\\s]+)\\s+(?<http_version>HTTP\/\\d+\\.\\d+)\\s+(?<status_code>\\d+)$", + "fields": ["timestamp_fb", "stream", "log_type", "timestamp", "log_level", "thread", "logger", "client_ip", "http_method", "http_version", "status_code"] + } + ] + }, + { + "name": "nginx_access", + "regex": [ + { + "pattern": "(?<remote_addr>[^ ]*) - (?<remote_user>[^ ]*) \\[(?<timestamp>[^\\]]*)\\] \"(?<method>\\S+)(?: +(?<request>[^\"]*?)(?: +\\S*)?)?\" (?<status>[^ ]*) (?<body_bytes_sent>[^ ]*) \"(?<http_referer>[^\"]*)\" \"(?<http_user_agent>[^\"]*)\" (?<request_length>[^ ]*) (?<request_time>[^ ]*) \\[(?<proxy_upstream_name>[^ ]*)\\] \\[(?<proxy_alternative_upstream_name>[^ ]*)\\] (?<upstream_addr>[^,]*),?(?:[^,]*),?(?:[^ ]*) (?<upstream_response_length>[^,]*),?(?:[^,]*),?(?:[^ ]*) (?<upstream_response_time>[^,]*),?(?:[^,]*),?(?:[^ ]*) (?<upstream_status>[^,]*),?(?:[^,]*),?(?:[^ ]*) (?<req_id>[^ ]*)", + "fields": ["remote_addr", "remote_user", "timestamp", "method", "request", "status", "body_bytes_sent", "http_referer", "http_user_agent", "request_length", "request_time", "proxy_upstream_name", "proxy_alternative_upstream_name", "upstream_addr", "upstream_response_length", "upstream_response_time", "upstream_status", "req_id"] + } + ] + }, + { + "name": "postgres", + "regex": [ + { + "pattern": "^(?<timestamp_fb>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s+(?<stream>\\w+)\\s+(?<log_type>\\w+)\\s+(?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} GMT)\\s+\\[(?<process_id>\\d+)\\]\\s+(?<log_level>\\w+):\\s+(?<message>.+)$", + "fields": ["timestamp_fb", "stream", "log_type", "timestamp", "process_id", "log_level", "message"] + } + ] + }, + { + "name": "zookeeper", + "regex": [ + { + "pattern": "^(?<timestamp_fb>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s+(?<stream>\\w+)\\s+(?<log_type>\\w+)\\s+(?<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3})\\s+\\[(?<myid>[^\\]]*)\\]\\s+-\\s+(?<log_level>[A-Z]+)\\s+\\[(?<thread>[^\\]]+)\\]\\s+-\\s+(?<message>.+)$", + "fields": ["timestamp_fb", "stream", "log_type", "timestamp", "myid", "log_level", "thread", "message"] + } + ] + }, + { + "name": "csi", + "regex": [ + { + "pattern": "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d+Z)\\s+(?<stream>\\w+)\\s+(?<log_type>\\w+)\\s+(?<severity>[IWEF])(?<month>\\d{2})(?<day>\\d{2})\\s+(?<time>\\d{2}:\\d{2}:\\d{2}\\.\\d{6})\\s+(?<pid>\\d+)\\s+(?<source_file>[^:]+):(?<line_number>\\d+)\\]\\s+(?<message>.*)$", + "fields": ["timestamp", "stream", "log_type", "severity", "month", "day", "time", "pid", "source_file", "line_number", "message"] + } + ] + } +] + diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs new file mode 100644 index 000000000..2a420e10e --- /dev/null +++ b/src/event/format/known_schema.rs @@ -0,0 +1,497 @@ +/* + * Parseable Server (C) 2022 - 2025 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +use std::collections::{HashMap, HashSet}; + +use once_cell::sync::Lazy; +use regex::Regex; +use serde::{Deserialize, Deserializer}; +use serde_json::{Map, Value}; +use tracing::error; + +/// Predefined JSON with known textual logging formats +const FORMATS_JSON: &str = include_str!("../../../resources/formats.json"); + +/// Global instance of EventProcessor containing predefined schema definitions +pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> = + Lazy::new(|| EventProcessor::new(FORMATS_JSON)); + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Event is not in the expected text/JSON format for {0}")] + Unacceptable(String), + #[error("Unknown log format: {0}")] + Unknown(String), +} + +/// Deserializes a string pattern into a compiled Regex +/// NOTE: we only warn if the pattern doesn't compile +pub fn deserialize_regex<'de, D>(deserializer: D) -> Result<Option<Regex>, D::Error> +where + D: Deserializer<'de>, +{ + let pattern = String::deserialize(deserializer)?; + + let regex = Regex::new(&pattern) + .inspect_err(|err| error!("Error compiling regex pattern: {err}; Pattern: {pattern}")) + .ok(); + + Ok(regex) +} + +/// Configuration for a single pattern within a log format +#[derive(Debug, Default, Deserialize)] +struct Pattern { + /// Regular expression pattern used to match and capture fields from log strings + #[serde(deserialize_with = "deserialize_regex", default)] + pattern: Option<Regex>, + // Maps field names to regex capture groups + fields: HashSet<String>, +} + +/// Defines a schema for extracting structured data from logs using regular expressions +#[derive(Debug, Default)] +pub struct SchemaDefinition { + patterns: Vec<Pattern>, +} + +impl SchemaDefinition { + /// Extracts structured data from a log event string using a defined regex pattern + /// + /// This function checks if the given object already contains all expected fields + /// or attempts to extract them from a log event string if a pattern is available. + /// + /// # Arguments + /// * `obj` - The JSON object to check or extract fields into + /// * `extract_log` - Optional field name containing the raw log text + /// + /// # Returns + /// * `Some` - If all expected fields are already present in the object OR if extraction was successful + /// Contains fields present in catch group + /// * `None` - If extraction failed or no pattern was available and fields were missing + pub fn check_or_extract( + &self, + obj: &mut Map<String, Value>, + extract_log: Option<&str>, + ) -> Option<HashSet<String>> { + if let Some(pattern) = self + .patterns + .iter() + .find(|pattern| pattern.fields.iter().all(|field| obj.contains_key(field))) + { + return Some(pattern.fields.clone()); + } + + let event = extract_log + .and_then(|field| obj.get(field)) + .and_then(|s| s.as_str())?; + + for format in self.patterns.iter() { + let Some(pattern) = format.pattern.as_ref() else { + continue; + }; + let Some(captures) = pattern.captures(event) else { + continue; + }; + let mut extracted_fields = Map::new(); + + // With named capture groups, you can iterate over the field names + for field_name in format.fields.iter() { + if let Some(value) = captures.name(field_name) { + extracted_fields.insert( + field_name.to_owned(), + Value::String(value.as_str().to_string()), + ); + } + } + + obj.extend(extracted_fields); + + return Some(format.fields.clone()); + } + + None + } +} + +/// Configuration structure loaded from JSON for defining log formats +#[derive(Debug, Deserialize)] +struct Format { + name: String, + regex: Vec<Pattern>, +} + +/// Manages a collection of schema definitions for various log formats +#[derive(Debug)] +pub struct EventProcessor { + /// Map of format names to their corresponding schema definitions + pub schema_definitions: HashMap<String, SchemaDefinition>, +} + +impl EventProcessor { + /// Parses given formats from JSON text and stores them in-memory + fn new(json_text: &str) -> Self { + let mut processor = EventProcessor { + schema_definitions: HashMap::new(), + }; + + let formats: Vec<Format> = + serde_json::from_str(json_text).expect("Known formats are stored as JSON text"); + + for format in formats { + for regex in format.regex { + let schema = processor + .schema_definitions + .entry(format.name.clone()) + .or_default(); + + schema.patterns.push(regex); + } + } + + processor + } + + /// Extracts fields from logs embedded within a JSON string + /// + /// # Arguments + /// * `json` - JSON value containing log entries + /// * `log_source` - Name of the log format to use for extraction + /// * `extract_log` - Optional field name containing the raw log text + /// + /// # Returns + /// * `Ok` - The original JSON will now contain extracted fields + /// * `Err(Unacceptable)` - JSON provided is acceptable for the known format + pub fn extract_from_inline_log( + &self, + json: &mut Value, + log_source: &str, + extract_log: Option<&str>, + ) -> Result<HashSet<String>, Error> { + let Some(schema) = self.schema_definitions.get(log_source) else { + return Err(Error::Unknown(log_source.to_owned())); + }; + + let mut fields = HashSet::new(); + match json { + Value::Array(list) => { + for event in list { + let Value::Object(event) = event else { + continue; + }; + if let Some(known_fields) = schema.check_or_extract(event, extract_log) { + fields.extend(known_fields); + } else { + return Err(Error::Unacceptable(log_source.to_owned())); + } + } + } + Value::Object(event) => { + if let Some(known_fields) = schema.check_or_extract(event, extract_log) { + return Ok(known_fields); + } else { + return Err(Error::Unacceptable(log_source.to_owned())); + } + } + _ => unreachable!("We don't accept events of the form: {json}"), + } + + Ok(fields) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + const TEST_CONFIG: &str = r#" + [ + { + "name": "apache_access", + "regex": [ + { + "pattern": "^(?P<ip>[\\d.]+) - - \\[(?P<timestamp>[^\\]]+)\\] \"(?P<method>\\w+) (?P<path>[^\\s]+) HTTP/[\\d.]+\" (?P<status>\\d+) (?P<bytes>\\d+)", + "fields": ["ip", "timestamp", "method", "path", "status", "bytes"] + } + ] + }, + { + "name": "custom_app_log", + "regex": [ + { + "pattern": "\\[(?P<level>\\w+)\\] \\[(?P<timestamp>[^\\]]+)\\] (?P<message>.*)", + "fields": ["level", "timestamp", "message"] + } + ] + } + ] + "#; + + #[test] + fn test_apache_log_extraction() { + let processor = EventProcessor::new(TEST_CONFIG); + let schema = processor.schema_definitions.get("apache_access").unwrap(); + + // Create a mutable object for check_or_extract to modify + let mut obj = Map::new(); + let log_field = "raw_log"; + obj.insert(log_field.to_string(), Value::String( + "192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] \"GET /index.html HTTP/1.1\" 200 2326".to_string() + )); + + // Use check_or_extract instead of extract + let result = schema.check_or_extract(&mut obj, Some(log_field)); + assert!(result.is_some(), "Failed to extract fields from valid log"); + + // Verify extracted fields were added to the object + assert_eq!(obj.get("ip").unwrap().as_str().unwrap(), "192.168.1.1"); + assert_eq!( + obj.get("timestamp").unwrap().as_str().unwrap(), + "10/Oct/2023:13:55:36 +0000" + ); + assert_eq!(obj.get("method").unwrap().as_str().unwrap(), "GET"); + assert_eq!(obj.get("path").unwrap().as_str().unwrap(), "/index.html"); + assert_eq!(obj.get("status").unwrap().as_str().unwrap(), "200"); + assert_eq!(obj.get("bytes").unwrap().as_str().unwrap(), "2326"); + } + + #[test] + fn test_custom_log_extraction() { + let processor = EventProcessor::new(TEST_CONFIG); + let schema = processor.schema_definitions.get("custom_app_log").unwrap(); + + // Create a mutable object for check_or_extract to modify + let mut obj = Map::new(); + let log_field = "raw_log"; + obj.insert( + log_field.to_string(), + Value::String( + "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database".to_string(), + ), + ); + + // Use check_or_extract instead of extract + let result = schema.check_or_extract(&mut obj, Some(log_field)); + assert!(result.is_some(), "Failed to extract fields from valid log"); + + // Verify extracted fields were added to the object + assert_eq!(obj.get("level").unwrap().as_str().unwrap(), "ERROR"); + assert_eq!( + obj.get("timestamp").unwrap().as_str().unwrap(), + "2023-10-10T13:55:36Z" + ); + assert_eq!( + obj.get("message").unwrap().as_str().unwrap(), + "Failed to connect to database" + ); + } + + #[test] + fn test_fields_already_exist() { + let processor = EventProcessor::new(TEST_CONFIG); + let schema = processor.schema_definitions.get("custom_app_log").unwrap(); + + // Create an object that already has all required fields + let mut obj = Map::new(); + obj.insert("level".to_string(), Value::String("ERROR".to_string())); + obj.insert( + "timestamp".to_string(), + Value::String("2023-10-10T13:55:36Z".to_string()), + ); + obj.insert( + "message".to_string(), + Value::String("Database error".to_string()), + ); + + // check_or_extract should return true without modifying anything + let result = schema.check_or_extract(&mut obj, None); + assert!( + result.is_some(), + "Should return true when fields already exist" + ); + + // Verify the original values weren't changed + assert_eq!( + obj.get("message").unwrap().as_str().unwrap(), + "Database error" + ); + } + + #[test] + fn test_no_match() { + let processor = EventProcessor::new(TEST_CONFIG); + let schema = processor.schema_definitions.get("apache_access").unwrap(); + + // Create an object with non-matching log text + let mut obj = Map::new(); + let log_field = "raw_log"; + obj.insert( + log_field.to_string(), + Value::String("This is not an Apache log line".to_string()), + ); + + // check_or_extract should return false + let result = schema.check_or_extract(&mut obj, Some(log_field)); + assert!( + result.is_none(), + "Should not extract fields from invalid log format" + ); + + // Verify no fields were added + assert!(!obj.contains_key("ip")); + assert!(!obj.contains_key("method")); + } + + #[test] + fn test_no_pattern_missing_fields() { + // Create a schema definition with no pattern + let schema = SchemaDefinition { + patterns: vec![Pattern { + pattern: None, + fields: HashSet::from_iter(["field1".to_string(), "field2".to_string()]), + }], + }; + + // Create an object missing the required fields + let mut obj = Map::new(); + obj.insert( + "other_field".to_string(), + Value::String("value".to_string()), + ); + + // check_or_extract should return false + let result = schema.check_or_extract(&mut obj, Some("log")); + assert!( + result.is_none(), + "Should return false when no pattern and missing fields" + ); + } + + #[test] + fn test_extract_from_inline_log_object() { + let processor = EventProcessor::new(TEST_CONFIG); + + let mut json_value = json!({ + "id": "12345", + "raw_log": "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database" + }); + + // Updated to handle check_or_extract + let result = if let Value::Object(ref mut obj) = json_value { + let schema = processor.schema_definitions.get("custom_app_log").unwrap(); + schema.check_or_extract(obj, Some("raw_log")); + json_value + } else { + json_value + }; + + let obj = result.as_object().unwrap(); + assert!(obj.contains_key("level")); + assert!(obj.contains_key("timestamp")); + assert!(obj.contains_key("message")); + assert_eq!(obj.get("level").unwrap().as_str().unwrap(), "ERROR"); + } + + #[test] + fn test_extract_from_inline_log_array() { + let processor = EventProcessor::new(TEST_CONFIG); + + let mut json_value = json!([ + { + "id": "12345", + "raw_log": "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database" + }, + { + "id": "12346", + "raw_log": "[INFO] [2023-10-10T13:55:40Z] Application started" + } + ]); + + // Updated to handle check_or_extract for array + if let Value::Array(ref mut array) = json_value { + for item in array { + if let Value::Object(ref mut obj) = item { + let schema = processor.schema_definitions.get("custom_app_log").unwrap(); + schema.check_or_extract(obj, Some("raw_log")); + } + } + } + + let array = json_value.as_array().unwrap(); + assert_eq!(array.len(), 2); + + let first = array[0].as_object().unwrap(); + assert_eq!(first.get("level").unwrap().as_str().unwrap(), "ERROR"); + assert_eq!( + first.get("message").unwrap().as_str().unwrap(), + "Failed to connect to database" + ); + + let second = array[1].as_object().unwrap(); + assert_eq!(second.get("level").unwrap().as_str().unwrap(), "INFO"); + assert_eq!( + second.get("message").unwrap().as_str().unwrap(), + "Application started" + ); + } + + #[test] + fn test_unknown_log_format() { + let processor = EventProcessor::new(TEST_CONFIG); + let mut json_value = json!({ + "id": "12345", + "raw_log": "Some log message" + }); + + // Try to extract with a non-existent format + if let Value::Object(ref mut obj) = json_value { + if let Some(schema) = processor.schema_definitions.get("nonexistent_format") { + schema.check_or_extract(obj, Some("raw_log")); + } + } + + // Should return original JSON without modification + let obj = json_value.as_object().unwrap(); + assert_eq!(obj.len(), 2); + assert!(obj.contains_key("id")); + assert!(obj.contains_key("raw_log")); + assert!(!obj.contains_key("level")); + } + + #[test] + fn test_missing_log_field() { + let processor = EventProcessor::new(TEST_CONFIG); + let schema = processor.schema_definitions.get("custom_app_log").unwrap(); + + // Create an object that doesn't have the log field + let mut obj = Map::new(); + obj.insert("id".to_string(), Value::String("12345".to_string())); + + // check_or_extract should return false + let result = schema.check_or_extract(&mut obj, Some("raw_log")); + assert!( + result.is_none(), + "Should return false when log field is missing" + ); + + // Verify no fields were added + assert!(!obj.contains_key("level")); + assert!(!obj.contains_key("timestamp")); + } +} diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 830409d3d..67080b137 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -39,6 +39,7 @@ use crate::{ use super::{Event, DEFAULT_TIMESTAMP_KEY}; pub mod json; +pub mod known_schema; static TIME_FIELD_NAME_PARTS: [&str; 11] = [ "time", @@ -74,18 +75,21 @@ pub enum LogSource { #[default] // Json object or array Json, + // Custom Log Sources e.g. "syslog" + #[serde(untagged)] Custom(String), } impl From<&str> for LogSource { fn from(s: &str) -> Self { - match s { + match s.to_lowercase().as_str() { "kinesis" => LogSource::Kinesis, "otel-logs" => LogSource::OtelLogs, "otel-metrics" => LogSource::OtelMetrics, "otel-traces" => LogSource::OtelTraces, "pmeta" => LogSource::Pmeta, - _ => LogSource::Json, + "" | "json" => LogSource::Json, + custom => LogSource::Custom(custom.to_owned()), } } } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index b8e056cac..a979b86f8 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -28,8 +28,9 @@ use serde_json::Value; use crate::event; use crate::event::error::EventError; +use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; -use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; +use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::SchemaVersion; use crate::option::Mode; use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; @@ -48,7 +49,10 @@ use super::users::filters::FiltersError; // Handler for POST /api/v1/ingest // ingests events by extracting stream name from header // creates if stream does not exist -pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpResponse, PostError> { +pub async fn ingest( + req: HttpRequest, + Json(mut json): Json<Value>, +) -> Result<HttpResponse, PostError> { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -65,6 +69,11 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes .and_then(|h| h.to_str().ok()) .map_or(LogSource::default(), LogSource::from); + let extract_log = req + .headers() + .get(EXTRACT_LOG_KEY) + .and_then(|h| h.to_str().ok()); + if matches!( log_source, LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces @@ -72,9 +81,19 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes return Err(PostError::OtelNotSupported); } + let fields = match &log_source { + LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { + return Err(PostError::OtelNotSupported) + } + LogSource::Custom(src) => { + KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)? + } + _ => HashSet::new(), + }; + + let log_source_entry = LogSourceEntry::new(log_source.clone(), fields); let p_custom_fields = get_custom_fields_from_header(req); - let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new()); PARSEABLE .create_stream_if_not_exists( &stream_name, @@ -242,7 +261,7 @@ pub async fn handle_otel_traces_ingestion( pub async fn post_event( req: HttpRequest, stream_name: Path<String>, - Json(json): Json<Value>, + Json(mut json): Json<Value>, ) -> Result<HttpResponse, PostError> { let stream_name = stream_name.into_inner(); @@ -273,11 +292,19 @@ pub async fn post_event( .and_then(|h| h.to_str().ok()) .map_or(LogSource::default(), LogSource::from); - if matches!( - log_source, - LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces - ) { - return Err(PostError::OtelNotSupported); + let extract_log = req + .headers() + .get(EXTRACT_LOG_KEY) + .and_then(|h| h.to_str().ok()); + + match &log_source { + LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { + return Err(PostError::OtelNotSupported) + } + LogSource::Custom(src) => { + KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?; + } + _ => {} } let p_custom_fields = get_custom_fields_from_header(req); @@ -347,6 +374,8 @@ pub enum PostError { IngestionNotAllowed, #[error("Missing field for time partition in json: {0}")] MissingTimePartition(String), + #[error("{0}")] + KnownFormat(#[from] known_schema::Error), } impl actix_web::ResponseError for PostError { @@ -373,6 +402,7 @@ impl actix_web::ResponseError for PostError { PostError::IncorrectLogSource(_) => StatusCode::BAD_REQUEST, PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST, PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST, + PostError::KnownFormat(_) => StatusCode::BAD_REQUEST, } } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index a3a2096ae..6d095d7b0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -572,7 +572,9 @@ pub mod error { #[cfg(test)] mod tests { - use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders; + use crate::{ + event::format::LogSource, handlers::http::modal::utils::logstream_utils::PutStreamHeaders, + }; use actix_web::test::TestRequest; // TODO: Fix this test with routes @@ -597,7 +599,7 @@ mod tests { async fn header_without_log_source() { let req = TestRequest::default().to_http_request(); let PutStreamHeaders { log_source, .. } = req.headers().into(); - assert_eq!(log_source, crate::event::format::LogSource::Json); + assert_eq!(log_source, LogSource::Json); } #[actix_web::test] @@ -606,19 +608,19 @@ mod tests { .insert_header(("X-P-Log-Source", "pmeta")) .to_http_request(); let PutStreamHeaders { log_source, .. } = req.headers().into(); - assert_eq!(log_source, crate::event::format::LogSource::Pmeta); + assert_eq!(log_source, LogSource::Pmeta); req = TestRequest::default() .insert_header(("X-P-Log-Source", "otel-logs")) .to_http_request(); let PutStreamHeaders { log_source, .. } = req.headers().into(); - assert_eq!(log_source, crate::event::format::LogSource::OtelLogs); + assert_eq!(log_source, LogSource::OtelLogs); req = TestRequest::default() .insert_header(("X-P-Log-Source", "kinesis")) .to_http_request(); let PutStreamHeaders { log_source, .. } = req.headers().into(); - assert_eq!(log_source, crate::event::format::LogSource::Kinesis); + assert_eq!(log_source, LogSource::Kinesis); } #[actix_web::test] @@ -627,6 +629,9 @@ mod tests { .insert_header(("X-P-Log-Source", "teststream")) .to_http_request(); let PutStreamHeaders { log_source, .. } = req.headers().into(); - assert_eq!(log_source, crate::event::format::LogSource::Json); + matches!( + log_source, + LogSource::Custom(src) if src == "teststream" + ); } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index cb932a5c3..86615d215 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -83,10 +83,9 @@ pub async fn flatten_and_push_logs( push_logs(stream_name, record, log_source, p_custom_fields).await?; } } - _ => { - push_logs(stream_name, json, log_source, p_custom_fields).await?; - } + _ => push_logs(stream_name, json, log_source, p_custom_fields).await?, } + Ok(()) } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index f584dcd97..42f9f31ee 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -22,6 +22,7 @@ pub mod livetail; pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; const LOG_SOURCE_KEY: &str = "x-p-log-source"; +const EXTRACT_LOG_KEY: &str = "x-p-extract-log"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; diff --git a/src/migration/stream_metadata_migration.rs b/src/migration/stream_metadata_migration.rs index ab29e124b..74439c6a5 100644 --- a/src/migration/stream_metadata_migration.rs +++ b/src/migration/stream_metadata_migration.rs @@ -256,7 +256,7 @@ mod tests { #[test] fn test_v5_v6_unknown_log_source() { let stream_metadata = serde_json::json!({"version":"v5","schema_version":"v0","objectstore-format":"v5","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":"Invalid"}); - let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Json","fields":[]}]}); + let expected = serde_json::json!({"version":"v6","schema_version":"v0","objectstore-format":"v6","created-at":"2025-03-10T14:38:29.355131524-04:00","first-event-at":"2025-03-10T14:38:29.356-04:00","owner":{"id":"admin","group":"admin"},"permissions":[{"id":"admin","group":"admin","access":["all"]}],"stats":{"lifetime_stats":{"events":3,"ingestion":70,"storage":1969},"current_stats":{"events":3,"ingestion":70,"storage":1969},"deleted_stats":{"events":0,"ingestion":0,"storage":0}},"snapshot":{"version":"v2","manifest_list":[{"manifest_path":"home/nikhilsinha/Parseable/parseable/data/test10/date=2025-03-10/manifest.json","time_lower_bound":"2025-03-10T00:00:00Z","time_upper_bound":"2025-03-10T23:59:59.999999999Z","events_ingested":3,"ingestion_size":70,"storage_size":1969}]},"hot_tier_enabled":false,"stream_type":"UserDefined","log_source":[{"log_source_format":"Invalid","fields":[]}]}); let updated_stream_metadata = super::v5_v6(stream_metadata.clone()); assert_eq!(updated_stream_metadata, expected); }