-
Notifications
You must be signed in to change notification settings - Fork 36
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Jchris/query-allDocs #601
base: main
Are you sure you want to change the base?
Jchris/query-allDocs #601
Conversation
… when using for await
WalkthroughThis update removes a redundant blank line from a build script and modifies dependency versions in the package manifest. It refactors several CRDT and helper modules to use asynchronous generator functions for update streaming and enhances callback management. The indexing system and ledger now use consolidated query methods with improved type definitions, while new type interfaces and a utility function support asynchronous iteration. Additionally, a new test suite validates the Streaming API functionality. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant CRDT
participant Internal
Client->>CRDT: allDocs({ waitFor })
CRDT->>Internal: #snapshot() to get initial docs
Internal-->>CRDT: Returns snapshot (async iterable)
CRDT->>Internal: #subscribe() for real-time updates
Internal-->>CRDT: Streams updates via async generator
CRDT-->>Client: Returns QueryResponse (snapshot + live update stream)
sequenceDiagram
participant App
participant CRDTClock
App->>CRDTClock: onTick(callback)
CRDTClock-->>App: Returns unregister function
App->>CRDTClock: onTock(callback)
CRDTClock-->>App: Returns unregister function
✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (9)
src/indexer-helpers.ts (3)
84-95
: Consider usingv ?? null
instead of(v || null)
.
Using(v || null)
will treat all falsy values (including0
and""
) asnull
. If you wish to preserve0
or the empty string, considerv ?? null
for clarity.- value: (v || null) as R, + value: (v ?? null) as R,
170-185
: Avoid spread syntax in array accumulator to improve performance.
Repeatedly usingreturn [...acc, row]
can cause anO(n^2)
time complexity when handling large arrays. Consider pushing directly toacc
:- return [...acc, row]; + acc.push(row); + return acc;Additionally, the line
let result = [...resp.result];
at line 172 is fine for making a single copy, but be mindful of potential overhead ifresp.result
can be very large.🧰 Tools
🪛 Biome (1.9.4)
[error] 180-180: Avoid the use of spread (
...
) syntax on accumulators.Spread syntax should be avoided on accumulators (like those in
.reduce
) because it causes a time complexity ofO(n^2)
.
Consider methods such as .splice or .push instead.(lint/performance/noAccumulatingSpread)
187-202
: Yielding a Promise, or fully awaiting the document retrieval?
You doyield crdt.get(res.id).then(...)
. This passes a Promise to the consumer. If you want to yield fully resolved docs, consider anawait
:- yield crdt.get(res.id).then((val) => { - if (!val) return undefined; - const row: IndexRow<K, T, R> = { ...res, doc: val.doc }; - return row; - }); + const val = await crdt.get(res.id); + if (val) yield { ...res, doc: val.doc };src/crdt-helpers.ts (1)
242-255
: Consider handling missing or invaliddocValue
.If
getValueFromLink
fails or returns an unexpected result (e.g., missing doc), handling within this generator could provide clearer error feedback.src/ledger.ts (3)
79-96
: Multiple overload signatures might become unwieldy.Consider consolidating overloads into a single function that takes a parameter object, reducing complexity.
178-198
: Repeated overload structure within the shell.Align with the approach in lines 79-96 or unify them to avoid duplicated code paths.
336-349
: Duplicate interface forselect
within the implementation.Refactoring or sharing a common helper could reduce code duplication and maintain a single source of truth for query logic.
src/indexer.ts (1)
117-117
: Complex conditional handling for map function and metadata.Consider breaking this logic into smaller helper methods for better clarity and maintainability.
tests/fireproof/streaming-api.test.ts (1)
87-87
: Consider using fixed values for test data.Using random values in tests (e.g.,
Math.floor(Math.random() * (10 - 1) + 1)
andMath.floor(Math.random() * (10 - 4) + 4)
) could make tests flaky. Consider using fixed values for better test reliability and reproducibility.Apply this diff to use fixed values:
- const amountOfNewDocs = Math.floor(Math.random() * (10 - 1) + 1); + const amountOfNewDocs = 5; - const amountOfSnapshotDocs = Math.floor(Math.random() * (10 - 4) + 4); + const amountOfSnapshotDocs = 7;Also applies to: 114-114
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
pnpm-lock.yaml
is excluded by!**/pnpm-lock.yaml
📒 Files selected for processing (11)
build-docs.sh
(0 hunks)package.json
(1 hunks)src/crdt-clock.ts
(2 hunks)src/crdt-helpers.ts
(6 hunks)src/crdt.ts
(4 hunks)src/indexer-helpers.ts
(3 hunks)src/indexer.ts
(10 hunks)src/ledger.ts
(7 hunks)src/types.ts
(2 hunks)src/utils.ts
(1 hunks)tests/fireproof/streaming-api.test.ts
(1 hunks)
💤 Files with no reviewable changes (1)
- build-docs.sh
🧰 Additional context used
🪛 Biome (1.9.4)
src/indexer-helpers.ts
[error] 180-180: Avoid the use of spread (...
) syntax on accumulators.
Spread syntax should be avoided on accumulators (like those in .reduce
) because it causes a time complexity of O(n^2)
.
Consider methods such as .splice or .push instead.
(lint/performance/noAccumulatingSpread)
🔇 Additional comments (44)
src/indexer-helpers.ts (6)
27-29
: Imports introduced for 'ClockHead', 'ChangesOptions', and 'IndexRow' look proper.
They seem directly aligned with the new functionality in this file, and there appear to be no issues.
34-34
: Import 'clockChangesSince' is correctly placed.
No concerns here; it provides the functionality needed for filtering by clock head.
66-68
: NewIndexDoc
interface usage.
Defining a strongly typedvalue: R
is an excellent move towards better type safety and clarity.
76-79
:indexEntriesForChanges
function signature looks good.
It’s well-typed with generics, matching your use case for flexible document fragment handling.
164-168
:applyQuery
function signature.
Shifting to anAsyncGenerator
aligns well with streaming and partial retrieval. The signature is consistent with your new query flow.
205-207
: Outer loop for final yielding is straightforward.
This logic is concise and effectively passes along any filtered results from_apply()
.src/crdt.ts (10)
18-20
: New imports fromcrdt-helpers
:
The addition ofdocUpdateToDocWithId
,getAllEntries
, andclockUpdatesSince
aligns properly with your new doc and clock streaming approach.
33-36
: Extended imports for streaming and subscription support
QueryResponse
,ListenerFn
,QueryStreamMarker
, andChangesOptions
are well placed for the new reactive features.
41-41
:arrayFromAsyncIterable
andensureLogger
imports
Great additions for snapshot generation and consistent logging.
62-65
: Subscription-related properties
Declaring_listening
,_listeners
, and_noupdate_listeners
here is clear, enabling subscription logic.
163-182
:allDocs
method returning aQueryResponse
This provides a succinct entry point for snapshot and streaming. The design is consistent with your reactive architecture.
183-185
:#currentDocs
method
Delegating to eitherchanges(...)
orall(...)
based onsince
is an elegant approach.
187-205
:#snapshot
private generator
UsingwaitFor
before yielding docs ensures upstream readiness. Implementation is neat and covers the entire doc set.
206-215
:#subscribe
method
This robust subscription callback pattern will keep external code informed of doc updates in real time.
216-261
:#stream
method
ThisReadableStream
approach elegantly handles back-pressure and cancels. Enabling both preexisting and new docs is a good design choice.
272-279
: Newall
andchanges
methods
They provide clear, asynchronous iteration over doc updates in line with your streaming model.src/types.ts (6)
220-224
: NewRow
interface
Definingid
,key
, andvalue
together in a single typed structure will help unify row handling logic across the codebase.
226-228
: NewDocumentRow
interface
Great extension ofRow
to includedoc
. This clarifies which rows also carry full document data.
232-232
: Changedkey
type toIndexKey<K>
MakingIndexRow
consistent with newly introducedIndexKey<K>
improves consistency across indexing definitions.
279-285
:InquiryResponse
interface
Encapsulating snapshot, streams, and array conversion ensures a uniform approach to reading rows in different modes.
291-301
:QueryResponse
interface
MirroringInquiryResponse
but with fullDocumentRow
ensures strong typing for doc retrieval in queries.
303-305
: UpdatedEmitFn
andMapFn
signature
Accepting a genericR
fosters more powerful mapping logic with typed fragments.src/crdt-helpers.ts (6)
6-6
: Import looks properly aligned with subsequent usage.
32-32
: New import forClockLink
is consistent with usage.
90-92
: Helper function simplifies doc creation logic.
256-269
: Asynchronous generator structure is clear and concise.
271-285
: Recursively yielding updates is well-structured.It relies on
didLinks
to prevent revisiting events, which should avoid cycles. Verify thatevent.parents
cannot create an unexpected loop in certain edge cases.
393-395
: Compaction loop usage ofclockChangesSince
.src/ledger.ts (3)
21-23
: New imports for advanced types.
65-65
: Exposingclock
as a public property.
147-149
: Getter delegates to underlying reference.src/indexer.ts (9)
16-20
: New type imports for extended query functionality.
39-39
: Utility import forarrayFromAsyncIterable
.
41-41
: ImportingdocUpdateToDocWithId
for direct usage.
52-57
: Conditional index creation logic.Potential concurrency concern: multiple calls to
index
with the samename
at the same time. Confirm if there's a guard or if only a single call is expected.
70-70
: Optional map function broadens indexing flexibility.
97-97
: Constructor extends CRDT usage with typed mapFn.
158-158
: Default map function usage.
395-395
: Variable declaration in_updateIndex
.
398-398
: Reliance onArray.fromAsync
.Please confirm that
Array.fromAsync
is supported in the target environment or successfully polyfilled, as it can be unavailable in certain older runtimes.src/crdt-clock.ts (1)
75-75
: LGTM! Good memory management.The addition of cleanup functions in
onTick
andonTock
is a good practice for memory management, allowing proper removal of callbacks when they are no longer needed.Also applies to: 80-80
tests/fireproof/streaming-api.test.ts (1)
175-175
: Verify if skipped tests are intentional.The "allDocs" test suite is skipped. Please verify if this is intentional or if these tests should be enabled.
src/utils.ts (1)
357-368
: LGTM! Good polyfill implementation.The polyfill implementation is correct and well-documented. It provides a good fallback for browsers that don't support
Array.fromAsync
.package.json (1)
97-98
:❓ Verification inconclusive
Verify the idb version downgrade.
The
idb
package has been downgraded from^8.0.2
to^8.0.1
. Please verify if this downgrade is intentional and if there are any compatibility concerns.
🌐 Web query:
What are the differences between idb 8.0.1 and 8.0.2?
💡 Result:
The differences between
idb
versions 8.0.1 and 8.0.2 primarily focus on ECMAScript Module (ESM) compatibility and dependency updates, though specific release notes for these versions aren't explicitly documented in public sources[1][2][10]. However, key insights can be inferred from related projects:Key Changes
Feature/Aspect idb
v8.0.1idb
v8.0.2ESM Support Likely lacked full ESM compatibility Added explicit ESM support for modern JavaScript toolchains[7][10] Dependencies May have used older versions of underlying libraries Updated dependencies to ensure compatibility with modern browser APIs[7][10] Firebase Integration Caused compatibility issues requiring workarounds Became the recommended version for Firebase SDKs due to resolved issues[7] Contextual Evidence
- Firebase upgraded to
[email protected]
in SDK v9.x specifically to resolve ESM-related errors in browsers[7].- The update addressed issues where older
idb
versions caused failures in projects using strict ESM modules or modern bundlers like Webpack/ESBuild[7][10].- No breaking API changes were reported between these minor versions[1][2].
For most users, upgrading from v8.0.x to v8.x.y requires no code changes but ensures better compatibility with modern JavaScript ecosystems[7][10].
Citations:
- 1: https://www.jsdocs.io/package/idb
- 2: https://www.npmjs.com/package/idb
- 3: https://www.npmjs.com/package/idb?activeTab=versions
- 4: IDBNext: migration from idb plan pouchdb/pouchdb#7591
- 5: https://wiki.idera.com/display/SQLDMMYSQL88/Previous+Features+and+Fixed+Issues
- 6: https://sprintray.com/desktop-software/release-notes/
- 7: https://firebase.google.com/support/release-notes/js
- 8: https://www.alibabacloud.com/help/en/polardb/polardb-for-mysql/olap-performance-test
- 9: https://deps.dev/npm/@teambit%2Felements/0.0.341/dependencies
- 10: https://security.snyk.io/package/npm/idb
Action Required: Confirm Intentional Downgrade of
idb
VersionWe've observed that the
idb
package version has been downgraded from^8.0.2
to^8.0.1
. Given that version 8.0.2 includes key improvements—such as enhanced ECMAScript Module (ESM) support and updated dependencies beneficial for modern bundlers and Firebase integration—it’s important to verify that this downgrade is intentional and won't introduce compatibility issues.
- Confirm whether falling back to 8.0.1 aligns with our project’s ESM requirements and overall dependency strategy.
- Assess if the removal of the ESM improvements and dependency updates present in 8.0.2 might affect the codebase or integration with tools that rely on them.
const changes = await Array.fromAsync(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger)); | ||
internalUpdates = changes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Use the polyfill function for better compatibility.
Replace Array.fromAsync
with the arrayFromAsyncIterable
polyfill function from utils.ts
for better browser compatibility.
Apply this diff to use the polyfill:
- const changes = await Array.fromAsync(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger));
+ const changes = await arrayFromAsyncIterable(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger));
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const changes = await Array.fromAsync(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger)); | |
internalUpdates = changes; | |
const changes = await arrayFromAsyncIterable(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger)); | |
internalUpdates = changes; |
@@ -58,8 +58,8 @@ export class CRDTClock<T extends DocTypes> { | |||
async processUpdates(updatesAcc: DocUpdate<T>[], all: boolean, prevHead: ClockHead) { | |||
let internalUpdates = updatesAcc; | |||
if (this.watchers.size && !all) { | |||
const changes = await clockChangesSince<T>(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger); | |||
internalUpdates = changes.result; | |||
const changes = await Array.fromAsync(clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think we should not use a polyfill.
@@ -292,19 +303,17 @@ async function gatherUpdates<T extends DocTypes>( | |||
} | |||
for (let i = ops.length - 1; i >= 0; i--) { | |||
const { key, value } = ops[i]; | |||
if (!keys.has(key)) { | |||
if (!keys.has(key) && (allowedKeys === undefined || allowedKeys.has(key))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use ! instead of ===
@@ -381,7 +390,9 @@ export async function doCompact(blockLog: CompactFetcher, head: ClockHead, logge | |||
timeEnd("compact root blocks"); | |||
|
|||
time("compact changes"); | |||
await clockChangesSince(blockLog, head, [], {}, logger); | |||
for await (const x of clockChangesSince(blockLog, head, [], {}, logger)) { | |||
void x; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be more clear why we loop through this
@@ -55,6 +59,11 @@ export class CRDT<T extends DocTypes> { | |||
readonly logger: Logger; | |||
readonly sthis: SuperThis; | |||
|
|||
// Subscriptions | |||
_listening = false; | |||
readonly _listeners = new Set<ListenerFn<T>>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not a fan on use Set of functions --- please use Map<string, Fn> and generate string this sthis.nextid()
* Retrieve the current set of documents. | ||
*/ | ||
allDocs<T extends DocTypes>({ waitFor }: { waitFor?: Promise<unknown> } = {}): QueryResponse<T> { | ||
const stream = this.#stream.bind(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is exactly where we should make use of:
#stream = () => { }
declaration.
/** | ||
* Retrieve the current set of documents. | ||
*/ | ||
allDocs<T extends DocTypes>({ waitFor }: { waitFor?: Promise<unknown> } = {}): QueryResponse<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we decided that allDocs will stay with the current signature.
This should move to select.
let unsubscribe: undefined | (() => void); | ||
let isClosed = false; | ||
|
||
return new ReadableStream<{ doc: DocWithId<T>; marker: QueryStreamMarker }>({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to test that with the worker runtime of Cloudflare. They protect against the direct use of ReadableStream.
|
||
#stream<T extends DocTypes>( | ||
opts: { futureOnly: boolean; since?: ClockHead } & ChangesOptions, | ||
{ waitFor }: { waitFor?: Promise<unknown> } = {}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
better use Future
changes.forEach(({ id: key, value, del }) => { | ||
if (del || !value) return; | ||
let mapCalled = false; | ||
const mapReturn = mapFn({ ...(value as DocWithId<T>), _id: key }, (k: IndexKeyType, v?: DocFragment) => { | ||
const mapReturn = mapFn({ ...value, _id: key }, (k: IndexKeyType, v?: R) => { | ||
mapCalled = true; | ||
if (typeof k === "undefined") return; | ||
indexEntries.push({ | ||
key: [charwise.encode(k) as K, key], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need charwise there is sthis.txt
}); | ||
}); | ||
if (!mapCalled && mapReturn) { | ||
indexEntries.push({ | ||
key: [charwise.encode(mapReturn) as K, key], | ||
value: null, | ||
value: null as R, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rethink null -> better falsy?
|
||
if (query.since) { | ||
const gen = clockChangesSince(crdt.blockstore, crdt.clock.head, query.since, query.sinceOptions || {}, logger); | ||
const ids = await Array.fromAsync(gen) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no polyfill
this.logger.Debug().Msg("post _updateIndex query"); | ||
await this._hydrateIndex(); | ||
this.logger.Debug().Msg("post _hydrateIndex query"); | ||
query(qryOpts: QueryOpts<K> & { excludeDocs: true }, intlOpts: { waitFor?: Promise<unknown> }): InquiryResponse<K, R>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to select --- leave query like it was
if (!this.indexHead || this.indexHead.length === 0) { | ||
({ result, head } = await this.crdt.allDocs()); | ||
this.logger.Debug().Msg("enter crdt.allDocs"); | ||
result = await Array.fromAsync(this.crdt.all<T>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- polyfill
} else { | ||
({ result, head } = await this.crdt.changes(this.indexHead)); | ||
result = await Array.fromAsync(this.crdt.changes<T>(this.indexHead)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- polyfill
this is before rebase, currently on main
Summary by CodeRabbit
New Features
Refactor
Tests