Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to ingest data from kafka in protobuf format #17620

Open
obozhinov opened this issue Jan 13, 2025 · 0 comments
Open

Unable to ingest data from kafka in protobuf format #17620

obozhinov opened this issue Jan 13, 2025 · 0 comments

Comments

@obozhinov
Copy link

I'm running apache/druid:31.0.0 in Docker locally.

syntax = "proto3";

package com.example.kafka.event.v1;

option java_multiple_files=true;
option java_package = "com.example.kafka.event.v1";

message Event {
  int64 timestamp = 1; 
  optional string identity_id = 2;
  optional string clientId = 3;
  optional string firstName = 4;
  optional string lastName = 5;
  optional string email = 6;
  optional string phone = 7;
  optional string productId = 8;
  optional int32 allItems = 9;
  optional int32 usedItems = 10;
}

I used this command to generate the .desc protoc -o event.desc --proto_path=proto proto/com/example/kafka/event/v1/event.proto
And mounted it in this folder /opt/druid/proto-descriptors/
In my KafkaProducer I use the Event class and these properties

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                KafkaProtobufSerializer.class);
        properties.put(KafkaProtobufSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8084");

I can access the event.desc when I exec in the broker/historical/middlemanager
This is the config for Druid

"protoBytesDecoder": {
          "type": "file",
          "descriptor": "file:///opt/druid/proto-descriptors/event.desc",
          "protoMessageType": "com.example.kafka.event.v1.Event"
        }

But I get this error either in supervisor or just using Load data tab

Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.FileBasedProtobufBytesDecoder`, problem: Cannot read descriptor file: file:/opt/druid/proto-descriptors/event.desc at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 341] (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])

I tried Schema registry as well. I did a POST to locahlost:8084/subjects/test/vesrions with payload
{
"schemaType": "PROTOBUF",
"schema": "syntax = "proto3";\n\npackage com.example.kafka.event.v1;\n\n option java_multiple_files=true;\n option java_package = "com.example.kafka.event.v1";\n\n message Event {\n int64 timestamp = 1;\n optional string identity_id = 2;\n optional string clientId = 3;\n optional string firstName = 4;\n optional string lastName = 5;\n optional string email = 6;\n optional string phone = 7;\n optional string productId = 8;\n optional int32 allItems = 9;\n optional int32 usedItems = 10;\n}"
}

Downloaded all suggested libraries and put them in (tried both) /opt/druid/extensions/protobuf-extensions and /opt/druid/extensions/druid-protobuf-extensions also added in environment file DRUID_CLASSPATH=/opt/druid/extensions/druid-protobuf-extensions/*:/opt/druid/proto-descriptors/*(I tried echo $DRUID_CLASSPATH and it had only -CLASSPATH, so that's not really working):

  • common-config-6.0.1.jar
  • common-utils-6.0.1.jar
  • kafka-protobuf-provider-6.0.1.jar
  • kafka-protobuf-serializer-6.0.1.jar
  • kafka-schema-registry-client-6.0.1.jar
  • kotlin-stdlib-1.4.0.jar
  • wire-schema-3.2.2.jar

I did also try just the suggested ones in the repo (still not working):

When I try to submit a supervisor

{
  "type": "kafka",
  "spec": {
    "ioConfig": {
      "type": "kafka",
      "consumerProperties": {
        "bootstrap.servers": "localhost:9092"
      },
      "topic": "events",
      "inputFormat": {
        "type": "protobuf",
        "protoBytesDecoder": {
          "type": "schema_registry",
          "url": "http://localhost:8084"
        }
      },
      "useEarliestOffset": true
    },
    "tuningConfig": {
      "type": "kafka"
    },
    "dataSchema": {
      "dataSource": "events-proto",
      "timestampSpec": {
        "column": "time",
        "format": "posix"
      },
      "dimensionsSpec": {
        "dimensions": [
          "identity_id",
          "clientId",
          "firstName",
          "lastName",
          "email",
          "phone",
          "productId",
           {
            "type": "long",
            "name": "allItems"
          },
          {
            "type": "long",
            "name": "usedItems"
          }
        ]
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": false,
        "segmentGranularity": "day"
      }
    }
  }
}

I get this:

Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder`, problem: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 237] (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])

I tried all the steps provided and even more, but I'm not getting anywhere. The data is being read from the topic, but I cannot parse it.
https://github.com/Godin/apache-druid/blob/master/docs/development/extensions-core/protobuf.md

When I tried the /extentions/protobuf-extentions folder from this example, the broker/historical and middle manager just all crashed
https://blog.hellmar-becker.de/2022/05/26/ingesting-protobuf-messages-into-apache-druid/

Feels like something is missing from the explanations in the documentation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant