Skip to content

Commit

Permalink
Added complex and recursive type encoding for Specific data, added so…
Browse files Browse the repository at this point in the history
…me tests. re #16
  • Loading branch information
serejja committed Mar 24, 2015
1 parent 4a4a67e commit adf6ab9
Show file tree
Hide file tree
Showing 11 changed files with 744 additions and 187 deletions.
46 changes: 3 additions & 43 deletions codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,6 @@ func newRecordSchemaInfo(schema *RecordSchema) (*recordSchemaInfo, error) {
}, nil
}

type fixedSchemaInfo struct {
schema *FixedSchema
typeName string
}

func newFixedSchemaInfo(schema *FixedSchema) (*fixedSchemaInfo, error) {
if schema.Name == "" {
return nil, errors.New("Name not set.")
}

return &fixedSchemaInfo{
schema: schema,
typeName: fmt.Sprintf("%s%s", strings.ToUpper(schema.Name[:1]), schema.Name[1:]),
}, nil
}

type enumSchemaInfo struct {
schema *EnumSchema
typeName string
Expand Down Expand Up @@ -187,20 +171,6 @@ func (this *CodeGenerator) writeStruct(info *recordSchemaInfo) error {
return nil
}

func (this *CodeGenerator) writeFixed(info *fixedSchemaInfo) error {
buffer := &bytes.Buffer{}
if _, exists := this.structs[info.typeName]; exists {
return nil
} else {
this.codeSnippets = append(this.codeSnippets, buffer)
this.structs[info.typeName] = buffer
}

buffer.WriteString(fmt.Sprintf("type %s [%d]byte\n", info.typeName, info.schema.Size))

return nil
}

func (this *CodeGenerator) writeEnum(info *enumSchemaInfo) error {
buffer := &bytes.Buffer{}
if _, exists := this.structs[info.typeName]; exists {
Expand Down Expand Up @@ -339,17 +309,7 @@ func (this *CodeGenerator) writeStructFieldType(schema Schema, buffer *bytes.Buf
}
}
case Fixed:
{
fixedSchema := schema.(*FixedSchema)
info, err := newFixedSchemaInfo(fixedSchema)
if err != nil {
return err
}

buffer.WriteString(info.typeName)

return this.writeFixed(info)
}
buffer.WriteString("[]byte")
case Record:
{
buffer.WriteString("*")
Expand Down Expand Up @@ -512,7 +472,7 @@ func (this *CodeGenerator) writeStructConstructorFieldValue(info *recordSchemaIn
}
case *FixedSchema:
{
buffer.WriteString(fmt.Sprintf("make(%s)", field.Type.GetName()))
buffer.WriteString(fmt.Sprintf("make([]byte, %d)", field.Type.(*FixedSchema).Size))
}
case *RecordSchema:
{
Expand All @@ -533,7 +493,7 @@ func (this *CodeGenerator) needWriteField(field *SchemaField) bool {
}

switch field.Type.(type) {
case *BytesSchema, *ArraySchema, *MapSchema, *EnumSchema, *RecordSchema:
case *BytesSchema, *ArraySchema, *MapSchema, *EnumSchema, *FixedSchema, *RecordSchema:
return true
}

Expand Down
17 changes: 11 additions & 6 deletions datum_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func (this *SpecificDatumReader) readValue(field Schema, reflectField reflect.Va
return this.mapFixed(field, dec)
case Record:
return this.mapRecord(field, reflectField, dec)
//TODO recursive types
case Recursive:
return this.mapRecord(field.(*RecursiveSchema).Actual, reflectField, dec)
}

return reflect.ValueOf(nil), fmt.Errorf("Unknown field type: %s", field.Type())
Expand Down Expand Up @@ -223,10 +224,12 @@ func (this *SpecificDatumReader) mapMap(field Schema, reflectField reflect.Value
}

func (this *SpecificDatumReader) mapEnum(field Schema, dec Decoder) (reflect.Value, error) {
if enum, err := dec.ReadEnum(); err != nil {
return reflect.ValueOf(enum), err
if enumIndex, err := dec.ReadEnum(); err != nil {
return reflect.ValueOf(enumIndex), err
} else {
return reflect.ValueOf(GenericEnum{Symbols: field.(*EnumSchema).Symbols, index: enum}), nil
enum := NewGenericEnum(field.(*EnumSchema).Symbols)
enum.SetIndex(enumIndex)
return reflect.ValueOf(enum), nil
}
}

Expand Down Expand Up @@ -363,10 +366,12 @@ func (this *GenericDatumReader) mapArray(field Schema, dec Decoder) ([]interface
}

func (this *GenericDatumReader) mapEnum(field Schema, dec Decoder) (*GenericEnum, error) {
if enum, err := dec.ReadEnum(); err != nil {
if enumIndex, err := dec.ReadEnum(); err != nil {
return nil, err
} else {
return &GenericEnum{Symbols: field.(*EnumSchema).Symbols, index: enum}, nil
enum := NewGenericEnum(field.(*EnumSchema).Symbols)
enum.SetIndex(enumIndex)
return enum, nil
}
}

Expand Down
8 changes: 4 additions & 4 deletions datum_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

//primitives
type Primitive struct {
type primitive struct {
BooleanField bool
IntField int32
LongField int64
Expand Down Expand Up @@ -37,7 +37,7 @@ func TestPrimitiveBinding(t *testing.T) {
t.Fatal(err)
}
for {
p := &Primitive{}
p := &primitive{}
ok, err := reader.Next(p)
if !ok {
if err != nil {
Expand All @@ -61,7 +61,7 @@ func TestPrimitiveBinding(t *testing.T) {
type complex struct {
StringArray []string
LongArray []int64
EnumField GenericEnum
EnumField *GenericEnum
MapOfInts map[string]int32
UnionField string
FixedField []byte
Expand Down Expand Up @@ -171,7 +171,7 @@ type testRecord2 struct {

type testRecord3 struct {
StringArray []string
EnumRecordField GenericEnum
EnumRecordField *GenericEnum
}

func TestComplexOfComplexBinding(t *testing.T) {
Expand Down
154 changes: 139 additions & 15 deletions datum_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,63 +45,185 @@ func (this *SpecificDatumWriter) write(v reflect.Value, enc Encoder, s Schema) e
switch s.Type() {
case Null:
case Boolean:
this.writeBoolean(v, enc)
return this.writeBoolean(v, enc, s)
case Int:
this.writeInt(v, enc)
return this.writeInt(v, enc, s)
case Long:
this.writeLong(v, enc)
return this.writeLong(v, enc, s)
case Float:
this.writeFloat(v, enc)
return this.writeFloat(v, enc, s)
case Double:
this.writeDouble(v, enc)
return this.writeDouble(v, enc, s)
case Bytes:
this.writeBytes(v, enc)
return this.writeBytes(v, enc, s)
case String:
this.writeString(v, enc)
return this.writeString(v, enc, s)
case Array:
return this.writeArray(v, enc, s)
case Map:
return this.writeMap(v, enc, s)
case Enum:
return this.writeEnum(v, enc, s)
case Union:
return this.writeUnion(v, enc, s)
case Fixed:
return this.writeFixed(v, enc, s)
case Record:
return this.writeRecord(v, enc, s)
case Recursive:
return this.writeRecord(v, enc, s.(*RecursiveSchema).Actual)
}

return nil
}

func (this *SpecificDatumWriter) writeBoolean(v reflect.Value, enc Encoder) {
func (this *SpecificDatumWriter) writeBoolean(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid boolean value: %v", v.Interface())
}

enc.WriteBoolean(v.Interface().(bool))
return nil
}

func (this *SpecificDatumWriter) writeInt(v reflect.Value, enc Encoder) {
func (this *SpecificDatumWriter) writeInt(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid int value: %v", v.Interface())
}

enc.WriteInt(v.Interface().(int32))
return nil
}

func (this *SpecificDatumWriter) writeLong(v reflect.Value, enc Encoder) {
func (this *SpecificDatumWriter) writeLong(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid long value: %v", v.Interface())
}

enc.WriteLong(v.Interface().(int64))
return nil
}

func (this *SpecificDatumWriter) writeFloat(v reflect.Value, enc Encoder) {
func (this *SpecificDatumWriter) writeFloat(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid float value: %v", v.Interface())
}

enc.WriteFloat(v.Interface().(float32))
return nil
}

func (this *SpecificDatumWriter) writeDouble(v reflect.Value, enc Encoder) {
func (this *SpecificDatumWriter) writeDouble(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid double value: %v", v.Interface())
}

enc.WriteDouble(v.Interface().(float64))
return nil
}

func (this *SpecificDatumWriter) writeBytes(v reflect.Value, enc Encoder) {
func (this *SpecificDatumWriter) writeBytes(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid bytes value: %v", v.Interface())
}

enc.WriteBytes(v.Interface().([]byte))
return nil
}

func (this *SpecificDatumWriter) writeString(v reflect.Value, enc Encoder) {
func (this *SpecificDatumWriter) writeString(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid string value: %v", v.Interface())
}

enc.WriteString(v.Interface().(string))
return nil
}

func (this *SpecificDatumWriter) writeArray(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid array value: %v", v.Interface())
}

//TODO should probably write blocks of some length
enc.WriteArrayStart(int64(v.Len()))
for i := 0; i < v.Len(); i++ {
if err := this.write(v.Index(i), enc, s.(*ArraySchema).Items); err != nil {
return err
}
}
enc.WriteArrayNext(0)

return nil
}

func (this *SpecificDatumWriter) writeMap(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid map value: %v", v.Interface())
}

//TODO should probably write blocks of some length
enc.WriteMapStart(int64(v.Len()))
for _, key := range v.MapKeys() {
this.writeString(key, enc, &StringSchema{})
if err := this.write(v.MapIndex(key), enc, s.(*MapSchema).Values); err != nil {
return err
}
}
enc.WriteMapNext(0)

return nil
}

func (this *SpecificDatumWriter) writeEnum(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid enum value: %v", v.Interface())
}

enc.WriteInt(v.Interface().(*GenericEnum).GetIndex())

return nil
}

func (this *SpecificDatumWriter) writeUnion(v reflect.Value, enc Encoder, s Schema) error {
unionSchema := s.(*UnionSchema)
index := unionSchema.GetType(v)

if unionSchema.Types == nil || index < 0 || index >= len(unionSchema.Types) {
return fmt.Errorf("Invalid union value: %v", v.Interface())
}

enc.WriteLong(int64(index))
return this.write(v, enc, unionSchema.Types[index])
}

func (this *SpecificDatumWriter) writeFixed(v reflect.Value, enc Encoder, s Schema) error {
fs := s.(*FixedSchema)

if !fs.Validate(v) {
return fmt.Errorf("Invalid fixed value: %v", v.Interface())
}

// Write the raw bytes. The length is known by the schema
enc.WriteRaw(v.Interface().([]byte))
return nil
}

func (this *SpecificDatumWriter) writeRecord(v reflect.Value, enc Encoder, s Schema) error {
if !s.Validate(v) {
return fmt.Errorf("Invalid record value: %v", v.Interface())
}

rs := s.(*RecordSchema)
for i := range rs.Fields {
schemaField := rs.Fields[i]
field, err := this.findField(v, schemaField.Name)
if err != nil {
return err
}
this.write(field, enc, schemaField.Type)
if err := this.write(field, enc, schemaField.Type); err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -177,6 +299,8 @@ func (this *GenericDatumWriter) write(v interface{}, enc Encoder, s Schema) erro
return this.writeFixed(v, enc, s)
case Record:
return this.writeRecord(v, enc, s)
case Recursive:
return this.writeRecord(v, enc, s.(*RecursiveSchema).Actual)
}

return nil
Expand Down
Loading

0 comments on commit adf6ab9

Please sign in to comment.