Skip to content

Commit

Permalink
[Task Manager] Add caching to the task partitioning logic (#189562)
Browse files Browse the repository at this point in the history
Resolves #189119

## Summary

This PR adds a mechanism to keep the node's calculated partitions in
cache for 10 seconds before calling the discovery service again and
recalculating them.


### Checklist

- [ ] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios


### To verify
- Add the following to kibana.yml:
```
xpack.task_manager.claim_strategy: 'unsafe_mget'
```
- Verify that the cache is being updated every 10 seconds, you could
test this by adding a log statement with the timestamp in the
TaskPartitioner code that was updated
- Verify that on start the cache is updated on the initial claiming
cycle
  • Loading branch information
doakalexi authored Aug 5, 2024
1 parent 8cffec5 commit d79bdfd
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 25 deletions.
71 changes: 54 additions & 17 deletions x-pack/plugins/task_manager/server/lib/task_partitioner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
createDiscoveryServiceMock,
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';
import { TaskPartitioner } from './task_partitioner';
import { CACHE_INTERVAL, TaskPartitioner } from './task_partitioner';

const POD_NAME = 'test-pod';

Expand Down Expand Up @@ -47,24 +47,61 @@ describe('getPodName()', () => {
describe('getPartitions()', () => {
const lastSeen = '2024-08-10T10:00:00.000Z';
const discoveryServiceMock = createDiscoveryServiceMock(POD_NAME);
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);
const expectedPartitions = [
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36, 37,
39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70, 72, 73,
75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103, 105, 106,
108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132, 133, 135,
136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160, 162, 163,
165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189, 190, 192,
193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217, 219, 220,
222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246, 247, 249,
250, 252, 253, 255,
];

beforeEach(() => {
jest.useFakeTimers();
discoveryServiceMock.getActiveKibanaNodes.mockResolvedValue([
createFindSO(POD_NAME, lastSeen),
createFindSO('test-pod-2', lastSeen),
createFindSO('test-pod-3', lastSeen),
]);
});

afterEach(() => {
jest.clearAllMocks();
jest.clearAllTimers();
});

test('correctly gets the partitons for this pod', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
expect(await taskPartitioner.getPartitions()).toEqual([
0, 1, 3, 4, 6, 7, 9, 10, 12, 13, 15, 16, 18, 19, 21, 22, 24, 25, 27, 28, 30, 31, 33, 34, 36,
37, 39, 40, 42, 43, 45, 46, 48, 49, 51, 52, 54, 55, 57, 58, 60, 61, 63, 64, 66, 67, 69, 70,
72, 73, 75, 76, 78, 79, 81, 82, 84, 85, 87, 88, 90, 91, 93, 94, 96, 97, 99, 100, 102, 103,
105, 106, 108, 109, 111, 112, 114, 115, 117, 118, 120, 121, 123, 124, 126, 127, 129, 130, 132,
133, 135, 136, 138, 139, 141, 142, 144, 145, 147, 148, 150, 151, 153, 154, 156, 157, 159, 160,
162, 163, 165, 166, 168, 169, 171, 172, 174, 175, 177, 178, 180, 181, 183, 184, 186, 187, 189,
190, 192, 193, 195, 196, 198, 199, 201, 202, 204, 205, 207, 208, 210, 211, 213, 214, 216, 217,
219, 220, 222, 223, 225, 226, 228, 229, 231, 232, 234, 235, 237, 238, 240, 241, 243, 244, 246,
247, 249, 250, 252, 253, 255,
]);
expect(await taskPartitioner.getPartitions()).toEqual(expectedPartitions);
});

test('correctly caches the partitions on 10 second interval', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);
const shorterInterval = CACHE_INTERVAL / 2;

await taskPartitioner.getPartitions();

jest.advanceTimersByTime(shorterInterval);
await taskPartitioner.getPartitions();

jest.advanceTimersByTime(shorterInterval);
await taskPartitioner.getPartitions();

expect(discoveryServiceMock.getActiveKibanaNodes).toHaveBeenCalledTimes(2);
});

test('correctly catches the error from the discovery service and returns the cached value', async () => {
const taskPartitioner = new TaskPartitioner(POD_NAME, discoveryServiceMock);

await taskPartitioner.getPartitions();
expect(taskPartitioner.getPodPartitions()).toEqual(expectedPartitions);

discoveryServiceMock.getActiveKibanaNodes.mockRejectedValueOnce([]);
jest.advanceTimersByTime(CACHE_INTERVAL);
await taskPartitioner.getPartitions();
expect(taskPartitioner.getPodPartitions()).toEqual(expectedPartitions);
});
});
27 changes: 24 additions & 3 deletions x-pack/plugins/task_manager/server/lib/task_partitioner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ function range(start: number, end: number) {
}

export const MAX_PARTITIONS = 256;
export const CACHE_INTERVAL = 10000;

export class TaskPartitioner {
private readonly allPartitions: number[];
private readonly podName: string;
private kibanaDiscoveryService: KibanaDiscoveryService;
private podPartitions: number[];
private podPartitionsLastUpdated: number;

constructor(podName: string, kibanaDiscoveryService: KibanaDiscoveryService) {
this.allPartitions = range(0, MAX_PARTITIONS);
this.podName = podName;
this.kibanaDiscoveryService = kibanaDiscoveryService;
this.podPartitions = [];
this.podPartitionsLastUpdated = Date.now() - CACHE_INTERVAL;
}

getAllPartitions(): number[] {
Expand All @@ -37,10 +42,26 @@ export class TaskPartitioner {
return this.podName;
}

getPodPartitions(): number[] {
return this.podPartitions;
}

async getPartitions(): Promise<number[]> {
const allPodNames = await this.getAllPodNames();
const podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
return podPartitions;
const lastUpdated = new Date(this.podPartitionsLastUpdated).getTime();
const now = Date.now();

// update the pod partitions cache after 10 seconds
if (now - lastUpdated >= CACHE_INTERVAL) {
try {
const allPodNames = await this.getAllPodNames();
this.podPartitions = assignPodPartitions(this.podName, allPodNames, this.allPartitions);
this.podPartitionsLastUpdated = now;
} catch (error) {
// return the cached value
return this.podPartitions;
}
}
return this.podPartitions;
}

private async getAllPodNames(): Promise<string[]> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ import {
createFindSO,
} from '../kibana_discovery_service/mock_kibana_discovery_service';

jest.mock('../lib/assign_pod_partitions', () => ({
assignPodPartitions: jest.fn().mockReturnValue([1, 3]),
}));

jest.mock('../constants', () => ({
CONCURRENCY_ALLOW_LIST_BY_TASK_TYPE: [
'limitedToZero',
Expand Down Expand Up @@ -107,6 +103,7 @@ describe('TaskClaiming', () => {
.spyOn(apm, 'startTransaction')
// eslint-disable-next-line @typescript-eslint/no-explicit-any
.mockImplementation(() => mockApmTrans as any);
jest.spyOn(taskPartitioner, 'getPartitions').mockResolvedValue([1, 3]);
});

describe('claimAvailableTasks', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type * as estypes from '@elastic/elasticsearch/lib/api/typesWithBodyKey';
import { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
import { taskMappings as TaskManagerMapping } from '@kbn/task-manager-plugin/server/saved_objects/mappings';
import { asyncForEach } from '@kbn/std';
import { setTimeout as setTimeoutAsync } from 'timers/promises';
import { FtrProviderContext } from '../../ftr_provider_context';

const { properties: taskManagerIndexMapping } = TaskManagerMapping;
Expand Down Expand Up @@ -154,13 +155,17 @@ export default function ({ getService }: FtrProviderContext) {
});
});

it('should tasks with partitions assigned to this kibana node', async () => {
it('should run tasks with partitions assigned to this kibana node', async () => {
const partitions: Record<string, number> = {
'0': 127,
'1': 147,
'2': 23,
};

// wait for the pod partitions cache to update before scheduling tasks
await updateKibanaNodes();
await setTimeoutAsync(10000);

const tasksToSchedule = [];
for (let i = 0; i < 3; i++) {
tasksToSchedule.push(
Expand Down

0 comments on commit d79bdfd

Please sign in to comment.