Skip to content

Commit b438176

Browse files
Modified for Readis
1 parent 9c44e45 commit b438176

8 files changed

+536
-242
lines changed

Diff for: docker/container_monitor_setup.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var (
2323

2424
// Create new system monitor instance.
2525
var (
26-
monitor *container_monitor.ContainerMonitor
26+
listener *container_monitor.RedisListener
2727
)
2828

2929
// Main function.
@@ -72,10 +72,10 @@ func main() {
7272
break
7373
}
7474
}
75-
monitor = container_monitor.NewContainerMonitor(redis_url)
7675

77-
go monitor.Run()
78-
defer monitor.Stop()
76+
listener = container_monitor.NewRedisListener(redis_url,"",0)
77+
go listener.Listen()
78+
defer listener.Close()
7979

8080
err = daemon.ServeSignals()
8181
if err != nil {
@@ -87,6 +87,6 @@ func main() {
8787
// Terminate daemon.
8888
func terminateHandler(sig os.Signal) error {
8989
log.Println("terminating system monitor...")
90-
monitor.Stop()
90+
listener.Close()
9191
return daemon.ErrStop
9292
}

Diff for: monitor.go

+13-30
Original file line numberDiff line numberDiff line change
@@ -11,54 +11,37 @@ import (
1111
type ContainerMonitor struct {
1212
close_channel chan bool // Channel for close signal.
1313
info_factory *SystemInfoFactory // System info factory.
14-
client *redis.Client
14+
testID string
1515
}
1616

1717
// Returns new ContainerMonitor instance.
18-
func NewContainerMonitor(redis_url string) *ContainerMonitor {
18+
//
19+
// params: client *redis.Client Instance of Redis client.
20+
// test_id string Test ID.
21+
func newContainerMonitor(client *redis.Client, test_id string) *ContainerMonitor {
1922
return &ContainerMonitor{
2023
close_channel: make(chan bool),
21-
info_factory: NewSystemInfoFactory(),
22-
client: redis.NewClient(&redis.Options{
23-
Addr: redis_url,
24-
Password: "",
25-
DB: 0,
26-
}),
24+
info_factory: NewSystemInfoFactory(client),
25+
testID:test_id,
2726
}
2827
}
2928

3029
// Runs the container monitor.
3130
// Just starts listen of unix socket.
3231
func (m *ContainerMonitor) Run() {
33-
pong, err := m.client.Ping().Result()
34-
defer m.client.Close()
35-
if err != nil {
36-
log.Printf("redis error: %s", err.Error())
32+
for{
33+
select{
34+
case <- time.After(time.Second *2):
35+
m.info_factory.UpdateSystemInfo(m.testID)
36+
case <- m.close_channel:
3737
return
38-
}
39-
log.Printf("pong: %v", pong)
40-
for {
41-
select {
42-
case <-time.After(2 * time.Second):
43-
m.writeSystemInfo()
44-
case <-m.close_channel:
45-
return
4638
}
4739
}
4840
}
4941

50-
// Writes system info to redis DB.
51-
func (m *ContainerMonitor) writeSystemInfo() {
52-
info := m.info_factory.GetSystemInfo()
53-
log.Printf(string(info))
54-
err := m.client.Set("system:info", info, 0).Err()
55-
if err != nil {
56-
log.Printf("WRITE DATA ERROR: %s", err.Error())
57-
m.Stop()
58-
}
59-
}
6042

6143
// Close redis connection.
6244
func (m *ContainerMonitor) Stop() {
45+
log.Println("stop test: %s",m.testID)
6346
m.close_channel <- true
6447
}

Diff for: monitor_client.go

-58
This file was deleted.

Diff for: process_info.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ type ProcessInfo struct {
1212
Cwd string // Process file path.
1313
CreateTime string // Process creation UNIX time.
1414
MemoryInfo *process.MemoryInfoStat // Process memory usage info.
15-
MemoryPersent float32 // Usage virtual memory in percents.
16-
NumThreads int32 // Process threads count.
17-
CPUPersent float64 // CPU usage in percents.
15+
MemoryPercent float64 // Usage virtual memory in percents.
16+
NumThreads int64 // Process threads count.
17+
CPUPercent float64 // CPU usage in percents.
1818
}

Diff for: process_sorter.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ func (b ByCPU) Less(i, j int) bool {
2121
if b[i] == nil || b[j] == nil {
2222
return false
2323
}
24-
if b[i].CPUPersent > 0 || b[j].CPUPersent > 0 {
25-
return b[i].CPUPersent > b[j].CPUPersent
24+
if b[i].CPUPercent > 0 || b[j].CPUPercent > 0 {
25+
return b[i].CPUPercent > b[j].CPUPercent
2626
}
27-
return b[i].MemoryPersent > b[j].MemoryPersent
27+
return b[i].MemoryPercent > b[j].MemoryPercent
2828
}

Diff for: redis_listener.go

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package container_monitor
2+
import (
3+
"gopkg.in/redis.v4"
4+
"log"
5+
)
6+
type RedisListener struct{
7+
Client *redis.Client // Redis client
8+
monitor *ContainerMonitor // Container monitor.
9+
}
10+
11+
// Returns new instance of Redis listener.
12+
// props: r_url string Redis server URL.
13+
// password string Redis server password.
14+
// db int Redis server Data Base ID.
15+
func NewRedisListener(r_url string, password string, db int)(*RedisListener){
16+
return &RedisListener{
17+
Client: redis.NewClient(&redis.Options{
18+
Addr: r_url,
19+
Password: password,
20+
DB: db,
21+
}),
22+
}
23+
}
24+
25+
// Listens Redis pub/sub channel.
26+
func (l *RedisListener)Listen(){
27+
defer l.Client.Close()
28+
pubsub,err:= l.Client.Subscribe(STRESS_TEST_CHANNEL)
29+
if(err != nil){
30+
log.Printf("CLIENT SUBSCRIBE ERROR: %s",err.Error())
31+
return
32+
}
33+
defer pubsub.Unsubscribe(STRESS_TEST_CHANNEL)
34+
for{
35+
err:=l.ping()
36+
if(err != nil){
37+
log.Printf("CLIENT PING ERROR: %s",err.Error())
38+
return
39+
}
40+
mess,err:= pubsub.ReceiveMessage()
41+
if(err == nil){
42+
l.readRedisMessage(mess)
43+
}
44+
}
45+
}
46+
47+
// Calls redis pub/sub channel.
48+
func (l *RedisListener)Call(test_id string, command string){
49+
mess:= newRedisMessage(command, test_id)
50+
message_string,err:= marshalRedisMessage(mess)
51+
if(err != nil){
52+
log.Printf("can not create start redis message %s",err.Error())
53+
return
54+
}
55+
l.Client.Publish(STRESS_TEST_CHANNEL,message_string)
56+
}
57+
58+
// Closes listener
59+
func (l *RedisListener)Close(){
60+
l.Client.Close()
61+
}
62+
63+
// Reads Redis pub/sub messages.
64+
func(l *RedisListener)readRedisMessage(mess *redis.Message){
65+
message,err:=unmarshalRedisMessage(mess.Payload)
66+
if(err != nil){
67+
log.Printf("Can not unmarshall redis message: %s",err.Error())
68+
return
69+
}
70+
if(message.Command == START_COMMAND) {
71+
l.startTest(message.TestID)
72+
}else{
73+
l.stopTest(message.TestID)
74+
}
75+
}
76+
77+
// Starts gathering information about the container system.
78+
func (l *RedisListener)startTest(test_id string){
79+
if(l.monitor != nil){
80+
log.Println("ERROR: last test not finiched!")
81+
return
82+
}
83+
l.monitor = newContainerMonitor(l.Client,test_id)
84+
go l.monitor.Run()
85+
}
86+
// Stops gathering information about the container system.
87+
func (l *RedisListener)stopTest(test_id string){
88+
if(l.monitor == nil){
89+
log.Println("ERROR: test not started!")
90+
return
91+
}
92+
l.monitor.Stop()
93+
l.monitor = nil
94+
}
95+
96+
// Pings redis pub/sub channel.
97+
func (l *RedisListener)ping()(error){
98+
err:=l.Client.Ping().Err()
99+
if(err != nil){
100+
return err
101+
}
102+
return nil
103+
}
104+
105+

Diff for: redis_message.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package container_monitor
2+
import (
3+
"encoding/json"
4+
"log"
5+
)
6+
const (
7+
START_COMMAND="start_test" // Stress test started.
8+
STOP_COMMAND="stop_test" // Stress test stopped.
9+
STRESS_TEST_CHANNEL="stress_test_client" // Redis channel name.
10+
)
11+
12+
// Redis pub/sub message
13+
type redisMessage struct{
14+
Command string
15+
TestID string
16+
}
17+
18+
// Returns new instance of Redis pub/sub message.
19+
//
20+
// params: command string Kind of command.
21+
// test_id string Stress test ID.
22+
func newRedisMessage(command string, test_id string)(*redisMessage){
23+
return &redisMessage{
24+
Command:command,
25+
TestID: test_id,
26+
}
27+
}
28+
29+
// Encodes Redis pub/sub message instance to JSON string.
30+
//
31+
// param: message *redisMessage Instance of Redis pub/sub message.
32+
// return JSON string or error instance.
33+
func marshalRedisMessage(message *redisMessage)(string, error){
34+
message_string,err:= json.Marshal(message)
35+
if(err != nil){
36+
log.Printf("Can not marshal redis message %s",err.Error())
37+
return err.Error(),err
38+
}
39+
return string(message_string), nil
40+
}
41+
42+
// Decodes Redis pub/sub message JSON string to instance of *redisMessage
43+
//
44+
// param: message_string string JSON message string.
45+
// return: Instance of *redisMessage or error instance.
46+
func unmarshalRedisMessage(message_string string)(*redisMessage, error){
47+
mess:= &redisMessage{}
48+
err:= json.Unmarshal([]byte(message_string),mess)
49+
if(err != nil){
50+
log.Printf("Can not unmarshal redis message %s",err.Error())
51+
return nil, err
52+
}
53+
return mess,nil
54+
}

0 commit comments

Comments
 (0)