Skip to content

Commit

Permalink
Merge ci_feature into ci_feature_prod (#291)
Browse files Browse the repository at this point in the history
* Bug Fixes for exceptions in telemetry, remove limit set check (#289)

* Bug Fixes 10222019

* Initialize container_cpu_memory_records in fhmb

* Added telemetry to investigate health exceptions

* Set frozen_string_literal to true

* Send event once per container when lookup is empty, or limit is an array

* Unit Tests, Use RS and POD to determine workload

* Fixed Node Condition Bug, added exception handling to return get_rs_owner_ref
  • Loading branch information
r-dilip authored Nov 2, 2019
1 parent 3533cf9 commit 1322eab
Show file tree
Hide file tree
Showing 30 changed files with 680 additions and 647 deletions.
31 changes: 18 additions & 13 deletions source/code/plugin/filter_health_model_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,17 +39,16 @@ def initialize
@kube_api_down_handler = HealthKubeApiDownHandler.new
@resources = HealthKubernetesResources.instance
@reducer = HealthSignalReducer.new
@state = HealthMonitorState.new
@generator = HealthMissingSignalGenerator.new
#TODO: cluster_labels needs to be initialized
@provider = HealthMonitorProvider.new(@@cluster_id, HealthMonitorUtils.get_cluster_labels, @resources, @health_monitor_config_path)
deserialized_state_info = @cluster_health_state.get_state
@state = HealthMonitorState.new
@state.initialize_state(deserialized_state_info)
@cluster_old_state = 'none'
@cluster_new_state = 'none'
@container_cpu_memory_records = []
@telemetry = HealthMonitorTelemetry.new
@state = HealthMonitorState.new
# move network calls to the end. This will ensure all the instance variables get initialized
deserialized_state_info = @cluster_health_state.get_state
@state.initialize_state(deserialized_state_info)
rescue => e
ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "Health"})
end
Expand Down Expand Up @@ -99,21 +98,27 @@ def filter_stream(tag, es)
end
container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider)
deduped_records = container_records_aggregator.dedupe_records(container_records)
if @container_cpu_memory_records.nil?
@log.info "@container_cpu_memory_records was not initialized"
@container_cpu_memory_records = [] #in some clusters, this is null, so initialize it again.
end
@container_cpu_memory_records.push(*deduped_records) # push the records for aggregation later
return MultiEventStream.new
elsif tag.start_with?("kubehealth.ReplicaSet")
records = []
es.each{|time, record|
records.push(record)
}
@buffer.add_to_buffer(records)

container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider)
container_records_aggregator.aggregate(@container_cpu_memory_records)
container_records_aggregator.compute_state
aggregated_container_records = container_records_aggregator.get_records
@buffer.add_to_buffer(aggregated_container_records)

@buffer.add_to_buffer(records) # in_kube_health records

aggregated_container_records = []
if !@container_cpu_memory_records.nil? && !@container_cpu_memory_records.empty?
container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider)
container_records_aggregator.aggregate(@container_cpu_memory_records)
container_records_aggregator.compute_state
aggregated_container_records = container_records_aggregator.get_records
end
@buffer.add_to_buffer(aggregated_container_records) #container cpu/memory records
records_to_process = @buffer.get_buffer
@buffer.reset_buffer
@container_cpu_memory_records = []
Expand Down
1 change: 1 addition & 0 deletions source/code/plugin/health/agg_monitor_id_labels.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# frozen_string_literal: true
require_relative 'health_model_constants'

module HealthModel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module HealthModel
class AggregateMonitorStateFinalizer

Expand Down
2 changes: 2 additions & 0 deletions source/code/plugin/health/cluster_health_state.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require "net/http"
require "net/https"
require "uri"
Expand Down
71 changes: 57 additions & 14 deletions source/code/plugin/health/health_container_cpu_memory_aggregator.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,12 @@
# frozen_string_literal: true

require_relative 'health_model_constants'

# Require only when running inside container.
# otherwise unit tests will fail due to ApplicationInsightsUtility dependency on base omsagent ruby files. If you have your dev machine starting with omsagent-rs, then GOOD LUCK!
if Socket.gethostname.start_with?('omsagent-rs')
require_relative '../ApplicationInsightsUtility'
end
=begin
@cpu_records/@memory_records
[
Expand Down Expand Up @@ -37,6 +45,10 @@ class HealthContainerCpuMemoryAggregator

@@memory_counter_name = 'memoryRssBytes'
@@cpu_counter_name = 'cpuUsageNanoCores'
@@workload_container_count_empty_event_sent = {}
@@limit_is_array_event_sent = {}
@@WORKLOAD_CONTAINER_COUNT_EMPTY_EVENT = "WorkloadContainerCountEmptyEvent"
@@LIMIT_IS_ARRAY_EVENT = "ResourceLimitIsAnArrayEvent"
def initialize(resources, provider)
@pod_uid_lookup = resources.get_pod_uid_lookup
@workload_container_count = resources.get_workload_container_count
Expand Down Expand Up @@ -163,11 +175,30 @@ def get_records
container_cpu_memory_records = []

@cpu_records.each{|resource_key, record|

cpu_limit_mc = 1.0
if record["limit"].is_a?(Numeric)
cpu_limit_mc = record["limit"]/1000000.to_f
else
@log.info "CPU Limit is not a number #{record['limit']}"
if !@@limit_is_array_event_sent.key?(resource_key)
custom_properties = {}
custom_properties['limit'] = record['limit']
if record['limit'].is_a?(Array)
record['limit'].each_index{|i|
custom_properties[i] = record['limit'][i]
}
end
@@limit_is_array_event_sent[resource_key] = true
#send once per resource key
ApplicationInsightsUtility.sendCustomEvent(@@LIMIT_IS_ARRAY_EVENT, custom_properties)
end
end
health_monitor_record = {
"timestamp" => time_now,
"state" => record["state"],
"details" => {
"cpu_limit_millicores" => record["limit"]/1000000.to_f,
"cpu_limit_millicores" => cpu_limit_mc,
"cpu_usage_instances" => record["records"].map{|r| r.each {|k,v|
k == "counter_value" ? r[k] = r[k] / 1000000.to_f : r[k]
}},
Expand Down Expand Up @@ -219,29 +250,41 @@ def get_records

private
def calculate_monitor_state(v, config)
if !v['limit_set'] && v['namespace'] != 'kube-system'
v["state"] = HealthMonitorStates::WARNING
else
# sort records by descending order of metric
v["records"] = v["records"].sort_by{|record| record["counter_value"]}.reverse
size = v["records"].size
# sort records by descending order of metric
v["records"] = v["records"].sort_by{|record| record["counter_value"]}.reverse
size = v["records"].size
if !v["record_count"].nil?
if size < v["record_count"]
unknown_count = v["record_count"] - size
for i in unknown_count.downto(1)
# it requires a lot of computation to figure out which actual pod is not sending the signal
v["records"].insert(0, {"counter_value" => -1, "container" => v["container"], "pod_name" => "???", "state" => HealthMonitorStates::UNKNOWN }) #insert -1 for unknown records
end
end
else
v["state"] = HealthMonitorStates::UNKNOWN
container_key = "#{v['workload_name']}~~#{v['container']}"
@log.info "ContainerKey: #{container_key} Records Size: #{size} Records: #{v['records']} Record Count: #{v['record_count']} #{@workload_container_count}"

if size == 1
state_index = 0
else
state_threshold = config['StateThresholdPercentage'].to_f
count = ((state_threshold*size)/100).ceil
state_index = size - count
if !@@workload_container_count_empty_event_sent.key?(container_key)
custom_properties = {}
custom_properties = custom_properties.merge(v)
custom_properties = custom_properties.merge(@workload_container_count)
@log.info "Custom Properties : #{custom_properties}"
@@workload_container_count_empty_event_sent[container_key] = true
ApplicationInsightsUtility.sendCustomEvent(@@WORKLOAD_CONTAINER_COUNT_EMPTY_EVENT, custom_properties)
end
v["state"] = v["records"][state_index]["state"]
return #simply return the state as unknown here
end

if size == 1
state_index = 0
else
state_threshold = config['StateThresholdPercentage'].to_f
count = ((state_threshold*size)/100).ceil
state_index = size - count
end
v["state"] = v["records"][state_index]["state"]
end

def calculate_container_instance_state(counter_value, limit, config)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module HealthModel
class HealthContainerCpuMemoryRecordFormatter

Expand Down
2 changes: 2 additions & 0 deletions source/code/plugin/health/health_hierarchy_builder.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require 'json'
module HealthModel
class HealthHierarchyBuilder
Expand Down
2 changes: 2 additions & 0 deletions source/code/plugin/health/health_kube_api_down_handler.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require_relative 'health_model_constants'
module HealthModel
class HealthKubeApiDownHandler
Expand Down
90 changes: 44 additions & 46 deletions source/code/plugin/health/health_kubernetes_resources.rb
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
# frozen_string_literal: true

require 'singleton'
require_relative 'health_model_constants'

module HealthModel
class HealthKubernetesResources

include Singleton
attr_accessor :node_inventory, :pod_inventory, :deployment_inventory, :pod_uid_lookup, :workload_container_count
attr_accessor :node_inventory, :pod_inventory, :replicaset_inventory, :pod_uid_lookup, :workload_container_count
attr_reader :nodes, :pods, :workloads, :deployment_lookup

def initialize
@node_inventory = []
@pod_inventory = []
@deployment_inventory = []
@node_inventory = {}
@pod_inventory = {}
@replicaset_inventory = {}
@nodes = []
@pods = []
@workloads = []
@log = HealthMonitorHelpers.get_log_handle
@pod_uid_lookup = {}
@deployment_lookup = {}
@workload_container_count = {}
@workload_name_cache = {}
end

def get_node_inventory
Expand All @@ -36,9 +38,8 @@ def get_nodes
return @nodes
end

def set_deployment_inventory(deployments)
@deployment_inventory = deployments
@deployment_lookup = {}
def set_replicaset_inventory(replicasets)
@replicaset_inventory = replicasets
end

def get_workload_names
Expand All @@ -51,7 +52,12 @@ def get_workload_names
end

def build_pod_uid_lookup
if @pod_inventory.nil? || @pod_inventory['items'].nil? || @pod_inventory['items'].empty? || @pod_inventory['items'].size == 0
@log.info "Not Clearing pod_uid_lookup and workload_container_count since pod inventory is nil"
return
end
@workload_container_count = {}
@pod_uid_lookup = {}
@pod_inventory['items'].each do |pod|
begin
namespace = pod['metadata']['namespace']
Expand Down Expand Up @@ -92,7 +98,7 @@ def build_pod_uid_lookup
end
end
rescue => e
@log.info "Error in build_pod_uid_lookup #{pod} #{e.message}"
@log.info "Error in build_pod_uid_lookup for POD: #{pod_name} #{e.message} #{e.backtrace}"
end
end
end
Expand All @@ -105,19 +111,7 @@ def get_workload_container_count
return @workload_container_count
end

private
def get_workload_name(pod)

if @deployment_lookup.empty?
@deployment_inventory['items'].each do |deployment|
match_labels = deployment['spec']['selector']['matchLabels'].to_h
namespace = deployment['metadata']['namespace']
match_labels.each{|k,v|
@deployment_lookup["#{namespace}-#{k}=#{v}"] = "#{deployment['metadata']['namespace']}~~#{deployment['metadata']['name']}"
}
end
end

begin
has_owner = !pod['metadata']['ownerReferences'].nil?
owner_kind = ''
Expand All @@ -129,7 +123,6 @@ def get_workload_name(pod)
controller_name = pod['metadata']['name']
end
namespace = pod['metadata']['namespace']

workload_name = ''
if owner_kind.nil?
owner_kind = 'Pod'
Expand All @@ -139,41 +132,22 @@ def get_workload_name(pod)
# we are excluding jobs
return nil
when 'replicaset'
# get the labels, and see if there is a match. If there is, it is the deployment. If not, use replica set name/controller name
labels = pod['metadata']['labels'].to_h
labels.each {|k,v|
lookup_key = "#{namespace}-#{k}=#{v}"
if @deployment_lookup.key?(lookup_key)
workload_name = @deployment_lookup[lookup_key]
break
end
}
if workload_name.empty?
workload_name = "#{namespace}~~#{controller_name}"
end
#TODO:
workload_name = get_replica_set_owner_ref(controller_name)
workload_name = "#{namespace}~~#{workload_name}"
when 'daemonset'
workload_name = "#{namespace}~~#{controller_name}"
else
workload_name = "#{namespace}~~#{pod['metadata']['name']}"
workload_name = "#{namespace}~~#{controller_name}"
end
return workload_name
rescue => e
@log.info "Error in get_workload_name(pod) #{e.message}"
@log.info "Error in get_workload_name(pod) #{e.message} #{e.backtrace}"
return nil
end
end

def get_workload_kind(pod)
if @deployment_lookup.empty?
@deployment_inventory['items'].each do |deployment|
match_labels = deployment['spec']['selector']['matchLabels'].to_h
namespace = deployment['metadata']['namespace']
match_labels.each{|k,v|
@deployment_lookup["#{namespace}-#{k}=#{v}"] = "#{deployment['metadata']['namespace']}~~#{deployment['metadata']['name']}"
}
end
end

begin
has_owner = !pod['metadata']['ownerReferences'].nil?
owner_kind = ''
Expand All @@ -193,6 +167,30 @@ def get_workload_kind(pod)
end
end

private
def get_replica_set_owner_ref(controller_name)
if @workload_name_cache.key?(controller_name)
return @workload_name_cache[controller_name]
end
begin
owner_ref = controller_name
@replicaset_inventory['items'].each{|rs|
rs_name = rs['metadata']['name']
if controller_name.casecmp(rs_name) == 0
if !rs['metadata']['ownerReferences'].nil?
owner_ref = rs['metadata']['ownerReferences'][0]['name'] if rs['metadata']['ownerReferences'][0]['name']
end
break
end
}
@workload_name_cache[controller_name] = owner_ref
return owner_ref
rescue => e
@log.info "Error in get_replica_set_owner_ref(controller_name) #{e.message}"
return controller_name
end
end

def get_node_capacity(node_name, type)
if node_name.nil? #unscheduled pods will not have a node name
return -1
Expand Down
2 changes: 2 additions & 0 deletions source/code/plugin/health/health_missing_signal_generator.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

require_relative 'health_model_constants'
require_relative 'health_monitor_record'

Expand Down
2 changes: 2 additions & 0 deletions source/code/plugin/health/health_model_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# frozen_string_literal: true

module HealthModel

=begin
Expand Down
1 change: 1 addition & 0 deletions source/code/plugin/health/health_model_builder.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# frozen_string_literal: true
require 'time'

module HealthModel
Expand Down
1 change: 1 addition & 0 deletions source/code/plugin/health/health_model_constants.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# frozen_string_literal: true
module HealthModel
class MonitorState
CRITICAL = "fail"
Expand Down
Loading

0 comments on commit 1322eab

Please sign in to comment.