Skip to content

Commit 5c0529d

Browse files
committed
feat: retrying package fetching
Signed-off-by: Guillaume Hivert <[email protected]>
1 parent f50c2bf commit 5c0529d

File tree

3 files changed

+118
-24
lines changed

3 files changed

+118
-24
lines changed

apps/backend/src/backend.gleam

+14-7
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import gleam/otp/supervisor
66
import mist
77
import periodic
88
import setup
9-
import wisp
109
import tasks/hex
10+
import wisp
1111

1212
pub fn main() {
13+
dot_env.load()
1314
setup.radiate()
1415
wisp.configure_logger()
15-
dot_env.load()
1616

1717
let secret_key_base = config.get_secret_key_base()
1818
let cnf = config.read_config()
@@ -24,7 +24,11 @@ pub fn main() {
2424
|> mist.port(3000)
2525
|> mist.start_http()
2626

27-
let _ = start_hex_sync(cnf)
27+
let assert Ok(_) =
28+
supervisor.start(fn(children) {
29+
let assert Ok(_) = start_hex_sync(cnf, children)
30+
children
31+
})
2832

2933
process.sleep_forever()
3034
}
@@ -37,11 +41,14 @@ fn supervise(start: fn() -> _) {
3741
})
3842
}
3943

40-
fn sync_hex(cnf: Config) {
41-
hex.sync_new_gleam_releases(cnf)
44+
fn sync_hex(cnf: Config, children: supervisor.Children(Nil)) {
45+
hex.sync_new_gleam_releases(cnf, children)
4246
}
4347

44-
fn start_hex_sync(cnf: Config) {
48+
fn start_hex_sync(cnf: Config, children: supervisor.Children(Nil)) {
4549
use <- supervise()
46-
periodic.periodically(do: fn() { sync_hex(cnf) }, waiting: 60 * 1000)
50+
periodic.periodically(
51+
do: fn() { sync_hex(cnf, children) },
52+
waiting: 60 * 1000,
53+
)
4754
}

apps/backend/src/retrier.gleam

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import backend/index/error.{type Error}
2+
import gleam/erlang/process.{type Subject}
3+
import gleam/function
4+
import gleam/io
5+
import gleam/otp/actor
6+
7+
pub opaque type Message {
8+
Rerun
9+
}
10+
11+
type State(a) {
12+
State(self: Subject(Message), work: fn() -> Result(a, Error), interval: Int)
13+
}
14+
15+
fn enqueue_next_rerun(state: State(a)) {
16+
process.send_after(state.self, state.interval, Rerun)
17+
}
18+
19+
/// Repeatedly call a function, leaving `interval` milliseconds between each call.
20+
/// When the `work` function returns an error it is printed.
21+
pub fn retry(
22+
do work: fn() -> Result(a, Error),
23+
) -> Result(Subject(Message), actor.StartError) {
24+
fn() { init(60_000, work) }
25+
|> actor.Spec(loop: loop, init_timeout: 100)
26+
|> actor.start_spec()
27+
}
28+
29+
fn init(
30+
interval: Int,
31+
work: fn() -> Result(a, Error),
32+
) -> actor.InitResult(State(a), Message) {
33+
let subject = process.new_subject()
34+
let state = State(subject, work, interval)
35+
process.new_selector()
36+
|> process.selecting(subject, function.identity)
37+
|> actor.Ready(state, _)
38+
|> function.tap(fn(_) { enqueue_next_rerun(state) })
39+
}
40+
41+
fn loop(message: Message, state: State(a)) -> actor.Next(Message, State(a)) {
42+
case message {
43+
Rerun -> {
44+
case state.work() {
45+
Ok(_) -> actor.Stop(process.Normal)
46+
Error(e) -> {
47+
io.debug(e)
48+
enqueue_next_rerun(state)
49+
actor.continue(state)
50+
}
51+
}
52+
}
53+
}
54+
}

apps/backend/src/tasks/hex.gleam

+50-17
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import api/hex as api
2+
import api/hex_repo
3+
import api/signatures
24
import backend/config.{type Config}
35
import backend/data/hex_read.{type HexRead}
46
import backend/index/connect as postgres
@@ -9,9 +11,11 @@ import birl/duration
911
import gleam/hexpm.{type Package}
1012
import gleam/list
1113
import gleam/order
14+
import gleam/otp/supervisor
1215
import gleam/pgo
1316
import gleam/result
1417
import gleam/string
18+
import retrier
1519
import wisp
1620

1721
type State {
@@ -25,20 +29,24 @@ type State {
2529
)
2630
}
2731

28-
pub fn sync_new_gleam_releases(cnf: Config) -> Result(HexRead, Error) {
32+
pub fn sync_new_gleam_releases(
33+
cnf: Config,
34+
children: supervisor.Children(Nil),
35+
) -> Result(HexRead, Error) {
2936
let ctx = postgres.connect(cnf)
3037
wisp.log_info("Syncing new releases from Hex")
3138
use limit <- result.try(index.get_last_hex_date(ctx.connection))
32-
use latest <- result.try(
33-
sync_packages(State(
39+
use latest <- result.try(sync_packages(
40+
State(
3441
page: 1,
3542
limit: limit,
3643
newest: limit,
3744
hex_api_key: cnf.hex_api_key,
3845
last_logged: birl.now(),
3946
db: ctx.connection,
40-
)),
41-
)
47+
),
48+
children,
49+
))
4250
let latest = index.upsert_most_recent_hex_timestamp(ctx.connection, latest)
4351
wisp.log_info("\nUp to date!")
4452
latest
@@ -58,28 +66,42 @@ fn first_timestamp(packages: List(hexpm.Package), state: State) -> Time {
5866
|> result.unwrap(state.newest)
5967
}
6068

61-
fn sync_packages(state: State) -> Result(Time, Error) {
69+
fn sync_packages(
70+
state: State,
71+
children: supervisor.Children(Nil),
72+
) -> Result(Time, Error) {
6273
let page = state.page
6374
let api_key = state.hex_api_key
6475
use all_packages <- result.try(api.get_api_packages_page(page, api_key))
6576
let state = State(..state, newest: first_timestamp(all_packages, state))
6677
let new_packages = take_fresh_packages(all_packages, state.limit)
67-
use state <- result.try(list.try_fold(new_packages, state, sync_package))
78+
use state <- result.try(list.try_fold(
79+
new_packages,
80+
state,
81+
sync_package(children),
82+
))
6883
case list.length(all_packages) == list.length(new_packages) {
6984
_ if all_packages == [] -> Ok(state.newest)
7085
False -> Ok(state.newest)
71-
True -> sync_packages(State(..state, page: state.page + 1))
86+
True -> sync_packages(State(..state, page: state.page + 1), children)
7287
}
7388
}
7489

75-
fn sync_package(state: State, package: hexpm.Package) -> Result(State, Error) {
76-
let secret = state.hex_api_key
77-
use releases <- result.try(lookup_gleam_releases(package, secret: secret))
78-
case releases {
79-
[] -> Ok(log_if_needed(state, package.updated_at))
80-
_ -> {
81-
use _ <- result.map(insert_package_and_releases(package, releases, state))
82-
State(..state, last_logged: birl.now())
90+
fn sync_package(children: supervisor.Children(Nil)) {
91+
fn(state: State, package: hexpm.Package) -> Result(State, Error) {
92+
let secret = state.hex_api_key
93+
use releases <- result.try(lookup_gleam_releases(package, secret: secret))
94+
case releases {
95+
[] -> Ok(log_if_needed(state, package.updated_at))
96+
_ -> {
97+
use _ <- result.map(insert_package_and_releases(
98+
package,
99+
releases,
100+
state,
101+
children,
102+
))
103+
State(..state, last_logged: birl.now())
104+
}
83105
}
84106
}
85107
}
@@ -88,6 +110,7 @@ fn insert_package_and_releases(
88110
package: hexpm.Package,
89111
releases: List(hexpm.Release),
90112
state: State,
113+
children: supervisor.Children(Nil),
91114
) {
92115
let secret = state.hex_api_key
93116
let versions =
@@ -100,7 +123,17 @@ fn insert_package_and_releases(
100123
use owners <- result.try(api.get_package_owners(package.name, secret: secret))
101124
use _ <- result.try(index.sync_package_owners(state.db, id, owners))
102125
wisp.log_info("Saving releases for " <> package.name)
103-
list.try_each(releases, fn(r) { index.upsert_release(state.db, id, r) })
126+
list.try_each(releases, fn(r) {
127+
use _ <- result.map(index.upsert_release(state.db, id, r))
128+
supervisor.add(children, {
129+
use _ <- supervisor.worker()
130+
retrier.retry(fn() {
131+
let infos = hex_repo.get_package_infos(package.name, r.version)
132+
use #(package, gleam_toml) <- result.try(infos)
133+
signatures.extract_signatures(state.db, package, gleam_toml)
134+
})
135+
})
136+
})
104137
}
105138

106139
fn lookup_gleam_releases(

0 commit comments

Comments
 (0)