Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚡️ Speed up function _attach_resources_to_jobs_and_instigator_jobs by 45% in python_modules/dagster/dagster/_core/definitions/definitions_class.py #65

Open
wants to merge 1 commit into
base: codeflash/optimize-remove_none_recursively-2024-06-26T09.20.53
Choose a base branch
from

Conversation

codeflash-ai[bot]
Copy link

@codeflash-ai codeflash-ai bot commented Jul 25, 2024

📄 _attach_resources_to_jobs_and_instigator_jobs() in python_modules/dagster/dagster/_core/definitions/definitions_class.py

📈 Performance improved by 45% (0.45x faster)

⏱️ Runtime went down from 20.3 microseconds to 14.0 microseconds

Explanation and details

Certainly! Here's an optimized version of the provided program. I've made improvements to minimize redundant looping and dictionary look-up operations.

Key Optimizations.

  1. Combine Job Collections: Collect all jobs from the primary jobs list, schedules, and sensors into a single dictionary first to eliminate redundant duplicates.
  2. Single Pass for Jobs: Handle unsatisfied jobs and prepare resource-bound versions in a single pass over the jobs list.
  3. Single Pass for Updates: Update all schedules and sensors to use the resource-bound job versions in one loop.

Correctness verification

The new optimized code was tested for correctness. The results are listed below.

🔘 (none found) − ⚙️ Existing Unit Tests

✅ 2 Passed − 🌀 Generated Regression Tests

(click to show generated tests)
# imports
from typing import Any, Iterable, Mapping, Optional, Union
from unittest.mock import Mock

import pytest  # used for our unit tests
from dagster._core.definitions import JobDefinition
from dagster._core.definitions.definitions_class import \
    _attach_resources_to_jobs_and_instigator_jobs
# function to test
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.partitioned_schedule import \
    UnresolvedPartitionedAssetScheduleDefinition
from dagster._core.definitions.schedule_definition import ScheduleDefinition
from dagster._core.definitions.sensor_definition import SensorDefinition
from dagster._core.definitions.unresolved_asset_job_definition import \
    UnresolvedAssetJobDefinition

# unit tests

def test_empty_inputs():
    # Test with all inputs as None
    result = _attach_resources_to_jobs_and_instigator_jobs(None, None, None, {})
    assert result.jobs_with_resources == []
    assert result.updated_schedules == []
    assert result.updated_sensors == []

    # Test with all inputs as empty lists
    result = _attach_resources_to_jobs_and_instigator_jobs([], [], [], {})
    assert result.jobs_with_resources == []
    assert result.updated_schedules == []
    assert result.updated_sensors == []

def test_single_job_no_resources_needed():
    job = JobDefinition(name="simple_job")
    result = _attach_resources_to_jobs_and_instigator_jobs([job], None, None, {})
    assert result.jobs_with_resources == [job]
    assert result.updated_schedules == []
    assert result.updated_sensors == []

def test_single_job_resources_needed():
    job = JobDefinition(name="complex_job", resource_defs={"resource1": Mock()})
    resource_defs = {"resource1": Mock()}
    expected_job = job.with_top_level_resources(resource_defs)
    result = _attach_resources_to_jobs_and_instigator_jobs([job], None, None, resource_defs)
    assert result.jobs_with_resources == [expected_job]
    assert result.updated_schedules == []
    assert result.updated_sensors == []

def test_jobs_in_schedules():
    job = JobDefinition(name="job_in_schedule")
    schedule = ScheduleDefinition(name="schedule_with_job", job=job)
    schedule.has_loadable_target = Mock(return_value=True)
    result = _attach_resources_to_jobs_and_instigator_jobs(None, [schedule], None, {})
    assert result.jobs_with_resources == [job]
    assert result.updated_schedules == [schedule]
    assert result.updated_sensors == []

def test_jobs_in_sensors():
    job = JobDefinition(name="job_in_sensor")
    sensor = SensorDefinition(name="sensor_with_jobs", jobs=[job])
    sensor.has_loadable_targets = Mock(return_value=True)
    result = _attach_resources_to_jobs_and_instigator_jobs(None, None, [sensor], {})
    assert result.jobs_with_resources == [job]
    assert result.updated_schedules == []
    assert result.updated_sensors == [sensor]

def test_mixed_jobs_in_schedules_and_sensors():
    job = JobDefinition(name="job_in_both")
    schedule = ScheduleDefinition(name="schedule_with_job", job=job)
    sensor = SensorDefinition(name="sensor_with_jobs", jobs=[job])
    schedule.has_loadable_target = Mock(return_value=True)
    sensor.has_loadable_targets = Mock(return_value=True)
    result = _attach_resources_to_jobs_and_instigator_jobs(None, [schedule], [sensor], {})
    assert result.jobs_with_resources == [job]
    assert result.updated_schedules == [schedule]
    assert result.updated_sensors == [sensor]

def test_duplicate_jobs():
    job = JobDefinition(name="duplicate_job")
    result = _attach_resources_to_jobs_and_instigator_jobs([job, job], None, None, {})
    assert result.jobs_with_resources == [job]

def test_jobs_needing_resource_binding():
    job = JobDefinition(name="job_needing_resources", resource_defs={"resource1": Mock()})
    resource_defs = {"resource1": Mock()}
    expected_job = job.with_top_level_resources(resource_defs)
    result = _attach_resources_to_jobs_and_instigator_jobs([job], None, None, resource_defs)
    assert result.jobs_with_resources == [expected_job]

def test_jobs_not_needing_resource_binding():
    job = JobDefinition(name="job_with_resources", resource_defs={"resource1": Mock()})
    result = _attach_resources_to_jobs_and_instigator_jobs([job], None, None, {})
    assert result.jobs_with_resources == [job]

def test_invalid_job_types():
    invalid_job = "invalid_job"
    with pytest.raises(AttributeError):
        _attach_resources_to_jobs_and_instigator_jobs([invalid_job], None, None, {})

def test_partial_resource_definitions():
    job = JobDefinition(name="job_needing_partial_resources", resource_defs={"resource1": Mock()})
    partial_resource_defs = {"resource1": Mock()}
    expected_job = job.with_top_level_resources(partial_resource_defs)
    result = _attach_resources_to_jobs_and_instigator_jobs([job], None, None, partial_resource_defs)
    assert result.jobs_with_resources == [expected_job]

def test_large_number_of_jobs():
    jobs = [JobDefinition(name=f"job_{i}") for i in range(1000)]
    result = _attach_resources_to_jobs_and_instigator_jobs(jobs, None, None, {})
    assert result.jobs_with_resources == jobs

def test_complex_resource_definitions():
    job = JobDefinition(name="complex_job", resource_defs={"resource1": Mock(), "resource2": Mock()})
    resource_defs = {"resource1": Mock(), "resource2": Mock(), "resource3": Mock()}
    expected_job = job.with_top_level_resources(resource_defs)
    result = _attach_resources_to_jobs_and_instigator_jobs([job], None, None, resource_defs)
    assert result.jobs_with_resources == [expected_job]

def test_resource_binding_side_effects():
    job = JobDefinition(name="job_needing_resources", resource_defs={"resource1": Mock()})
    resource_defs = {"resource1": Mock()}
    original_jobs = [job]
    _attach_resources_to_jobs_and_instigator_jobs(original_jobs, None, None, resource_defs)
    assert original_jobs == [job]  # Ensure original list is not mutated

🔘 (none found) − ⏪ Replay Tests

…by 45%

Certainly! Here's an optimized version of the provided program. I've made improvements to minimize redundant looping and dictionary look-up operations.



### Key Optimizations.
1. **Combine Job Collections**: Collect all jobs from the primary `jobs` list, `schedules`, and `sensors` into a single dictionary first to eliminate redundant duplicates.
2. **Single Pass for Jobs**: Handle unsatisfied jobs and prepare resource-bound versions in a single pass over the `jobs` list.
3. **Single Pass for Updates**: Update all `schedules` and `sensors` to use the resource-bound job versions in one loop.
@codeflash-ai codeflash-ai bot added the ⚡️ codeflash Optimization PR opened by Codeflash AI label Jul 25, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⚡️ codeflash Optimization PR opened by Codeflash AI
Projects
None yet
Development

Successfully merging this pull request may close these issues.

0 participants