-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathkinesis_formatter.go
115 lines (97 loc) · 2.95 KB
/
kinesis_formatter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package shuttle
import (
"bytes"
"io"
"net/http"
"net/url"
"strings"
"github.com/bmizerany/aws4"
)
// KinesisFormatter formats batches destined for AWS Kinesis HTTP endpoints
// Kinesis has a very small payload side, so recommend setting config.BatchSize in the 1-3 range so as to not loose logs because we go over the batch size.
// Kinesis formats the Data using the LogplexLineFormatter, which is additionally base64 encoded.
type KinesisFormatter struct {
records []KinesisRecord
keys *aws4.Keys
url *url.URL
io.Reader
}
// NewKinesisFormatter constructs a proper HTTPFormatter for Kinesis http targets
func NewKinesisFormatter(b Batch, eData []errData, config *Config) HTTPFormatter {
u, err := url.Parse(config.LogsURL)
if err != nil {
panic(err)
}
awsKey := u.User.Username()
awsSecret, _ := u.User.Password()
streamName := strings.TrimPrefix(u.Path, "/")
u.User = nil // Ensure there is no auth info
u.Path = "" // Ensure there is no path
kf := &KinesisFormatter{
records: make([]KinesisRecord, 0, b.MsgCount()+len(eData)),
keys: &aws4.Keys{AccessKey: awsKey, SecretKey: awsSecret},
url: u,
}
for _, edata := range eData {
kf.records = append(kf.records, KinesisRecord{llf: NewLogplexErrorFormatter(edata, config)})
}
for _, l := range b.logLines {
kf.records = append(kf.records, KinesisRecord{llf: NewLogplexLineFormatter(l, config)})
}
recordsReader, recordsWriter := io.Pipe()
kf.Reader = io.MultiReader(
bytes.NewReader([]byte(`{"StreamName":"`+streamName+`","Records":[`)),
recordsReader,
bytes.NewReader([]byte("]}")),
)
go func() {
var cs int
for i, record := range kf.records {
cs = determineShard(cs, config.KinesisShards)
record.shard = cs
if _, err := record.WriteTo(recordsWriter); err != nil {
recordsWriter.CloseWithError(err)
return
}
if i < len(kf.records)-1 {
if _, err := recordsWriter.Write([]byte(`,`)); err != nil {
recordsWriter.CloseWithError(err)
return
}
}
}
recordsWriter.Close()
}()
return kf
}
// Given a current shard number and the number number of shards return the next shard number
// In the case of 0 KinesisRecord does not add the shard number to the PartitionKey
func determineShard(c, max int) int {
if max == 1 {
return 0
}
if c == max {
c = 0
}
return c + 1
}
// Request constructs a request for this formatter
// See: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
func (kf *KinesisFormatter) Request() (*http.Request, error) {
req, err := http.NewRequest("POST", kf.url.String(), kf)
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/x-amz-json-1.1")
req.Header.Add("X-Amz-Target", "Kinesis_20131202.PutRecords")
req.Host = kf.url.Host
err = aws4.Sign(kf.keys, req)
if err != nil {
return nil, err
}
return req, nil
}
//MsgCount returns the number of records that the formatter is formatting
func (kf *KinesisFormatter) MsgCount() int {
return len(kf.records)
}