Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Improvements #18

Draft
wants to merge 21 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/elasticlunr/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ defmodule Elasticlunr.Application do
def start(_type, _args) do
children = [
{Registry, name: Elasticlunr.IndexRegistry, keys: :unique},
{DynamicSupervisor, name: Elasticlunr.IndexSupervisor, strategy: :one_for_one}
{DynamicSupervisor, name: Elasticlunr.IndexSupervisor, strategy: :one_for_one},
{Registry, name: Elasticlunr.SchedulerRegistry, keys: :unique},
{DynamicSupervisor, name: Elasticlunr.SchedulerSupervisor, strategy: :one_for_one}
# Starts a worker by calling: Elasticlunr.Worker.start_link(arg)
# {Elasticlunr.Worker, arg}
]
Expand Down
91 changes: 53 additions & 38 deletions lib/elasticlunr/core/field.ex
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,22 @@ defmodule Elasticlunr.Field do
%{field | query_pipeline: pipeline}
end

@spec add(t(), list(document())) :: t()
def add(%__MODULE__{pipeline: pipeline} = field, documents) do
Enum.each(documents, fn %{id: id, content: content} ->
unless DB.member?(field.db, {:field_ids, field.name, id}) do
tokens = Pipeline.run(pipeline, content)
@spec add(t(), list(document()), keyword()) :: t()
def add(%__MODULE__{pipeline: pipeline} = field, documents, opts \\ []) do
conflict_action = Keyword.get(opts, :on_conflict)

add_id(field, id)
Enum.each(documents, fn %{id: id, content: content} ->
with false <- DB.member?(field.db, {:field_ids, field.name, id}),
tokens <- Pipeline.run(pipeline, content),
true <- add_id(field, id) do
update_field_stats(field, id, tokens)
else
true ->
handle_conflict(conflict_action, field, %{id: id, content: content})
end
end)

recalculate_idf(field)
field
end

@spec length(t(), atom()) :: pos_integer()
Expand Down Expand Up @@ -142,7 +146,7 @@ defmodule Elasticlunr.Field do
true = DB.delete(db, {:field_ids, name, id})
end)

recalculate_idf(field)
field
end

@spec analyze(t(), any(), keyword) :: list(Token.t())
Expand Down Expand Up @@ -214,6 +218,47 @@ defmodule Elasticlunr.Field do
end)
end

@spec calculate_idf(t()) :: t()
def calculate_idf(%__MODULE__{} = field) do
terms = unique_terms_lookup(field)

terms_length = Enum.count(terms)

ids_length = length(field, :ids)

flnorm =
case terms_length > 0 do
true ->
1 / :math.sqrt(terms_length)

false ->
0
end

:ok =
terms
|> Task.async_stream(fn {term, _id, _attrs} ->
count = length(field, :term, term) + 1
value = 1 + :math.log10(ids_length / count)

true = DB.insert(field.db, {{:field_idf, field.name, term}, value})
end)
|> Stream.run()

true = DB.insert(field.db, {{:field_flnorm, field.name}, flnorm})
field
end

defp handle_conflict(:index, %{pipeline: pipeline} = field, %{id: id, content: content}) do
tokens = Pipeline.run(pipeline, content)

field
|> remove([id])
|> update_field_stats(id, tokens)
end

defp handle_conflict(:ignore, field, _document), do: field

defp update_field_stats(%{db: db, name: name} = field, id, tokens) do
Enum.each(tokens, fn token ->
%Token{token: term} = token
Expand Down Expand Up @@ -318,36 +363,6 @@ defmodule Elasticlunr.Field do
|> Stream.uniq_by(&elem(&1, 0))
end

defp recalculate_idf(field) do
terms = unique_terms_lookup(field)

terms_length = Enum.count(terms)

ids_length = length(field, :ids)

flnorm =
case terms_length > 0 do
true ->
1 / :math.sqrt(terms_length)

false ->
0
end

:ok =
terms
|> Task.async_stream(fn {term, _id, _attrs} ->
count = length(field, :term, term) + 1
value = 1 + :math.log10(ids_length / count)

true = DB.insert(field.db, {{:field_idf, field.name, term}, value})
end)
|> Stream.run()

true = DB.insert(field.db, {{:field_flnorm, field.name}, flnorm})
field
end

defp filter_ids(field, ids, term, matching_docs, query) do
docs = Keyword.get(query, :docs)

Expand Down
71 changes: 43 additions & 28 deletions lib/elasticlunr/core/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ defmodule Elasticlunr.Index.IdPipeline do
end

defmodule Elasticlunr.Index do
alias Elasticlunr.{DB, Field, Pipeline}
alias Elasticlunr.{DB, Field, Pipeline, Scheduler}
alias Elasticlunr.Index.IdPipeline
alias Elasticlunr.Dsl.{Query, QueryRepository}
alias Uniq.UUID

@fields ~w[db fields name ref pipeline documents_size store_positions store_documents]a
@fields ~w[db fields name ref pipeline store_positions store_documents on_conflict]a
@enforce_keys @fields
defstruct @fields

Expand All @@ -24,7 +24,6 @@ defmodule Elasticlunr.Index do
@type t :: %__MODULE__{
db: DB.t(),
fields: map(),
documents_size: integer(),
ref: Field.document_ref(),
pipeline: Pipeline.t(),
name: atom() | binary(),
Expand All @@ -49,11 +48,11 @@ defmodule Elasticlunr.Index do

attrs = %{
db: db,
documents_size: 0,
ref: ref,
fields: fields,
pipeline: pipeline,
name: name,
on_conflict: Keyword.get(opts, :on_conflict, :index),
store_documents: Keyword.get(opts, :store_documents, true),
store_positions: Keyword.get(opts, :store_positions, true)
}
Expand Down Expand Up @@ -91,7 +90,7 @@ defmodule Elasticlunr.Index do
raise "Unknown field #{name} in index"
end

update_documents_size(%{index | fields: Map.put(fields, name, field)})
%{index | fields: Map.put(fields, name, field)}
end

@spec get_fields(t()) :: list(Field.document_ref() | document_field())
Expand All @@ -112,18 +111,25 @@ defmodule Elasticlunr.Index do
%{index | fields: fields}
end

@spec add_documents(t(), list(map())) :: t()
def add_documents(%__MODULE__{fields: fields, ref: ref} = index, documents) do
:ok = persist(fields, ref, documents, &Field.add/2)
@spec add_documents(t(), list(map()), keyword()) :: t()
def add_documents(
%__MODULE__{fields: fields, on_conflict: on_conflict, ref: ref} = index,
documents,
opts \\ []
) do
opts = Keyword.put_new(opts, :on_conflict, on_conflict)
:ok = persist(fields, ref, documents, &Field.add(&1, &2, opts))
:ok = Scheduler.push(index, :calculate_idf)

update_documents_size(index)
index
end

@spec update_documents(t(), list(map())) :: t()
def update_documents(%__MODULE__{ref: ref, fields: fields} = index, documents) do
:ok = persist(fields, ref, documents, &Field.update/2)
:ok = Scheduler.push(index, :calculate_idf)

update_documents_size(index)
index
end

@spec remove_documents(t(), list(Field.document_ref())) :: t()
Expand All @@ -132,7 +138,9 @@ defmodule Elasticlunr.Index do
Field.remove(field, document_ids)
end)

update_documents_size(index)
:ok = Scheduler.push(index, :calculate_idf)

index
end

@spec analyze(t(), document_field(), any(), keyword()) :: Enumerable.t()
Expand All @@ -158,20 +166,17 @@ defmodule Elasticlunr.Index do
|> Field.documents()
end

@spec update_documents_size(t()) :: t()
def update_documents_size(%__MODULE__{fields: fields} = index) do
size =
Enum.reduce(fields, 0, fn {_, field}, acc ->
size = Field.length(field, :ids)

if size > acc do
size
else
acc
end
end)
@spec documents_size(t()) :: t()
def documents_size(%__MODULE__{fields: fields}) do
Enum.reduce(fields, 0, fn {_, field}, acc ->
size = Field.length(field, :ids)

%{index | documents_size: size}
if size > acc do
size
else
acc
end
end)
end

@spec search(t(), search_query(), map() | nil) :: list(search_result())
Expand Down Expand Up @@ -281,10 +286,20 @@ defmodule Elasticlunr.Index do
end

defp persist(fields, ref, documents, persist_fn) do
Task.async_stream(documents, fn document ->
document = flatten_document(document)
save(fields, ref, document, persist_fn)
end)
tasks_opt = [ordered: false]

Task.async_stream(
documents,
fn document ->
document =
document
|> flatten_document()
|> Map.put_new_lazy(ref, &FlakeId.get/0)

save(fields, ref, document, persist_fn)
end,
tasks_opt
)
|> Stream.run()
end

Expand Down
7 changes: 5 additions & 2 deletions lib/elasticlunr/deserializer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ end
defmodule Elasticlunr.Deserializer.Parser do
alias Elasticlunr.{Index, Pipeline}

@spec process(Enum.t()) :: Index.t()
@spec process(Enumerable.t()) :: Index.t()
def process(data) do
Enum.reduce(data, nil, fn line, acc ->
[command | opts] =
Expand Down Expand Up @@ -42,7 +42,10 @@ defmodule Elasticlunr.Deserializer.Parser do
{index + 1, Map.put(map, to_string(index), String.to_atom(callback))}
end)

opts = Keyword.replace(opts, :pipeline, parse_pipeline(opts[:pipeline]))
opts =
opts
|> Keyword.replace(:pipeline, parse_pipeline(opts[:pipeline]))
|> Keyword.replace(:on_conflict, String.to_atom(opts[:on_conflict]))

{Index.new(opts), %{pipeline: pipeline_map}}
end
Expand Down
13 changes: 13 additions & 0 deletions lib/elasticlunr/logger.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Elasticlunr.Logger do
require Logger

@spec debug(binary() | iodata(), keyword()) :: :ok
def debug(msg, opts \\ []) do
Logger.debug("[elasticlunr] #{msg}", opts)
end

@spec error(binary() | iodata(), keyword()) :: :ok
def error(msg, opts \\ []) do
Logger.error("[elasticlunr] #{msg}", opts)
end
end
15 changes: 13 additions & 2 deletions lib/elasticlunr/protocol_implementations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,17 @@ end
defimpl Elasticlunr.Serializer, for: Elasticlunr.Index do
alias Elasticlunr.{Index, Serializer}

def serialize(%Index{db: db, fields: fields, name: name, pipeline: pipeline, ref: ref}, _opts) do
def serialize(
%Index{
db: db,
fields: fields,
name: name,
on_conflict: on_conflict,
pipeline: pipeline,
ref: ref
},
_opts
) do
pipeline_opt = Serializer.serialize(pipeline)
db_settings = Serializer.serialize(db)

Expand All @@ -48,7 +58,8 @@ defimpl Elasticlunr.Serializer, for: Elasticlunr.Index do
{index + 1, Map.put(map, callback, index)}
end)

settings = "settings#name:#{name}|ref:#{ref}|pipeline:#{pipeline_opt}"
settings =
"settings#name:#{name}|ref:#{ref}|pipeline:#{pipeline_opt}|on_conflict:#{on_conflict}"

fields_settings =
Stream.map(fields, fn {name, field} ->
Expand Down
18 changes: 18 additions & 0 deletions lib/elasticlunr/scheduler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Elasticlunr.Scheduler do
@moduledoc false

alias Elasticlunr.Index

@actions ~w[calculate_idf]a

@spec push(Index.t(), atom()) :: :ok
def push(index, action) when action in @actions, do: provider().push(index, action)

defp provider, do: Application.get_env(:elasticlunr, :scheduler, Elasticlunr.Scheduler.Async)

defmacro __using__(_) do
quote location: :keep do
@behaviour Elasticlunr.Scheduler.Behaviour
end
end
end
Loading