tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
27
pulls
pipelines
fix reader feed query
awarm.space
1 week ago
1a3f8052
06fd8c90
+110
-25
4 changed files
expand all
collapse all
unified
split
app
(home-pages)
reader
enrichPost.ts
getReaderFeed.ts
supabase
database.types.ts
migrations
20260303000000_add_get_reader_feed_function.sql
+4
-1
app/(home-pages)/reader/enrichPost.ts
···
34
const handle = await idResolver.did.resolve(uri.host);
35
36
const normalizedData = normalizeDocumentRecord(doc.data, doc.uri);
37
-
if (!normalizedData) return null;
0
0
0
38
39
const normalizedPubRecord = pub
40
? normalizePublicationRecord(pub.record)
···
34
const handle = await idResolver.did.resolve(uri.host);
35
36
const normalizedData = normalizeDocumentRecord(doc.data, doc.uri);
37
+
if (!normalizedData) {
38
+
console.log("[enrichPost] normalizeDocumentRecord returned null for:", doc.uri);
39
+
return null;
40
+
}
41
42
const normalizedPubRecord = pub
43
? normalizePublicationRecord(pub.record)
+38
-24
app/(home-pages)/reader/getReaderFeed.ts
···
6
NormalizedDocument,
7
NormalizedPublication,
8
} from "src/utils/normalizeRecords";
9
-
import { deduplicateByUriOrdered } from "src/utils/deduplicateRecords";
10
import { enrichDocumentToPost } from "./enrichPost";
11
12
export type Cursor = {
···
19
): Promise<{ posts: Post[]; nextCursor: Cursor | null }> {
20
let auth_res = await getIdentityData();
21
if (!auth_res?.atp_did) return { posts: [], nextCursor: null };
22
-
let query = supabaseServerClient
23
-
.from("documents")
24
-
.select(
25
-
`*,
26
-
comments_on_documents(count),
27
-
document_mentions_in_bsky(count),
28
-
recommends_on_documents(count),
29
-
documents_in_publications!inner(publications!inner(*, publication_subscriptions!inner(*)))`,
30
-
)
31
-
.eq(
32
-
"documents_in_publications.publications.publication_subscriptions.identity",
33
-
auth_res.atp_did,
34
-
)
35
-
.order("sort_date", { ascending: false })
36
-
.order("uri", { ascending: false })
37
-
.limit(25);
38
-
if (cursor) {
39
-
query = query.or(
40
-
`sort_date.lt.${cursor.timestamp},and(sort_date.eq.${cursor.timestamp},uri.lt.${cursor.uri})`,
41
-
);
42
}
43
-
let { data: rawFeed, error } = await query;
44
45
-
// Deduplicate records that may exist under both pub.leaflet and site.standard namespaces
46
-
const feed = deduplicateByUriOrdered(rawFeed || []);
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
47
48
let posts = (
49
await Promise.all(feed.map((post) => enrichDocumentToPost(post as any)))
50
).filter((post): post is Post => post !== null);
0
0
0
51
52
const nextCursor =
53
posts.length > 0
···
6
NormalizedDocument,
7
NormalizedPublication,
8
} from "src/utils/normalizeRecords";
0
9
import { enrichDocumentToPost } from "./enrichPost";
10
11
export type Cursor = {
···
18
): Promise<{ posts: Post[]; nextCursor: Cursor | null }> {
19
let auth_res = await getIdentityData();
20
if (!auth_res?.atp_did) return { posts: [], nextCursor: null };
21
+
22
+
const { data: rawFeed, error } = await supabaseServerClient.rpc(
23
+
"get_reader_feed",
24
+
{
25
+
p_identity: auth_res.atp_did,
26
+
p_cursor_timestamp: cursor?.timestamp ?? null,
27
+
p_cursor_uri: cursor?.uri ?? null,
28
+
p_limit: 25,
29
+
},
30
+
);
31
+
if (error) {
32
+
console.error("[getReaderFeed] rpc error:", error);
33
+
return { posts: [], nextCursor: null };
0
0
0
0
0
0
0
34
}
0
35
36
+
if (rawFeed.length === 0) return { posts: [], nextCursor: null };
37
+
38
+
// Reshape rows to match the structure enrichDocumentToPost expects
39
+
const feed = rawFeed.map((row: any) => ({
40
+
uri: row.uri,
41
+
data: row.data,
42
+
sort_date: row.sort_date,
43
+
comments_on_documents: [{ count: Number(row.comments_count) }],
44
+
document_mentions_in_bsky: [{ count: Number(row.mentions_count) }],
45
+
recommends_on_documents: [{ count: Number(row.recommends_count) }],
46
+
documents_in_publications: row.publication_uri
47
+
? [
48
+
{
49
+
publications: {
50
+
uri: row.publication_uri,
51
+
record: row.publication_record,
52
+
name: row.publication_name,
53
+
},
54
+
},
55
+
]
56
+
: [],
57
+
}));
58
59
let posts = (
60
await Promise.all(feed.map((post) => enrichDocumentToPost(post as any)))
61
).filter((post): post is Post => post !== null);
62
+
if (feed.length > 0 && posts.length !== feed.length) {
63
+
console.log(`[getReaderFeed] ${feed.length - posts.length}/${feed.length} posts dropped during enrichment`);
64
+
}
65
66
const nextCursor =
67
posts.length > 0
+19
supabase/database.types.ts
···
1360
like: unknown
1361
}[]
1362
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1363
parse_iso_timestamp: {
1364
Args: {
1365
"": string
···
1360
like: unknown
1361
}[]
1362
}
1363
+
get_reader_feed: {
1364
+
Args: {
1365
+
p_identity: string
1366
+
p_cursor_timestamp?: string
1367
+
p_cursor_uri?: string
1368
+
p_limit?: number
1369
+
}
1370
+
Returns: {
1371
+
uri: string
1372
+
data: Json
1373
+
sort_date: string
1374
+
comments_count: number
1375
+
mentions_count: number
1376
+
recommends_count: number
1377
+
publication_uri: string
1378
+
publication_record: Json
1379
+
publication_name: string
1380
+
}[]
1381
+
}
1382
parse_iso_timestamp: {
1383
Args: {
1384
"": string
+49
supabase/migrations/20260303000000_add_get_reader_feed_function.sql
···
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
···
1
+
CREATE OR REPLACE FUNCTION get_reader_feed(
2
+
p_identity text,
3
+
p_cursor_timestamp timestamptz DEFAULT NULL,
4
+
p_cursor_uri text DEFAULT NULL,
5
+
p_limit int DEFAULT 25
6
+
)
7
+
RETURNS TABLE (
8
+
uri text,
9
+
data jsonb,
10
+
sort_date timestamptz,
11
+
comments_count bigint,
12
+
mentions_count bigint,
13
+
recommends_count bigint,
14
+
publication_uri text,
15
+
publication_record jsonb,
16
+
publication_name text
17
+
)
18
+
LANGUAGE sql STABLE
19
+
AS $$
20
+
SELECT
21
+
d.uri,
22
+
d.data,
23
+
d.sort_date,
24
+
(SELECT count(*) FROM comments_on_documents c WHERE c.document = d.uri),
25
+
(SELECT count(*) FROM document_mentions_in_bsky m WHERE m.document = d.uri),
26
+
(SELECT count(*) FROM recommends_on_documents r WHERE r.document = d.uri),
27
+
pub.uri,
28
+
pub.record,
29
+
pub.name
30
+
FROM documents d
31
+
JOIN documents_in_publications dip ON dip.document = d.uri
32
+
JOIN publication_subscriptions ps ON ps.publication = dip.publication
33
+
LEFT JOIN LATERAL (
34
+
SELECT p.uri, p.record, p.name
35
+
FROM documents_in_publications dip2
36
+
JOIN publications p ON p.uri = dip2.publication
37
+
WHERE dip2.document = d.uri
38
+
LIMIT 1
39
+
) pub ON true
40
+
WHERE ps.identity = p_identity
41
+
AND (
42
+
p_cursor_timestamp IS NULL
43
+
OR d.sort_date < p_cursor_timestamp
44
+
OR (d.sort_date = p_cursor_timestamp AND d.uri < p_cursor_uri)
45
+
)
46
+
GROUP BY d.uri, d.data, d.sort_date, pub.uri, pub.record, pub.name
47
+
ORDER BY d.sort_date DESC, d.uri DESC
48
+
LIMIT p_limit;
49
+
$$;