Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Adds max retries to Job #48

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ config :elixir_bench,
:supported_erlang_versions,
{:system, :list, "SUPPORTED_ERLANG_VERSIONS", ["20.1.2"]}

config :elixir_bench, :job_max_retries, {:system, :integer, "JOB_MAX_RETRIES", 3}

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
22 changes: 11 additions & 11 deletions lib/elixir_bench/benchmarks/benchmarks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -146,17 +146,17 @@ defmodule ElixirBench.Benchmarks do
end

defp fetch_unclaimed_job(runner) do
# Unclaimed or claimed by this runner but not completed
Repo.fetch(
from(
j in Job,
where: is_nil(j.claimed_by) and is_nil(j.claimed_at) and is_nil(j.completed_at),
or_where:
j.claimed_by == ^runner.id and not is_nil(j.claimed_at) and is_nil(j.completed_at),
lock: "FOR UPDATE SKIP LOCKED",
order_by: j.inserted_at,
limit: 1
)
# Unclaimed or claimed by this runner but not completed and claim_count < MAX_RETRIES
from(
j in Job,
where: is_nil(j.claimed_by) and is_nil(j.claimed_at) and is_nil(j.completed_at),
or_where:
j.claimed_by == ^runner.id and not is_nil(j.claimed_at) and is_nil(j.completed_at),
lock: "FOR UPDATE SKIP LOCKED",
order_by: j.inserted_at,
limit: 1
)
|> Job.claimable()
|> Repo.fetch()
end
end
21 changes: 20 additions & 1 deletion lib/elixir_bench/benchmarks/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ defmodule ElixirBench.Benchmarks.Job do
use Ecto.Schema

import Ecto.Changeset
import Ecto.Query, only: [from: 2]
import Ecto.Query, only: [from: 2, where: 2]

alias ElixirBench.Repos
alias ElixirBench.Repo
alias ElixirBench.Benchmarks.{Runner, Job, Config}

schema "jobs" do
Expand All @@ -28,12 +29,15 @@ defmodule ElixirBench.Benchmarks.Job do
field :elixir_version, :string
field :erlang_version, :string
field :memory_mb, :integer
field :claim_count, :integer, default: 0

embeds_one(:config, Config)

timestamps()
end

@max_retries Confex.fetch_env!(:elixir_bench, :job_max_retries)

@submit_fields [
:elixir_version,
:erlang_version,
Expand All @@ -54,6 +58,9 @@ defmodule ElixirBench.Benchmarks.Job do
def claim_changeset(%Job{} = job, claimed_by) do
job
|> change(claimed_by: claimed_by, claimed_at: DateTime.utc_now())
|> change(claim_count: job.claim_count + 1)

# |> prepare_changes(&increment_claim_count/1)
end

def create_changeset(%Job{} = job, attrs) do
Expand All @@ -73,4 +80,16 @@ defmodule ElixirBench.Benchmarks.Job do
def filter_by_repo(query, repo_id) do
from(j in query, where: j.repo_id == ^repo_id)
end

def claimable(query) do
from(j in query, where: j.claim_count < @max_retries)
end

defp increment_claim_count(changeset) do
Job
|> where(id: ^changeset.data.id)
|> Repo.update_all(inc: [claim_count: 1])

changeset
end
end
14 changes: 14 additions & 0 deletions priv/repo/migrations/20180815140601_adds_claims_count_to_jobs.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defmodule ElixirBench.Repo.Migrations.AddsClaimsCountToJobs do
use Ecto.Migration

def change do
alter table(:jobs) do
add :claim_count, :integer, default: 0
end

# assume all existent jobs has being claimed once
execute """
UPDATE jobs SET claim_count = 1 WHERE claimed_by IS NOT NULL;
""", "" # nothing on down
end
end
14 changes: 14 additions & 0 deletions test/elixir_bench/benchmarks/benchmarks_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,20 @@ defmodule ElixirBench.BenchmarksTest do
assert %Job{id: ^jid, claimed_by: ^rid} = job
end

test "return error when max retries limit is reached" do
%{id: jid, claimed_by: nil} = insert(:job)
%{id: rid} = runner = insert(:runner)

max_retries = Confex.fetch_env!(:elixir_bench, :job_max_retries)

1..max_retries
|> Enum.each(fn _ ->
assert {:ok, job} = Benchmarks.claim_job(runner)
end)

assert {:error, :not_found} = Benchmarks.claim_job(runner)
end

test "return error if there is no pending jobs" do
runner = insert(:runner)

Expand Down