···9import {
10 Bindings, BskyEmbedWrapper, BskyRecordWrapper, EmbedData, EmbedDataType,
11 LooseObj, Post, PostLabel, AccountStatus,
12- PostRecordResponse, PostStatus, Repost, ScheduledContext
013} from '../types.d';
14import { atpRecordURI } from '../validation/regexCases';
15import { bulkUpdatePostedData, getChildPostsOfThread, isPostAlreadyPosted, setPostNowOffForPost } from './db/data';
···104 return AccountStatus.UnhandledError;
105}
106107-export const makeAgentForUser = async (env: Bindings, userId: string) => {
108- const loginCreds = await getBskyUserPassForId(env, userId);
109 if (loginCreds.valid === false) {
110 console.error(`credentials for user ${userId} were invalid`);
111 return null;
···116117 const loginResponse: AccountStatus = await loginToBsky(agent, username, password);
118 if (loginResponse != AccountStatus.Ok) {
119- const addViolation: boolean = await createViolationForUser(env, userId, loginResponse);
120 if (addViolation)
121 console.error(`Unable to login to ${userId} with violation ${loginResponse}`);
122 return null;
···124 return agent;
125}
126127-export const makePost = async (c: Context|ScheduledContext, content: Post|null, usingAgent: AtpAgent|null=null) => {
128 if (content === null) {
129 console.warn("Dropping invocation of makePost, content was null");
130 return false;
131 }
132133- const env = c.env;
134 // make a check to see if the post has already been posted onto bsky
135 // skip over this check if we are a threaded post, as we could have had a child post that didn't make it.
136- if (!content.isThreadRoot && await isPostAlreadyPosted(env, content.postid)) {
137 console.log(`Dropped handling make post for post ${content.postid}, already posted.`);
138 return true;
139 }
140141- const agent: AtpAgent|null = (usingAgent === null) ? await makeAgentForUser(env, content.user) : usingAgent;
142 if (agent === null) {
143 console.warn(`could not make agent for post ${content.postid}`);
144 return false;
145 }
146147- const newPostRecords: PostStatus|null = await makePostRaw(env, content, agent);
148 if (newPostRecords !== null) {
149- await bulkUpdatePostedData(env, newPostRecords.records, newPostRecords.expected == newPostRecords.got);
150151 // Delete any embeds if they exist.
152 for (const record of newPostRecords.records) {
···164165 // Turn off the post now flag if we failed.
166 if (content.postNow) {
167- c.executionCtx.waitUntil(setPostNowOffForPost(env, content.postid));
168 }
169 return false;
170}
171172-export const makeRepost = async (c: Context|ScheduledContext, content: Repost, usingAgent: AtpAgent|null=null) => {
173- const env = c.env;
174 let bWasSuccess = true;
175- const agent: AtpAgent|null = (usingAgent === null) ? await makeAgentForUser(env, content.userId) : usingAgent;
176 if (agent === null) {
177 console.warn(`could not make agent for repost ${content.postid}`);
178 return false;
···196 return bWasSuccess;
197};
198199-export const makePostRaw = async (env: Bindings, content: Post, agent: AtpAgent): Promise<PostStatus|null> => {
200- const username = await getUsernameForUserId(env, content.user);
201 // incredibly unlikely but we'll handle it
202 if (username === null) {
203 console.warn(`username for post ${content.postid} was invalid`);
···294 // embed thumbnails of any size
295 // it will fail when you try to make the post record, saying the
296 // post record is invalid.
297- const imgTransform = (await env.IMAGES.input(imageBlob.stream())
298 .transform({width: 1280, height: 720, fit: "scale-down"})
299 .output({ format: "image/jpeg", quality: 85 })).response();
300 if (imgTransform.ok) {
···429 }
430431 // Otherwise pull files from storage
432- const file = await env.R2.get(currentEmbed.content);
433 if (!file) {
434 console.warn(`Could not get the file ${currentEmbed.content} from R2 for post!`);
435 return false;
···447 }
448 }
449 // Give violation mediaTooBig if the file is too large.
450- await createViolationForUser(env, postData.user, AccountStatus.MediaTooBig);
451 console.warn(`Unable to upload ${currentEmbed.content} for post ${postData.postid} with err ${err}`);
452 return false;
453 }
···609610 // If this is a post thread root
611 if (content.isThreadRoot) {
612- const childPosts = await getChildPostsOfThread(env, content.postid) || [];
613 expected += childPosts.length;
614 // get the thread children.
615 for (const child of childPosts) {
···9import {
10 Bindings, BskyEmbedWrapper, BskyRecordWrapper, EmbedData, EmbedDataType,
11 LooseObj, Post, PostLabel, AccountStatus,
12+ PostRecordResponse, PostStatus, Repost, ScheduledContext,
13+ AllContext
14} from '../types.d';
15import { atpRecordURI } from '../validation/regexCases';
16import { bulkUpdatePostedData, getChildPostsOfThread, isPostAlreadyPosted, setPostNowOffForPost } from './db/data';
···105 return AccountStatus.UnhandledError;
106}
107108+export const makeAgentForUser = async (c: AllContext, userId: string) => {
109+ const loginCreds = await getBskyUserPassForId(c, userId);
110 if (loginCreds.valid === false) {
111 console.error(`credentials for user ${userId} were invalid`);
112 return null;
···117118 const loginResponse: AccountStatus = await loginToBsky(agent, username, password);
119 if (loginResponse != AccountStatus.Ok) {
120+ const addViolation: boolean = await createViolationForUser(c, userId, loginResponse);
121 if (addViolation)
122 console.error(`Unable to login to ${userId} with violation ${loginResponse}`);
123 return null;
···125 return agent;
126}
127128+export const makePost = async (c: AllContext, content: Post|null, usingAgent: AtpAgent|null=null) => {
129 if (content === null) {
130 console.warn("Dropping invocation of makePost, content was null");
131 return false;
132 }
1330134 // make a check to see if the post has already been posted onto bsky
135 // skip over this check if we are a threaded post, as we could have had a child post that didn't make it.
136+ if (!content.isThreadRoot && await isPostAlreadyPosted(c, content.postid)) {
137 console.log(`Dropped handling make post for post ${content.postid}, already posted.`);
138 return true;
139 }
140141+ const agent: AtpAgent|null = (usingAgent === null) ? await makeAgentForUser(c, content.user) : usingAgent;
142 if (agent === null) {
143 console.warn(`could not make agent for post ${content.postid}`);
144 return false;
145 }
146147+ const newPostRecords: PostStatus|null = await makePostRaw(c, content, agent);
148 if (newPostRecords !== null) {
149+ await bulkUpdatePostedData(c, newPostRecords.records, newPostRecords.expected == newPostRecords.got);
150151 // Delete any embeds if they exist.
152 for (const record of newPostRecords.records) {
···164165 // Turn off the post now flag if we failed.
166 if (content.postNow) {
167+ c.executionCtx.waitUntil(setPostNowOffForPost(c, content.postid));
168 }
169 return false;
170}
171172+export const makeRepost = async (c: AllContext, content: Repost, usingAgent: AtpAgent|null=null) => {
0173 let bWasSuccess = true;
174+ const agent: AtpAgent|null = (usingAgent === null) ? await makeAgentForUser(c, content.userId) : usingAgent;
175 if (agent === null) {
176 console.warn(`could not make agent for repost ${content.postid}`);
177 return false;
···195 return bWasSuccess;
196};
197198+export const makePostRaw = async (c: AllContext, content: Post, agent: AtpAgent): Promise<PostStatus|null> => {
199+ const username = await getUsernameForUserId(c, content.user);
200 // incredibly unlikely but we'll handle it
201 if (username === null) {
202 console.warn(`username for post ${content.postid} was invalid`);
···293 // embed thumbnails of any size
294 // it will fail when you try to make the post record, saying the
295 // post record is invalid.
296+ const imgTransform = (await c.env.IMAGES.input(imageBlob.stream())
297 .transform({width: 1280, height: 720, fit: "scale-down"})
298 .output({ format: "image/jpeg", quality: 85 })).response();
299 if (imgTransform.ok) {
···428 }
429430 // Otherwise pull files from storage
431+ const file = await c.env.R2.get(currentEmbed.content);
432 if (!file) {
433 console.warn(`Could not get the file ${currentEmbed.content} from R2 for post!`);
434 return false;
···446 }
447 }
448 // Give violation mediaTooBig if the file is too large.
449+ await createViolationForUser(c, postData.user, AccountStatus.MediaTooBig);
450 console.warn(`Unable to upload ${currentEmbed.content} for post ${postData.postid} with err ${err}`);
451 return false;
452 }
···608609 // If this is a post thread root
610 if (content.isThreadRoot) {
611+ const childPosts = await getChildPostsOfThread(c, content.postid) || [];
612 expected += childPosts.length;
613 // get the thread children.
614 for (const child of childPosts) {
+3-3
src/utils/bskyPrune.ts
···1import isEmpty from 'just-is-empty';
2import split from 'just-split';
3-import { Bindings } from '../types.d';
4import { getPostRecords } from './bskyApi';
5import { getAllPostedPosts, getAllPostedPostsOfUser } from './db/data';
6···8// are still on the network or not. If they are not, then this prunes the posts from
9// the database. This call is quite expensive and should only be ran on a weekly
10// cron job.
11-export const pruneBskyPosts = async (env: Bindings, userId?: string) => {
12- const allPostedPosts = (userId !== undefined) ? await getAllPostedPostsOfUser(env, userId) : await getAllPostedPosts(env);
13 let removePostIds: string[] = [];
14 let postedGroups = split(allPostedPosts, 25);
15 while (!isEmpty(postedGroups)) {
···1import isEmpty from 'just-is-empty';
2import split from 'just-split';
3+import { AllContext } from '../types.d';
4import { getPostRecords } from './bskyApi';
5import { getAllPostedPosts, getAllPostedPostsOfUser } from './db/data';
6···8// are still on the network or not. If they are not, then this prunes the posts from
9// the database. This call is quite expensive and should only be ran on a weekly
10// cron job.
11+export const pruneBskyPosts = async (c: AllContext, userId?: string) => {
12+ const allPostedPosts = (userId !== undefined) ? await getAllPostedPostsOfUser(c, userId) : await getAllPostedPosts(c);
13 let removePostIds: string[] = [];
14 let postedGroups = split(allPostedPosts, 25);
15 while (!isEmpty(postedGroups)) {
···1import { and, eq, ne } from "drizzle-orm";
2+import { DrizzleD1Database } from "drizzle-orm/d1";
03import { bannedUsers, violations } from "../../db/enforcement.schema";
4+import { AccountStatus, AllContext, LooseObj, Violation } from "../../types.d";
5import { lookupBskyHandle } from "../bskyApi";
6import { getUsernameForUserId } from "./userinfo";
7···17 }
18}
1920+export const userHasBan = async (c: AllContext, userDid: string): Promise<boolean> => {
21+ const db: DrizzleD1Database = c.get("db");
22+ if (!db) {
23+ console.error("unable to check if user has ban, db was null");
24+ return false;
25+ }
26+ const usersBanned = await db.$count(bannedUsers, eq(bannedUsers.did, userDid));
27+ return (usersBanned > 0);
28};
2930+export const userHandleHasBan = async (c: AllContext, userName: string) => {
31 if (userName !== null) {
32 const didHandle = await lookupBskyHandle(userName);
33 if (didHandle !== null)
34+ return await userHasBan(c, didHandle);
35 }
36 return false;
37};
···60 return valuesUpdate;
61}
6263+export const createViolationForUser = async(c: AllContext, userId: string, violationType: AccountStatus): Promise<boolean> => {
64 const NoHandleState: AccountStatus[] = [AccountStatus.Ok, AccountStatus.PlatformOutage,
65 AccountStatus.None, AccountStatus.UnhandledError];
66 // Don't do anything in these cases
···69 return false;
70 }
7172+ const db: DrizzleD1Database = c.get("db");
73+ if (!db) {
74+ console.error("unable to get database to create violations for");
75+ return false;
76+ }
77 const valuesUpdate:LooseObj = createObjForValuesChange([violationType], true);
78 if (violationType === AccountStatus.TOSViolation) {
79+ const bskyUsername = await getUsernameForUserId(c, userId);
80 if (bskyUsername !== null) {
81 await createBanForUser(db, bskyUsername, "tos violation");
82 } else {
···95 ));
96};
9798+export const removeViolation = async(c: AllContext, userId: string, violationType: AccountStatus) => {
99+ await removeViolations(c, userId, [violationType]);
100};
101102+export const removeViolations = async(c: AllContext, userId: string, violationType: AccountStatus[]) => {
103+ const db: DrizzleD1Database = c.get("db");
104+ if (!db) {
105+ console.warn(`unable to remove violations for user ${userId}, db was null`);
106+ return;
107+ }
108 // Check if they have a violation first
109 if ((await userHasViolations(db, userId)) == false) {
110 return;
···125}
126127export const getViolationsForUser = async(db: DrizzleD1Database, userId: string) => {
128+ const {results} = await db.select().from(violations)
129+ .where(eq(violations.userId, userId)).limit(1).run();
130 if (results.length > 0)
131 return (results[0] as Violation);
132 return null;
133};
134135+export const getViolationsForCurrentUser = async(c: AllContext): Promise<Violation|null> => {
136 const userId = c.get("userId");
137+ const db: DrizzleD1Database = c.get("db");
138+ if (userId && db) {
139 return await getViolationsForUser(db, userId);
140 }
141 return null;
142+};
+51-25
src/utils/dbQuery.ts
···1import { addHours, isAfter, isEqual } from "date-fns";
2import { and, asc, desc, eq, getTableColumns, gt, gte, sql } from "drizzle-orm";
3import { BatchItem } from "drizzle-orm/batch";
4-import { drizzle, DrizzleD1Database } from "drizzle-orm/d1";
5-import { Context } from "hono";
6import has from "just-has";
7import isEmpty from "just-is-empty";
8import { v4 as uuidv4, validate as uuidValid } from 'uuid';
···11import { MAX_POSTS_PER_THREAD, MAX_REPOST_POSTS, MAX_REPOST_RULES_PER_POST } from "../limits";
12import {
13 AccountStatus,
014 BatchQuery,
15 CreateObjectResponse, CreatePostQueryResponse,
16 DeleteResponse,
···25import { createPostObject, createRepostInfo, floorGivenTime } from "./helpers";
26import { deleteEmbedsFromR2 } from "./r2Query";
2728-export const getPostsForUser = async (c: Context): Promise<Post[]|null> => {
29 try {
30 const userId = c.get("userId");
31- if (userId) {
32- const db: DrizzleD1Database = drizzle(c.env.DB);
33 const results = await db.select({
34 ...getTableColumns(posts),
35 repostCount: repostCounts.count
···49 return null;
50};
5152-export const updateUserData = async (c: Context, newData: any): Promise<boolean> => {
53 const userId = c.get("userId");
054 try {
000055 if (userId) {
56- const db: DrizzleD1Database = drizzle(c.env.DB);
57 let queriesToExecute:BatchItem<"sqlite">[] = [];
5859 if (has(newData, "password")) {
···73 // check if the user has violations
74 if (await userHasViolations(db, userId)) {
75 // they do, so clear them out
76- await removeViolations(c.env, userId, [AccountStatus.InvalidAccount, AccountStatus.Deactivated]);
77 }
78 }
79···92 return false;
93};
9495-export const deletePost = async (c: Context, id: string): Promise<DeleteResponse> => {
96 const userId = c.get("userId");
97 const returnObj: DeleteResponse = {success: false};
98 if (!userId) {
99 return returnObj;
100 }
101102- const db: DrizzleD1Database = drizzle(c.env.DB);
00000103 const postObj = await getPostById(c, id);
104 if (postObj !== null) {
105 let queriesToExecute:BatchItem<"sqlite">[] = [];
···109 await deleteEmbedsFromR2(c, postObj.embeds);
110 if (await userHasViolations(db, userId)) {
111 // Remove the media too big violation if it's been given
112- await removeViolation(c.env, userId, AccountStatus.MediaTooBig);
113 }
114 }
115···129130 // We'll need to delete all of the child embeds then, a costly, annoying experience.
131 if (postObj.isThreadRoot) {
132- const childPosts = await getChildPostsOfThread(c.env, postObj.postid);
133 if (childPosts !== null) {
134 for (const childPost of childPosts) {
135 c.executionCtx.waitUntil(deleteEmbedsFromR2(c, childPost.embeds));
···155 return returnObj;
156};
157158-export const createPost = async (c: Context, body: any): Promise<CreatePostQueryResponse> => {
159- const db: DrizzleD1Database = drizzle(c.env.DB);
160-161 const userId = c.get("userId");
162 if (!userId)
163 return { ok: false, msg: "Your user session has expired, please login again"};
16400000165 const validation = PostSchema.safeParse(body);
166 if (!validation.success) {
167 return { ok: false, msg: validation.error.toString() };
···313 return { ok: success, postNow: makePostNow, postId: postUUID, msg: success ? "success" : "fail" };
314};
315316-export const createRepost = async (c: Context, body: any): Promise<CreateObjectResponse> => {
317- const db: DrizzleD1Database = drizzle(c.env.DB);
318319 const userId = c.get("userId");
320 if (!userId)
321 return { ok: false, msg: "Your user session has expired, please login again"};
00000322323 const validation = RepostSchema.safeParse(body);
324 if (!validation.success) {
···430 return { ok: success, msg: success ? "success" : "fail", postId: postUUID };
431};
432433-export const updatePostForUser = async (c: Context, id: string, newData: Object): Promise<boolean> => {
434 const userId = c.get("userId");
435- return await updatePostForGivenUser(c.env, userId, id, newData);
436};
437438-export const getPostById = async(c: Context, id: string): Promise<Post|null> => {
439 const userId = c.get("userId");
440 if (!userId || !uuidValid(id))
441 return null;
442443- const env = c.env;
444- const db: DrizzleD1Database = drizzle(env.DB);
0000445 const result = await db.select().from(posts)
446 .where(and(eq(posts.uuid, id), eq(posts.userId, userId)))
447 .limit(1).all();
···452};
453454// used for post editing, acts very similar to getPostsForUser
455-export const getPostByIdWithReposts = async(c: Context, id: string): Promise<Post|null> => {
456 const userId = c.get("userId");
457 if (!userId || !uuidValid(id))
458 return null;
459460- const env = c.env;
461- const db: DrizzleD1Database = drizzle(env.DB);
0000462 const result = await db.select({
463 ...getTableColumns(posts),
464 repostCount: repostCounts.count,
···1import { addHours, isAfter, isEqual } from "date-fns";
2import { and, asc, desc, eq, getTableColumns, gt, gte, sql } from "drizzle-orm";
3import { BatchItem } from "drizzle-orm/batch";
4+import { DrizzleD1Database } from "drizzle-orm/d1";
05import has from "just-has";
6import isEmpty from "just-is-empty";
7import { v4 as uuidv4, validate as uuidValid } from 'uuid';
···10import { MAX_POSTS_PER_THREAD, MAX_REPOST_POSTS, MAX_REPOST_RULES_PER_POST } from "../limits";
11import {
12 AccountStatus,
13+ AllContext,
14 BatchQuery,
15 CreateObjectResponse, CreatePostQueryResponse,
16 DeleteResponse,
···25import { createPostObject, createRepostInfo, floorGivenTime } from "./helpers";
26import { deleteEmbedsFromR2 } from "./r2Query";
2728+export const getPostsForUser = async (c: AllContext): Promise<Post[]|null> => {
29 try {
30 const userId = c.get("userId");
31+ const db: DrizzleD1Database = c.get("db");
32+ if (userId && db) {
33 const results = await db.select({
34 ...getTableColumns(posts),
35 repostCount: repostCounts.count
···49 return null;
50};
5152+export const updateUserData = async (c: AllContext, newData: any): Promise<boolean> => {
53 const userId = c.get("userId");
54+ const db: DrizzleD1Database = c.get("db");
55 try {
56+ if (!db) {
57+ console.error("Unable to update user data, no database object");
58+ return false;
59+ }
60 if (userId) {
061 let queriesToExecute:BatchItem<"sqlite">[] = [];
6263 if (has(newData, "password")) {
···77 // check if the user has violations
78 if (await userHasViolations(db, userId)) {
79 // they do, so clear them out
80+ await removeViolations(c, userId, [AccountStatus.InvalidAccount, AccountStatus.Deactivated]);
81 }
82 }
83···96 return false;
97};
9899+export const deletePost = async (c: AllContext, id: string): Promise<DeleteResponse> => {
100 const userId = c.get("userId");
101 const returnObj: DeleteResponse = {success: false};
102 if (!userId) {
103 return returnObj;
104 }
105106+ const db: DrizzleD1Database = c.get("db");
107+ if (!db) {
108+ console.error(`unable to delete post ${id}, db was null`);
109+ return returnObj;
110+ }
111+112 const postObj = await getPostById(c, id);
113 if (postObj !== null) {
114 let queriesToExecute:BatchItem<"sqlite">[] = [];
···118 await deleteEmbedsFromR2(c, postObj.embeds);
119 if (await userHasViolations(db, userId)) {
120 // Remove the media too big violation if it's been given
121+ await removeViolation(c, userId, AccountStatus.MediaTooBig);
122 }
123 }
124···138139 // We'll need to delete all of the child embeds then, a costly, annoying experience.
140 if (postObj.isThreadRoot) {
141+ const childPosts = await getChildPostsOfThread(c, postObj.postid);
142 if (childPosts !== null) {
143 for (const childPost of childPosts) {
144 c.executionCtx.waitUntil(deleteEmbedsFromR2(c, childPost.embeds));
···164 return returnObj;
165};
166167+export const createPost = async (c: AllContext, body: any): Promise<CreatePostQueryResponse> => {
168+ const db: DrizzleD1Database = c.get("db");
0169 const userId = c.get("userId");
170 if (!userId)
171 return { ok: false, msg: "Your user session has expired, please login again"};
172173+ if (!db) {
174+ console.error("unable to create post, db became null");
175+ return { ok: false, msg: "An application error has occurred please refresh" };
176+ }
177+178 const validation = PostSchema.safeParse(body);
179 if (!validation.success) {
180 return { ok: false, msg: validation.error.toString() };
···326 return { ok: success, postNow: makePostNow, postId: postUUID, msg: success ? "success" : "fail" };
327};
328329+export const createRepost = async (c: AllContext, body: any): Promise<CreateObjectResponse> => {
330+ const db: DrizzleD1Database = c.get("db");
331332 const userId = c.get("userId");
333 if (!userId)
334 return { ok: false, msg: "Your user session has expired, please login again"};
335+336+ if (!db) {
337+ console.error("unable to create repost db became null");
338+ return {ok: false, msg: "Invalid server operation occurred, please refresh"};
339+ }
340341 const validation = RepostSchema.safeParse(body);
342 if (!validation.success) {
···448 return { ok: success, msg: success ? "success" : "fail", postId: postUUID };
449};
450451+export const updatePostForUser = async (c: AllContext, id: string, newData: Object): Promise<boolean> => {
452 const userId = c.get("userId");
453+ return await updatePostForGivenUser(c, userId, id, newData);
454};
455456+export const getPostById = async(c: AllContext, id: string): Promise<Post|null> => {
457 const userId = c.get("userId");
458 if (!userId || !uuidValid(id))
459 return null;
460461+ const db: DrizzleD1Database = c.get("db");
462+ if (!db) {
463+ console.error(`unable to get post ${id}, db was null`);
464+ return null;
465+ }
466+467 const result = await db.select().from(posts)
468 .where(and(eq(posts.uuid, id), eq(posts.userId, userId)))
469 .limit(1).all();
···474};
475476// used for post editing, acts very similar to getPostsForUser
477+export const getPostByIdWithReposts = async(c: AllContext, id: string): Promise<Post|null> => {
478 const userId = c.get("userId");
479 if (!userId || !uuidValid(id))
480 return null;
481482+ const db: DrizzleD1Database = c.get("db");
483+ if (!db) {
484+ console.error(`unable to get post ${id} with reposts, db was null`);
485+ return null;
486+ }
487+488 const result = await db.select({
489 ...getTableColumns(posts),
490 repostCount: repostCounts.count,
+3-3
src/utils/helpers.ts
···1import { startOfHour, subDays } from "date-fns";
02import has from "just-has";
3import isEmpty from "just-is-empty";
4-import { Bindings, BskyAPILoginCreds, Post, Repost, RepostInfo } from "../types.d";
5-import { Context } from "hono";
67export function createPostObject(data: any) {
8 const postData: Post = (new Object() as Post);
···82 return repostObj;
83}
8485-export function createLoginCredsObj(env: Bindings, data: any) {
86 const loginCreds: BskyAPILoginCreds = (new Object() as BskyAPILoginCreds);
87 if (isEmpty(data)) {
88 loginCreds.password = loginCreds.username = loginCreds.pds = "";
···1import { startOfHour, subDays } from "date-fns";
2+import { Context } from "hono";
3import has from "just-has";
4import isEmpty from "just-is-empty";
5+import { BskyAPILoginCreds, Post, Repost, RepostInfo } from "../types.d";
067export function createPostObject(data: any) {
8 const postData: Post = (new Object() as Post);
···82 return repostObj;
83}
8485+export function createLoginCredsObj(data: any) {
86 const loginCreds: BskyAPILoginCreds = (new Object() as BskyAPILoginCreds);
87 if (isEmpty(data)) {
88 loginCreds.password = loginCreds.username = loginCreds.pds = "";
+5-5
src/utils/inviteKeys.ts
···15 if (inviteKey === undefined)
16 return false;
1718- const value = await c.env.INVITE_POOL.get(inviteKey);
19 // Key does not exist
20 if (value === null)
21 return false;
···41 if (inviteKey === undefined)
42 return;
4344- const value = await c.env.INVITE_POOL.get(inviteKey);
45 if (value === null) {
46 console.error(`attempted to use invite key ${inviteKey} but is invalid`);
47 return;
···62 let newValue: number = amount - 1;
63 // Delete any keys that fall to 0, they should be removed from the db
64 if (newValue <= 0) {
65- await c.env.INVITE_POOL.delete(inviteKey);
66 return;
67 }
6869 // put the new value on the stack
70- await c.env.INVITE_POOL.put(inviteKey, newValue.toString());
71 }
72}
73···80 separator: '-',
81 capitalize: false,
82 });
83- c.executionCtx.waitUntil(c.env.INVITE_POOL.put(newKey, "10"));
84 return newKey;
85}
···15 if (inviteKey === undefined)
16 return false;
1718+ const value = await c.env.INVITE_POOL!.get(inviteKey);
19 // Key does not exist
20 if (value === null)
21 return false;
···41 if (inviteKey === undefined)
42 return;
4344+ const value = await c.env.INVITE_POOL!.get(inviteKey);
45 if (value === null) {
46 console.error(`attempted to use invite key ${inviteKey} but is invalid`);
47 return;
···62 let newValue: number = amount - 1;
63 // Delete any keys that fall to 0, they should be removed from the db
64 if (newValue <= 0) {
65+ await c.env.INVITE_POOL!.delete(inviteKey);
66 return;
67 }
6869 // put the new value on the stack
70+ await c.env.INVITE_POOL!.put(inviteKey, newValue.toString());
71 }
72}
73···80 separator: '-',
81 capitalize: false,
82 });
83+ c.executionCtx.waitUntil(c.env.INVITE_POOL!.put(newKey, "10"));
84 return newKey;
85}
+21-22
src/utils/r2Query.ts
···15 R2_FILE_SIZE_LIMIT,
16 R2_FILE_SIZE_LIMIT_IN_MB
17} from "../limits";
18-import { Bindings, EmbedData, EmbedDataType, R2BucketObject, ScheduledContext } from '../types.d';
19import { addFileListing, deleteFileListings } from './db/file';
2021type FileMetaData = {
···26 qualityLevel?: number;
27};
2829-export const deleteEmbedsFromR2 = async (c: Context|ScheduledContext, embeds: EmbedData[]|undefined, isQueued: boolean=false) => {
30 let itemsToDelete:string[] = [];
3132 if (embeds !== undefined && embeds.length > 0) {
···42 return itemsToDelete;
43};
4445-export const deleteFromR2 = async (c: Context|ScheduledContext, embeds: string[]|string, isQueued: boolean=false) => {
46 if (embeds.length <= 0)
47 return;
4849 console.log(`Deleting ${embeds}`);
50 const killFilesPromise = c.env.R2.delete(embeds);
51- const deleteFileListingPromise = deleteFileListings(c.env, embeds);
52 if (isQueued) {
53 await killFilesPromise;
54 await deleteFileListingPromise;
···58 }
59};
6061-const rawUploadToR2 = async (env: Bindings, buffer: ArrayBuffer|ReadableStream, metaData: FileMetaData) => {
62 const fileExt:string|undefined = metaData.name.split(".").pop();
63 if (fileExt === undefined) {
64 return {"success": false, "error": "unable to upload, file name is invalid"};
65 }
6667 const fileName = `${uuidv4()}.${fileExt.toLowerCase()}`;
68- const R2UploadRes = await env.R2.put(fileName, buffer, {
69 customMetadata: {"user": metaData.user, "type": metaData.type }
70 });
71 if (R2UploadRes) {
72- await addFileListing(env, fileName, metaData.user);
73 return {"success": true, "data": R2UploadRes.key,
74 "originalName": metaData.name, "fileSize": metaData.size,
75 "qualityLevel": metaData.qualityLevel};
···8081const uploadImageToR2 = async(c: Context, file: File, userId: string) => {
82 const originalName = file.name;
83- const env: Bindings = c.env;
84 // The maximum size of CF Image transforms.
85 if (file.size > CF_IMAGES_FILE_SIZE_LIMIT) {
86 return {"success": false, "error": `An image has a maximum file size of ${CF_IMAGES_FILE_SIZE_LIMIT_IN_MB}MB`};
···108 if (file.size > BSKY_IMG_SIZE_LIMIT) {
109 let failedToResize = true;
110111- if (env.IMAGE_SETTINGS.enabled) {
112 const resizeFilename = uuidv4();
113- const resizeBucketPush = await env.R2RESIZE.put(resizeFilename, await file.bytes(), {
114 customMetadata: {"user": userId },
115 httpMetadata: { contentType: file.type }
116 });
···121 }
122123 // TODO: use the image wrangler binding
124- for (var i = 0; i < env.IMAGE_SETTINGS.steps.length; ++i) {
125- const qualityLevel = env.IMAGE_SETTINGS.steps[i];
126- const response = await fetch(new URL(resizeFilename, env.IMAGE_SETTINGS.bucket_url), {
127 headers: {
128- "x-skyscheduler-helper": env.RESIZE_SECRET_HEADER
129 },
130 cf: {
131 image: {
···170 }
171 }
172 // Delete the file from the resize bucket.
173- c.executionCtx.waitUntil(env.R2RESIZE.delete(resizeFilename));
174 }
175176 if (failedToResize) {
···189190 if (fileToProcess === null)
191 fileToProcess = await file.arrayBuffer();
192- return await rawUploadToR2(env, fileToProcess, fileMetaData);
193};
194195-const uploadVideoToR2 = async (env: Bindings, file: File, userId: string) => {
196 // Technically this will never hit because it is greater than our own internal limits
197 if (file.size > BSKY_VIDEO_SIZE_LIMIT) {
198 return {"success": false, "error": `max video size is ${BSKY_VIDEO_SIZE_LIMIT}MB`};
···204 type: file.type,
205 user: userId
206 };
207- return await rawUploadToR2(env, await file.stream(), fileMetaData);
208};
209210export const uploadFileR2 = async (c: Context, file: File|string, userId: string) => {
···227 if (BSKY_IMG_MIME_TYPES.includes(fileType)) {
228 return await uploadImageToR2(c, file, userId);
229 } else if (BSKY_VIDEO_MIME_TYPES.includes(fileType)) {
230- return await uploadVideoToR2(c.env, file, userId);
231 } else if (GIF_UPLOAD_ALLOWED && BSKY_GIF_MIME_TYPES.includes(fileType)) {
232 // TODO: modify this in the future to transform the image to a webm
233 // then push to uploadVideo
234- return await uploadVideoToR2(c.env, file, userId);
235 }
236 return {"success": false, "error": "unable to push to R2"};
237};
238239-export const getAllFilesList = async (env: Bindings) => {
240 let options: R2ListOptions = {
241 limit: 1000,
242 include: ["customMetadata"]
···244 let values:R2BucketObject[] = [];
245246 while (true) {
247- const response = await env.R2.list(options);
248 for (const file of response.objects) {
249 values.push({
250 name: file.key,
···15 R2_FILE_SIZE_LIMIT,
16 R2_FILE_SIZE_LIMIT_IN_MB
17} from "../limits";
18+import { AllContext, EmbedData, EmbedDataType, R2BucketObject } from '../types.d';
19import { addFileListing, deleteFileListings } from './db/file';
2021type FileMetaData = {
···26 qualityLevel?: number;
27};
2829+export const deleteEmbedsFromR2 = async (c: AllContext, embeds: EmbedData[]|undefined, isQueued: boolean=false) => {
30 let itemsToDelete:string[] = [];
3132 if (embeds !== undefined && embeds.length > 0) {
···42 return itemsToDelete;
43};
4445+export const deleteFromR2 = async (c: AllContext, embeds: string[]|string, isQueued: boolean=false) => {
46 if (embeds.length <= 0)
47 return;
4849 console.log(`Deleting ${embeds}`);
50 const killFilesPromise = c.env.R2.delete(embeds);
51+ const deleteFileListingPromise = deleteFileListings(c, embeds);
52 if (isQueued) {
53 await killFilesPromise;
54 await deleteFileListingPromise;
···58 }
59};
6061+const rawUploadToR2 = async (c: AllContext, buffer: ArrayBuffer|ReadableStream, metaData: FileMetaData) => {
62 const fileExt:string|undefined = metaData.name.split(".").pop();
63 if (fileExt === undefined) {
64 return {"success": false, "error": "unable to upload, file name is invalid"};
65 }
6667 const fileName = `${uuidv4()}.${fileExt.toLowerCase()}`;
68+ const R2UploadRes = await c.env.R2.put(fileName, buffer, {
69 customMetadata: {"user": metaData.user, "type": metaData.type }
70 });
71 if (R2UploadRes) {
72+ await addFileListing(c, fileName, metaData.user);
73 return {"success": true, "data": R2UploadRes.key,
74 "originalName": metaData.name, "fileSize": metaData.size,
75 "qualityLevel": metaData.qualityLevel};
···8081const uploadImageToR2 = async(c: Context, file: File, userId: string) => {
82 const originalName = file.name;
083 // The maximum size of CF Image transforms.
84 if (file.size > CF_IMAGES_FILE_SIZE_LIMIT) {
85 return {"success": false, "error": `An image has a maximum file size of ${CF_IMAGES_FILE_SIZE_LIMIT_IN_MB}MB`};
···107 if (file.size > BSKY_IMG_SIZE_LIMIT) {
108 let failedToResize = true;
109110+ if (c.env.IMAGE_SETTINGS.enabled) {
111 const resizeFilename = uuidv4();
112+ const resizeBucketPush = await c.env.R2RESIZE.put(resizeFilename, await file.bytes(), {
113 customMetadata: {"user": userId },
114 httpMetadata: { contentType: file.type }
115 });
···120 }
121122 // TODO: use the image wrangler binding
123+ for (var i = 0; i < c.env.IMAGE_SETTINGS.steps.length; ++i) {
124+ const qualityLevel = c.env.IMAGE_SETTINGS.steps[i];
125+ const response = await fetch(new URL(resizeFilename, c.env.IMAGE_SETTINGS.bucket_url!), {
126 headers: {
127+ "x-skyscheduler-helper": c.env.RESIZE_SECRET_HEADER
128 },
129 cf: {
130 image: {
···169 }
170 }
171 // Delete the file from the resize bucket.
172+ c.executionCtx.waitUntil(c.env.R2RESIZE.delete(resizeFilename));
173 }
174175 if (failedToResize) {
···188189 if (fileToProcess === null)
190 fileToProcess = await file.arrayBuffer();
191+ return await rawUploadToR2(c, fileToProcess, fileMetaData);
192};
193194+const uploadVideoToR2 = async (c: Context, file: File, userId: string) => {
195 // Technically this will never hit because it is greater than our own internal limits
196 if (file.size > BSKY_VIDEO_SIZE_LIMIT) {
197 return {"success": false, "error": `max video size is ${BSKY_VIDEO_SIZE_LIMIT}MB`};
···203 type: file.type,
204 user: userId
205 };
206+ return await rawUploadToR2(c, await file.stream(), fileMetaData);
207};
208209export const uploadFileR2 = async (c: Context, file: File|string, userId: string) => {
···226 if (BSKY_IMG_MIME_TYPES.includes(fileType)) {
227 return await uploadImageToR2(c, file, userId);
228 } else if (BSKY_VIDEO_MIME_TYPES.includes(fileType)) {
229+ return await uploadVideoToR2(c, file, userId);
230 } else if (GIF_UPLOAD_ALLOWED && BSKY_GIF_MIME_TYPES.includes(fileType)) {
231 // TODO: modify this in the future to transform the image to a webm
232 // then push to uploadVideo
233+ return await uploadVideoToR2(c, file, userId);
234 }
235 return {"success": false, "error": "unable to push to R2"};
236};
237238+export const getAllFilesList = async (c: AllContext) => {
239 let options: R2ListOptions = {
240 limit: 1000,
241 include: ["customMetadata"]
···243 let values:R2BucketObject[] = [];
244245 while (true) {
246+ const response = await c.env.R2.list(options);
247 for (const file of response.objects) {
248 values.push({
249 name: file.key,
+27-37
src/utils/scheduler.ts
···1import AtpAgent from '@atproto/api';
2import isEmpty from 'just-is-empty';
3-import { Bindings, Post, Repost, ScheduledContext } from '../types.d';
4import { makeAgentForUser, makePost, makeRepost } from './bskyApi';
5import { pruneBskyPosts } from './bskyPrune';
6import {
···11import { enqueuePost, enqueueRepost, isQueueEnabled, isRepostQueueEnabled, shouldPostThreadQueue } from './queuePublisher';
12import { deleteFromR2 } from './r2Query';
1314-export const handlePostTask = async(runtime: ScheduledContext, postData: Post, agent: AtpAgent|null) => {
15 const madePost = await makePost(runtime, postData, agent);
16 if (madePost) {
17 console.log(`Made post ${postData.postid} successfully`);
···20 }
21 return madePost;
22}
23-export const handleRepostTask = async(runtime: ScheduledContext, postData: Repost, agent: AtpAgent|null) => {
24- const madeRepost = await makeRepost(runtime, postData, agent);
25 if (madeRepost) {
26 console.log(`Reposted ${postData.uri} successfully!`);
27 } else {
···30 return madeRepost;
31};
3233-export const schedulePostTask = async (env: Bindings, ctx: ExecutionContext) => {
34- const scheduledPosts: Post[] = await getAllPostsForCurrentTime(env);
35- const scheduledReposts: Repost[] = await getAllRepostsForCurrentTime(env);
36- const queueEnabled: boolean = isQueueEnabled(env);
37- const repostQueueEnabled: boolean = isRepostQueueEnabled(env);
38- const threadQueueEnabled: boolean = shouldPostThreadQueue(env);
39-40- const runtimeWrapper: ScheduledContext = {
41- executionCtx: ctx,
42- env: env
43- };
44-45 // Temporary cache of agents to make handling actions much better and easier.
46 // The only potential downside is if we run hot on RAM with a lot of users. Before, the agents would
47 // get freed up as a part of exiting their cycle, but this would make that worse...
···49 // TODO: bunching as a part of queues, literally just throw an agent at a queue with instructions and go.
50 // this requires queueing to be working properly.
51 const AgentList = new Map();
52- const usesAgentMap: boolean = (env.SITE_SETTINGS.use_agent_map) || false;
5354 // Push any posts
55 if (!isEmpty(scheduledPosts)) {
56 console.log(`handling ${scheduledPosts.length} posts...`);
57 for (const post of scheduledPosts) {
58 if (queueEnabled || (post.isThreadRoot && threadQueueEnabled)) {
59- await enqueuePost(env, post);
60 } else {
61 let agent = (usesAgentMap) ? AgentList.get(post.user) || null : null;
62 if (agent === null) {
63- agent = await makeAgentForUser(env, post.user);
64 if (usesAgentMap)
65 AgentList.set(post.user, agent);
66 }
67- ctx.waitUntil(handlePostTask(runtimeWrapper, post, agent));
68 }
69 }
70 } else {
···78 if (!repostQueueEnabled) {
79 let agent = (usesAgentMap) ? AgentList.get(repost.userId) || null : null;
80 if (agent === null) {
81- agent = await makeAgentForUser(env, repost.userId);
82 if (usesAgentMap)
83 AgentList.set(repost.userId, agent);
84 }
85- ctx.waitUntil(handleRepostTask(runtimeWrapper, repost, agent));
86 } else {
87- await enqueueRepost(env, repost);
88 }
89 };
90- ctx.waitUntil(deleteAllRepostsBeforeCurrentTime(env));
91 } else {
92 console.log("no reposts scheduled for this time");
93 }
94};
9596-export const cleanUpPostsTask = async(env: Bindings, ctx: ExecutionContext) => {
97- const purgedPosts: number = await purgePostedPosts(env);
98 console.log(`Purged ${purgedPosts} old posts from the database`);
99100- const removedIds: string[] = await pruneBskyPosts(env);
101 if (!isEmpty(removedIds)) {
102- const deletedItems: number = await deletePosts(env, removedIds);
103 console.log(`Deleted ${deletedItems} missing posts from the db`);
104 }
105- if (env.R2_SETTINGS.auto_prune === true)
106- await cleanupAbandonedFiles(env, ctx);
107};
108109-export const cleanupAbandonedFiles = async(env: Bindings, ctx: ExecutionContext) => {
110- const abandonedFiles: string[] = await getAllAbandonedMedia(env);
111- const runtimeWrapper: ScheduledContext = {
112- executionCtx: ctx,
113- env: env
114- };
115 if (!isEmpty(abandonedFiles)) {
116- await deleteFromR2(runtimeWrapper, abandonedFiles);
117 }
118};
···1import AtpAgent from '@atproto/api';
2import isEmpty from 'just-is-empty';
3+import { AllContext, Post, Repost } from '../types.d';
4import { makeAgentForUser, makePost, makeRepost } from './bskyApi';
5import { pruneBskyPosts } from './bskyPrune';
6import {
···11import { enqueuePost, enqueueRepost, isQueueEnabled, isRepostQueueEnabled, shouldPostThreadQueue } from './queuePublisher';
12import { deleteFromR2 } from './r2Query';
1314+export const handlePostTask = async(runtime: AllContext, postData: Post, agent: AtpAgent|null) => {
15 const madePost = await makePost(runtime, postData, agent);
16 if (madePost) {
17 console.log(`Made post ${postData.postid} successfully`);
···20 }
21 return madePost;
22}
23+export const handleRepostTask = async(c: AllContext, postData: Repost, agent: AtpAgent|null) => {
24+ const madeRepost = await makeRepost(c, postData, agent);
25 if (madeRepost) {
26 console.log(`Reposted ${postData.uri} successfully!`);
27 } else {
···30 return madeRepost;
31};
3233+export const schedulePostTask = async (c: AllContext) => {
34+ const scheduledPosts: Post[] = await getAllPostsForCurrentTime(c);
35+ const scheduledReposts: Repost[] = await getAllRepostsForCurrentTime(c);
36+ const queueEnabled: boolean = isQueueEnabled(c.env);
37+ const repostQueueEnabled: boolean = isRepostQueueEnabled(c.env);
38+ const threadQueueEnabled: boolean = shouldPostThreadQueue(c.env);
00000039 // Temporary cache of agents to make handling actions much better and easier.
40 // The only potential downside is if we run hot on RAM with a lot of users. Before, the agents would
41 // get freed up as a part of exiting their cycle, but this would make that worse...
···43 // TODO: bunching as a part of queues, literally just throw an agent at a queue with instructions and go.
44 // this requires queueing to be working properly.
45 const AgentList = new Map();
46+ const usesAgentMap: boolean = (c.env.SITE_SETTINGS.use_agent_map) || false;
4748 // Push any posts
49 if (!isEmpty(scheduledPosts)) {
50 console.log(`handling ${scheduledPosts.length} posts...`);
51 for (const post of scheduledPosts) {
52 if (queueEnabled || (post.isThreadRoot && threadQueueEnabled)) {
53+ await enqueuePost(c, post);
54 } else {
55 let agent = (usesAgentMap) ? AgentList.get(post.user) || null : null;
56 if (agent === null) {
57+ agent = await makeAgentForUser(c, post.user);
58 if (usesAgentMap)
59 AgentList.set(post.user, agent);
60 }
61+ c.ctx.waitUntil(handlePostTask(c, post, agent));
62 }
63 }
64 } else {
···72 if (!repostQueueEnabled) {
73 let agent = (usesAgentMap) ? AgentList.get(repost.userId) || null : null;
74 if (agent === null) {
75+ agent = await makeAgentForUser(c, repost.userId);
76 if (usesAgentMap)
77 AgentList.set(repost.userId, agent);
78 }
79+ c.ctx.waitUntil(handleRepostTask(c, repost, agent));
80 } else {
81+ await enqueueRepost(c, repost);
82 }
83 };
84+ c.ctx.waitUntil(deleteAllRepostsBeforeCurrentTime(c));
85 } else {
86 console.log("no reposts scheduled for this time");
87 }
88};
8990+export const cleanUpPostsTask = async(c: AllContext) => {
91+ const purgedPosts: number = await purgePostedPosts(c);
92 console.log(`Purged ${purgedPosts} old posts from the database`);
9394+ const removedIds: string[] = await pruneBskyPosts(c);
95 if (!isEmpty(removedIds)) {
96+ const deletedItems: number = await deletePosts(c, removedIds);
97 console.log(`Deleted ${deletedItems} missing posts from the db`);
98 }
99+ if (c.env.R2_SETTINGS.auto_prune === true)
100+ await cleanupAbandonedFiles(c);
101};
102103+export const cleanupAbandonedFiles = async(c: AllContext) => {
104+ const abandonedFiles: string[] = await getAllAbandonedMedia(c);
0000105 if (!isEmpty(abandonedFiles)) {
106+ await deleteFromR2(c, abandonedFiles);
107 }
108};