Support subquery moves across AND/OR/NOT WHERE clauses#4051
Conversation
c7da8bb to
f33c781
Compare
6669eb8 to
a1e0fc1
Compare
226f9b3 to
203f4fc
Compare
67f228e to
438e6cf
Compare
66d52fe to
fb8a7d9
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4051 +/- ##
==========================================
+ Coverage 68.29% 68.72% +0.43%
==========================================
Files 127 156 +29
Lines 15988 17244 +1256
Branches 3855 3911 +56
==========================================
+ Hits 10919 11851 +932
- Misses 5065 5389 +324
Partials 4 4
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
413acaf to
4208d42
Compare
alco
left a comment
There was a problem hiding this comment.
Intermediate review while I keep looking through the rest of the changes.
What exactly is a subquery dependency polarity and what does it affect?
| """ | ||
| @callback write_move_in_snapshot!( | ||
| Enumerable.t({key :: String.t(), value :: Querying.json_iodata()}), | ||
| Enumerable.t(row()), |
There was a problem hiding this comment.
The row() type is defined as list(). Can we make its definition reflect the actual value type it represents?
| is_subquery: boolean(), | ||
| negated: boolean(), |
There was a problem hiding this comment.
Weird naming inconsistency. Both are boolean fields, should rather be named subquery? and negated?.
| case :ets.lookup(table(stack_ref), :last_broadcast_lsn) do | ||
| [{:last_broadcast_lsn, lsn}] -> lsn | ||
| [] -> 0 | ||
| end | ||
| rescue | ||
| ArgumentError -> 0 |
There was a problem hiding this comment.
Nit: since we have this rescue clause, we could use :ets.lookup_element:
| case :ets.lookup(table(stack_ref), :last_broadcast_lsn) do | |
| [{:last_broadcast_lsn, lsn}] -> lsn | |
| [] -> 0 | |
| end | |
| rescue | |
| ArgumentError -> 0 | |
| :ets.lookup_element(table(stack_ref), :last_broadcast_lsn, 2) | |
| rescue | |
| ArgumentError -> 0 |
| last_lsn = get_last_broadcast_lsn(stack_ref) | ||
|
|
||
| if last_lsn > 0 do | ||
| send(self(), {:global_last_seen_lsn, last_lsn}) |
There was a problem hiding this comment.
Since this is sending to self(), it should be mentioned in the function doc that callers are expected to process this message after a successful function call.
| test "return 400 if same subquery is used with both positive and negative polarity", %{ | ||
| opts: opts | ||
| } do | ||
| assert %{status: 400} = |
There was a problem hiding this comment.
Does this also return a descriptive message? I think we should have that and assert against that in tests for easy reference.
| tag2 = | ||
| :crypto.hash(:md5, ctx.stack_id <> req.handle <> "v:2") |> Base.encode16(case: :lower) |
There was a problem hiding this comment.
This looks like a good candidate for a helper function, to avoid repeating the same invocation with base encoding throughout the test code.
There was a problem hiding this comment.
| end | ||
|
|
||
| @impl GenServer | ||
| def handle_continue({:init_consumer, config}, state) do |
There was a problem hiding this comment.
This is dead code, we no longer have :init_consumer continuation in this module. It got replaced with the def handle_info({:initialize_shape, shape, opts}, state) clause some time ago.
| {:ok, reduced} <- | ||
| build_bool_chain( | ||
| %{name: "or", impl: &pg_and/2, strict?: false}, | ||
| %{name: "and", impl: &pg_and/2, strict?: false}, |
There was a problem hiding this comment.
Is this a sneaked-in bug fix for BETWEEN?
There was a problem hiding this comment.
Before this PR it was a latent bug: the only consumer that ran was the evaluator, which uses implementation, so BETWEEN gave correct results. This PR introduces the code paths that key off name — DNF decomposition and re-generating WHERE SQL for subquery move-in queries — at which point the mislabel becomes a real correctness bug for any shape whose WHERE clause uses BETWEEN.
| end | ||
| end) | ||
| |> case do | ||
| {:ok, effects} -> {:ok, effects |> Enum.reverse() |> List.flatten()} |
There was a problem hiding this comment.
Effects could alternatively be accumulated in an order-preserving way using Stream.concat() and here converted to a list with Enum.to_list().
There was a problem hiding this comment.
I left this one as-is. Each transaction_to_effects returns 0 or 1 Effects.AppendChanges, so the accumulator is a list of at most single-element lists and Enum.reverse |> List.flatten stays cheap and clear. A Stream.concat chain built up in the reduce would be nested per-txn and, to my eye, less readable here without a measurable win — happy to revisit if you feel strongly.
| error | ||
|
|
||
| converted -> | ||
| {:ok, converted |> Enum.reverse() |> List.flatten() |> mark_last_change()} |
There was a problem hiding this comment.
While this code was still in Consumer and/or ChangeHandling, the last change could be marked before reversing the list of changes. We can preserve that behaviour here as well, no? Instead of doing reverse + rebuild the list just to update its last entry.
There was a problem hiding this comment.
positive:
|
801d714 to
ec39d48
Compare
|
This PR has been released! 🚀 The following packages include changes from this PR:
Thanks for contributing to Electric! |
## Summary Follow-up to the merged subqueries PR #4051, addressing reviewer (@alco) feedback that came in after merge. Each commit maps 1:1 to a review comment so they can be reviewed independently. ## Changes | Commit | Review comment | | --- | --- | | `8827e20` Make storage `row()` type reflect actual value type | [r3130325864](#4051 (comment)) | | `fd7748e` Rename `position_info` boolean fields to `subquery?`/`negated?` | [r3130956496](#4051 (comment)) | | `91f8f17` Use `:ets.lookup_element/3` in `get_last_broadcast_lsn` | [r3130990531](#4051 (comment)) | | `dd8cb5c` Document `self()` message sent by `subscribe_to_global_lsn_updates` | [r3130996392](#4051 (comment)) | | `82ecbc2` Remove dead `:init_consumer` `handle_continue` clause | [r3131087003](#4051 (comment)) | | `1545bb3` Assert mixed-polarity subquery error message in router test | [r3131001659](#4051 (comment)) | | `6aa7581` Mark last converted change before reversing | [r3131488531](#4051 (comment)) | | `2a7fff5` Extract `value_tag/3` helper in router test | [r3131013153](#4051 (comment)) | These are internal-only changes (type docs, field renames, a dead-code removal, an ETS lookup simplification, test assertions/helpers, and a list-building cleanup) with no user-facing behavior change, so no changeset is included. A few comments were discussed and intentionally left unchanged: the `Stream.concat` suggestion ([r3131472813](#4051 (comment)), the accumulated lists are tiny and `reverse |> flatten` is clearer) and the `BETWEEN` question ([r3131209403](#4051 (comment)), confirmed as an intentional latent-bug fix required by this PR's `name`-based DNF decomposition / SQL generation). --- 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
This PR lets shapes with boolean combinations around subqueries stay live when dependency rows move. Previously, shapes that used
AND,OR, orNOTaround a subquery would return409on a move, forcing the client to throw away the shape and resync it from scratch. For large shapes that could mean a very expensive full resync; this PR avoids that by reconciling those changes in-stream.The core of the PR is a rewrite of the subquery foundation. Move-ins are now handled as exact log splices: buffer outer-table transactions, run the move-in query, and insert its results into the shape log at the precise point where they become valid. That gives subqueries a much more reliable ordering model, removes a large class of move-handling edge cases, and gives us a stable base for richer boolean
WHEREsupport over subqueries.Compatibility
This PR changes the wire protocol.
The sync service from this branch is not compatible with
elixir-clientfrom before this PR. For the TanStack DB client, this requires:@tanstack/db >= 0.6.2@tanstack/electric-db-collection >= 0.3.0The protocol change is needed because DNF-based subquery shapes need more than tags alone. Rows can now stay in the shape for multiple independent reasons, so the server sends real
active_conditionsvalues and position-awaremove-inbroadcast control messages. That lets clients update the truth value of the affected DNF positions for rows they already have locally, re-evaluate inclusion correctly, and avoid unnecessary deletes or full resyncs.Architecture
WHEREclause into a DNF plan and use that plan consistently for routing, move planning, SQL generation, tags, andactive_conditionsSubqueryIndexthat each shape consumer seeds and updates from its own dependency views, keeping the reverse index exactly in sync with consumer stateElectric.Shapes.Consumeraround explicit event handlers and ordered effects, with a single move queue and explicit buffering/splice handlingSecondary changes
tags,active_conditions, and move broadcasts