Skip to content

Commit

Permalink
Allow more than two unions; Validate records better.
Browse files Browse the repository at this point in the history
  • Loading branch information
arunmk committed May 29, 2015
1 parent 1cae064 commit 262ccc0
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 8 deletions.
11 changes: 5 additions & 6 deletions datum_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,11 @@ func (this *GenericDatumWriter) writeEnum(v interface{}, enc Encoder, s Schema)

func (this *GenericDatumWriter) writeUnion(v interface{}, enc Encoder, s Schema) error {
unionSchema := s.(*UnionSchema)
if this.isWritableAs(v, unionSchema.Types[0]) {
enc.WriteInt(0)
return this.write(v, enc, unionSchema.Types[0])
} else if this.isWritableAs(v, unionSchema.Types[1]) {
enc.WriteInt(1)
return this.write(v, enc, unionSchema.Types[1])

index := unionSchema.GetType(reflect.ValueOf(v))
if (index != -1) {
enc.WriteInt(int32(index))
return this.write(v, enc, unionSchema.Types[index])
}

return fmt.Errorf("Could not write %v as %s", v, s)
Expand Down
26 changes: 26 additions & 0 deletions examples/generic_datum/generic_datum.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ var rawSchema = `{
"name":"value",
"type":"int"
},
{
"name":"union",
"type":[ {
"name":"union1",
"type":"string"
},
{
"name":"union2",
"type":"boolean"
},
{
"name":"union3",
"type":"int"
}],
"default":"null"
},
{
"name":"rec",
"type":{
Expand Down Expand Up @@ -71,6 +87,9 @@ func main() {
value := int32(3)
record.Set("value", value)

var unionValue int32 = 1234
record.Set("union", unionValue)

subRecords := make([]*avro.GenericRecord, 2)
subRecord0 := avro.NewGenericRecord(schema)
subRecord0.Set("stringValue", "Hello")
Expand Down Expand Up @@ -120,6 +139,12 @@ func main() {
}
fmt.Printf("Read a value: %d\n", decodedValue)

decodedUnionValue := decodedRecord.Get("union").(int32)
if unionValue != decodedUnionValue {
panic("Something went terribly wrong!")
}
fmt.Printf("Read a union value: %d\n", decodedUnionValue)

decodedArray := decodedRecord.Get("rec").([]interface{})
if len(decodedArray) != 2 {
panic("Something went terribly wrong!")
Expand Down Expand Up @@ -170,4 +195,5 @@ func main() {
panic("Something went terribly wrong!")
}
fmt.Printf("Read a primitive value: %s\n", decodedPrimitive)

}
32 changes: 30 additions & 2 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,8 +451,37 @@ func (this *RecordSchema) Prop(key string) (string, bool) {
// Checks whether the given value is writeable to this schema.
func (rs *RecordSchema) Validate(v reflect.Value) bool {
v = dereference(v)
if v.Kind() != reflect.Struct || !v.CanAddr() || !v.CanInterface() {
return false
}
rec, ok := v.Interface().(GenericRecord)
if !ok {
return false
}

field_count := 0
for key, val := range rec.fields {
for idx := range rs.Fields {
// key.Name must have rs.Fields[idx].Name as a suffix
if len(rs.Fields[idx].Name) <= len(key) {
lhs := key[len(key) - len(rs.Fields[idx].Name) : len(key)]
if lhs == rs.Fields[idx].Name {
if !rs.Fields[idx].Type.Validate(reflect.ValueOf(val)) {
return false
}
field_count ++
break
}
}
}
}

// All of the fields set must be accounted for in the union.
if field_count < len(rec.fields) {
return false
}

return v.Kind() == reflect.Struct
return true
}

// RecursiveSchema implements Schema and represents Avro record type without a definition (e.g. that should be looked up).
Expand Down Expand Up @@ -752,7 +781,6 @@ func (this *UnionSchema) GetType(v reflect.Value) int {
// Checks whether the given value is writeable to this schema.
func (this *UnionSchema) Validate(v reflect.Value) bool {
v = dereference(v)

for i := range this.Types {
if t := this.Types[i]; t.Validate(v) {
return true
Expand Down

0 comments on commit 262ccc0

Please sign in to comment.