Skip to content

Commit

Permalink
Added godoc. re #26, git #16
Browse files Browse the repository at this point in the history
  • Loading branch information
serejja committed Mar 27, 2015
1 parent 73731ad commit 07f899b
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 80 deletions.
5 changes: 5 additions & 0 deletions codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
)

// CodeGenerator is a code generation tool for structs from given Avro schemas.
type CodeGenerator struct {
rawSchemas []string

Expand All @@ -31,6 +32,7 @@ type CodeGenerator struct {
schemaDefinitions *bytes.Buffer
}

// Creates a new CodeGenerator for given Avro schemas.
func NewCodeGenerator(schemas []string) *CodeGenerator {
return &CodeGenerator{
rawSchemas: schemas,
Expand Down Expand Up @@ -78,6 +80,9 @@ func newEnumSchemaInfo(schema *EnumSchema) (*enumSchemaInfo, error) {
}, nil
}

// Generates source code for Avro schemas specified on creation.
// The ouput is Go formatted source code that contains struct definitions for all given schemas.
// May return an error if code generation fails, e.g. due to unparsable schema.
func (this *CodeGenerator) Generate() (string, error) {
for index, rawSchema := range this.rawSchemas {
parsedSchema, err := ParseSchema(rawSchema)
Expand Down
30 changes: 21 additions & 9 deletions data_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ import (
"math"
)

var VERSION byte = 1
var MAGIC []byte = []byte{'O', 'b', 'j', VERSION}
const (
version byte = 1
sync_size = 16
schema_key = "avro.schema"
codec_key = "avro.codec"
)

var SYNC_SIZE = 16
var SCHEMA_KEY = "avro.schema"
var CODEC_KEY = "avro.codec"
var magic []byte = []byte{'O', 'b', 'j', version}

var syncBuffer = make([]byte, SYNC_SIZE)
var syncBuffer = make([]byte, sync_size)

// DataFileReader is a reader for Avro Object Container Files. More here: https://avro.apache.org/docs/current/spec.html#Object+Container+Files
type DataFileReader struct {
data []byte
header *header
Expand All @@ -34,16 +37,18 @@ type header struct {
func newHeader() *header {
header := &header{}
header.meta = make(map[string][]byte)
header.sync = make([]byte, SYNC_SIZE)
header.sync = make([]byte, sync_size)

return header
}

// Creates a new DataFileReader for a given file and using the given DatumReader to read the data from that file.
// May return an error if the file contains invalid data or is just missing.
func NewDataFileReader(filename string, datumReader DatumReader) (*DataFileReader, error) {
if buf, err := ioutil.ReadFile(filename); err != nil {
return nil, err
} else {
if len(buf) < len(MAGIC) || !bytes.Equal(MAGIC, buf[0:4]) {
if len(buf) < len(magic) || !bytes.Equal(magic, buf[0:4]) {
return nil, NotAvroFile
}

Expand Down Expand Up @@ -87,7 +92,7 @@ func NewDataFileReader(filename string, datumReader DatumReader) (*DataFileReade
dec.ReadFixed(reader.header.sync)
//TODO codec?

schema, err := ParseSchema(string(reader.header.meta[SCHEMA_KEY]))
schema, err := ParseSchema(string(reader.header.meta[schema_key]))
if err != nil {
return nil, err
}
Expand All @@ -104,6 +109,7 @@ func NewDataFileReader(filename string, datumReader DatumReader) (*DataFileReade
}
}

// Switches the reading position in this DataFileReader to a provided value.
func (this *DataFileReader) Seek(pos int64) {
this.dec.Seek(pos)
}
Expand All @@ -128,6 +134,10 @@ func (this *DataFileReader) hasNextBlock() bool {
return int64(len(this.data)) > this.dec.Tell()
}

// Reads the next value from file and fills the given value with data.
// First return value indicates whether the read was successful.
// Second return value indicates whether there was an error while reading data.
// Returns (false, nil) when no more data left to read.
func (this *DataFileReader) Next(v interface{}) (bool, error) {
if hasNext, err := this.hasNext(); err != nil {
return false, err
Expand All @@ -146,6 +156,8 @@ func (this *DataFileReader) Next(v interface{}) (bool, error) {
return false, nil
}

// Tells this DataFileReader to skip current block and move to next one.
// May return an error if the block is malformed or no more blocks left to read.
func (this *DataFileReader) NextBlock() error {
if blockCount, err := this.dec.ReadLong(); err != nil {
return err
Expand Down
40 changes: 38 additions & 2 deletions datum_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,27 @@ import (
"reflect"
)

// DatumReader is an interface that is responsible for reading structured data according to schema from a decoder
type DatumReader interface {
// Reads a single structured entry using this DatumReader according to provided Schema.
// Accepts a value to fill with data and a Decoder to read from. Given value MUST be of pointer type.
// May return an error indicating a read failure.
Read(interface{}, Decoder) error

// Sets the schema for this DatumReader to know the data structure.
// Note that it must be called before calling Read.
SetSchema(Schema)
}

// Generic Avro enum representation. This is still subject to change and may be rethought.
type GenericEnum struct {
// Avro enum symbols.
Symbols []string
symbolsToIndex map[string]int32
index int32
}

// Returns a new GenericEnum that uses provided enum symbols.
func NewGenericEnum(symbols []string) *GenericEnum {
symbolsToIndex := make(map[string]int32)
for index, symbol := range symbols {
Expand All @@ -29,18 +39,23 @@ func NewGenericEnum(symbols []string) *GenericEnum {
}
}

// Gets the numeric value for this enum.
func (this *GenericEnum) GetIndex() int32 {
return this.index
}

// Gets the string value for this enum (e.g. symbol).
func (this *GenericEnum) Get() string {
return this.Symbols[this.index]
}

// Sets the numeric value for this enum.
func (this *GenericEnum) SetIndex(index int32) {
this.index = index
}

// Sets the string value for this enum (e.g. symbol).
// Panics if the given symbol does not exist in this enum.
func (this *GenericEnum) Set(symbol string) {
if index, exists := this.symbolsToIndex[symbol]; !exists {
panic("Unknown enum symbol")
Expand All @@ -49,19 +64,30 @@ func (this *GenericEnum) Set(symbol string) {
}
}

// SpecificDatumReader implements DatumReader and is used for filling Go structs with data.
// Each value passed to Read is expected to be a pointer.
type SpecificDatumReader struct {
dataType reflect.Type
schema Schema
schema Schema
}

// Creates a new SpecificDatumReader.
func NewSpecificDatumReader() *SpecificDatumReader {
return &SpecificDatumReader{}
}

// Sets the schema for this SpecificDatumReader to know the data structure.
// Note that it must be called before calling Read.
func (this *SpecificDatumReader) SetSchema(schema Schema) {
this.schema = schema
}

// Reads a single structured entry using this SpecificDatumReader.
// Accepts a Go struct with exported fields to fill with data and a Decoder to read from. Given value MUST be of
// pointer type. Field names should match field names in Avro schema but be exported (e.g. "some_value" in Avro
// schema is expected to be Some_value in struct) or you may provide Go struct tags to explicitly show how
// to map fields (e.g. if you want to map "some_value" field of type int to SomeValue in Go struct you should define
// your struct field as follows: SomeValue int32 `avro:"some_field"`).
// May return an error indicating a read failure.
func (this *SpecificDatumReader) Read(v interface{}, dec Decoder) error {
rv := reflect.ValueOf(v)
if rv.Kind() != reflect.Ptr || rv.IsNil() {
Expand Down Expand Up @@ -262,18 +288,28 @@ func (this *SpecificDatumReader) mapRecord(field Schema, reflectField reflect.Va
return reflect.ValueOf(record), nil
}

// GenericDatumReader implements DatumReader and is used for filling GenericRecords or other Avro supported types
// (full list is: interface{}, bool, int32, int64, float32, float64, string, slices of any type, maps with string keys
// and any values, GenericEnums) with data.
// Each value passed to Read is expected to be a pointer.
type GenericDatumReader struct {
schema Schema
}

// Creates a new GenericDatumReader.
func NewGenericDatumReader() *GenericDatumReader {
return &GenericDatumReader{}
}

// Sets the schema for this GenericDatumReader to know the data structure.
// Note that it must be called before calling Read.
func (this *GenericDatumReader) SetSchema(schema Schema) {
this.schema = schema
}

// Reads a single entry using this GenericDatumReader.
// Accepts a value to fill with data and a Decoder to read from. Given value MUST be of pointer type.
// May return an error indicating a read failure.
func (this *GenericDatumReader) Read(v interface{}, dec Decoder) error {
rv := reflect.ValueOf(v)
if rv.Kind() != reflect.Ptr || rv.IsNil() {
Expand Down
35 changes: 27 additions & 8 deletions datum_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,40 @@ import (
"reflect"
)

// DatumWriter is an interface that is responsible for writing structured data according to schema to an encoder.
type DatumWriter interface {
SetSchema(Schema)
// Write accepts a pointer to the object to be written and an Encoder, and writes
// the Avro representation of the object to the Encoder’s buffer.
// Note that `obj` **must** be a pointer to an object or you will probably get
// a panic.
// Write writes a single entry using this DatumWriter according to provided Schema.
// Accepts a value to write and Encoder to write to.
// May return an error indicating a write failure.
Write(interface{}, Encoder)

// Sets the schema for this DatumWriter to know the data structure.
// Note that it must be called before calling Write.
SetSchema(Schema)
}

// SpecificDatumWriter implements DatumWriter and is used for writing Go structs in Avro format.
type SpecificDatumWriter struct {
schema Schema
}

// Creates a new SpecificDatumWriter.
func NewSpecificDatumWriter() *SpecificDatumWriter {
return &SpecificDatumWriter{}
}

// Sets the schema for this SpecificDatumWriter to know the data structure.
// Note that it must be called before calling Write.
func (this *SpecificDatumWriter) SetSchema(schema Schema) {
this.schema = schema
}

// Write accepts a pointer to the object to be written and an Encoder, and writes
// the Avro representation of the object to the Encoder’s buffer.
// Note that `obj` **must** be a pointer to an object or you will probably get a panic.
// Write writes a single Go struct using this SpecificDatumWriter according to provided Schema.
// Accepts a value to write and Encoder to write to. Field names should match field names in Avro schema but be exported
// (e.g. "some_value" in Avro schema is expected to be Some_value in struct) or you may provide Go struct tags to
// explicitly show how to map fields (e.g. if you want to map "some_value" field of type int to SomeValue in Go struct
// you should define your struct field as follows: SomeValue int32 `avro:"some_field"`).
// May return an error indicating a write failure.
func (this *SpecificDatumWriter) Write(obj interface{}, enc Encoder) error {
rv := reflect.ValueOf(obj)

Expand Down Expand Up @@ -228,18 +238,27 @@ func (this *SpecificDatumWriter) writeRecord(v reflect.Value, enc Encoder, s Sch
return nil
}

// GenericDatumWriter implements DatumWriter and is used for writing GenericRecords or other Avro supported types
// (full list is: interface{}, bool, int32, int64, float32, float64, string, slices of any type, maps with string keys
// and any values, GenericEnums) to a given Encoder.
type GenericDatumWriter struct {
schema Schema
}

// Creates a new GenericDatumWriter.
func NewGenericDatumWriter() *GenericDatumWriter {
return &GenericDatumWriter{}
}

// Sets the schema for this GenericDatumWriter to know the data structure.
// Note that it must be called before calling Write.
func (this *GenericDatumWriter) SetSchema(schema Schema) {
this.schema = schema
}

// Write writes a single entry using this GenericDatumWriter according to provided Schema.
// Accepts a value to write and Encoder to write to.
// May return an error indicating a write failure.
func (this *GenericDatumWriter) Write(obj interface{}, enc Encoder) error {
return this.write(obj, enc, this.schema)
}
Expand Down
Loading

0 comments on commit 07f899b

Please sign in to comment.