Skip to content

Commit

Permalink
Added example for loading schemas and fixed schema parser to use name…
Browse files Browse the repository at this point in the history
…spaces. re #16
  • Loading branch information
serejja committed Mar 25, 2015
1 parent ab31b48 commit 8624703
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 32 deletions.
3 changes: 2 additions & 1 deletion datum_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ func (this *GenericDatumReader) readValue(field Schema, dec Decoder) (interface{
return this.mapFixed(field, dec)
case Record:
return this.mapRecord(field, dec)
//TODO recursive types
case Recursive:
return this.mapRecord(field.(*RecursiveSchema).Actual, dec)
}

return nil, fmt.Errorf("Unknown field type: %s", field.Type())
Expand Down
85 changes: 85 additions & 0 deletions examples/load_schema/load_schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/* Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

package main

import (
"bytes"
"fmt"
"github.com/stealthly/go-avro"
)

func main() {
// Define a schema that does not contain information about type "User"
dependentSchema := `{ "type": "record",
"namespace": "example.avro",
"name": "System",
"fields": [
{"name": "user", "type": "User"}
]
}`

// Load schemas in a given folder, where the information about type "User" is provided
schemas := avro.LoadSchemas("schemas/")

// Now parse the dependent schema providing it loaded schemas
// It should now be able to resolve type "User" as it is in the same namespace as type "System"
schema, err := avro.ParseSchemaWithRegistry(dependentSchema, schemas)
if err != nil {
panic(err)
}

// Try it out and write a GenericRecord to verify everything works fine
user := avro.NewGenericRecord(schema)
user.Set("name", "Some User")

record := avro.NewGenericRecord(schema)
record.Set("user", user)

writer := avro.NewGenericDatumWriter()
// SetSchema must be called before calling Write
writer.SetSchema(schema)

// Create a new Buffer and Encoder to write to this Buffer
buffer := new(bytes.Buffer)
encoder := avro.NewBinaryEncoder(buffer)

// Write the record
writer.Write(record, encoder)

reader := avro.NewGenericDatumReader()
// SetSchema must be called before calling Read
reader.SetSchema(schema)

// Create a new Decoder with a given buffer
decoder := avro.NewBinaryDecoder(buffer.Bytes())

// Read a new GenericRecord with a given Decoder. The first parameter to Read should be nil for GenericDatumReader
decodedRecord, err := reader.Read(nil, decoder)
if err != nil {
panic(err)
}

// GenericDatumReader always returns a *avro.GenericRecord, so it's safe to do a type assertion
decodedGenericRecord := decodedRecord.(*avro.GenericRecord)
// Check whether the user got decoded correctly
decodedUser := decodedGenericRecord.Get("user")
decodedUserName := decodedUser.(*avro.GenericRecord).Get("name")
if decodedUserName.(string) != "Some User" {
panic("Something went terribly wrong!")
}

fmt.Printf("Got a user name back: %s\n", decodedUserName)
}
7 changes: 7 additions & 0 deletions examples/load_schema/schemas/user.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"}
]
}
57 changes: 28 additions & 29 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,10 +687,10 @@ func ParseSchemaWithRegistry(rawSchema string, schemas map[string]Schema) (Schem
schema = rawSchema
}

return schemaByType(schema, schemas)
return schemaByType(schema, schemas, "")
}

func schemaByType(i interface{}, registry map[string]Schema) (Schema, error) {
func schemaByType(i interface{}, registry map[string]Schema, namespace string) (Schema, error) {
switch v := i.(type) {
case nil:
return new(NullSchema), nil
Expand All @@ -713,15 +713,15 @@ func schemaByType(i interface{}, registry map[string]Schema) (Schema, error) {
case type_string:
return new(StringSchema), nil
default:
schema, ok := registry[v]
schema, ok := registry[getFullName(v, namespace)]
if !ok {
return nil, fmt.Errorf("Unknown type name: %s", v)
}

return schema, nil
}
case map[string][]interface{}:
return parseUnionSchema(v[schema_typeField], registry)
return parseUnionSchema(v[schema_typeField], registry, namespace)
case map[string]interface{}:
switch v[schema_typeField] {
case type_null:
Expand All @@ -741,32 +741,32 @@ func schemaByType(i interface{}, registry map[string]Schema) (Schema, error) {
case type_string:
return new(StringSchema), nil
case type_array:
items, err := schemaByType(v[schema_itemsField], registry)
items, err := schemaByType(v[schema_itemsField], registry, namespace)
if err != nil {
return nil, err
}
return &ArraySchema{Items: items, Properties: getProperties(v)}, nil
case type_map:
values, err := schemaByType(v[schema_valuesField], registry)
values, err := schemaByType(v[schema_valuesField], registry, namespace)
if err != nil {
return nil, err
}
return &MapSchema{Values: values, Properties: getProperties(v)}, nil
case type_enum:
return parseEnumSchema(v, registry)
return parseEnumSchema(v, registry, namespace)
case type_fixed:
return parseFixedSchema(v, registry)
return parseFixedSchema(v, registry, namespace)
case type_record:
return parseRecordSchema(v, registry)
return parseRecordSchema(v, registry, namespace)
}
case []interface{}:
return parseUnionSchema(v, registry)
return parseUnionSchema(v, registry, namespace)
}

return nil, InvalidSchema
}

func parseEnumSchema(v map[string]interface{}, registry map[string]Schema) (Schema, error) {
func parseEnumSchema(v map[string]interface{}, registry map[string]Schema, namespace string) (Schema, error) {
symbols := make([]string, len(v[schema_symbolsField].([]interface{})))
for i, symbol := range v[schema_symbolsField].([]interface{}) {
symbols[i] = symbol.(string)
Expand All @@ -777,54 +777,55 @@ func parseEnumSchema(v map[string]interface{}, registry map[string]Schema) (Sche
setOptionalField(&schema.Doc, v, schema_docField)
schema.Properties = getProperties(v)

return addSchema(getFullName(v), schema, registry)
return addSchema(getFullName(v[schema_nameField].(string), namespace), schema, registry)
}

func parseFixedSchema(v map[string]interface{}, registry map[string]Schema) (Schema, error) {
func parseFixedSchema(v map[string]interface{}, registry map[string]Schema, namespace string) (Schema, error) {
if size, ok := v[schema_sizeField].(float64); !ok {
return nil, InvalidFixedSize
} else {
return addSchema(getFullName(v), &FixedSchema{Name: v[schema_nameField].(string), Size: int(size), Properties: getProperties(v)}, registry)
return addSchema(getFullName(v[schema_nameField].(string), namespace), &FixedSchema{Name: v[schema_nameField].(string), Size: int(size), Properties: getProperties(v)}, registry)
}
}

func parseUnionSchema(v []interface{}, registry map[string]Schema) (Schema, error) {
func parseUnionSchema(v []interface{}, registry map[string]Schema, namespace string) (Schema, error) {
types := make([]Schema, len(v))
var err error
for i := range v {
types[i], err = schemaByType(v[i], registry)
types[i], err = schemaByType(v[i], registry, namespace)
if err != nil {
return nil, err
}
}
return &UnionSchema{Types: types}, nil
}

func parseRecordSchema(v map[string]interface{}, registry map[string]Schema) (Schema, error) {
func parseRecordSchema(v map[string]interface{}, registry map[string]Schema, namespace string) (Schema, error) {
schema := &RecordSchema{Name: v[schema_nameField].(string)}
registry[schema.Name] = newRecursiveSchema(schema)
setOptionalField(&schema.Namespace, v, schema_namespaceField)
setOptionalField(&namespace, v, schema_namespaceField)
setOptionalField(&schema.Doc, v, schema_docField)
addSchema(getFullName(v[schema_nameField].(string), namespace), newRecursiveSchema(schema), registry)
fields := make([]*SchemaField, len(v[schema_fieldsField].([]interface{})))
for i := range fields {
field, err := parseSchemaField(v[schema_fieldsField].([]interface{})[i], registry)
field, err := parseSchemaField(v[schema_fieldsField].([]interface{})[i], registry, namespace)
if err != nil {
return nil, err
}
fields[i] = field
}
schema.Fields = fields
setOptionalField(&schema.Namespace, v, schema_namespaceField)
setOptionalField(&schema.Doc, v, schema_docField)
schema.Properties = getProperties(v)

return schema, nil
}

func parseSchemaField(i interface{}, registry map[string]Schema) (*SchemaField, error) {
func parseSchemaField(i interface{}, registry map[string]Schema, namespace string) (*SchemaField, error) {
switch v := i.(type) {
case map[string]interface{}:
schemaField := &SchemaField{Name: v[schema_nameField].(string)}
setOptionalField(&schemaField.Doc, v, schema_docField)
fieldType, err := schemaByType(v[schema_typeField], registry)
fieldType, err := schemaByType(v[schema_typeField], registry, namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -856,13 +857,11 @@ func addSchema(name string, schema Schema, schemas map[string]Schema) (Schema, e
return schema, nil
}

func getFullName(v map[string]interface{}) string {
ns, ok := v[schema_namespaceField].(string)

if len(ns) > 0 && ok {
return ns + "." + v[schema_nameField].(string)
func getFullName(name string, namespace string) string {
if len(namespace) > 0 {
return namespace + "." + name
} else {
return v[schema_nameField].(string)
return name
}
}

Expand Down
4 changes: 2 additions & 2 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,9 @@ func TestLoadSchemas(t *testing.T) {
schemas := LoadSchemas("test/schemas/")
assert(t, len(schemas), 4)

_, exists := schemas["Complex"]
_, exists := schemas["example.avro.Complex"]
assert(t, exists, true)
_, exists = schemas["foo"]
_, exists = schemas["example.avro.foo"]
assert(t, exists, true)
}

Expand Down

0 comments on commit 8624703

Please sign in to comment.