From 2a3689bf92a499ca3494acf841f7e123995783c1 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Tue, 18 Mar 2025 16:03:44 +0530
Subject: [PATCH 01/15] feat: detect and extract json from log-lines

---
 resources/formats.json                        | 554 ++++++++++++++++++
 src/event/format/known_schema.rs              | 157 +++++
 src/event/format/mod.rs                       |   1 +
 src/handlers/http/ingest.rs                   |  22 +-
 src/handlers/http/modal/utils/ingest_utils.rs |   8 +-
 src/handlers/mod.rs                           |   1 +
 6 files changed, 735 insertions(+), 8 deletions(-)
 create mode 100644 resources/formats.json
 create mode 100644 src/event/format/known_schema.rs

diff --git a/resources/formats.json b/resources/formats.json
new file mode 100644
index 000000000..95c101cca
--- /dev/null
+++ b/resources/formats.json
@@ -0,0 +1,554 @@
+[
+    {
+      "name": "access_log",
+      "regex": [
+        {
+            "pattern": "^(?<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?) (?<c_ip>[^ ]+) (?<cs_username>[^ ]+) (?<cs_method>[A-Z]+)(?<cs_uri_stem>[^ \\?]+)(?:\\?(?<cs_uri_query>[^ ]*))? (?:-1|\\d+) (?<sc_status>\\d+) \\d+\\s*(?<body>.*)",           
+             "fields": ["timestamp", "c_ip", "cs_username", "cs_method", "cs_uri_stem", "cs_uri_query", "sc_status", "body"]
+        },
+        {
+          "pattern": "^(?P<c_ip>[\\w\\.:\\-]+)\\s+[\\w\\.\\-]+\\s+(?:-|(?P<cs_username>\\S+))\\s+\\[(?P<timestamp>[^\\]]+)\\] \"(?:\\-|(?P<cs_method>\\w+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+))\" (?P<sc_status>\\d+) (?P<sc_bytes>\\d+|-)(?: \"(?:-|(?P<cs_referer>[^\"]*))\" \"(?:-|(?P<cs_user_agent>[^\"]+))\")?\\s*(?P<body>.*)",
+          "fields": ["c_ip", "cs_username", "timestamp", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_bytes", "cs_referer", "cs_user_agent", "body"]
+        },
+        {
+          "pattern": "^(?P<cs_host>[\\w\\-\\.]*)(?::\\d+)?\\s+(?P<c_ip>[\\w\\.:\\-]+)\\s+[\\w\\.\\-]+\\s+(?:-|(?P<cs_username>\\S+))\\s+\\[(?P<timestamp>[^\\]]+)\\] \"(?:\\-|(?P<cs_method>\\w+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+))\" (?P<sc_status>\\d+) (?P<sc_bytes>\\d+|-)(?: \"(?:-|(?P<cs_referer>[^\"]+))\" \"(?P<cs_user_agent>[^\"]+)\")?\\s*(?P<body>.*)",
+          "fields": ["cs_host", "c_ip", "cs_username", "timestamp", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_bytes", "cs_referer", "cs_user_agent", "body"]
+        },
+        {
+          "pattern": "^(?P<c_ip>[\\w\\.:\\-]+)\\s+[\\w\\.\\-]+\\s+(?P<cs_username>\\S+)\\s+\"(?:\\-|(?P<cs_method>\\w+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+))\" (?P<sc_status>\\d+) (?P<sc_bytes>\\d+|-)(?: \"(?P<cs_referer>[^\"]+)\" \"(?P<cs_user_agent>[^\"]+)\")?\\s*(?P<body>.*)",
+          "fields": ["c_ip", "cs_username", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_bytes", "cs_referer", "cs_user_agent", "body"]
+        }
+      ]
+    },
+    {
+      "name": "alb_log",
+      "regex": [
+        {
+          "pattern": "^(?P<type>(http)|(https)|(h2)|(ws)|(wss)) (?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?P<elb>[^ ]+) (?P<client_ip>[\\w\\.:]+):(?P<client_port>\\d+) (?P<target_ip>[\\w\\.:]+):(?P<target_port>\\d+) (?P<request_processing_time>(-1)|(\\d+(\\.\\d+))?) (?P<target_processing_time>(-1)|(\\d+(\\.\\d+))?) (?P<response_processing_time>(-1)|(\\d+(\\.\\d+))?) (?P<elb_status_code>\\d+|-) (?P<target_status_code>\\d+|-) (?P<received_bytes>\\d+) (?P<sent_bytes>\\d+) \"(?:\\-|(?P<cs_method>\\w+|-) (?P<cs_uri_whole>(?P<cs_uri_stem>(?:(?P<cs_uri_scheme>https|http)?://)?(?:(?P<cs_uri_hostname>[^:]+):(?P<cs_uri_port>\\d+)?)?(?P<cs_uri_path>[^ \\?]+)?)(?:\\?(?P<cs_uri_query>[^ ]*))?) (?P<cs_version>[\\w/\\.]+|-)\\s*)\" \"(?P<user_agent>[^\"]+)\" (?P<ssl_cipher>[\\w-]+) (?P<ssl_protocol>[\\w\\.-]+) (?P<target_group_arn>[^ ]+) \"(?P<trace_id>[^ ]+)\" (?P<domain_name>[^ ]+) (?P<chosen_cert_arn>[^ ]+) ?(?P<matched_rule_priority>(-1)|\\b([0-9]|[1-8][0-9]|9[0-9]|[1-8][0-9]{2}|9[0-8][0-9]|99[0-9]|[1-8][0-9]{3}|9[0-8][0-9]{2}|99[0-8][0-9]|999[0-9]|[1-4][0-9]{4}|50000)\\b)?",
+          "fields": ["type", "timestamp", "elb", "client_ip", "client_port", "target_ip", "target_port", "request_processing_time", "target_processing_time", "response_processing_time", "elb_status_code", "target_status_code", "received_bytes", "sent_bytes", "cs_method", "cs_uri_whole", "cs_uri_stem", "cs_uri_scheme", "cs_uri_hostname", "cs_uri_port", "cs_uri_path", "cs_uri_query", "cs_version", "user_agent", "ssl_cipher", "ssl_protocol", "target_group_arn", "trace_id", "domain_name", "chosen_cert_arn", "matched_rule_priority"]
+        }
+      ]
+    },
+    {
+      "name": "block_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\S{3,8} \\w{3}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2} \\w+ \\d{4})\\s*(?P<body>.*)",
+          "fields": ["timestamp", "body"]
+        },
+        {
+          "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3,6})?(?:Z|[-+]\\d{2}:?\\d{2})?)\\]\\s*(?P<body>.*)",
+          "fields": ["timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "candlepin_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) \\[(req=(?P<req>[0-9a-f-]+)|=), org=(?P<org>\\w*)\\] (?P<alert_level>\\w+)  (?P<module>[\\w.]+) - (?P<body>.*)",
+          "fields": ["timestamp", "req", "org", "alert_level", "module", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}[+-]\\d{4}) (?P<body>.*)",
+          "fields": ["timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "choose_repo_log",
+      "regex": [
+        {
+          "pattern": "^\\[(?P<level>\\w+):[^\\]]+] [^:]+:\\d+ (?P<timestamp>\\d{4}-\\d{2}-\\d{2}[T ]\\d{2}:\\d{2}:\\d{2}(?:[\\.,]\\d{3})?):(?P<body>.*)",
+          "fields": ["level", "timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "cloudvm_ram_log",
+      "regex": [
+        {
+          "pattern": "^========== Start of cloudvm ram size dump at (?P<timestamp>[^=]+) ==========(?P<body>.*)",
+          "fields": ["timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "cups_log",
+      "regex": [
+        {
+          "pattern": "^(?P<level>[IEW]) \\[(?P<timestamp>\\d{2}/\\S{3,8}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [+-]\\d{2,4})\\] (?P<section>\\w+): (?P<body>.*)",
+          "fields": ["level", "timestamp", "section", "body"]
+        },
+        {
+          "pattern": "^(?P<level>[IEW]) \\[(?P<timestamp>\\d{2}/\\S{3,8}/\\d{4}:\\d{2}:\\d{2}:\\d{2} [+-]\\d{2,4})\\](?P<body>.*)",
+          "fields": ["level", "timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "dpkg_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}[T ]\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?) (?:(?:(?P<action>startup|status|configure|install|upgrade|trigproc|remove|purge)(?: (?P<status>config-files|failed-config|half-configured|half-installed|installed|not-installed|post-inst-failed|removal-failed|triggers-awaited|triggers-pending|unpacked))? (?P<package>[^ ]+) (?P<installed_version>[^ ]+)(?: (?P<available_version>[^ ]+))?)|update-alternatives: (?P<body>.*))",
+          "fields": ["timestamp", "action", "status", "package", "installed_version", "available_version", "body"]
+        }
+      ]
+    },
+    {
+      "name": "elb_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z) (?P<elb>[^ ]+) (?P<client_ip>[\\w\\.:]+):(?P<client_port>\\d+) (?P<backend_ip>[\\w\\.:]+):(?P<backend_port>\\d+) (?P<request_processing_time>\\d+(\\.\\d+)?) (?P<backend_processing_time>\\d+(\\.\\d+)?) (?P<response_processing_time>\\d+(\\.\\d+)?) (?P<elb_status_code>\\d+|-) (?P<backend_status_code>\\d+|-) (?P<received_bytes>\\d+) (?P<sent_bytes>\\d+) \"(?:\\-|(?P<cs_method>\\w+|-) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? (?P<cs_version>[\\w/\\.]+|-)\\s*)\" \"(?P<user_agent>[^\"]+)\" (?P<ssl_cipher>[\\w-]+) (?P<ssl_protocol>[\\w\\.-]+)(?P<body>.*)",
+          "fields": ["timestamp", "elb", "client_ip", "client_port", "backend_ip", "backend_port", "request_processing_time", "backend_processing_time", "response_processing_time", "elb_status_code", "backend_status_code", "received_bytes", "sent_bytes", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "user_agent", "ssl_cipher", "ssl_protocol", "body"]
+        }
+      ]
+    },
+    {
+      "name": "engine_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}+)\\s+(?P<level>\\w+)\\s+\\[(?P<logger>[^\\]]+)\\]\\s+\\((?P<tid>[^\\)]+)\\)\\s+(?P<body>.*)",
+          "fields": ["timestamp", "level", "logger", "tid", "body"]
+        }
+      ]
+    },
+    {
+      "name": "env_logger_log",
+      "regex": [
+        {
+          "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}[^ ]+) (?P<level>\\w+) (?P<module>[^\\]]+)\\]\\s+(?P<body>.*)",
+          "fields": ["timestamp", "level", "module", "body"]
+        }
+      ]
+    },
+    {
+      "name": "error_log",
+      "regex": [
+        {
+          "pattern": "^(?P<level>\\w) \\[(?P<timestamp>[^\\]]+)\\] (?P<body>.*)",
+          "fields": ["level", "timestamp", "body"]
+        },
+        {
+          "pattern": "^\\[(?P<timestamp>[^\\]]+)\\] \\[(?:(?P<module>[^:]+):)?(?P<level>\\w+)\\](?: \\[pid (?P<pid>\\d+)(:tid (?P<tid>\\d+))?\\])?(?: \\[client (?P<c_ip>[\\w\\.:\\-]+):(?P<c_port>\\d+)\\])? (?P<body>.*)",
+          "fields": ["timestamp", "module", "level", "pid", "tid", "c_ip", "c_port", "body"]
+        }
+      ]
+    },
+    {
+      "name": "esx_syslog_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?Z))\\s+(?P<level>\\w+\\((?P<syslog_pri>\\d+)\\))(?:\\[\\+\\]|\\+)?(?:(?: (?P<log_syslog_tag>(?P<log_procname>(?:[^\\[:]+|[^:]+))(?:\\[(?P<log_pid>\\d+)\\])?):\\s*(?:\\w+ \\[(?P<logger>[^ ]+)(?: op[iI][dD]=(?P<opid>[^ \\]]+))?\\]\\s*)?(?P<body>.*))$|:?(?:(?: ---)? last message repeated \\d+ times?(?: ---)?))",
+          "fields": ["timestamp", "level", "syslog_pri", "log_syslog_tag", "log_procname", "log_pid", "logger", "opid", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?Z))\\s+(?P<level>\\w+\\((?P<syslog_pri>\\d+)\\))(?:\\[\\+\\]|\\+)?(?:(?: (?P<log_syslog_tag>(?:host-(?P<log_pid>\\d+))?)\\s+(?P<body>.*))$|:?(?:(?: ---)? last message repeated \\d+ times?(?: ---)?))",
+          "fields": ["timestamp", "level", "syslog_pri", "log_syslog_tag", "log_pid", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2})\\s+(?P<level>\\w+\\((?P<syslog_pri>\\d+)\\))\\s+(?P<log_procname>[^\\[]+)\\[(?P<log_pid>\\d+)\\]:\\s(?P<new_time>\\d{2}:\\d{2}:\\d{2}\\.\\d+)\\s+(?P<body>.*)",
+          "fields": ["timestamp", "level", "syslog_pri", "log_procname", "log_pid", "new_time", "body"]
+        }
+      ]
+    },
+    {
+      "name": "haproxy_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: Proxy (?P<frontend_name>[^ ]+) started.",
+          "fields": ["timestamp", "logging_host", "process_name", "pid", "frontend_name"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: Stopping frontend (?P<frontend_name>[^ ]+) in (?P<stopping_timeout>\\d+) ms.",
+          "fields": ["timestamp", "logging_host", "process_name", "pid", "frontend_name", "stopping_timeout"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: Proxy (?P<frontend_name>[^ ]+) stopped \\(FE: (?P<frontend_connections>\\d+) conns, BE: (?P<backend_connections>\\d+) conns\\).",
+          "fields": ["timestamp", "logging_host", "process_name", "pid", "frontend_name", "frontend_connections", "backend_connections"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: (?P<client_ip>[^:]+):(?P<client_port>\\d+) \\[(?P<accept_date>\\d{2}\\/\\w{3}\\/\\d{4}:\\d{2}:\\d{2}:\\d{2}.\\d{3})\\] (?P<frontend_name>[^ ]+) (?P<backend_name>[^ ]+)\\/(?P<server_name>[^ ]+) (?P<tw>\\d+)\\/(?P<tc>\\d+)\\/(?P<tt>\\d+) (?P<bytes_read>\\d+) (?P<termination_state>..) (?P<actconn>\\d+)\\/(?P<feconn>\\d+)\\/(?P<beconn>\\d+)\\/(?P<srv_conn>\\d+)\\/(?P<retries>\\d+) (?P<srv_queue>\\d+)\\/(?P<backend_queue>\\d+)",
+          "fields": ["timestamp", "logging_host", "process_name", "pid", "client_ip", "client_port", "accept_date", "frontend_name", "backend_name", "server_name", "tw", "tc", "tt", "bytes_read", "termination_state", "actconn", "feconn", "beconn", "srv_conn", "retries", "srv_queue", "backend_queue"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: (?P<client_ip>[^:]+):(?P<client_port>\\d+) \\[(?P<accept_date>\\d{2}\\/\\w{3}\\/\\d{4}:\\d{2}:\\d{2}:\\d{2}.\\d{3})\\] (?P<frontend_name>[^ ]+)(?P<ssl>~)? (?P<backend_name>[^ ]+)\\/(?P<server_name>[^ ]+) (?P<tq>-?\\d+)\\/(?P<tw>-?\\d+)\\/(?P<tc>-?\\d+)\\/(?P<tr>-?\\d+)\\/(?P<tt>\\d+) (?P<status_code>\\d{3}|-1) (?P<bytes_read>\\d+) (?P<captured_request_cookie>.*) (?P<captured_response_cookie>.*) (?P<termination_state>....) (?P<actconn>\\d+)\\/(?P<feconn>\\d+)\\/(?P<beconn>\\d+)\\/(?P<srv_conn>\\d+)\\/(?P<retries>\\d+) (?P<srv_queue>\\d+)\\/(?P<backend_queue>\\d+) (?:\\{(?P<captured_request_headers>.*)\\} \\{(?P<captured_response_headers>.*)\\} )?\"(?P<http_method>[A-Z<>]+)(?: (?P<http_url>.*?))?(?: (?P<http_version>HTTP\\/\\d+.\\d+))?\"?",
+          "fields": ["timestamp", "logging_host", "process_name", "pid", "client_ip", "client_port", "accept_date", "frontend_name", "ssl", "backend_name", "server_name", "tq", "tw", "tc", "tr", "tt", "status_code", "bytes_read", "captured_request_cookie", "captured_response_cookie", "termination_state", "actconn", "feconn", "beconn", "srv_conn", "retries", "srv_queue", "backend_queue", "captured_request_headers", "captured_response_headers", "http_method", "http_url", "http_version"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\w{3} \\d{2} \\d{2}:\\d{2}:\\d{2}) (?P<logging_host>[^ ]+) (?P<process_name>\\w+)\\[(?P<pid>\\d+)\\]: (?P<client_ip>[^:]+):(?P<client_port>\\d+) \\[(?P<accept_date>\\d{2}\\/\\w{3}\\/\\d{4}:\\d{2}:\\d{2}:\\d{2}.\\d{3})\\] (?P<backend_name>[^ ]+)\\/(?P<server_name>[^ ]+): (?P<ssl_error>.+)",
+          "fields": ["timestamp", "logging_host", "process_name", "pid", "client_ip", "client_port", "accept_date", "backend_name", "server_name", "ssl_error"]
+        }
+      ]
+    },
+    {
+      "name": "katello_log",
+      "regex": [
+        {
+          "pattern": "^\\[\\s?(?P<alert_level>\\w+)\\s(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})\\s(?P<module>\\w+)\\]\\s+(?P<message>.*)",
+          "fields": ["alert_level", "timestamp", "module", "message"]
+        }
+      ]
+    },
+    {
+      "name": "lnav_debug_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(-|\\+)\\d{2}:\\d{2}) (?P<level>\\w) (?P<thread>\\w+) (?P<srcfile>[^:]+):(?P<srcline>\\d+) (?P<body>.*)",
+          "fields": ["timestamp", "level", "thread", "srcfile", "srcline", "body"]
+        }
+      ]
+    },
+    {
+      "name": "nextflow_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\w{3}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}) \\[(?P<thread>[^\\]]+)\\] (?P<level>[^ ]+)\\s+(?P<module>[^ ]+) - (?P<body>.*)",
+          "fields": ["timestamp", "thread", "level", "module", "body"]
+        }
+      ]
+    },
+    {
+      "name": "openam_log",
+      "regex": [
+        {
+          "pattern": "^\"(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2})\"\\s+(?P<data>[^ \"]+|\"(?:[^\"]*|\"\")*\")\\s+(?P<loginid>[^ \"]+|\"(?:[^\"]*|\"\")*\")\\s+(?P<contextid>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<ipaddr>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<level>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<domain>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<loggedby>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<messageid>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<modulename>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<nameid>[^ \"]+|\"(?:[^\"]|\"\")*\")\\s+(?P<hostname>[^ \"]+|\"(?:[^\"]|\"\")*\")(?P<body>.*)",
+          "fields": ["timestamp", "data", "loginid", "contextid", "ipaddr", "level", "domain", "loggedby", "messageid", "modulename", "nameid", "hostname", "body"]
+        }
+      ]
+    },
+    {
+      "name": "openamdb_log",
+      "regex": [
+        {
+          "pattern": "^(?P<module>[\\w]+):(?P<timestamp>\\d{2}/\\d{2}/\\d{4} \\d{2}:\\d{2}:\\d{2}:\\d{3} [AP]M \\w+): Thread\\[(?P<thread>[^,]+,\\d+,[^,]+)\\]\\n?(?:\\*+|(?P<body>.*))",
+          "fields": ["module", "timestamp", "thread", "body"]
+        }
+      ]
+    },
+    {
+      "name": "openstack_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}) (?P<pid>\\d+) (?P<level>\\w+) (?P<logger>\\S+) \\[(?P<tid>[^\\]]+)\\] (?P<body>.*)",
+          "fields": ["timestamp", "pid", "level", "logger", "tid", "body"]
+        },
+        {
+          "pattern": "^(?P<level>\\w+) (?P<logger>\\S+) \\[(?P<tid>[^\\]]+)\\] (?P<body>.*)",
+          "fields": ["level", "logger", "tid", "body"]
+        },
+        {
+          "pattern": "^[(](?P<logger>[^)]+)[)]: (?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<level>\\w+)(?P<body>.*)",
+          "fields": ["logger", "timestamp", "level", "body"]
+        },
+        {
+          "pattern": "^[(](?P<logger>[^)]+)[)]: (?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<level>\\w+) [(](?P<user>[^)]+)[)] (?P<body>.*)",
+          "fields": ["logger", "timestamp", "level", "user", "body"]
+        }
+      ]
+    },
+    {
+      "name": "page_log",
+      "regex": [
+        {
+          "pattern": "^(?P<printer>[\\w_\\-\\.]+) (?P<username>[\\w\\.\\-]+) (?P<job_id>\\d+) \\[(?P<timestamp>[^\\]]+)\\] (?P<page_number>total|\\d+) (?P<num_copies>\\d+) (?P<job_billing>[^ ]+) (?P<job_originating_hostname>[\\w\\.:\\-]+)",
+          "fields": ["printer", "username", "job_id", "timestamp", "page_number", "num_copies", "job_billing", "job_originating_hostname"]
+        },
+        {
+          "pattern": "^(?P<printer>[\\w_\\-\\.]+) (?P<username>[\\w\\.\\-]+) (?P<job_id>\\d+) \\[(?P<timestamp>[^\\]]+)\\] (?P<page_number>total|\\d+) (?P<num_copies>\\d+) (?P<job_billing>[^ ]+) (?P<job_originating_hostname>[\\w\\.:\\-]+) (?P<job_name>.+) (?P<media>[^ ]+) (?P<sides>.+)(?P<body>.*)",
+          "fields": ["printer", "username", "job_id", "timestamp", "page_number", "num_copies", "job_billing", "job_originating_hostname", "job_name", "media", "sides", "body"]
+        }
+      ]
+    },
+    {
+      "name": "procstate_log",
+      "regex": [
+        {
+          "pattern": "^========== Start of system state dump at (?P<timestamp>[^=]+) ==========(?P<body>.*)",
+          "fields": ["timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "proxifier_log",
+      "regex": [
+        {
+          "pattern": "\\[(?P<timestamp>\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2})\\]\\s+(?P<app_name>[^ ]+(?: \\*64)?)(?:\\s+(?:-|(?P<app_pid>\\d+)))\\s+(?P<target_host>[^:]+):(?P<target_port>\\d+)\\s+(?P<body>(?:open|close).*)",
+          "fields": ["timestamp", "app_name", "app_pid", "target_host", "target_port", "body"]
+        },
+        {
+          "pattern": "\\[(?P<timestamp>\\d{2}\\.\\d{2} \\d{2}:\\d{2}:\\d{2})\\]\\s+(?P<app_name>[^ ]+(?: \\*64)?)(?:\\s+(?:-|(?P<app_pid>\\d+)))\\s+(?P<target_host>[^:]+):(?P<target_port>\\d+)\\s+(?P<level>error) : (?P<body>.*)",
+          "fields": ["timestamp", "app_name", "app_pid", "target_host", "target_port", "level", "body"]
+        }
+      ]
+    },
+    {
+      "name": "rails_log",
+      "regex": [
+        {
+          "pattern": "^(?P<level_char>[A-Z]),\\s\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2}(?:T| )\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{6})?) #(?P<pid>\\d+)\\]\\s+(?P<level>\\w+) --\\s(?P<module>[^:]+)?:\\s(?:\\[(?P<reqid>\\w{8}-\\w{4}-\\w{4}-\\w{4}-\\w{12})\\]\\s)?(?P<body>.*)",
+          "fields": ["level_char", "timestamp", "pid", "level", "module", "reqid", "body"]
+        }
+      ]
+    },
+    {
+      "name": "redis_log",
+      "regex": [
+        {
+          "pattern": "^\\[(?P<pid>\\d+)\\]\\s+(?P<timestamp>\\d{1,2} [a-zA-Z]{3} \\d{2}:\\d{2}:\\d{2}\\.\\d{3})\\s+(?P<level>[\\.\\-\\*\\#])\\s+(?P<body>.*)",
+          "fields": ["pid", "timestamp", "level", "body"]
+        },
+        {
+          "pattern": "^(?P<pid>\\d+):(?P<role>[XCSM])\\s+(?P<timestamp>\\d{1,2} [a-zA-Z]{3} \\d{4} \\d{2}:\\d{2}:\\d{2}\\.\\d{3})\\s+(?P<level>[\\.\\*\\#\\-])\\s+(?P<body>.*)",
+          "fields": ["pid", "role", "timestamp", "level", "body"]
+        },
+        {
+          "pattern": "^(?P<pid>\\d+):(?P<role>signal-handler) \\((?P<timestamp>\\d+)\\) (?P<body>.*)",
+          "fields": ["pid", "role", "timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "s3_log",
+      "regex": [
+        {
+          "pattern": "^(?P<owner>\\S+)\\s+(?P<bucket>\\S+)\\s+\\[(?P<timestamp>[^\\]]+)\\]\\s+(?P<c_ip>[\\w*.:-]+)\\s+(?P<cs_userid>\\S+)\\s+(?P<req_id>\\S+)\\s+(?P<op>\\S+)\\s+(?P<cs_key>\\S+)\\s+\"(?P<cs_method>\\S+)\\s+(?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))?\\s+(?P<cs_version>\\S+)\"\\s+(?P<sc_status>\\d+|-)\\s+(?P<sc_error_code>\\S+)\\s+(?P<sc_bytes>\\d+|-)\\s+(?P<obj_size>\\d+|-)\\s+(?P<total_time>\\d+|-)\\s+(?P<turn_around_time>\\d+|-)\\s+\"(?P<cs_referer>.*?)\"\\s+\"(?P<cs_user_agent>.*?)\"",
+          "fields": ["owner", "bucket", "timestamp", "c_ip", "cs_userid", "req_id", "op", "cs_key", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_error_code", "sc_bytes", "obj_size", "total_time", "turn_around_time", "cs_referer", "cs_user_agent"]
+        },
+        {
+          "pattern": "^(?P<owner>\\S+)\\s+(?P<bucket>\\S+)\\s+\\[(?P<timestamp>[^\\]]+)\\]\\s+(?P<c_ip>[\\w*.:-]+)\\s+(?P<cs_userid>\\S+)\\s+(?P<req_id>\\S+)\\s+(?P<op>\\S+)\\s+(?P<cs_key>\\S+)\\s+\"(?P<cs_method>\\S+)\\s+(?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))?\\s+(?P<cs_version>\\S+)\"\\s+(?P<sc_status>\\d+|-)\\s+(?P<sc_error_code>\\S+)\\s+(?P<sc_bytes>\\d+|-)\\s+(?P<obj_size>\\d+|-)\\s+(?P<total_time>\\d+|-)\\s+(?P<turn_around_time>\\d+|-)\\s+\"(?P<cs_referer>.*?)\"\\s+\"(?P<cs_user_agent>.*?)\"\\s+(?P<version_id>\\S+)\\s+(?P<host_id>\\S+)\\s+(?P<sig_version>\\S+)\\s+(?P<cipher_suite>\\S+)\\s+(?P<auth_type>\\S+)\\s+(?P<cs_host>\\S+)\\s+(?P<tls_version>\\S+)",
+          "fields": ["owner", "bucket", "timestamp", "c_ip", "cs_userid", "req_id", "op", "cs_key", "cs_method", "cs_uri_stem", "cs_uri_query", "cs_version", "sc_status", "sc_error_code", "sc_bytes", "obj_size", "total_time", "turn_around_time", "cs_referer", "cs_user_agent", "version_id", "host_id", "sig_version", "cipher_suite", "auth_type", "cs_host", "tls_version"]
+        }
+      ]
+    },
+    {
+      "name": "simple_rs_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{9}[^\\s]+)\\s+(?P<level>\\w+)\\s+\\[(?P<module>\\w+)\\]\\s+(?P<body>.*)",
+          "fields": ["timestamp", "level", "module", "body"]
+        }
+      ]
+    },
+    {
+      "name": "snaplogic_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?) (?:(?:(?P<level>\\w{4,}) (?P<logger>[^ ]+) (?P<facility>[^ ]+) (?P<msgid>[^ ]+) (?P<pipe_rid>-|\\d+)(?:\\.(?P<comp_rid>[^ ]+))? (?P<resource_name>[^ ]+) (?P<invoker>[^ ]+))|(?:(?:stdout|stderr): ))(?P<body>.*)",
+          "fields": ["timestamp", "level", "logger", "facility", "msgid", "pipe_rid", "comp_rid", "resource_name", "invoker", "body"]
+        }
+      ]
+    },
+    {
+      "name": "sssd_log",
+      "regex": [
+        {
+          "pattern": "^\\((?P<timestamp>\\S{3,8} \\S{3,8} ( \\d|\\d{2}) \\d{2}:\\d{2}:\\d{2}(?:(?:\\.|:)\\d{6})? \\d{4})\\) \\[(?P<service>\\w+)\\] \\[(?P<function>\\w+)\\] \\((?P<debug_level>0x[0-9a-fA-F]{4})\\): (?P<body>.*)",
+          "fields": ["timestamp", "service", "function", "debug_level", "body"]
+        },
+        {
+          "pattern": "^\\((?P<timestamp>\\S{3,8} \\S{3,8} ( \\d|\\d{2}) \\d{2}:\\d{2}:\\d{2}(?:(?:\\.|:)\\d{6})? \\d{4})\\) \\[(?P<service>\\w+)(?P<module>\\[.*?\\])\\] \\[(?P<function>\\w+)\\] \\((?P<debug_level>0x[0-9a-fA-F]{4})\\): (?P<body>.*)",
+          "fields": ["timestamp", "service", "module", "function", "debug_level", "body"]
+        },
+        {
+          "pattern": "^\\((?P<timestamp>\\d{4}-\\d{2}-\\d{2} [ 0-9]{2}:\\d{2}:\\d{2}(?:(?:\\.|:)\\d{6})?)\\): \\[(?P<service>\\w+)(?P<module>\\[.*?\\])?\\] \\[(?P<function>\\w+)\\] \\((?P<debug_level>0x[0-9a-fA-F]{4})\\): (?P<body>.*)",
+          "fields": ["timestamp", "service", "module", "function", "debug_level", "body"]
+        }
+      ]
+    },
+    {
+      "name": "strace_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{2}:\\d{2}:\\d{2}\\.\\d{6}|\\d+\\.\\d{6}) (?P<syscall>\\w+)\\((?P<body>.*)\\)\\s+=\\s+(?P<rc>[-\\w]+)(?: (?P<errno>\\w+) \\([^\\)]+\\))?(?: <(?P<duration>\\d+\\.\\d+)>)?",
+          "fields": ["timestamp", "syscall", "body", "rc", "errno", "duration"]
+        }
+      ]
+    },
+    {
+      "name": "sudo_log",
+      "regex": [
+        {
+          "pattern": "^(?P<login>\\S+)\\s*: (?:(?P<error_msg>[^;]+);)?\\s*TTY=(?P<tty>[^;]+)\\s+;\\s*PWD=(?P<pwd>[^;]+)\\s+;\\s*USER=(?P<user>[^;]+)\\s+;\\s*COMMAND=(?P<command>.*)",
+          "fields": ["login", "error_msg", "tty", "pwd", "user", "command"]
+        }
+      ]
+    },
+    {
+      "name": "syslog_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3,6})?(?:Z|(?:\\+|-)\\d{2}:\\d{2})))(?: (?P<log_hostname>[a-zA-Z0-9:][^ ]+[a-zA-Z0-9]))?(?: \\[CLOUDINIT\\])?(?:(?: syslogd [\\d\\.]+|(?: (?P<log_syslog_tag>(?P<log_procname>(?:[^\\[:]+|[^ :]+))(?:\\[(?P<log_pid>\\d+)\\](?: \\([^\\)]+\\))?)?))):\\s*(?P<body>.*)$|:?(?:(?: ---)? last message repeated \\d+ times?(?: ---)?))",
+          "fields": ["timestamp", "log_hostname", "log_syslog_tag", "log_procname", "log_pid", "body"]
+        },
+        {
+          "pattern": "^<(?P<log_pri>\\d+)>(?P<syslog_version>\\d+) (?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{6})?(?:[^ ]+)?) (?P<log_hostname>[^ ]+|-) (?P<log_syslog_tag>(?P<log_procname>[^ ]+|-) (?P<log_pid>[^ ]+|-) (?P<log_msgid>[^ ]+|-)) (?P<log_struct>\\[(?:[^\\]\"]|\"(?:\\.|[^\"])+\")*\\]|-|)\\s+(?P<body>.*)",
+          "fields": ["log_pri", "syslog_version", "timestamp", "log_hostname", "log_syslog_tag", "log_procname", "log_pid", "log_msgid", "log_struct", "body"]
+        }
+      ]
+    },
+    {
+      "name": "tcf_log",
+      "regex": [
+        {
+          "pattern": "^TCF (?P<timestamp>\\d{2}:\\d{2}.\\d{3,6}): (?:Server-Properties: (?:.*)|channel server|\\w+: (?P<dir>--->|<---) (?P<type>\\w)(?: (?P<token>\\w+))?(?: (?P<service>\\w+))?(?: (?P<name>\\w+))?(?: (?P<msg>.*))?(?: <eom>))(?P<body>.*)",
+          "fields": ["timestamp", "dir", "type", "token", "service", "name", "msg", "body"]
+        }
+      ]
+    },
+    {
+      "name": "tcsh_history",
+      "regex": [
+        {
+          "pattern": "^#(?P<timestamp>\\+\\d+)\\n?(?P<body>.*)?",
+          "fields": ["timestamp", "body"]
+        }
+      ]
+    },
+    {
+      "name": "uwsgi_log",
+      "regex": [
+        {
+          "pattern": "^\\[pid: (?P<s_pid>\\d+)\\|app: (?P<s_app>[\\-\\d]+)\\|req: (?P<s_req>[\\-\\d]+)/(?P<s_worker_reqs>\\d+)\\] (?P<c_ip>[^ ]+) \\((?P<cs_username>[^\\)]*)\\) \\{(?P<cs_vars>\\d+) vars in (?P<cs_bytes>\\d+) bytes\\} \\[(?P<timestamp>[^\\]]+)\\] (?P<cs_method>[A-Z]+) (?P<cs_uri_stem>[^ \\?]+)(?:\\?(?P<cs_uri_query>[^ ]*))? => generated (?P<sc_bytes>\\d+) bytes in (?P<s_runtime>\\d+) (?P<rt_unit>\\w+) \\((?P<cs_version>[^ ]+) (?P<sc_status>\\d+)\\) (?P<sc_headers>\\d+) headers in (?P<sc_header_bytes>\\d+) bytes \\((?P<s_switches>\\d+) switches on core (?P<s_core>\\d+)\\)(?P<body>.*)",
+          "fields": ["s_pid", "s_app", "s_req", "s_worker_reqs", "c_ip", "cs_username", "cs_vars", "cs_bytes", "timestamp", "cs_method", "cs_uri_stem", "cs_uri_query", "sc_bytes", "s_runtime", "rt_unit", "cs_version", "sc_status", "sc_headers", "sc_header_bytes", "s_switches", "s_core", "body"]
+        }
+      ]
+    },
+    {
+      "name": "vmk_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}Z) cpu(?P<cpu>\\d+):(?P<world_id>\\d+)(?: opID=(?P<opid>[^\\)]+))?\\)((?:(?P<level>WARNING|ALERT)|(?P<subsystem>[^:]+)): )?(?P<body>.*)",
+          "fields": ["timestamp", "cpu", "world_id", "opid", "level", "subsystem", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>(?:\\S{3,8}\\s+\\d{1,2} \\d{2}:\\d{2}:\\d{2}|\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?Z))\\s+(?P<level>\\w+)\\((?P<syslog_pri>\\d+)\\)(?:\\[\\+\\]|\\+)? (?:vmkernel|vmkwarning):\\s* (?:cpu(?P<cpu>\\d+):(?P<world_id>\\d+)(?: opID=(?P<opid>[^\\)]+))?\\))?((?:(?:WARNING|ALERT)|(?P<subsystem>[^:]+)): )?(?P<body>.*)",
+          "fields": ["timestamp", "level", "syslog_pri", "cpu", "world_id", "opid", "subsystem", "body"]
+        }
+      ]
+    },
+    {
+      "name": "vmw_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) (?P<level>\\w+)(?:\\(\\d+\\)+)? (?P<prc>[\\w\\-]+)\\[(?P<tid>\\w+)\\]:? \\[(?:opI(?:D|d)=(?P<opid>[^\\]]+))\\]\\s*(?P<body>.*)",
+          "fields": ["timestamp", "level", "prc", "tid", "opid", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) (?:- last log rotation time, \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?\\s*(ESX KMX Agent started.|(?:- time the service was last started(?: \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?, )?Section for (?:[^,]+), pid=(?P<tid>\\w+).*)",
+          "fields": ["timestamp", "tid"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) (?P<level>\\w+)(?:\\(\\d+\\)+) (?P<prc>[\\w\\-]+)\\[(?P<tid>\\w+)\\]: (?:Logs rotated. \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?(?:- last log rotation time, \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2}))?\\s*(ESX KMX Agent started.|(?:- time the service was last started(?: \\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}.\\d{3}Z)?, )?Section for (?:[^,]+), pid=(?:\\w+).*)",
+          "fields": ["timestamp", "level", "prc", "tid"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) \\[(?P<tid>\\w+) (?P<level>\\w+) '(?P<comp>[^']+)'(?: opID=(?P<opid>[^ \\]]+))?(?: user=(?P<user>[^ \\]]+))?\\](?P<body>.*)(?:\\n.*)?",
+          "fields": ["timestamp", "tid", "level", "comp", "opid", "user", "body"]
+        },
+        {
+          "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3}) (?P<tid>\\w+) (?P<level>\\w+) '(?P<comp>[^']+)'(?: opID=(?P<opid>[^ \\]]+))?(?: user=(?P<user>[^ \\]]+))?\\](?P<body>.*)(?:\\n.*)?",
+          "fields": ["timestamp", "tid", "level", "comp", "opid", "user", "body"]
+        },
+        {
+          "pattern": "^\\[(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<tid>[\\w\\-]+)\\s+(?P<level>\\w+)\\s+(?P<comp>[^\\]]+)\\]\\s+(?P<body>.*)",
+          "fields": ["timestamp", "tid", "level", "comp", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}(T| )\\d{2}:\\d{2}:\\d{2}(?:.|,)\\d{3}(?:Z|[-+]\\d{2}:\\d{2})) \\[(?P<prc>[^\\[]+)\\[(?P<tid>\\w+)\\]:\\s+(?P<body>.*)\\]",
+          "fields": ["timestamp", "prc", "tid", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}(T| )\\d{2}:\\d{2}:\\d{2}(?:.|,)\\d{3}(?:Z|[-+]\\d{2}:\\d{2})?) (?P<level>\\w+) (?P<prc>[^\\[]+)\\[(?P<tid>\\d+)\\]\\s+\\[(?P<file>[^ ]+) (?P<line>\\d+)\\]\\s+(?P<body>.*)",
+          "fields": ["timestamp", "level", "prc", "tid", "file", "line", "body"]
+        },
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{3})?(?:Z|[-+]\\d{2}:\\d{2})) (?P<prc>[^:]+):\\s+(?P<tid>\\d+):\\s+(?P<comp>[^:]+):(?P<line>\\d+)?\\s+(?P<level>\\w+):?\\s+(?P<body>.*)(?:\\n.*)?",
+          "fields": ["timestamp", "prc", "tid", "comp", "line", "level", "body"]
+        },
+        {
+          "pattern": "^(?P<prc>[^:]+):(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3})\\[(?P<tid>\\w+)\\](?P<file>[^:]+):(?P<line>\\d+) \\[(?P<level>[a-zA-Z]+)\\]\\s+(?P<body>.*)",
+          "fields": ["prc", "timestamp", "tid", "file", "line", "level", "body"]
+        },
+        {
+          "pattern": "^(?P<prc>[^:]+): (?P<tid>\\d+): (?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) (?P<file>[^:]+):(?P<line>\\d+) (?P<level>[a-zA-Z]+)\\s+(?P<body>.*)",
+          "fields": ["prc", "tid", "timestamp", "file", "line", "level", "body"]
+        }
+      ]
+    },
+    {
+      "name": "vmw_py_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(?:\\.\\d{1,3})?(?: (?:AM|PM) UTC)?) \\[(?P<pid>\\d+)\\](?P<level>ERROR|WARNING|INFO|DEBUG):(?P<module>[\\w\\-\\.]+):(?P<body>.*$)",
+          "fields": ["timestamp", "pid", "level", "module", "body"]
+        }
+      ]
+    },
+    {
+      "name": "vmw_vc_svc_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{1,3}Z)\\s+(?P<level>\\w+)\\s+(?P<module>\\w+)\\s\\[(?P<srcfile>[^:]+):(?P<srcline>\\d+)\\](\\s+\\[opID=(?P<opid>[^\\]]+)\\])?\\s+(?P<body>.*)",
+          "fields": ["timestamp", "level", "module", "srcfile", "srcline", "opid", "body"]
+        }
+      ]
+    },
+    {
+      "name": "vpostgres_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\S+) (?P<session_id>[^ ]*) (?P<transaction_id>[^ ]*) (?P<db_name>[^ ]*) (?P<user>[^ ]*) (?P<remote_pair>[^ ]*) (?P<pid>[^ ]+) (?P<num_line>\\d+)(?P<level>[^:]+):\\s+(?P<body>.*)",
+          "fields": ["timestamp", "session_id", "transaction_id", "db_name", "user", "remote_pair", "pid", "num_line", "level", "body"]
+        }
+      ]
+    },
+    {
+      "name": "web_robot_log",
+      "regex": [
+        {
+          "fields": ["ip", "timestamp", "method", "resource", "response", "bytes", "referrer", "request", "request-id", "useragent"]
+        }
+      ]
+    },
+    {
+      "name": "xmlrpc_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}/\\d{2}/\\d{2} \\d{2}:\\d{2}:\\d{2} [+-]?\\d{2}:\\d{2}) (?P<pid>\\d+) (?P<client_ip>\\S+): (?P<module>\\w+)/(?P<function>.*)(?P<arguments>\\(.*?\\))?(?P<body>.*)",
+          "fields": ["timestamp", "pid", "client_ip", "module", "function", "arguments", "body"]
+        }
+      ]
+    },
+    {
+      "name": "zookeeper_log",
+      "regex": [
+        {
+          "pattern": "^(?P<timestamp>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2},\\d{3}) \\[myid:(?P<myid>\\d+)?\\] - (?P<level>\\w+)\\s+\\[(?P<thread>.*):(?P<logger>[\\w\\.\\$]+)@(?P<line_number>\\d+)\\] - (?P<body>.*)",
+          "fields": ["timestamp", "myid", "level", "thread", "logger", "line_number", "body"]
+        },
+        {
+            "pattern": "^(<(?<pri>\\d+)>)?(?<version>\\d)\\s+(?<timestamp>[^\\s]+)\\s+(?<hostname>[^\\s]+)\\s+(?<appname>[^\\s]+)\\s+(?<procid>[^\\s]+)\\s+(?<msgid>[^\\s]+)?\\s*(\\[(?<structureddata>[^\\]]*)\\])?\\s*(?<message>.*)",
+            "fields": ["pri", "version", "timestamp", "hostname", "appname", "procid", "msgid", "structureddata", "message"]
+        }
+      ]
+    },
+    {
+        "name": "kubernetes_log",
+        "regex": [
+            {
+                "pattern": "^(?<severity>[IWEF])(?<month>\\d{2})(?<day>\\d{2})\\s+(?<time>\\d{2}:\\d{2}:\\d{2}\\.\\d{6})\\s+(?<pid>\\d+)\\s+(?<source_file>[^:]+):(?<line_number>\\d+)]\\s+(?<Smessage>.*)$",
+                "fields": ["severity", "month", "day", "time", "pid", "source_file", "line_number", "message"]
+            }
+        ]
+    },
+    {
+        "name": "postgresql_log",
+        "regex": [
+            {
+                "pattern": "^(?<timestamp>\\d{4}-\\d{1,2}-\\d{1,2} \\d{2}:\\d{2}:\\d{2}\\.\\d{3} \\w+) \\[(?<process_id>\\d+)\\] (?<log_level>\\w+):  (?<sql_statement>.*)$",
+                "fields": ["timestamp", "process_id", "log_level", "sql_statement"]
+            }
+        ]
+    }
+  ]
\ No newline at end of file
diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
new file mode 100644
index 000000000..3378335ee
--- /dev/null
+++ b/src/event/format/known_schema.rs
@@ -0,0 +1,157 @@
+use std::collections::HashMap;
+
+use once_cell::sync::Lazy;
+use regex::Regex;
+use serde::Deserialize;
+use serde_json::{Map, Value};
+use tracing::{error, warn};
+
+const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");
+
+// Schema definition with pattern matching
+pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> = Lazy::new(|| {
+    let mut processor = EventProcessor {
+        schema_definitions: HashMap::new(),
+    };
+
+    // Register known schemas
+    processor.register_schema();
+
+    processor
+});
+
+#[derive(Debug)]
+pub struct SchemaDefinition {
+    pattern: Option<Regex>,
+    field_mappings: Vec<String>, // Maps field names to regex capture groups
+}
+
+impl SchemaDefinition {
+    pub fn extract(&self, event: &str) -> Option<Map<String, Value>> {
+        if let Some(pattern) = &self.pattern {
+            if let Some(captures) = pattern.captures(event) {
+                let mut extracted_fields = Map::new();
+
+                // With named capture groups, you can iterate over the field names
+                for field_name in self.field_mappings.iter() {
+                    if let Some(value) = captures.name(field_name) {
+                        extracted_fields.insert(
+                            field_name.to_owned(),
+                            Value::String(value.as_str().to_string()),
+                        );
+                    }
+                }
+
+                return Some(extracted_fields);
+            }
+        }
+
+        None
+    }
+}
+
+#[derive(Debug, Deserialize)]
+struct Format {
+    name: String,
+    regex: Vec<Pattern>,
+}
+#[derive(Debug, Deserialize)]
+struct Pattern {
+    pattern: Option<String>,
+    fields: Vec<String>,
+}
+
+#[derive(Debug)]
+pub struct EventProcessor {
+    pub schema_definitions: HashMap<String, SchemaDefinition>,
+}
+
+impl EventProcessor {
+    fn register_schema(&mut self) {
+        let json_data: serde_json::Value = serde_json::from_str(FORMATS_JSON).unwrap();
+        let formats: Vec<Format> =
+            serde_json::from_value(json_data).expect("Failed to parse formats.json");
+
+        for format in formats {
+            for pattern in &format.regex {
+                if let Some(pattern_str) = &pattern.pattern {
+                    // Compile the regex pattern
+                    match Regex::new(pattern_str) {
+                        Ok(exp) => {
+                            let field_mappings = pattern
+                                .fields
+                                .iter()
+                                .map(|field| field.to_string())
+                                .collect();
+
+                            self.schema_definitions.insert(
+                                format.name.clone(),
+                                SchemaDefinition {
+                                    pattern: Some(exp),
+                                    field_mappings,
+                                },
+                            );
+                        }
+                        Err(e) => {
+                            error!("Error compiling regex pattern: {e}; Pattern: {pattern_str}");
+                        }
+                    }
+                } else {
+                    let field_mappings = pattern
+                        .fields
+                        .iter()
+                        .map(|field| field.to_string())
+                        .collect();
+
+                    self.schema_definitions.insert(
+                        format.name.clone(),
+                        SchemaDefinition {
+                            pattern: None,
+                            field_mappings,
+                        },
+                    );
+                }
+            }
+        }
+    }
+}
+pub fn extract_from_inline_log(
+    mut json: Value,
+    log_source: &str,
+    extract_log: Option<&str>,
+) -> Value {
+    let Some(schema) = KNOWN_SCHEMA_LIST.schema_definitions.get(log_source) else {
+        warn!("Unknown log format: {log_source}");
+        return json;
+    };
+
+    match &mut json {
+        Value::Array(list) => {
+            for event in list {
+                let Value::Object(event) = event else {
+                    continue;
+                };
+                per_event_extraction(event, schema, extract_log)
+            }
+        }
+        Value::Object(event) => per_event_extraction(event, schema, extract_log),
+        _ => unreachable!("We don't accept events of the form: {json}"),
+    }
+
+    json
+}
+
+pub fn per_event_extraction(
+    obj: &mut Map<String, Value>,
+    schema: &SchemaDefinition,
+    extract_log: Option<&str>,
+) {
+    if let Some(event) = extract_log
+        .and_then(|field| obj.get(field))
+        .and_then(|s| s.as_str())
+    {
+        if let Some(additional) = schema.extract(event) {
+            obj.extend(additional);
+        }
+    }
+}
diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs
index 58c35fc79..2fccff8ef 100644
--- a/src/event/format/mod.rs
+++ b/src/event/format/mod.rs
@@ -39,6 +39,7 @@ use crate::{
 use super::{Event, DEFAULT_TIMESTAMP_KEY};
 
 pub mod json;
+pub mod known_schema;
 
 static TIME_FIELD_NAME_PARTS: [&str; 11] = [
     "time",
diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs
index 1e5e6d048..b593fa4be 100644
--- a/src/handlers/http/ingest.rs
+++ b/src/handlers/http/ingest.rs
@@ -29,7 +29,7 @@ use serde_json::Value;
 use crate::event;
 use crate::event::error::EventError;
 use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
-use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
+use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
 use crate::metadata::SchemaVersion;
 use crate::option::Mode;
 use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
@@ -65,6 +65,11 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
         .and_then(|h| h.to_str().ok())
         .map_or(LogSource::default(), LogSource::from);
 
+    let extract_log = req
+        .headers()
+        .get(EXTRACT_LOG_KEY)
+        .and_then(|h| h.to_str().ok());
+
     if matches!(
         log_source,
         LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
@@ -81,7 +86,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source, extract_log).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -144,7 +149,7 @@ pub async fn handle_otel_logs_ingestion(
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -182,7 +187,7 @@ pub async fn handle_otel_metrics_ingestion(
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -222,7 +227,7 @@ pub async fn handle_otel_traces_ingestion(
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -264,6 +269,11 @@ pub async fn post_event(
         .and_then(|h| h.to_str().ok())
         .map_or(LogSource::default(), LogSource::from);
 
+    let extract_log = req
+        .headers()
+        .get(EXTRACT_LOG_KEY)
+        .and_then(|h| h.to_str().ok());
+
     if matches!(
         log_source,
         LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
@@ -271,7 +281,7 @@ pub async fn post_event(
         return Err(PostError::OtelNotSupported);
     }
 
-    flatten_and_push_logs(json, &stream_name, &log_source).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source, extract_log).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs
index 84d5ae117..8a40eeb1c 100644
--- a/src/handlers/http/modal/utils/ingest_utils.rs
+++ b/src/handlers/http/modal/utils/ingest_utils.rs
@@ -23,7 +23,7 @@ use opentelemetry_proto::tonic::{
 use serde_json::Value;
 
 use crate::{
-    event::format::{json, EventFormat, LogSource},
+    event::format::{json, known_schema::extract_from_inline_log, EventFormat, LogSource},
     handlers::http::{
         ingest::PostError,
         kinesis::{flatten_kinesis_logs, Message},
@@ -38,6 +38,7 @@ pub async fn flatten_and_push_logs(
     json: Value,
     stream_name: &str,
     log_source: &LogSource,
+    extract_log: Option<&str>,
 ) -> Result<(), PostError> {
     match log_source {
         LogSource::Kinesis => {
@@ -68,10 +69,13 @@ pub async fn flatten_and_push_logs(
                 push_logs(stream_name, record, log_source).await?;
             }
         }
-        _ => {
+        LogSource::Custom(src) => {
+            let json = extract_from_inline_log(json, src, extract_log);
             push_logs(stream_name, json, log_source).await?;
         }
+        _ => push_logs(stream_name, json, log_source).await?,
     }
+
     Ok(())
 }
 
diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs
index f584dcd97..42f9f31ee 100644
--- a/src/handlers/mod.rs
+++ b/src/handlers/mod.rs
@@ -22,6 +22,7 @@ pub mod livetail;
 
 pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
 const LOG_SOURCE_KEY: &str = "x-p-log-source";
+const EXTRACT_LOG_KEY: &str = "x-p-extract-log";
 const TIME_PARTITION_KEY: &str = "x-p-time-partition";
 const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
 const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";

From b0658820e22d0b9d42b4f215787ec8bb5c37297a Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Tue, 18 Mar 2025 18:37:12 +0530
Subject: [PATCH 02/15] fix: copy `/resources`

---
 Dockerfile.debug | 1 +
 Dockerfile.kafka | 1 +
 2 files changed, 2 insertions(+)

diff --git a/Dockerfile.debug b/Dockerfile.debug
index c58220408..35ba5fbae 100644
--- a/Dockerfile.debug
+++ b/Dockerfile.debug
@@ -29,6 +29,7 @@ RUN mkdir src && echo "fn main() {}" > src/main.rs && cargo build && rm -rf src
 
 # Build the actual binary
 COPY src ./src
+COPY resources ./resources
 RUN cargo build
 
 # final stage
diff --git a/Dockerfile.kafka b/Dockerfile.kafka
index 108f74c75..dfe123751 100644
--- a/Dockerfile.kafka
+++ b/Dockerfile.kafka
@@ -42,6 +42,7 @@ RUN mkdir src && echo "fn main() {}" > src/main.rs && \
 
 # Copy the actual source code
 COPY src ./src
+COPY resources ./resources
 
 # Build the actual binary with kafka feature
 RUN cargo build --release --features kafka

From abba8722e763cc924e04c63a914b3118e58be034 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Tue, 18 Mar 2025 20:16:33 +0530
Subject: [PATCH 03/15] refactor: flatten code

---
 src/event/format/known_schema.rs | 93 ++++++++++++--------------------
 1 file changed, 35 insertions(+), 58 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index 3378335ee..cbc79818b 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -28,25 +28,21 @@ pub struct SchemaDefinition {
 
 impl SchemaDefinition {
     pub fn extract(&self, event: &str) -> Option<Map<String, Value>> {
-        if let Some(pattern) = &self.pattern {
-            if let Some(captures) = pattern.captures(event) {
-                let mut extracted_fields = Map::new();
-
-                // With named capture groups, you can iterate over the field names
-                for field_name in self.field_mappings.iter() {
-                    if let Some(value) = captures.name(field_name) {
-                        extracted_fields.insert(
-                            field_name.to_owned(),
-                            Value::String(value.as_str().to_string()),
-                        );
-                    }
-                }
-
-                return Some(extracted_fields);
+        let pattern = self.pattern.as_ref()?;
+        let captures = pattern.captures(event)?;
+        let mut extracted_fields = Map::new();
+
+        // With named capture groups, you can iterate over the field names
+        for field_name in self.field_mappings.iter() {
+            if let Some(value) = captures.name(field_name) {
+                extracted_fields.insert(
+                    field_name.to_owned(),
+                    Value::String(value.as_str().to_string()),
+                );
             }
         }
 
-        None
+        Some(extracted_fields)
     }
 }
 
@@ -55,6 +51,7 @@ struct Format {
     name: String,
     regex: Vec<Pattern>,
 }
+
 #[derive(Debug, Deserialize)]
 struct Pattern {
     pattern: Option<String>,
@@ -73,44 +70,25 @@ impl EventProcessor {
             serde_json::from_value(json_data).expect("Failed to parse formats.json");
 
         for format in formats {
-            for pattern in &format.regex {
-                if let Some(pattern_str) = &pattern.pattern {
-                    // Compile the regex pattern
-                    match Regex::new(pattern_str) {
-                        Ok(exp) => {
-                            let field_mappings = pattern
-                                .fields
-                                .iter()
-                                .map(|field| field.to_string())
-                                .collect();
-
-                            self.schema_definitions.insert(
-                                format.name.clone(),
-                                SchemaDefinition {
-                                    pattern: Some(exp),
-                                    field_mappings,
-                                },
-                            );
-                        }
-                        Err(e) => {
-                            error!("Error compiling regex pattern: {e}; Pattern: {pattern_str}");
-                        }
-                    }
-                } else {
-                    let field_mappings = pattern
-                        .fields
-                        .iter()
-                        .map(|field| field.to_string())
-                        .collect();
-
-                    self.schema_definitions.insert(
-                        format.name.clone(),
-                        SchemaDefinition {
-                            pattern: None,
-                            field_mappings,
-                        },
-                    );
-                }
+            for regex in &format.regex {
+                // Compile the regex pattern if present
+                let pattern = regex.pattern.as_ref().and_then(|pattern| {
+                    Regex::new(pattern)
+                        .inspect_err(|err| {
+                            error!("Error compiling regex pattern: {err}; Pattern: {pattern}")
+                        })
+                        .ok()
+                });
+
+                let field_mappings = regex.fields.clone();
+
+                self.schema_definitions.insert(
+                    format.name.clone(),
+                    SchemaDefinition {
+                        pattern,
+                        field_mappings,
+                    },
+                );
             }
         }
     }
@@ -146,12 +124,11 @@ pub fn per_event_extraction(
     schema: &SchemaDefinition,
     extract_log: Option<&str>,
 ) {
-    if let Some(event) = extract_log
+    if let Some(additional) = extract_log
         .and_then(|field| obj.get(field))
         .and_then(|s| s.as_str())
+        .and_then(|event| schema.extract(event))
     {
-        if let Some(additional) = schema.extract(event) {
-            obj.extend(additional);
-        }
+        obj.extend(additional);
     }
 }

From 5af8565e688a8845d2b8aff81b884e18f6a9d941 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Tue, 18 Mar 2025 21:17:01 +0530
Subject: [PATCH 04/15] doc+test: improve readability and testing

---
 src/event/format/known_schema.rs              | 315 +++++++++++++++---
 src/handlers/http/modal/utils/ingest_utils.rs |   4 +-
 2 files changed, 268 insertions(+), 51 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index cbc79818b..249c6211c 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -1,3 +1,21 @@
+/*
+ * Parseable Server (C) 2022 - 2025 Parseable, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
 use std::collections::HashMap;
 
 use once_cell::sync::Lazy;
@@ -6,27 +24,31 @@ use serde::Deserialize;
 use serde_json::{Map, Value};
 use tracing::{error, warn};
 
+/// Predefined JSON with known textual logging formats
 const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");
 
-// Schema definition with pattern matching
-pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> = Lazy::new(|| {
-    let mut processor = EventProcessor {
-        schema_definitions: HashMap::new(),
-    };
-
-    // Register known schemas
-    processor.register_schema();
-
-    processor
-});
+/// Global instance of EventProcessor containing predefined schema definitions
+pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
+    Lazy::new(|| EventProcessor::new(FORMATS_JSON));
 
+/// Defines a schema for extracting structured data from logs using regular expressions
 #[derive(Debug)]
 pub struct SchemaDefinition {
+    /// Regular expression pattern used to match and capture fields from log strings
     pattern: Option<Regex>,
-    field_mappings: Vec<String>, // Maps field names to regex capture groups
+    // Maps field names to regex capture groups
+    field_mappings: Vec<String>,
 }
 
 impl SchemaDefinition {
+    /// Extracts structured data from a log event string using a defined regex pattern
+    ///
+    /// # Arguments
+    /// * `event` - The log event string to extract data from
+    ///
+    /// # Returns
+    /// * `Some(Map<String, Value>)` - A map of field names to extracted values if extraction succeeds
+    /// * `None` - If the pattern is missing or no matches were found
     pub fn extract(&self, event: &str) -> Option<Map<String, Value>> {
         let pattern = self.pattern.as_ref()?;
         let captures = pattern.captures(event)?;
@@ -44,6 +66,22 @@ impl SchemaDefinition {
 
         Some(extracted_fields)
     }
+
+    /// Extracts JSON event from raw text in received message
+    ///
+    /// # Arguments
+    /// * `obj` - The root level event object to extract into
+    /// * `schema` - Schema definition to use for extraction
+    /// * `extract_log` - Optional field name containing the raw log text
+    pub fn per_event_extraction(&self, obj: &mut Map<String, Value>, extract_log: Option<&str>) {
+        if let Some(additional) = extract_log
+            .and_then(|field| obj.get(field))
+            .and_then(|s| s.as_str())
+            .and_then(|event| self.extract(event))
+        {
+            obj.extend(additional);
+        }
+    }
 }
 
 #[derive(Debug, Deserialize)]
@@ -64,14 +102,19 @@ pub struct EventProcessor {
 }
 
 impl EventProcessor {
-    fn register_schema(&mut self) {
-        let json_data: serde_json::Value = serde_json::from_str(FORMATS_JSON).unwrap();
+    /// Parses given formats from JSON text and stores them in-memory
+    fn new(json_text: &str) -> Self {
+        let mut processor = EventProcessor {
+            schema_definitions: HashMap::new(),
+        };
+
         let formats: Vec<Format> =
-            serde_json::from_value(json_data).expect("Failed to parse formats.json");
+            serde_json::from_str(json_text).expect("Known formats are stored as JSON text");
 
         for format in formats {
             for regex in &format.regex {
                 // Compile the regex pattern if present
+                // NOTE: we only warn if the pattern doesn't compile
                 let pattern = regex.pattern.as_ref().and_then(|pattern| {
                     Regex::new(pattern)
                         .inspect_err(|err| {
@@ -82,7 +125,7 @@ impl EventProcessor {
 
                 let field_mappings = regex.fields.clone();
 
-                self.schema_definitions.insert(
+                processor.schema_definitions.insert(
                     format.name.clone(),
                     SchemaDefinition {
                         pattern,
@@ -91,44 +134,218 @@ impl EventProcessor {
                 );
             }
         }
+
+        processor
     }
-}
-pub fn extract_from_inline_log(
-    mut json: Value,
-    log_source: &str,
-    extract_log: Option<&str>,
-) -> Value {
-    let Some(schema) = KNOWN_SCHEMA_LIST.schema_definitions.get(log_source) else {
-        warn!("Unknown log format: {log_source}");
-        return json;
-    };
-
-    match &mut json {
-        Value::Array(list) => {
-            for event in list {
-                let Value::Object(event) = event else {
-                    continue;
-                };
-                per_event_extraction(event, schema, extract_log)
+
+    /// Extracts fields from logs embedded within a JSON string
+    ///
+    /// # Arguments
+    /// * `json` - JSON value containing log entries
+    /// * `log_source` - Name of the log format to use for extraction
+    /// * `extract_log` - Optional field name containing the raw log text
+    ///
+    /// # Returns
+    /// * `Value` - The original JSON with extracted fields added, if any
+    pub fn extract_from_inline_log(
+        &self,
+        mut json: Value,
+        log_source: &str,
+        extract_log: Option<&str>,
+    ) -> Value {
+        let Some(schema) = self.schema_definitions.get(log_source) else {
+            warn!("Unknown log format: {log_source}");
+            return json;
+        };
+
+        match &mut json {
+            Value::Array(list) => {
+                for event in list {
+                    let Value::Object(event) = event else {
+                        continue;
+                    };
+                    schema.per_event_extraction(event, extract_log)
+                }
             }
+            Value::Object(event) => schema.per_event_extraction(event, extract_log),
+            _ => unreachable!("We don't accept events of the form: {json}"),
         }
-        Value::Object(event) => per_event_extraction(event, schema, extract_log),
-        _ => unreachable!("We don't accept events of the form: {json}"),
-    }
 
-    json
+        json
+    }
 }
 
-pub fn per_event_extraction(
-    obj: &mut Map<String, Value>,
-    schema: &SchemaDefinition,
-    extract_log: Option<&str>,
-) {
-    if let Some(additional) = extract_log
-        .and_then(|field| obj.get(field))
-        .and_then(|s| s.as_str())
-        .and_then(|event| schema.extract(event))
-    {
-        obj.extend(additional);
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use serde_json::json;
+
+    const TEST_CONFIG: &str = r#"
+    [
+        {
+            "name": "apache_access",
+            "regex": [
+                {
+                    "pattern": "^(?P<ip>[\\d.]+) - - \\[(?P<timestamp>[^\\]]+)\\] \"(?P<method>\\w+) (?P<path>[^\\s]+) HTTP/[\\d.]+\" (?P<status>\\d+) (?P<bytes>\\d+)",
+                    "fields": ["ip", "timestamp", "method", "path", "status", "bytes"]
+                }
+            ]
+        },
+        {
+            "name": "custom_app_log",
+            "regex": [
+                {
+                    "pattern": "\\[(?P<level>\\w+)\\] \\[(?P<timestamp>[^\\]]+)\\] (?P<message>.*)",
+                    "fields": ["level", "timestamp", "message"]
+                }
+            ]
+        }
+    ]
+    "#;
+
+    #[test]
+    fn test_schema_load() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+        assert_eq!(
+            processor.schema_definitions.len(),
+            2,
+            "Expected 2 schema definitions"
+        );
+
+        assert!(processor.schema_definitions.contains_key("apache_access"));
+        assert!(processor.schema_definitions.contains_key("custom_app_log"));
+    }
+
+    #[test]
+    fn test_apache_log_extraction() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+
+        let schema = processor.schema_definitions.get("apache_access").unwrap();
+        let log =
+            "192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] \"GET /index.html HTTP/1.1\" 200 2326";
+
+        let result = schema.extract(log);
+        assert!(result.is_some(), "Failed to extract fields from valid log");
+
+        let fields = result.unwrap();
+        assert_eq!(fields.get("ip").unwrap().as_str().unwrap(), "192.168.1.1");
+        assert_eq!(
+            fields.get("timestamp").unwrap().as_str().unwrap(),
+            "10/Oct/2023:13:55:36 +0000"
+        );
+        assert_eq!(fields.get("method").unwrap().as_str().unwrap(), "GET");
+        assert_eq!(fields.get("path").unwrap().as_str().unwrap(), "/index.html");
+        assert_eq!(fields.get("status").unwrap().as_str().unwrap(), "200");
+        assert_eq!(fields.get("bytes").unwrap().as_str().unwrap(), "2326");
+    }
+
+    #[test]
+    fn test_custom_log_extraction() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+
+        let schema = processor.schema_definitions.get("custom_app_log").unwrap();
+        let log = "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database";
+
+        let result = schema.extract(log);
+        assert!(result.is_some(), "Failed to extract fields from valid log");
+
+        let fields = result.unwrap();
+        assert_eq!(fields.get("level").unwrap().as_str().unwrap(), "ERROR");
+        assert_eq!(
+            fields.get("timestamp").unwrap().as_str().unwrap(),
+            "2023-10-10T13:55:36Z"
+        );
+        assert_eq!(
+            fields.get("message").unwrap().as_str().unwrap(),
+            "Failed to connect to database"
+        );
+    }
+
+    #[test]
+    fn test_no_match() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+
+        let schema = processor.schema_definitions.get("apache_access").unwrap();
+        let log = "This is not an Apache log line";
+
+        let result = schema.extract(log);
+        assert!(
+            result.is_none(),
+            "Should not extract fields from invalid log format"
+        );
+    }
+
+    #[test]
+    fn test_extract_from_inline_log_object() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+
+        let json_value = json!({
+            "id": "12345",
+            "raw_log": "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database"
+        });
+
+        let result =
+            processor.extract_from_inline_log(json_value, "custom_app_log", Some("raw_log"));
+
+        let obj = result.as_object().unwrap();
+        assert!(obj.contains_key("level"));
+        assert!(obj.contains_key("timestamp"));
+        assert!(obj.contains_key("message"));
+        assert_eq!(obj.get("level").unwrap().as_str().unwrap(), "ERROR");
+    }
+
+    #[test]
+    fn test_extract_from_inline_log_array() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+
+        let json_value = json!([
+            {
+                "id": "12345",
+                "raw_log": "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database"
+            },
+            {
+                "id": "12346",
+                "raw_log": "[INFO] [2023-10-10T13:55:40Z] Application started"
+            }
+        ]);
+
+        let result =
+            processor.extract_from_inline_log(json_value, "custom_app_log", Some("raw_log"));
+
+        let array = result.as_array().unwrap();
+        assert_eq!(array.len(), 2);
+
+        let first = array[0].as_object().unwrap();
+        assert_eq!(first.get("level").unwrap().as_str().unwrap(), "ERROR");
+        assert_eq!(
+            first.get("message").unwrap().as_str().unwrap(),
+            "Failed to connect to database"
+        );
+
+        let second = array[1].as_object().unwrap();
+        assert_eq!(second.get("level").unwrap().as_str().unwrap(), "INFO");
+        assert_eq!(
+            second.get("message").unwrap().as_str().unwrap(),
+            "Application started"
+        );
+    }
+
+    #[test]
+    fn test_unknown_log_format() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+        let json_value = json!({
+            "id": "12345",
+            "raw_log": "Some log message"
+        });
+
+        let result =
+            processor.extract_from_inline_log(json_value, "nonexistent_format", Some("raw_log"));
+
+        // Should return original JSON without modification
+        let obj = result.as_object().unwrap();
+        assert_eq!(obj.len(), 2);
+        assert!(obj.contains_key("id"));
+        assert!(obj.contains_key("raw_log"));
+        assert!(!obj.contains_key("level"));
     }
 }
diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs
index 8a40eeb1c..2c48d7c32 100644
--- a/src/handlers/http/modal/utils/ingest_utils.rs
+++ b/src/handlers/http/modal/utils/ingest_utils.rs
@@ -23,7 +23,7 @@ use opentelemetry_proto::tonic::{
 use serde_json::Value;
 
 use crate::{
-    event::format::{json, known_schema::extract_from_inline_log, EventFormat, LogSource},
+    event::format::{json, known_schema::KNOWN_SCHEMA_LIST, EventFormat, LogSource},
     handlers::http::{
         ingest::PostError,
         kinesis::{flatten_kinesis_logs, Message},
@@ -70,7 +70,7 @@ pub async fn flatten_and_push_logs(
             }
         }
         LogSource::Custom(src) => {
-            let json = extract_from_inline_log(json, src, extract_log);
+            let json = KNOWN_SCHEMA_LIST.extract_from_inline_log(json, src, extract_log);
             push_logs(stream_name, json, log_source).await?;
         }
         _ => push_logs(stream_name, json, log_source).await?,

From 2d4d26ac9c990c9c9ceb503ead74ea4821c760e4 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Tue, 18 Mar 2025 21:46:55 +0530
Subject: [PATCH 05/15] refactor: `check_or_extract`

---
 src/event/format/known_schema.rs              | 286 +++++++++++++-----
 src/handlers/http/ingest.rs                   |   4 +
 src/handlers/http/modal/utils/ingest_utils.rs |   4 +-
 3 files changed, 210 insertions(+), 84 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index 249c6211c..25ba11fef 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -31,6 +31,10 @@ const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");
 pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
     Lazy::new(|| EventProcessor::new(FORMATS_JSON));
 
+#[derive(Debug, thiserror::Error)]
+#[error("Unacceptable text/JSON for known log format")]
+pub struct Unacceptable;
+
 /// Defines a schema for extracting structured data from logs using regular expressions
 #[derive(Debug)]
 pub struct SchemaDefinition {
@@ -43,15 +47,43 @@ pub struct SchemaDefinition {
 impl SchemaDefinition {
     /// Extracts structured data from a log event string using a defined regex pattern
     ///
+    /// This function checks if the given object already contains all expected fields
+    /// or attempts to extract them from a log event string if a pattern is available.
+    ///
     /// # Arguments
-    /// * `event` - The log event string to extract data from
+    /// * `obj` - The JSON object to check or extract fields into
+    /// * `extract_log` - Optional field name containing the raw log text
     ///
     /// # Returns
-    /// * `Some(Map<String, Value>)` - A map of field names to extracted values if extraction succeeds
-    /// * `None` - If the pattern is missing or no matches were found
-    pub fn extract(&self, event: &str) -> Option<Map<String, Value>> {
-        let pattern = self.pattern.as_ref()?;
-        let captures = pattern.captures(event)?;
+    /// * `true` - If all expected fields are already present in the object OR if extraction was successful
+    /// * `false` - If extraction failed or no pattern was available and fields were missing
+    pub fn check_or_extract(
+        &self,
+        obj: &mut Map<String, Value>,
+        extract_log: Option<&str>,
+    ) -> bool {
+        if self
+            .field_mappings
+            .iter()
+            .all(|field| obj.contains_key(field))
+        {
+            return true;
+        }
+
+        let Some(pattern) = self.pattern.as_ref() else {
+            return false;
+        };
+
+        let Some(event) = extract_log
+            .and_then(|field| obj.get(field))
+            .and_then(|s| s.as_str())
+        else {
+            return false;
+        };
+
+        let Some(captures) = pattern.captures(event) else {
+            return false;
+        };
         let mut extracted_fields = Map::new();
 
         // With named capture groups, you can iterate over the field names
@@ -64,40 +96,30 @@ impl SchemaDefinition {
             }
         }
 
-        Some(extracted_fields)
-    }
+        obj.extend(extracted_fields);
 
-    /// Extracts JSON event from raw text in received message
-    ///
-    /// # Arguments
-    /// * `obj` - The root level event object to extract into
-    /// * `schema` - Schema definition to use for extraction
-    /// * `extract_log` - Optional field name containing the raw log text
-    pub fn per_event_extraction(&self, obj: &mut Map<String, Value>, extract_log: Option<&str>) {
-        if let Some(additional) = extract_log
-            .and_then(|field| obj.get(field))
-            .and_then(|s| s.as_str())
-            .and_then(|event| self.extract(event))
-        {
-            obj.extend(additional);
-        }
+        true
     }
 }
 
+/// Configuration structure loaded from JSON for defining log formats
 #[derive(Debug, Deserialize)]
 struct Format {
     name: String,
     regex: Vec<Pattern>,
 }
 
+/// Configuration for a single pattern within a log format
 #[derive(Debug, Deserialize)]
 struct Pattern {
     pattern: Option<String>,
     fields: Vec<String>,
 }
 
+/// Manages a collection of schema definitions for various log formats
 #[derive(Debug)]
 pub struct EventProcessor {
+    /// Map of format names to their corresponding schema definitions
     pub schema_definitions: HashMap<String, SchemaDefinition>,
 }
 
@@ -123,13 +145,11 @@ impl EventProcessor {
                         .ok()
                 });
 
-                let field_mappings = regex.fields.clone();
-
                 processor.schema_definitions.insert(
                     format.name.clone(),
                     SchemaDefinition {
                         pattern,
-                        field_mappings,
+                        field_mappings: regex.fields.clone(),
                     },
                 );
             }
@@ -146,32 +166,39 @@ impl EventProcessor {
     /// * `extract_log` - Optional field name containing the raw log text
     ///
     /// # Returns
-    /// * `Value` - The original JSON with extracted fields added, if any
+    /// * `Ok` - The original JSON will now contain extracted fields
+    /// * `Err(Unacceptable)` - JSON provided is acceptable for the known format
     pub fn extract_from_inline_log(
         &self,
-        mut json: Value,
+        json: &mut Value,
         log_source: &str,
         extract_log: Option<&str>,
-    ) -> Value {
+    ) -> Result<(), Unacceptable> {
         let Some(schema) = self.schema_definitions.get(log_source) else {
             warn!("Unknown log format: {log_source}");
-            return json;
+            return Ok(());
         };
 
-        match &mut json {
+        match json {
             Value::Array(list) => {
                 for event in list {
                     let Value::Object(event) = event else {
                         continue;
                     };
-                    schema.per_event_extraction(event, extract_log)
+                    if !schema.check_or_extract(event, extract_log) {
+                        return Err(Unacceptable);
+                    }
+                }
+            }
+            Value::Object(event) => {
+                if !schema.check_or_extract(event, extract_log) {
+                    return Err(Unacceptable);
                 }
             }
-            Value::Object(event) => schema.per_event_extraction(event, extract_log),
             _ => unreachable!("We don't accept events of the form: {json}"),
         }
 
-        json
+        Ok(())
     }
 }
 
@@ -203,75 +230,135 @@ mod tests {
     ]
     "#;
 
-    #[test]
-    fn test_schema_load() {
-        let processor = EventProcessor::new(TEST_CONFIG);
-        assert_eq!(
-            processor.schema_definitions.len(),
-            2,
-            "Expected 2 schema definitions"
-        );
-
-        assert!(processor.schema_definitions.contains_key("apache_access"));
-        assert!(processor.schema_definitions.contains_key("custom_app_log"));
-    }
-
     #[test]
     fn test_apache_log_extraction() {
         let processor = EventProcessor::new(TEST_CONFIG);
-
         let schema = processor.schema_definitions.get("apache_access").unwrap();
-        let log =
-            "192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] \"GET /index.html HTTP/1.1\" 200 2326";
 
-        let result = schema.extract(log);
-        assert!(result.is_some(), "Failed to extract fields from valid log");
+        // Create a mutable object for check_or_extract to modify
+        let mut obj = Map::new();
+        let log_field = "raw_log";
+        obj.insert(log_field.to_string(), Value::String(
+            "192.168.1.1 - - [10/Oct/2023:13:55:36 +0000] \"GET /index.html HTTP/1.1\" 200 2326".to_string()
+        ));
+
+        // Use check_or_extract instead of extract
+        let result = schema.check_or_extract(&mut obj, Some(log_field));
+        assert!(result, "Failed to extract fields from valid log");
 
-        let fields = result.unwrap();
-        assert_eq!(fields.get("ip").unwrap().as_str().unwrap(), "192.168.1.1");
+        // Verify extracted fields were added to the object
+        assert_eq!(obj.get("ip").unwrap().as_str().unwrap(), "192.168.1.1");
         assert_eq!(
-            fields.get("timestamp").unwrap().as_str().unwrap(),
+            obj.get("timestamp").unwrap().as_str().unwrap(),
             "10/Oct/2023:13:55:36 +0000"
         );
-        assert_eq!(fields.get("method").unwrap().as_str().unwrap(), "GET");
-        assert_eq!(fields.get("path").unwrap().as_str().unwrap(), "/index.html");
-        assert_eq!(fields.get("status").unwrap().as_str().unwrap(), "200");
-        assert_eq!(fields.get("bytes").unwrap().as_str().unwrap(), "2326");
+        assert_eq!(obj.get("method").unwrap().as_str().unwrap(), "GET");
+        assert_eq!(obj.get("path").unwrap().as_str().unwrap(), "/index.html");
+        assert_eq!(obj.get("status").unwrap().as_str().unwrap(), "200");
+        assert_eq!(obj.get("bytes").unwrap().as_str().unwrap(), "2326");
     }
 
     #[test]
     fn test_custom_log_extraction() {
         let processor = EventProcessor::new(TEST_CONFIG);
-
         let schema = processor.schema_definitions.get("custom_app_log").unwrap();
-        let log = "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database";
 
-        let result = schema.extract(log);
-        assert!(result.is_some(), "Failed to extract fields from valid log");
+        // Create a mutable object for check_or_extract to modify
+        let mut obj = Map::new();
+        let log_field = "raw_log";
+        obj.insert(
+            log_field.to_string(),
+            Value::String(
+                "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database".to_string(),
+            ),
+        );
+
+        // Use check_or_extract instead of extract
+        let result = schema.check_or_extract(&mut obj, Some(log_field));
+        assert!(result, "Failed to extract fields from valid log");
 
-        let fields = result.unwrap();
-        assert_eq!(fields.get("level").unwrap().as_str().unwrap(), "ERROR");
+        // Verify extracted fields were added to the object
+        assert_eq!(obj.get("level").unwrap().as_str().unwrap(), "ERROR");
         assert_eq!(
-            fields.get("timestamp").unwrap().as_str().unwrap(),
+            obj.get("timestamp").unwrap().as_str().unwrap(),
             "2023-10-10T13:55:36Z"
         );
         assert_eq!(
-            fields.get("message").unwrap().as_str().unwrap(),
+            obj.get("message").unwrap().as_str().unwrap(),
             "Failed to connect to database"
         );
     }
 
     #[test]
-    fn test_no_match() {
+    fn test_fields_already_exist() {
         let processor = EventProcessor::new(TEST_CONFIG);
+        let schema = processor.schema_definitions.get("custom_app_log").unwrap();
+
+        // Create an object that already has all required fields
+        let mut obj = Map::new();
+        obj.insert("level".to_string(), Value::String("ERROR".to_string()));
+        obj.insert(
+            "timestamp".to_string(),
+            Value::String("2023-10-10T13:55:36Z".to_string()),
+        );
+        obj.insert(
+            "message".to_string(),
+            Value::String("Database error".to_string()),
+        );
+
+        // check_or_extract should return true without modifying anything
+        let result = schema.check_or_extract(&mut obj, None);
+        assert!(result, "Should return true when fields already exist");
+
+        // Verify the original values weren't changed
+        assert_eq!(
+            obj.get("message").unwrap().as_str().unwrap(),
+            "Database error"
+        );
+    }
 
+    #[test]
+    fn test_no_match() {
+        let processor = EventProcessor::new(TEST_CONFIG);
         let schema = processor.schema_definitions.get("apache_access").unwrap();
-        let log = "This is not an Apache log line";
 
-        let result = schema.extract(log);
+        // Create an object with non-matching log text
+        let mut obj = Map::new();
+        let log_field = "raw_log";
+        obj.insert(
+            log_field.to_string(),
+            Value::String("This is not an Apache log line".to_string()),
+        );
+
+        // check_or_extract should return false
+        let result = schema.check_or_extract(&mut obj, Some(log_field));
+        assert!(!result, "Should not extract fields from invalid log format");
+
+        // Verify no fields were added
+        assert!(!obj.contains_key("ip"));
+        assert!(!obj.contains_key("method"));
+    }
+
+    #[test]
+    fn test_no_pattern_missing_fields() {
+        // Create a schema definition with no pattern
+        let schema = SchemaDefinition {
+            pattern: None,
+            field_mappings: vec!["field1".to_string(), "field2".to_string()],
+        };
+
+        // Create an object missing the required fields
+        let mut obj = Map::new();
+        obj.insert(
+            "other_field".to_string(),
+            Value::String("value".to_string()),
+        );
+
+        // check_or_extract should return false
+        let result = schema.check_or_extract(&mut obj, Some("log"));
         assert!(
-            result.is_none(),
-            "Should not extract fields from invalid log format"
+            !result,
+            "Should return false when no pattern and missing fields"
         );
     }
 
@@ -279,13 +366,19 @@ mod tests {
     fn test_extract_from_inline_log_object() {
         let processor = EventProcessor::new(TEST_CONFIG);
 
-        let json_value = json!({
+        let mut json_value = json!({
             "id": "12345",
             "raw_log": "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database"
         });
 
-        let result =
-            processor.extract_from_inline_log(json_value, "custom_app_log", Some("raw_log"));
+        // Updated to handle check_or_extract
+        let result = if let Value::Object(ref mut obj) = json_value {
+            let schema = processor.schema_definitions.get("custom_app_log").unwrap();
+            schema.check_or_extract(obj, Some("raw_log"));
+            json_value
+        } else {
+            json_value
+        };
 
         let obj = result.as_object().unwrap();
         assert!(obj.contains_key("level"));
@@ -298,7 +391,7 @@ mod tests {
     fn test_extract_from_inline_log_array() {
         let processor = EventProcessor::new(TEST_CONFIG);
 
-        let json_value = json!([
+        let mut json_value = json!([
             {
                 "id": "12345",
                 "raw_log": "[ERROR] [2023-10-10T13:55:36Z] Failed to connect to database"
@@ -309,10 +402,17 @@ mod tests {
             }
         ]);
 
-        let result =
-            processor.extract_from_inline_log(json_value, "custom_app_log", Some("raw_log"));
+        // Updated to handle check_or_extract for array
+        if let Value::Array(ref mut array) = json_value {
+            for item in array {
+                if let Value::Object(ref mut obj) = item {
+                    let schema = processor.schema_definitions.get("custom_app_log").unwrap();
+                    schema.check_or_extract(obj, Some("raw_log"));
+                }
+            }
+        }
 
-        let array = result.as_array().unwrap();
+        let array = json_value.as_array().unwrap();
         assert_eq!(array.len(), 2);
 
         let first = array[0].as_object().unwrap();
@@ -333,19 +433,41 @@ mod tests {
     #[test]
     fn test_unknown_log_format() {
         let processor = EventProcessor::new(TEST_CONFIG);
-        let json_value = json!({
+        let mut json_value = json!({
             "id": "12345",
             "raw_log": "Some log message"
         });
 
-        let result =
-            processor.extract_from_inline_log(json_value, "nonexistent_format", Some("raw_log"));
+        // Try to extract with a non-existent format
+        if let Value::Object(ref mut obj) = json_value {
+            if let Some(schema) = processor.schema_definitions.get("nonexistent_format") {
+                schema.check_or_extract(obj, Some("raw_log"));
+            }
+        }
 
         // Should return original JSON without modification
-        let obj = result.as_object().unwrap();
+        let obj = json_value.as_object().unwrap();
         assert_eq!(obj.len(), 2);
         assert!(obj.contains_key("id"));
         assert!(obj.contains_key("raw_log"));
         assert!(!obj.contains_key("level"));
     }
+
+    #[test]
+    fn test_missing_log_field() {
+        let processor = EventProcessor::new(TEST_CONFIG);
+        let schema = processor.schema_definitions.get("custom_app_log").unwrap();
+
+        // Create an object that doesn't have the log field
+        let mut obj = Map::new();
+        obj.insert("id".to_string(), Value::String("12345".to_string()));
+
+        // check_or_extract should return false
+        let result = schema.check_or_extract(&mut obj, Some("raw_log"));
+        assert!(!result, "Should return false when log field is missing");
+
+        // Verify no fields were added
+        assert!(!obj.contains_key("level"));
+        assert!(!obj.contains_key("timestamp"));
+    }
 }
diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs
index b593fa4be..94b5c2dbe 100644
--- a/src/handlers/http/ingest.rs
+++ b/src/handlers/http/ingest.rs
@@ -28,6 +28,7 @@ use serde_json::Value;
 
 use crate::event;
 use crate::event::error::EventError;
+use crate::event::format::known_schema::Unacceptable;
 use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
 use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
 use crate::metadata::SchemaVersion;
@@ -347,6 +348,8 @@ pub enum PostError {
     IngestionNotAllowed,
     #[error("Missing field for time partition in json: {0}")]
     MissingTimePartition(String),
+    #[error("Known Format: {0}")]
+    KnownFormat(#[from] Unacceptable),
 }
 
 impl actix_web::ResponseError for PostError {
@@ -373,6 +376,7 @@ impl actix_web::ResponseError for PostError {
             PostError::IncorrectLogSource(_) => StatusCode::BAD_REQUEST,
             PostError::IngestionNotAllowed => StatusCode::BAD_REQUEST,
             PostError::MissingTimePartition(_) => StatusCode::BAD_REQUEST,
+            PostError::KnownFormat(_) => StatusCode::BAD_REQUEST,
         }
     }
 
diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs
index 2c48d7c32..abde1769e 100644
--- a/src/handlers/http/modal/utils/ingest_utils.rs
+++ b/src/handlers/http/modal/utils/ingest_utils.rs
@@ -35,7 +35,7 @@ use crate::{
 };
 
 pub async fn flatten_and_push_logs(
-    json: Value,
+    mut json: Value,
     stream_name: &str,
     log_source: &LogSource,
     extract_log: Option<&str>,
@@ -70,7 +70,7 @@ pub async fn flatten_and_push_logs(
             }
         }
         LogSource::Custom(src) => {
-            let json = KNOWN_SCHEMA_LIST.extract_from_inline_log(json, src, extract_log);
+            KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?;
             push_logs(stream_name, json, log_source).await?;
         }
         _ => push_logs(stream_name, json, log_source).await?,

From a9d81af23d7391c9b7e49606ce992595969cda73 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 14:34:52 +0530
Subject: [PATCH 06/15] fix: accept from custom sources

---
 src/event/format/mod.rs | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs
index 2fccff8ef..cf1485277 100644
--- a/src/event/format/mod.rs
+++ b/src/event/format/mod.rs
@@ -86,7 +86,8 @@ impl From<&str> for LogSource {
             "otel-metrics" => LogSource::OtelMetrics,
             "otel-traces" => LogSource::OtelTraces,
             "pmeta" => LogSource::Pmeta,
-            _ => LogSource::Json,
+            "" | "json" => LogSource::Json,
+            custom => LogSource::Custom(custom.to_owned()),
         }
     }
 }

From 75cf2a150ab0e47cf2a927d51679ea2e0d96a5fc Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 14:47:22 +0530
Subject: [PATCH 07/15] style: error message

---
 src/event/format/known_schema.rs | 8 ++++----
 src/handlers/http/ingest.rs      | 2 +-
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index 25ba11fef..d0539dcbb 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -32,8 +32,8 @@ pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
     Lazy::new(|| EventProcessor::new(FORMATS_JSON));
 
 #[derive(Debug, thiserror::Error)]
-#[error("Unacceptable text/JSON for known log format")]
-pub struct Unacceptable;
+#[error("Event is not in the expected text/JSON format for {0}")]
+pub struct Unacceptable(String);
 
 /// Defines a schema for extracting structured data from logs using regular expressions
 #[derive(Debug)]
@@ -186,13 +186,13 @@ impl EventProcessor {
                         continue;
                     };
                     if !schema.check_or_extract(event, extract_log) {
-                        return Err(Unacceptable);
+                        return Err(Unacceptable(log_source.to_owned()));
                     }
                 }
             }
             Value::Object(event) => {
                 if !schema.check_or_extract(event, extract_log) {
-                    return Err(Unacceptable);
+                    return Err(Unacceptable(log_source.to_owned()));
                 }
             }
             _ => unreachable!("We don't accept events of the form: {json}"),
diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs
index 94b5c2dbe..26bb5d60e 100644
--- a/src/handlers/http/ingest.rs
+++ b/src/handlers/http/ingest.rs
@@ -348,7 +348,7 @@ pub enum PostError {
     IngestionNotAllowed,
     #[error("Missing field for time partition in json: {0}")]
     MissingTimePartition(String),
-    #[error("Known Format: {0}")]
+    #[error("{0}")]
     KnownFormat(#[from] Unacceptable),
 }
 

From b40cbe6913a9b8b0b512946594e8cee947e1da64 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 14:56:09 +0530
Subject: [PATCH 08/15] fix: consider single capture groups

---
 src/event/format/known_schema.rs | 29 +++++++++++++++++------------
 1 file changed, 17 insertions(+), 12 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index d0539dcbb..2110ae652 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -16,7 +16,7 @@
  *
  */
 
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 
 use once_cell::sync::Lazy;
 use regex::Regex;
@@ -41,7 +41,7 @@ pub struct SchemaDefinition {
     /// Regular expression pattern used to match and capture fields from log strings
     pattern: Option<Regex>,
     // Maps field names to regex capture groups
-    field_mappings: Vec<String>,
+    field_mappings: Vec<HashSet<String>>,
 }
 
 impl SchemaDefinition {
@@ -65,7 +65,7 @@ impl SchemaDefinition {
         if self
             .field_mappings
             .iter()
-            .all(|field| obj.contains_key(field))
+            .any(|fields| fields.iter().all(|field| obj.contains_key(field)))
         {
             return true;
         }
@@ -87,7 +87,7 @@ impl SchemaDefinition {
         let mut extracted_fields = Map::new();
 
         // With named capture groups, you can iterate over the field names
-        for field_name in self.field_mappings.iter() {
+        for field_name in self.field_mappings.iter().flatten() {
             if let Some(value) = captures.name(field_name) {
                 extracted_fields.insert(
                     field_name.to_owned(),
@@ -113,7 +113,7 @@ struct Format {
 #[derive(Debug, Deserialize)]
 struct Pattern {
     pattern: Option<String>,
-    fields: Vec<String>,
+    fields: HashSet<String>,
 }
 
 /// Manages a collection of schema definitions for various log formats
@@ -145,13 +145,15 @@ impl EventProcessor {
                         .ok()
                 });
 
-                processor.schema_definitions.insert(
-                    format.name.clone(),
-                    SchemaDefinition {
+                let schema = processor
+                    .schema_definitions
+                    .entry(format.name.clone())
+                    .or_insert_with(|| SchemaDefinition {
                         pattern,
-                        field_mappings: regex.fields.clone(),
-                    },
-                );
+                        field_mappings: vec![],
+                    });
+
+                schema.field_mappings.push(regex.fields.clone());
             }
         }
 
@@ -344,7 +346,10 @@ mod tests {
         // Create a schema definition with no pattern
         let schema = SchemaDefinition {
             pattern: None,
-            field_mappings: vec!["field1".to_string(), "field2".to_string()],
+            field_mappings: vec![HashSet::from_iter([
+                "field1".to_string(),
+                "field2".to_string(),
+            ])],
         };
 
         // Create an object missing the required fields

From 9bcada9a7f0babbac768b942fd822ab415380039 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 15:20:52 +0530
Subject: [PATCH 09/15] fix: patterns are distinct

---
 src/event/format/known_schema.rs | 70 ++++++++++++++++----------------
 1 file changed, 34 insertions(+), 36 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index 2110ae652..42e43878b 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -36,10 +36,10 @@ pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
 pub struct Unacceptable(String);
 
 /// Defines a schema for extracting structured data from logs using regular expressions
-#[derive(Debug)]
+#[derive(Debug, Default)]
 pub struct SchemaDefinition {
     /// Regular expression pattern used to match and capture fields from log strings
-    pattern: Option<Regex>,
+    patterns: Vec<Regex>,
     // Maps field names to regex capture groups
     field_mappings: Vec<HashSet<String>>,
 }
@@ -70,10 +70,6 @@ impl SchemaDefinition {
             return true;
         }
 
-        let Some(pattern) = self.pattern.as_ref() else {
-            return false;
-        };
-
         let Some(event) = extract_log
             .and_then(|field| obj.get(field))
             .and_then(|s| s.as_str())
@@ -81,24 +77,28 @@ impl SchemaDefinition {
             return false;
         };
 
-        let Some(captures) = pattern.captures(event) else {
-            return false;
-        };
-        let mut extracted_fields = Map::new();
-
-        // With named capture groups, you can iterate over the field names
-        for field_name in self.field_mappings.iter().flatten() {
-            if let Some(value) = captures.name(field_name) {
-                extracted_fields.insert(
-                    field_name.to_owned(),
-                    Value::String(value.as_str().to_string()),
-                );
+        for pattern in self.patterns.iter() {
+            let Some(captures) = pattern.captures(event) else {
+                continue;
+            };
+            let mut extracted_fields = Map::new();
+
+            // With named capture groups, you can iterate over the field names
+            for field_name in self.field_mappings.iter().flatten() {
+                if let Some(value) = captures.name(field_name) {
+                    extracted_fields.insert(
+                        field_name.to_owned(),
+                        Value::String(value.as_str().to_string()),
+                    );
+                }
             }
-        }
 
-        obj.extend(extracted_fields);
+            obj.extend(extracted_fields);
+
+            return true;
+        }
 
-        true
+        false
     }
 }
 
@@ -134,26 +134,24 @@ impl EventProcessor {
             serde_json::from_str(json_text).expect("Known formats are stored as JSON text");
 
         for format in formats {
-            for regex in &format.regex {
+            for regex in format.regex {
+                let schema = processor
+                    .schema_definitions
+                    .entry(format.name.clone())
+                    .or_insert_with(SchemaDefinition::default);
+
+                schema.field_mappings.push(regex.fields.clone());
                 // Compile the regex pattern if present
                 // NOTE: we only warn if the pattern doesn't compile
-                let pattern = regex.pattern.as_ref().and_then(|pattern| {
-                    Regex::new(pattern)
+                if let Some(pattern) = regex.pattern.and_then(|pattern| {
+                    Regex::new(&pattern)
                         .inspect_err(|err| {
                             error!("Error compiling regex pattern: {err}; Pattern: {pattern}")
                         })
                         .ok()
-                });
-
-                let schema = processor
-                    .schema_definitions
-                    .entry(format.name.clone())
-                    .or_insert_with(|| SchemaDefinition {
-                        pattern,
-                        field_mappings: vec![],
-                    });
-
-                schema.field_mappings.push(regex.fields.clone());
+                }) {
+                    schema.patterns.push(pattern);
+                }
             }
         }
 
@@ -345,7 +343,7 @@ mod tests {
     fn test_no_pattern_missing_fields() {
         // Create a schema definition with no pattern
         let schema = SchemaDefinition {
-            pattern: None,
+            patterns: vec![],
             field_mappings: vec![HashSet::from_iter([
                 "field1".to_string(),
                 "field2".to_string(),

From f5479bbcf9edf6ea803bf505c2672da3f2a48633 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 16:05:19 +0530
Subject: [PATCH 10/15] feat: update log source entry

---
 src/event/format/known_schema.rs              | 125 ++++++++++--------
 src/handlers/http/ingest.rs                   |  42 ++++--
 src/handlers/http/modal/utils/ingest_utils.rs |   9 +-
 3 files changed, 103 insertions(+), 73 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index 42e43878b..d41de9dc0 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
 
 use once_cell::sync::Lazy;
 use regex::Regex;
-use serde::Deserialize;
+use serde::{Deserialize, Deserializer};
 use serde_json::{Map, Value};
 use tracing::{error, warn};
 
@@ -35,13 +35,35 @@ pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
 #[error("Event is not in the expected text/JSON format for {0}")]
 pub struct Unacceptable(String);
 
+/// Deserializes a string pattern into a compiled Regex
+/// NOTE: we only warn if the pattern doesn't compile
+pub fn deserialize_regex<'de, D>(deserializer: D) -> Result<Option<Regex>, D::Error>
+where
+    D: Deserializer<'de>,
+{
+    let pattern = String::deserialize(deserializer)?;
+
+    let regex = Regex::new(&pattern)
+        .inspect_err(|err| error!("Error compiling regex pattern: {err}; Pattern: {pattern}"))
+        .ok();
+
+    Ok(regex)
+}
+
+/// Configuration for a single pattern within a log format
+#[derive(Debug, Default, Deserialize)]
+struct Pattern {
+    /// Regular expression pattern used to match and capture fields from log strings
+    #[serde(deserialize_with = "deserialize_regex")]
+    pattern: Option<Regex>,
+    // Maps field names to regex capture groups
+    fields: HashSet<String>,
+}
+
 /// Defines a schema for extracting structured data from logs using regular expressions
 #[derive(Debug, Default)]
 pub struct SchemaDefinition {
-    /// Regular expression pattern used to match and capture fields from log strings
-    patterns: Vec<Regex>,
-    // Maps field names to regex capture groups
-    field_mappings: Vec<HashSet<String>>,
+    patterns: Vec<Pattern>,
 }
 
 impl SchemaDefinition {
@@ -55,36 +77,40 @@ impl SchemaDefinition {
     /// * `extract_log` - Optional field name containing the raw log text
     ///
     /// # Returns
-    /// * `true` - If all expected fields are already present in the object OR if extraction was successful
-    /// * `false` - If extraction failed or no pattern was available and fields were missing
+    /// * `Some` - If all expected fields are already present in the object OR if extraction was successful
+    ///            Contains fields present in catch group
+    /// * `None` - If extraction failed or no pattern was available and fields were missing
     pub fn check_or_extract(
         &self,
         obj: &mut Map<String, Value>,
         extract_log: Option<&str>,
-    ) -> bool {
-        if self
-            .field_mappings
+    ) -> Option<HashSet<String>> {
+        if let Some(pattern) = self
+            .patterns
             .iter()
-            .any(|fields| fields.iter().all(|field| obj.contains_key(field)))
+            .find(|pattern| pattern.fields.iter().all(|field| obj.contains_key(field)))
         {
-            return true;
+            return Some(pattern.fields.clone());
         }
 
         let Some(event) = extract_log
             .and_then(|field| obj.get(field))
             .and_then(|s| s.as_str())
         else {
-            return false;
+            return None;
         };
 
-        for pattern in self.patterns.iter() {
+        for format in self.patterns.iter() {
+            let Some(pattern) = format.pattern.as_ref() else {
+                continue;
+            };
             let Some(captures) = pattern.captures(event) else {
                 continue;
             };
             let mut extracted_fields = Map::new();
 
             // With named capture groups, you can iterate over the field names
-            for field_name in self.field_mappings.iter().flatten() {
+            for field_name in format.fields.iter() {
                 if let Some(value) = captures.name(field_name) {
                     extracted_fields.insert(
                         field_name.to_owned(),
@@ -95,10 +121,10 @@ impl SchemaDefinition {
 
             obj.extend(extracted_fields);
 
-            return true;
+            return Some(format.fields.clone());
         }
 
-        false
+        None
     }
 }
 
@@ -109,13 +135,6 @@ struct Format {
     regex: Vec<Pattern>,
 }
 
-/// Configuration for a single pattern within a log format
-#[derive(Debug, Deserialize)]
-struct Pattern {
-    pattern: Option<String>,
-    fields: HashSet<String>,
-}
-
 /// Manages a collection of schema definitions for various log formats
 #[derive(Debug)]
 pub struct EventProcessor {
@@ -140,18 +159,7 @@ impl EventProcessor {
                     .entry(format.name.clone())
                     .or_insert_with(SchemaDefinition::default);
 
-                schema.field_mappings.push(regex.fields.clone());
-                // Compile the regex pattern if present
-                // NOTE: we only warn if the pattern doesn't compile
-                if let Some(pattern) = regex.pattern.and_then(|pattern| {
-                    Regex::new(&pattern)
-                        .inspect_err(|err| {
-                            error!("Error compiling regex pattern: {err}; Pattern: {pattern}")
-                        })
-                        .ok()
-                }) {
-                    schema.patterns.push(pattern);
-                }
+                schema.patterns.push(regex);
             }
         }
 
@@ -173,32 +181,37 @@ impl EventProcessor {
         json: &mut Value,
         log_source: &str,
         extract_log: Option<&str>,
-    ) -> Result<(), Unacceptable> {
+    ) -> Result<HashSet<String>, Unacceptable> {
         let Some(schema) = self.schema_definitions.get(log_source) else {
             warn!("Unknown log format: {log_source}");
-            return Ok(());
+            return Ok(Default::default());
         };
 
+        let mut fields = HashSet::new();
         match json {
             Value::Array(list) => {
                 for event in list {
                     let Value::Object(event) = event else {
                         continue;
                     };
-                    if !schema.check_or_extract(event, extract_log) {
+                    if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
+                        fields.extend(known_fields);
+                    } else {
                         return Err(Unacceptable(log_source.to_owned()));
                     }
                 }
             }
             Value::Object(event) => {
-                if !schema.check_or_extract(event, extract_log) {
+                if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
+                    return Ok(known_fields);
+                } else {
                     return Err(Unacceptable(log_source.to_owned()));
                 }
             }
             _ => unreachable!("We don't accept events of the form: {json}"),
         }
 
-        Ok(())
+        Ok(fields)
     }
 }
 
@@ -244,7 +257,7 @@ mod tests {
 
         // Use check_or_extract instead of extract
         let result = schema.check_or_extract(&mut obj, Some(log_field));
-        assert!(result, "Failed to extract fields from valid log");
+        assert!(result.is_some(), "Failed to extract fields from valid log");
 
         // Verify extracted fields were added to the object
         assert_eq!(obj.get("ip").unwrap().as_str().unwrap(), "192.168.1.1");
@@ -275,7 +288,7 @@ mod tests {
 
         // Use check_or_extract instead of extract
         let result = schema.check_or_extract(&mut obj, Some(log_field));
-        assert!(result, "Failed to extract fields from valid log");
+        assert!(result.is_some(), "Failed to extract fields from valid log");
 
         // Verify extracted fields were added to the object
         assert_eq!(obj.get("level").unwrap().as_str().unwrap(), "ERROR");
@@ -308,7 +321,10 @@ mod tests {
 
         // check_or_extract should return true without modifying anything
         let result = schema.check_or_extract(&mut obj, None);
-        assert!(result, "Should return true when fields already exist");
+        assert!(
+            result.is_some(),
+            "Should return true when fields already exist"
+        );
 
         // Verify the original values weren't changed
         assert_eq!(
@@ -332,7 +348,10 @@ mod tests {
 
         // check_or_extract should return false
         let result = schema.check_or_extract(&mut obj, Some(log_field));
-        assert!(!result, "Should not extract fields from invalid log format");
+        assert!(
+            result.is_none(),
+            "Should not extract fields from invalid log format"
+        );
 
         // Verify no fields were added
         assert!(!obj.contains_key("ip"));
@@ -343,11 +362,10 @@ mod tests {
     fn test_no_pattern_missing_fields() {
         // Create a schema definition with no pattern
         let schema = SchemaDefinition {
-            patterns: vec![],
-            field_mappings: vec![HashSet::from_iter([
-                "field1".to_string(),
-                "field2".to_string(),
-            ])],
+            patterns: vec![Pattern {
+                pattern: None,
+                fields: HashSet::from_iter(["field1".to_string(), "field2".to_string()]),
+            }],
         };
 
         // Create an object missing the required fields
@@ -360,7 +378,7 @@ mod tests {
         // check_or_extract should return false
         let result = schema.check_or_extract(&mut obj, Some("log"));
         assert!(
-            !result,
+            result.is_none(),
             "Should return false when no pattern and missing fields"
         );
     }
@@ -467,7 +485,10 @@ mod tests {
 
         // check_or_extract should return false
         let result = schema.check_or_extract(&mut obj, Some("raw_log"));
-        assert!(!result, "Should return false when log field is missing");
+        assert!(
+            result.is_none(),
+            "Should return false when log field is missing"
+        );
 
         // Verify no fields were added
         assert!(!obj.contains_key("level"));
diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs
index 26bb5d60e..139a8e89f 100644
--- a/src/handlers/http/ingest.rs
+++ b/src/handlers/http/ingest.rs
@@ -28,7 +28,7 @@ use serde_json::Value;
 
 use crate::event;
 use crate::event::error::EventError;
-use crate::event::format::known_schema::Unacceptable;
+use crate::event::format::known_schema::{Unacceptable, KNOWN_SCHEMA_LIST};
 use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
 use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
 use crate::metadata::SchemaVersion;
@@ -49,7 +49,7 @@ use super::users::filters::FiltersError;
 // Handler for POST /api/v1/ingest
 // ingests events by extracting stream name from header
 // creates if stream does not exist
-pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpResponse, PostError> {
+pub async fn ingest(req: HttpRequest, Json(mut json): Json<Value>) -> Result<HttpResponse, PostError> {
     let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
         return Err(PostError::Header(ParseHeaderError::MissingStreamName));
     };
@@ -78,7 +78,18 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
         return Err(PostError::OtelNotSupported);
     }
 
-    let log_source_entry = LogSourceEntry::new(log_source.clone(), HashSet::new());
+    let fields = match &log_source {
+        LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
+            return Err(PostError::OtelNotSupported)
+        }
+        LogSource::Custom(src) => {
+            KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?
+        }
+        _ => HashSet::new(),
+    };
+
+    let log_source_entry = LogSourceEntry::new(log_source.clone(), fields);
+
     PARSEABLE
         .create_stream_if_not_exists(
             &stream_name,
@@ -87,7 +98,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source, extract_log).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -150,7 +161,7 @@ pub async fn handle_otel_logs_ingestion(
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -188,7 +199,7 @@ pub async fn handle_otel_metrics_ingestion(
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -228,7 +239,7 @@ pub async fn handle_otel_traces_ingestion(
         )
         .await?;
 
-    flatten_and_push_logs(json, &stream_name, &log_source, None).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
@@ -239,7 +250,7 @@ pub async fn handle_otel_traces_ingestion(
 pub async fn post_event(
     req: HttpRequest,
     stream_name: Path<String>,
-    Json(json): Json<Value>,
+    Json(mut json): Json<Value>,
 ) -> Result<HttpResponse, PostError> {
     let stream_name = stream_name.into_inner();
 
@@ -275,14 +286,17 @@ pub async fn post_event(
         .get(EXTRACT_LOG_KEY)
         .and_then(|h| h.to_str().ok());
 
-    if matches!(
-        log_source,
-        LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces
-    ) {
-        return Err(PostError::OtelNotSupported);
+    match &log_source {
+        LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
+            return Err(PostError::OtelNotSupported)
+        }
+        LogSource::Custom(src) => {
+            KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?;
+        }
+        _ => {}
     }
 
-    flatten_and_push_logs(json, &stream_name, &log_source, extract_log).await?;
+    flatten_and_push_logs(json, &stream_name, &log_source).await?;
 
     Ok(HttpResponse::Ok().finish())
 }
diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs
index abde1769e..487f24305 100644
--- a/src/handlers/http/modal/utils/ingest_utils.rs
+++ b/src/handlers/http/modal/utils/ingest_utils.rs
@@ -23,7 +23,7 @@ use opentelemetry_proto::tonic::{
 use serde_json::Value;
 
 use crate::{
-    event::format::{json, known_schema::KNOWN_SCHEMA_LIST, EventFormat, LogSource},
+    event::format::{json, EventFormat, LogSource},
     handlers::http::{
         ingest::PostError,
         kinesis::{flatten_kinesis_logs, Message},
@@ -35,10 +35,9 @@ use crate::{
 };
 
 pub async fn flatten_and_push_logs(
-    mut json: Value,
+    json: Value,
     stream_name: &str,
     log_source: &LogSource,
-    extract_log: Option<&str>,
 ) -> Result<(), PostError> {
     match log_source {
         LogSource::Kinesis => {
@@ -69,10 +68,6 @@ pub async fn flatten_and_push_logs(
                 push_logs(stream_name, record, log_source).await?;
             }
         }
-        LogSource::Custom(src) => {
-            KNOWN_SCHEMA_LIST.extract_from_inline_log(&mut json, src, extract_log)?;
-            push_logs(stream_name, json, log_source).await?;
-        }
         _ => push_logs(stream_name, json, log_source).await?,
     }
 

From a6742366cc4f26e606e10bdbd1516db7cb4de22b Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 16:11:35 +0530
Subject: [PATCH 11/15] fix: allow empty

---
 src/event/format/known_schema.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index d41de9dc0..76e3fda45 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -54,7 +54,7 @@ where
 #[derive(Debug, Default, Deserialize)]
 struct Pattern {
     /// Regular expression pattern used to match and capture fields from log strings
-    #[serde(deserialize_with = "deserialize_regex")]
+    #[serde(deserialize_with = "deserialize_regex", default)]
     pattern: Option<Regex>,
     // Maps field names to regex capture groups
     fields: HashSet<String>,

From a5288204b40697a13938f2955057b1495282d957 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 16:37:44 +0530
Subject: [PATCH 12/15] ci: deepsource

---
 src/event/format/known_schema.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index 76e3fda45..c25cc3ca1 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -184,7 +184,7 @@ impl EventProcessor {
     ) -> Result<HashSet<String>, Unacceptable> {
         let Some(schema) = self.schema_definitions.get(log_source) else {
             warn!("Unknown log format: {log_source}");
-            return Ok(Default::default());
+            return Ok(HashSet::new());
         };
 
         let mut fields = HashSet::new();

From d747940c82a3bc7f68fb2eaf297aae3ae738bfdf Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 16:51:51 +0530
Subject: [PATCH 13/15] refactor: clippy suggestions

---
 src/event/format/known_schema.rs | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index c25cc3ca1..2ac1a68d8 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -93,12 +93,9 @@ impl SchemaDefinition {
             return Some(pattern.fields.clone());
         }
 
-        let Some(event) = extract_log
+        let event = extract_log
             .and_then(|field| obj.get(field))
-            .and_then(|s| s.as_str())
-        else {
-            return None;
-        };
+            .and_then(|s| s.as_str())?;
 
         for format in self.patterns.iter() {
             let Some(pattern) = format.pattern.as_ref() else {
@@ -157,7 +154,7 @@ impl EventProcessor {
                 let schema = processor
                     .schema_definitions
                     .entry(format.name.clone())
-                    .or_insert_with(SchemaDefinition::default);
+                    .or_default();
 
                 schema.patterns.push(regex);
             }

From 96dd957800f233f36f0373de9d4d5fc4870b3e84 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 16:56:33 +0530
Subject: [PATCH 14/15] fix: don't accept unknown log format

---
 src/event/format/known_schema.rs | 19 +++++++++++--------
 src/handlers/http/ingest.rs      |  9 ++++++---
 2 files changed, 17 insertions(+), 11 deletions(-)

diff --git a/src/event/format/known_schema.rs b/src/event/format/known_schema.rs
index 2ac1a68d8..2a420e10e 100644
--- a/src/event/format/known_schema.rs
+++ b/src/event/format/known_schema.rs
@@ -22,7 +22,7 @@ use once_cell::sync::Lazy;
 use regex::Regex;
 use serde::{Deserialize, Deserializer};
 use serde_json::{Map, Value};
-use tracing::{error, warn};
+use tracing::error;
 
 /// Predefined JSON with known textual logging formats
 const FORMATS_JSON: &str = include_str!("../../../resources/formats.json");
@@ -32,8 +32,12 @@ pub static KNOWN_SCHEMA_LIST: Lazy<EventProcessor> =
     Lazy::new(|| EventProcessor::new(FORMATS_JSON));
 
 #[derive(Debug, thiserror::Error)]
-#[error("Event is not in the expected text/JSON format for {0}")]
-pub struct Unacceptable(String);
+pub enum Error {
+    #[error("Event is not in the expected text/JSON format for {0}")]
+    Unacceptable(String),
+    #[error("Unknown log format: {0}")]
+    Unknown(String),
+}
 
 /// Deserializes a string pattern into a compiled Regex
 /// NOTE: we only warn if the pattern doesn't compile
@@ -178,10 +182,9 @@ impl EventProcessor {
         json: &mut Value,
         log_source: &str,
         extract_log: Option<&str>,
-    ) -> Result<HashSet<String>, Unacceptable> {
+    ) -> Result<HashSet<String>, Error> {
         let Some(schema) = self.schema_definitions.get(log_source) else {
-            warn!("Unknown log format: {log_source}");
-            return Ok(HashSet::new());
+            return Err(Error::Unknown(log_source.to_owned()));
         };
 
         let mut fields = HashSet::new();
@@ -194,7 +197,7 @@ impl EventProcessor {
                     if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
                         fields.extend(known_fields);
                     } else {
-                        return Err(Unacceptable(log_source.to_owned()));
+                        return Err(Error::Unacceptable(log_source.to_owned()));
                     }
                 }
             }
@@ -202,7 +205,7 @@ impl EventProcessor {
                 if let Some(known_fields) = schema.check_or_extract(event, extract_log) {
                     return Ok(known_fields);
                 } else {
-                    return Err(Unacceptable(log_source.to_owned()));
+                    return Err(Error::Unacceptable(log_source.to_owned()));
                 }
             }
             _ => unreachable!("We don't accept events of the form: {json}"),
diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs
index 139a8e89f..c38107f09 100644
--- a/src/handlers/http/ingest.rs
+++ b/src/handlers/http/ingest.rs
@@ -28,7 +28,7 @@ use serde_json::Value;
 
 use crate::event;
 use crate::event::error::EventError;
-use crate::event::format::known_schema::{Unacceptable, KNOWN_SCHEMA_LIST};
+use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
 use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
 use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
 use crate::metadata::SchemaVersion;
@@ -49,7 +49,10 @@ use super::users::filters::FiltersError;
 // Handler for POST /api/v1/ingest
 // ingests events by extracting stream name from header
 // creates if stream does not exist
-pub async fn ingest(req: HttpRequest, Json(mut json): Json<Value>) -> Result<HttpResponse, PostError> {
+pub async fn ingest(
+    req: HttpRequest,
+    Json(mut json): Json<Value>,
+) -> Result<HttpResponse, PostError> {
     let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
         return Err(PostError::Header(ParseHeaderError::MissingStreamName));
     };
@@ -363,7 +366,7 @@ pub enum PostError {
     #[error("Missing field for time partition in json: {0}")]
     MissingTimePartition(String),
     #[error("{0}")]
-    KnownFormat(#[from] Unacceptable),
+    KnownFormat(#[from] known_schema::Error),
 }
 
 impl actix_web::ResponseError for PostError {

From b40779c4297c584cf68094bfdecdc3983b5b1448 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Wed, 19 Mar 2025 20:17:44 +0530
Subject: [PATCH 15/15] test: fix expectation

---
 src/handlers/http/logstream.rs | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs
index a3a2096ae..6d095d7b0 100644
--- a/src/handlers/http/logstream.rs
+++ b/src/handlers/http/logstream.rs
@@ -572,7 +572,9 @@ pub mod error {
 
 #[cfg(test)]
 mod tests {
-    use crate::handlers::http::modal::utils::logstream_utils::PutStreamHeaders;
+    use crate::{
+        event::format::LogSource, handlers::http::modal::utils::logstream_utils::PutStreamHeaders,
+    };
     use actix_web::test::TestRequest;
 
     // TODO: Fix this test with routes
@@ -597,7 +599,7 @@ mod tests {
     async fn header_without_log_source() {
         let req = TestRequest::default().to_http_request();
         let PutStreamHeaders { log_source, .. } = req.headers().into();
-        assert_eq!(log_source, crate::event::format::LogSource::Json);
+        assert_eq!(log_source, LogSource::Json);
     }
 
     #[actix_web::test]
@@ -606,19 +608,19 @@ mod tests {
             .insert_header(("X-P-Log-Source", "pmeta"))
             .to_http_request();
         let PutStreamHeaders { log_source, .. } = req.headers().into();
-        assert_eq!(log_source, crate::event::format::LogSource::Pmeta);
+        assert_eq!(log_source, LogSource::Pmeta);
 
         req = TestRequest::default()
             .insert_header(("X-P-Log-Source", "otel-logs"))
             .to_http_request();
         let PutStreamHeaders { log_source, .. } = req.headers().into();
-        assert_eq!(log_source, crate::event::format::LogSource::OtelLogs);
+        assert_eq!(log_source, LogSource::OtelLogs);
 
         req = TestRequest::default()
             .insert_header(("X-P-Log-Source", "kinesis"))
             .to_http_request();
         let PutStreamHeaders { log_source, .. } = req.headers().into();
-        assert_eq!(log_source, crate::event::format::LogSource::Kinesis);
+        assert_eq!(log_source, LogSource::Kinesis);
     }
 
     #[actix_web::test]
@@ -627,6 +629,9 @@ mod tests {
             .insert_header(("X-P-Log-Source", "teststream"))
             .to_http_request();
         let PutStreamHeaders { log_source, .. } = req.headers().into();
-        assert_eq!(log_source, crate::event::format::LogSource::Json);
+        matches!(
+            log_source,
+            LogSource::Custom(src) if src == "teststream"
+        );
     }
 }