diff --git a/config/config.exs b/config/config.exs index e7de949..0dbf7f0 100644 --- a/config/config.exs +++ b/config/config.exs @@ -32,6 +32,8 @@ config :elixir_bench, :supported_erlang_versions, {:system, :list, "SUPPORTED_ERLANG_VERSIONS", ["20.1.2"]} +config :elixir_bench, :job_max_retries, {:system, :integer, "JOB_MAX_RETRIES", 3} + # Import environment specific config. This must remain at the bottom # of this file so it overrides the configuration defined above. import_config "#{Mix.env()}.exs" diff --git a/lib/elixir_bench/benchmarks/benchmarks.ex b/lib/elixir_bench/benchmarks/benchmarks.ex index 9b6bba3..6a2139d 100644 --- a/lib/elixir_bench/benchmarks/benchmarks.ex +++ b/lib/elixir_bench/benchmarks/benchmarks.ex @@ -146,17 +146,17 @@ defmodule ElixirBench.Benchmarks do end defp fetch_unclaimed_job(runner) do - # Unclaimed or claimed by this runner but not completed - Repo.fetch( - from( - j in Job, - where: is_nil(j.claimed_by) and is_nil(j.claimed_at) and is_nil(j.completed_at), - or_where: - j.claimed_by == ^runner.id and not is_nil(j.claimed_at) and is_nil(j.completed_at), - lock: "FOR UPDATE SKIP LOCKED", - order_by: j.inserted_at, - limit: 1 - ) + # Unclaimed or claimed by this runner but not completed and claim_count < MAX_RETRIES + from( + j in Job, + where: is_nil(j.claimed_by) and is_nil(j.claimed_at) and is_nil(j.completed_at), + or_where: + j.claimed_by == ^runner.id and not is_nil(j.claimed_at) and is_nil(j.completed_at), + lock: "FOR UPDATE SKIP LOCKED", + order_by: j.inserted_at, + limit: 1 ) + |> Job.claimable() + |> Repo.fetch() end end diff --git a/lib/elixir_bench/benchmarks/job.ex b/lib/elixir_bench/benchmarks/job.ex index a7542d6..97f6f6d 100644 --- a/lib/elixir_bench/benchmarks/job.ex +++ b/lib/elixir_bench/benchmarks/job.ex @@ -2,9 +2,10 @@ defmodule ElixirBench.Benchmarks.Job do use Ecto.Schema import Ecto.Changeset - import Ecto.Query, only: [from: 2] + import Ecto.Query, only: [from: 2, where: 2] alias ElixirBench.Repos + alias ElixirBench.Repo alias ElixirBench.Benchmarks.{Runner, Job, Config} schema "jobs" do @@ -28,12 +29,15 @@ defmodule ElixirBench.Benchmarks.Job do field :elixir_version, :string field :erlang_version, :string field :memory_mb, :integer + field :claim_count, :integer, default: 0 embeds_one(:config, Config) timestamps() end + @max_retries Confex.fetch_env!(:elixir_bench, :job_max_retries) + @submit_fields [ :elixir_version, :erlang_version, @@ -54,6 +58,9 @@ defmodule ElixirBench.Benchmarks.Job do def claim_changeset(%Job{} = job, claimed_by) do job |> change(claimed_by: claimed_by, claimed_at: DateTime.utc_now()) + |> change(claim_count: job.claim_count + 1) + + # |> prepare_changes(&increment_claim_count/1) end def create_changeset(%Job{} = job, attrs) do @@ -73,4 +80,16 @@ defmodule ElixirBench.Benchmarks.Job do def filter_by_repo(query, repo_id) do from(j in query, where: j.repo_id == ^repo_id) end + + def claimable(query) do + from(j in query, where: j.claim_count < @max_retries) + end + + defp increment_claim_count(changeset) do + Job + |> where(id: ^changeset.data.id) + |> Repo.update_all(inc: [claim_count: 1]) + + changeset + end end diff --git a/priv/repo/migrations/20180815140601_adds_claims_count_to_jobs.exs b/priv/repo/migrations/20180815140601_adds_claims_count_to_jobs.exs new file mode 100644 index 0000000..ef28042 --- /dev/null +++ b/priv/repo/migrations/20180815140601_adds_claims_count_to_jobs.exs @@ -0,0 +1,14 @@ +defmodule ElixirBench.Repo.Migrations.AddsClaimsCountToJobs do + use Ecto.Migration + + def change do + alter table(:jobs) do + add :claim_count, :integer, default: 0 + end + + # assume all existent jobs has being claimed once + execute """ + UPDATE jobs SET claim_count = 1 WHERE claimed_by IS NOT NULL; + """, "" # nothing on down + end +end diff --git a/test/elixir_bench/benchmarks/benchmarks_test.exs b/test/elixir_bench/benchmarks/benchmarks_test.exs index 1b7e373..e95a4a6 100644 --- a/test/elixir_bench/benchmarks/benchmarks_test.exs +++ b/test/elixir_bench/benchmarks/benchmarks_test.exs @@ -199,6 +199,20 @@ defmodule ElixirBench.BenchmarksTest do assert %Job{id: ^jid, claimed_by: ^rid} = job end + test "return error when max retries limit is reached" do + %{id: jid, claimed_by: nil} = insert(:job) + %{id: rid} = runner = insert(:runner) + + max_retries = Confex.fetch_env!(:elixir_bench, :job_max_retries) + + 1..max_retries + |> Enum.each(fn _ -> + assert {:ok, job} = Benchmarks.claim_job(runner) + end) + + assert {:error, :not_found} = Benchmarks.claim_job(runner) + end + test "return error if there is no pending jobs" do runner = insert(:runner)