tangled
alpha
login
or
join now
leaflet.pub
/
leaflet
289
fork
atom
a tool for shared writing and social publishing
289
fork
atom
overview
issues
27
pulls
pipelines
add inngest function to track bsky post likes
awarm.space
1 month ago
a4586e5f
42887e83
+75
-1
9 changed files
expand all
collapse all
unified
split
app
api
inngest
client.ts
functions
sync_bsky_likes.ts
route.tsx
rpc
[command]
get_publication_data.ts
lish
[did]
[publication]
dashboard
PublishedPostsLists.tsx
appview
index.ts
drizzle
schema.ts
supabase
database.types.ts
migrations
20260209000000_add_bsky_like_count.sql
+6
app/api/inngest/client.ts
···
51
51
documentUris?: string[];
52
52
};
53
53
};
54
54
+
"appview/sync-bsky-likes": {
55
55
+
data: {
56
56
+
document_uri: string;
57
57
+
bsky_post_uri: string;
58
58
+
};
59
59
+
};
54
60
"user/write-records-to-pds": {
55
61
data: {
56
62
did: string;
+41
app/api/inngest/functions/sync_bsky_likes.ts
···
1
1
+
import { inngest } from "../client";
2
2
+
import { supabaseServerClient } from "supabase/serverClient";
3
3
+
import { AtpAgent } from "@atproto/api";
4
4
+
5
5
+
const TOTAL_ITERATIONS = 144; // 36 hours at 15-minute intervals
6
6
+
7
7
+
export const sync_bsky_likes = inngest.createFunction(
8
8
+
{
9
9
+
id: "sync_bsky_likes",
10
10
+
idempotency: "event.data.bsky_post_uri",
11
11
+
},
12
12
+
{ event: "appview/sync-bsky-likes" },
13
13
+
async ({ event, step }) => {
14
14
+
const { document_uri, bsky_post_uri } = event.data;
15
15
+
16
16
+
const agent = new AtpAgent({ service: "https://public.api.bsky.app" });
17
17
+
18
18
+
const fetchAndUpdate = async () => {
19
19
+
const res = await agent.app.bsky.feed.getPosts({
20
20
+
uris: [bsky_post_uri],
21
21
+
});
22
22
+
const post = res.data.posts[0];
23
23
+
if (!post) return 0;
24
24
+
const likeCount = post.likeCount ?? 0;
25
25
+
await supabaseServerClient
26
26
+
.from("documents")
27
27
+
.update({ bsky_like_count: likeCount })
28
28
+
.eq("uri", document_uri);
29
29
+
return likeCount;
30
30
+
};
31
31
+
32
32
+
let likeCount = await step.run("sync-0", fetchAndUpdate);
33
33
+
34
34
+
for (let i = 1; i < TOTAL_ITERATIONS; i++) {
35
35
+
await step.sleep(`wait-${i}`, "15m");
36
36
+
likeCount = await step.run(`sync-${i}`, fetchAndUpdate);
37
37
+
}
38
38
+
39
39
+
return { likeCount };
40
40
+
},
41
41
+
);
+2
app/api/inngest/route.tsx
···
13
13
check_oauth_session,
14
14
} from "./functions/cleanup_expired_oauth_sessions";
15
15
import { write_records_to_pds } from "./functions/write_records_to_pds";
16
16
+
import { sync_bsky_likes } from "./functions/sync_bsky_likes";
16
17
17
18
export const { GET, POST, PUT } = serve({
18
19
client: inngest,
···
28
29
cleanup_expired_oauth_sessions,
29
30
check_oauth_session,
30
31
write_records_to_pds,
32
32
+
sync_bsky_likes,
31
33
],
32
34
});
+1
app/api/rpc/[command]/get_publication_data.ts
···
86
86
indexed_at: dip.documents.indexed_at,
87
87
sort_date: dip.documents.sort_date,
88
88
data: dip.documents.data,
89
89
+
bsky_like_count: dip.documents.bsky_like_count,
89
90
commentsCount: dip.documents.comments_on_documents[0]?.count || 0,
90
91
mentionsCount: dip.documents.document_mentions_in_bsky[0]?.count || 0,
91
92
recommendsCount:
+1
app/lish/[did]/[publication]/dashboard/PublishedPostsLists.tsx
···
112
112
indexed_at: doc.indexed_at,
113
113
sort_date: doc.sort_date,
114
114
data: doc.data,
115
115
+
bsky_like_count: doc.bsky_like_count ?? 0,
115
116
},
116
117
},
117
118
],
+18
appview/index.ts
···
109
109
data: record.value as Json,
110
110
});
111
111
if (docResult.error) console.log(docResult.error);
112
112
+
if (record.value.postRef?.uri) {
113
113
+
await inngest.send({
114
114
+
name: "appview/sync-bsky-likes",
115
115
+
data: {
116
116
+
document_uri: evt.uri.toString(),
117
117
+
bsky_post_uri: record.value.postRef.uri,
118
118
+
},
119
119
+
});
120
120
+
}
112
121
if (record.value.publication) {
113
122
let publicationURI = new AtUri(record.value.publication);
114
123
···
269
278
data: record.value as Json,
270
279
});
271
280
if (docResult.error) console.log(docResult.error);
281
281
+
if (record.value.bskyPostRef?.uri) {
282
282
+
await inngest.send({
283
283
+
name: "appview/sync-bsky-likes",
284
284
+
data: {
285
285
+
document_uri: evt.uri.toString(),
286
286
+
bsky_post_uri: record.value.bskyPostRef.uri,
287
287
+
},
288
288
+
});
289
289
+
}
272
290
273
291
// site.standard.document uses "site" field to reference the publication
274
292
// For documents in publications, site is an AT-URI (at://did:plc:xxx/site.standard.publication/rkey)
+2
-1
drizzle/schema.ts
···
1
1
-
import { pgTable, pgEnum, text, jsonb, foreignKey, timestamp, boolean, uuid, index, bigint, unique, uniqueIndex, smallint, primaryKey } from "drizzle-orm/pg-core"
1
1
+
import { pgTable, pgEnum, text, jsonb, foreignKey, timestamp, boolean, uuid, index, bigint, unique, uniqueIndex, smallint, primaryKey, integer } from "drizzle-orm/pg-core"
2
2
import { sql } from "drizzle-orm"
3
3
4
4
export const aal_level = pgEnum("aal_level", ['aal1', 'aal2', 'aal3'])
···
225
225
uri: text("uri").primaryKey().notNull(),
226
226
data: jsonb("data").notNull(),
227
227
indexed_at: timestamp("indexed_at", { withTimezone: true, mode: 'string' }).defaultNow().notNull(),
228
228
+
bsky_like_count: integer("bsky_like_count").default(0).notNull(),
228
229
});
229
230
230
231
export const atp_poll_votes = pgTable("atp_poll_votes", {
+3
supabase/database.types.ts
···
335
335
}
336
336
documents: {
337
337
Row: {
338
338
+
bsky_like_count: number
338
339
data: Json
339
340
indexed_at: string
340
341
sort_date: string
341
342
uri: string
342
343
}
343
344
Insert: {
345
345
+
bsky_like_count?: number
344
346
data: Json
345
347
indexed_at?: string
346
348
uri: string
347
349
}
348
350
Update: {
351
351
+
bsky_like_count?: number
349
352
data?: Json
350
353
indexed_at?: string
351
354
uri?: string
+1
supabase/migrations/20260209000000_add_bsky_like_count.sql
···
1
1
+
ALTER TABLE documents ADD COLUMN bsky_like_count integer NOT NULL DEFAULT 0;