Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ coverage/
# TypeScript build info
*.tsbuildinfo

....
# Lock files — project uses pnpm; npm lock file is not committed
package-lock.json
71 changes: 71 additions & 0 deletions docs/architecture/ownership-read-model.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Ownership Read Model

The ownership read model is the authoritative source of truth for wallet-level key holdings and per-creator holder lists. It is maintained by the indexer and consumed by API endpoints that return holder counts, balances, and holder lists.

## Table Schema

The ownership read model is stored in the `KeyOwnership` table.

| Field | Type | Description |
|----------------|------------|-----------------------------------------------------------------------------|
| `id` | `String` | Unique record identifier (cuid). |
| `ownerAddress` | `String` | Stellar wallet address of the key holder. |
| `creatorId` | `String` | ID of the creator whose keys are held. |
| `balance` | `Decimal` | Number of keys currently held. Defaults to `0`. Never goes below `0`. |
| `createdAt` | `DateTime` | Timestamp when this ownership record was first created. |
| `updatedAt` | `DateTime` | Timestamp of the most recent balance update (auto-managed by Prisma). |

**Uniqueness constraint:** `(ownerAddress, creatorId)` — one record per wallet per creator.

**Indexes:** `ownerAddress`, `creatorId` — both indexed for efficient lookups by wallet or by creator.

## Update Triggers

The indexer updates the ownership read model in response to three on-chain trade event types:

### Buy

When a wallet purchases keys from a creator:

1. An `upsert` is performed on `(ownerAddress, creatorId)`.
2. `balance` is incremented by the purchased amount.
3. If no record exists, one is created with `balance = purchased amount`.

### Sell

When a wallet sells keys back to a creator:

1. The existing `KeyOwnership` record for `(ownerAddress, creatorId)` is located.
2. `balance` is decremented by the sold amount.
3. If `balance` reaches `0`, the record is retained at `0` (not deleted) to preserve audit history and simplify replay logic.

### Peer-to-Peer Transfer

When a wallet transfers keys directly to another wallet (without going through the bonding curve):

1. The sender's `KeyOwnership` record is decremented by the transferred amount.
2. The recipient's `KeyOwnership` record is incremented by the same amount (upserted if it does not exist).
3. Both updates are applied atomically where possible to prevent intermediate inconsistent states.

## Balance Conservation Invariant

At any point in time, the sum of all `balance` values across every `KeyOwnership` record for a given `creatorId` must equal that creator's total key supply as recorded on-chain:

```
∑ balance(ownerAddress, creatorId) = creatorTotalSupply(creatorId)
```

This invariant must hold after every trade event is processed. Any discrepancy indicates a missed or double-processed event and should trigger a reconciliation replay.

## Replay and Consistency Recovery

If the indexer misses one or more on-chain events (due to a crash, network gap, or RPC timeout), the ownership read model can fall out of sync with the chain state.

**Replay procedure:**

1. The admin replay endpoint (`POST /api/v1/admin/replay`) re-fetches the affected ledger range from the Stellar RPC and re-emits all trade events in order.
2. Each event is processed with idempotency guards: an event with a ledger sequence already recorded is skipped without modifying the read model.
3. After replay completes, the balance conservation invariant is re-validated. If the sum of balances still does not match the on-chain supply, the replay window is widened and the process repeats.
4. Replay is safe to run at any time because all write paths are idempotent — re-processing a seen event produces no side effects.

Gaps detected by the ledger gap detection service (`LedgerGapDetectionService`) are automatically flagged and can trigger a targeted replay without requiring a full historical re-index.
25 changes: 25 additions & 0 deletions src/modules/indexer/price-snapshot.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ export async function upsertPriceSnapshot(event: TradeEventPayload): Promise<voi
lastTradeAt: tradeAt,
},
});
logger.debug(
{
creator_id: creatorId,
new_price: price.toString(),
previous_price: null,
ledger_sequence: null,
written_at: tradeAt.toISOString(),
},
'price-snapshot: written (first trade)'
);
return;
}

Expand All @@ -52,6 +62,11 @@ export async function upsertPriceSnapshot(event: TradeEventPayload): Promise<voi
return;
}

// Skip write when price is unchanged.
if (existing.currentPrice.toString() === price.toString()) {
return;
}

// Promote currentPrice → price24hAgo when the snapshot is older than 24 h.
const twentyFourHoursAgo = new Date(Date.now() - 24 * 60 * 60 * 1000);
const shouldRotate24h =
Expand All @@ -65,6 +80,16 @@ export async function upsertPriceSnapshot(event: TradeEventPayload): Promise<voi
lastTradeAt: tradeAt,
},
});
logger.debug(
{
creator_id: creatorId,
new_price: price.toString(),
previous_price: existing.currentPrice.toString(),
ledger_sequence: null,
written_at: tradeAt.toISOString(),
},
'price-snapshot: written'
);
} catch (err) {
logger.error({ err, creatorId }, 'price-snapshot: failed to upsert');
throw err;
Expand Down
18 changes: 15 additions & 3 deletions src/modules/webhooks/webhook.integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,26 +122,38 @@ describe('POST /api/v1/creators/:id/webhooks', () => {
expect(res.status).toBe(400);
});

it('returns 409 when max webhooks reached', async () => {
it('returns 422 when max webhooks reached', async () => {
const existingCount = await prisma.webhook.count({
where: { creatorId, isActive: true },
});

const remaining = envConfig.WEBHOOK_MAX_PER_CREATOR - existingCount;
for (let i = 0; i < remaining; i++) {
await supertest(app)
const res = await supertest(app)
.post(basePath)
.set(authHeaders('POST', basePath, creatorId))
.send({ callback_url: `https://example.com/hook-${i}`, events: ['buy'] });
expect(res.status).toBe(201);
}

const countAtLimit = await prisma.webhook.count({
where: { creatorId, isActive: true },
});
expect(countAtLimit).toBe(envConfig.WEBHOOK_MAX_PER_CREATOR);

const res = await supertest(app)
.post(basePath)
.set(authHeaders('POST', basePath, creatorId))
.send({ callback_url: 'https://example.com/too-many', events: ['buy'] });

expect(res.status).toBe(409);
expect(res.status).toBe(422);
expect(res.body.error.code).toBe('MAX_WEBHOOKS_REACHED');
expect(res.body.error.message).toMatch(/maximum/i);

const countAfter = await prisma.webhook.count({
where: { creatorId, isActive: true },
});
expect(countAfter).toBe(envConfig.WEBHOOK_MAX_PER_CREATOR);
});
});

Expand Down
2 changes: 1 addition & 1 deletion src/modules/webhooks/webhook.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ export async function createWebhook(
new Error(
`Maximum of ${envConfig.WEBHOOK_MAX_PER_CREATOR} active webhooks per creator reached`
),
{ statusCode: 409, code: 'MAX_WEBHOOKS_REACHED' }
{ statusCode: 422, code: 'MAX_WEBHOOKS_REACHED' }
);
}

Expand Down
5 changes: 3 additions & 2 deletions src/utils/api-response.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Response } from 'express';
import { ZodIssue } from 'zod';
import { ErrorCode, ErrorCodeType } from '../constants/error.constants';
import { requestContextStorage } from './als.utils';
import { serializeBigInt } from './bigint-serializer.utils';

/**
* Standard API error response shape.
Expand Down Expand Up @@ -128,7 +129,7 @@ export function sendSuccess<T>(
): void {
const body: ApiSuccessResponse<T> = {
success: true,
data,
data: serializeBigInt(data) as T,
...(message ? { message } : {}),
};
res.setHeader('Content-Type', 'application/json');
Expand All @@ -147,7 +148,7 @@ export function sendPaginatedSuccess<T>(
): void {
const body: PaginatedResponse<T> = {
success: true,
data,
data: serializeBigInt(data) as T[],
meta,
...(message ? { message } : {}),
};
Expand Down
17 changes: 16 additions & 1 deletion src/utils/bigint-serializer.utils.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { strict as assert } from 'assert';
import { bigIntReplacer, safeJsonStringify, sanitizeBigInts } from './bigint-serializer.utils';
import { bigIntReplacer, safeJsonStringify, sanitizeBigInts, serializeBigInt } from './bigint-serializer.utils';

function run() {
// bigIntReplacer converts BigInt to string
Expand Down Expand Up @@ -36,6 +36,21 @@ function run() {
assert.equal(sanitizeBigInts(42), 42);
assert.equal(sanitizeBigInts('str'), 'str');

// serializeBigInt – top-level BigInt converts to string
assert.equal(serializeBigInt(9007199254740993n), '9007199254740993');

// serializeBigInt – nested BigInt in object converts correctly
const serializedObj = serializeBigInt({ id: 1n, nested: { amount: 500n }, label: 'ok' });
assert.deepEqual(serializedObj, { id: '1', nested: { amount: '500' }, label: 'ok' });

// serializeBigInt – BigInt inside an array converts correctly
assert.deepEqual(serializeBigInt([1n, 2n, 3n]), ['1', '2', '3']);

// serializeBigInt – non-BigInt values pass through unchanged
assert.equal(serializeBigInt(42), 42);
assert.equal(serializeBigInt('hello'), 'hello');
assert.deepEqual(serializeBigInt({ x: 1, y: 'str' }), { x: 1, y: 'str' });

console.log('bigint-serializer.utils tests passed');
}

Expand Down
11 changes: 11 additions & 0 deletions src/utils/bigint-serializer.utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,14 @@ export function sanitizeBigInts(value: unknown): unknown {
}
return value;
}

/**
* Recursively converts BigInt values to their decimal string representation.
* Alias for `sanitizeBigInts` — use this name when the intent is to prepare
* a value for JSON serialization in API responses.
*
* @example
* serializeBigInt({ amount: 1000000000000000000n });
* // → { amount: "1000000000000000000" }
*/
export const serializeBigInt = sanitizeBigInts;
Loading