···65 // Handling posting right now.
66 const postInfo: Post|null = await getPostById(c, response.postId);
67 if (!isEmpty(postInfo)) {
68- const env: Bindings = c.env;
69- if (shouldPostNowQueue(env)) {
70- try
71- {
72- await enqueuePost(env, postInfo!);
73 } catch(err) {
74 console.error(err);
75 return c.json({message: 'Failed to post content, will retry again soon'}, 406);
···65 // Handling posting right now.
66 const postInfo: Post|null = await getPostById(c, response.postId);
67 if (!isEmpty(postInfo)) {
68+ if (shouldPostNowQueue(c.env)) {
69+ try {
70+ await enqueuePost(c, postInfo!);
0071 } catch(err) {
72 console.error(err);
73 return c.json({message: 'Failed to post content, will retry again soon'}, 406);
+8-8
src/utils/queuePublisher.ts
···1import isEmpty from 'just-is-empty';
2import random from 'just-random';
3import get from 'just-safe-get';
4-import { Bindings, Post, QueueTaskData, QueueTaskType, Repost } from "../types.d";
56const queueContentType = 'v8';
7···23export const shouldPostNowQueue = (env: Bindings) => env.QUEUE_SETTINGS.postNowEnabled && isQueueEnabled(env);
24export const shouldPostThreadQueue = (env: Bindings) => env.QUEUE_SETTINGS.threadEnabled && (hasPostQueue(env) || isQueueEnabled(env));
2526-export async function enqueuePost(env: Bindings, post: Post) {
27 if (post.isThreadRoot) {
28- if (!shouldPostThreadQueue(env))
29 return;
30- } else if (!isQueueEnabled(env))
31 return;
3233 // Pick a random consumer to handle this post
34- const queueConsumer: Queue|null = getRandomQueue(env, "post_queues");
3536 if (queueConsumer !== null) {
37 await queueConsumer.send({type: QueueTaskType.Post, post: post} as QueueTaskData, { contentType: queueContentType });
38 }
39}
4041-export async function enqueueRepost(env: Bindings, post: Repost) {
42- if (!isRepostQueueEnabled(env))
43 return;
4445 // Pick a random consumer to handle this repost
46- const queueConsumer: Queue|null = getRandomQueue(env, "repost_queues");
47 if (queueConsumer !== null)
48 await queueConsumer.send({type: QueueTaskType.Repost, repost: post} as QueueTaskData, { contentType: queueContentType });
49}
···1import isEmpty from 'just-is-empty';
2import random from 'just-random';
3import get from 'just-safe-get';
4+import { AllContext, Bindings, Post, QueueTaskData, QueueTaskType, Repost } from "../types.d";
56const queueContentType = 'v8';
7···23export const shouldPostNowQueue = (env: Bindings) => env.QUEUE_SETTINGS.postNowEnabled && isQueueEnabled(env);
24export const shouldPostThreadQueue = (env: Bindings) => env.QUEUE_SETTINGS.threadEnabled && (hasPostQueue(env) || isQueueEnabled(env));
2526+export async function enqueuePost(c: AllContext, post: Post) {
27 if (post.isThreadRoot) {
28+ if (!shouldPostThreadQueue(c.env))
29 return;
30+ } else if (!isQueueEnabled(c.env))
31 return;
3233 // Pick a random consumer to handle this post
34+ const queueConsumer: Queue|null = getRandomQueue(c.env, "post_queues");
3536 if (queueConsumer !== null) {
37 await queueConsumer.send({type: QueueTaskType.Post, post: post} as QueueTaskData, { contentType: queueContentType });
38 }
39}
4041+export async function enqueueRepost(c: AllContext, post: Repost) {
42+ if (!isRepostQueueEnabled(c.env))
43 return;
4445 // Pick a random consumer to handle this repost
46+ const queueConsumer: Queue|null = getRandomQueue(c.env, "repost_queues");
47 if (queueConsumer !== null)
48 await queueConsumer.send({type: QueueTaskType.Repost, repost: post} as QueueTaskData, { contentType: queueContentType });
49}
+4-4
src/utils/scheduler.ts
···43 // TODO: bunching as a part of queues, literally just throw an agent at a queue with instructions and go.
44 // this requires queueing to be working properly.
45 const AgentList = new Map();
46- const usesAgentMap: boolean = (c.env.SITE_SETTINGS.use_agent_map) || false;
4748 // Push any posts
49 if (!isEmpty(scheduledPosts)) {
···58 if (usesAgentMap)
59 AgentList.set(post.user, agent);
60 }
61- c.ctx.waitUntil(handlePostTask(c, post, agent));
62 }
63 }
64 } else {
···76 if (usesAgentMap)
77 AgentList.set(repost.userId, agent);
78 }
79- c.ctx.waitUntil(handleRepostTask(c, repost, agent));
80 } else {
81 await enqueueRepost(c, repost);
82 }
83 };
84- c.ctx.waitUntil(deleteAllRepostsBeforeCurrentTime(c));
85 } else {
86 console.log("no reposts scheduled for this time");
87 }
···43 // TODO: bunching as a part of queues, literally just throw an agent at a queue with instructions and go.
44 // this requires queueing to be working properly.
45 const AgentList = new Map();
46+ const usesAgentMap: boolean = c.env.SITE_SETTINGS.use_agent_map;
4748 // Push any posts
49 if (!isEmpty(scheduledPosts)) {
···58 if (usesAgentMap)
59 AgentList.set(post.user, agent);
60 }
61+ c.executionCtx.waitUntil(handlePostTask(c, post, agent));
62 }
63 }
64 } else {
···76 if (usesAgentMap)
77 AgentList.set(repost.userId, agent);
78 }
79+ c.executionCtx.waitUntil(handleRepostTask(c, repost, agent));
80 } else {
81 await enqueueRepost(c, repost);
82 }
83 };
84+ c.executionCtx.waitUntil(deleteAllRepostsBeforeCurrentTime(c));
85 } else {
86 console.log("no reposts scheduled for this time");
87 }