Skip to content

Monitoring

liyinan926 edited this page Jan 28, 2015 · 29 revisions
  • Author: Yinan
  • Reviewer: Sahil

Overview

Gobblin employs a variety of approaches to monitor the status of the system and job executions. All the approaches are optional and can be configured to be turned on and off.

Metrics Collecting and Reporting

During the course of a job execution, Gobblin collects both system-defined and user-defined job and task metrics and reports them periodically to metric log files.

Metric Collecting

Pre-defined Metrics

Internally, Gobblin pre-defines a minimum set of metrics listed below in two metric groups: JOB and TASK for job-level metrics and task-level metrics, respectively.

  • ${metric_group}.${id}.records: this metric keeps track of the total number of data records extracted by the job or task depending on the ${metric_group}. The ${id} is either a job ID or a task ID depending on the ${metric_group}.
  • ${metric_group}.${id}.recordsPerSecond: this metric keeps track of the rate of data extraction as data records extracted per second by the job or task depending on the ${metric_group}.
  • ${metric_group}.${id}.bytes: this metric keeps track of the total number of bytes extracted by the job or task depending on the ${metric_group}.
  • ${metric_group}.${id}.bytesPerSecond: this metric keeps track of the rate of data extraction as bytes extracted per second by the job or task depending on the ${metric_group}.

User-defined Metrics

Gobblin also allows users to define custom metrics in the custom Source, Extractor, Converter, QualityChecker, ForkOperator, DataWriter, and DataPublisher classes. using the JobMetricsclass. For example, let's say a user wants to keep track of the total number of connection attempts made to the data source by a job. This can be done using aCounter, say, named sourceConnections. The following code snapshot shows how the user gets and uses the Counter`.

// Get a new Counter with the given name in the form of JOB.${jobId}.sourceConnections 
Counter sourceConnectionCounter = JobMetrics.getCounter(JobMetrics.name(MetricGroup.JOB, jobId, "sourceConnections"));
// Code to establish a connection to the data source
// Increment the counter by one for the connection attempt made above 
sourceConnectionCounter.inc();

Metric Reporting

Job Execution History Store

CREATE TABLE IF NOT EXISTS gobblin_job_executions (
	job_name VARCHAR(128) NOT NULL,
	job_id VARCHAR(128) NOT NULL,
	start_time TIMESTAMP,
	end_time TIMESTAMP,
	duration BIGINT(21),
	state ENUM('PENDING', 'RUNNING', 'SUCCESSFUL', 'COMMITTED', 'FAILED', 'CANCELLED'),
	launched_tasks INT,
	completed_tasks INT,
	created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	last_modified_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (job_id),
	INDEX (job_name),
	INDEX (state)
);

CREATE TABLE IF NOT EXISTS gobblin_task_executions (
	task_id VARCHAR(128) NOT NULL,
	job_id VARCHAR(128) NOT NULL,
	start_time TIMESTAMP,
	end_time TIMESTAMP,
	duration BIGINT(21),
	state ENUM('PENDING', 'RUNNING', 'SUCCESSFUL', 'COMMITTED', 'FAILED', 'CANCELLED'),
	low_watermark BIGINT(21),
	high_watermark BIGINT(21),
	table_namespace VARCHAR(128),
	table_name VARCHAR(128),
	table_type ENUM('SNAPSHOT_ONLY', 'SNAPSHOT_APPEND', 'APPEND_ONLY'),
	created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	last_modified_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (task_id),
	FOREIGN KEY (job_id) 
	REFERENCES gobblin_job_executions(job_id) 
	ON DELETE CASCADE,
	INDEX (state),
	INDEX (table_namespace),
	INDEX (table_name),
	INDEX (table_type)
);

CREATE TABLE IF NOT EXISTS gobblin_job_metrics (
	metric_id BIGINT(21) NOT NULL AUTO_INCREMENT,
	job_id VARCHAR(128) NOT NULL,
	metric_group VARCHAR(128) NOT NULL,
	metric_name VARCHAR(128) NOT NULL,
	metric_type ENUM('COUNTER', 'METER', 'GAUGE') NOT NULL,
	metric_value VARCHAR(256) NOT NULL,
	created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	last_modified_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (metric_id),
	FOREIGN KEY (job_id) 
	REFERENCES gobblin_job_executions(job_id) 
	ON DELETE CASCADE,
	INDEX (metric_group),
	INDEX (metric_name),
	INDEX (metric_type)
);

CREATE TABLE IF NOT EXISTS gobblin_task_metrics (
	metric_id BIGINT(21) NOT NULL AUTO_INCREMENT,
	task_id VARCHAR(128) NOT NULL,
	metric_group VARCHAR(128) NOT NULL,
	metric_name VARCHAR(128) NOT NULL,
	metric_type ENUM('COUNTER', 'METER', 'GAUGE') NOT NULL,
	metric_value VARCHAR(256) NOT NULL,
	created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	last_modified_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (metric_id),
	FOREIGN KEY (task_id) 
	REFERENCES gobblin_task_executions(task_id) 
	ON DELETE CASCADE,
	INDEX (metric_group),
	INDEX (metric_name),
	INDEX (metric_type)
);

CREATE TABLE IF NOT EXISTS gobblin_job_properties (
    property_id BIGINT(21) NOT NULL AUTO_INCREMENT,
    job_id VARCHAR(128) NOT NULL,
    property_key VARCHAR(128) NOT NULL,
    property_value VARCHAR(128) NOT NULL,
	created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	last_modified_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (property_id),
	FOREIGN KEY (job_id)
    REFERENCES gobblin_job_executions(job_id)
    ON DELETE CASCADE,
    INDEX (property_key)
);

CREATE TABLE IF NOT EXISTS gobblin_task_properties (
    property_id BIGINT(21) NOT NULL AUTO_INCREMENT,
    task_id VARCHAR(128) NOT NULL,
    property_key VARCHAR(128) NOT NULL,
    property_value VARCHAR(128) NOT NULL,
	created_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
	last_modified_ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
	PRIMARY KEY (property_id),
	FOREIGN KEY (task_id)
    REFERENCES gobblin_task_executions(task_id)
    ON DELETE CASCADE,
    INDEX (property_key)
);

Email Notification

Clone this wiki locally