A social knowledge tool for researchers built on ATProto

feat: implement in-memory event system with saga state store

Co-authored-by: aider (anthropic/claude-sonnet-4-20250514) <aider@aider.chat>

+165 -6
+8 -2
docs/architecture/event_system_overview.md
··· 117 117 │ │ │ │ 118 118 │ └──────────────────┘ │ 119 119 │ │ 120 - │ (No external dependencies) │ 120 + │ ┌─────────────────────────────────┐ │ 121 + │ │ InMemorySagaStateStore │ │ 122 + │ │ - Map-based state storage │ │ 123 + │ │ - Timeout-based lock expiry │ │ 124 + │ │ - No external dependencies │ │ 125 + │ └─────────────────────────────────┘ │ 121 126 └─────────────────────────────────────┘ 122 127 ``` 123 128 124 129 **Configuration:** 125 130 - `USE_IN_MEMORY_EVENTS=true` 126 131 - No Redis required 127 - - All processing in-memory 132 + - All processing in-memory with `InMemorySagaStateStore` 133 + - Uses `InMemoryEventWorkerProcess` for event handling 128 134 129 135 ## Event Flow Example: CardAddedToLibrary 130 136
+7 -2
src/index.ts
··· 1 1 import { configService } from './shared/infrastructure/config'; 2 2 import { AppProcess } from './shared/infrastructure/processes/AppProcess'; 3 3 import { FeedWorkerProcess } from './shared/infrastructure/processes/FeedWorkerProcess'; 4 + import { InMemoryEventWorkerProcess } from './shared/infrastructure/processes/InMemoryEventWorkerProcess'; 4 5 5 6 async function main() { 6 7 const appProcess = new AppProcess(configService); ··· 8 9 // Start the app process 9 10 await appProcess.start(); 10 11 11 - // Start feed worker in the same process if using in-memory events 12 + // Start appropriate worker based on event system type 12 13 const useInMemoryEvents = process.env.USE_IN_MEMORY_EVENTS === 'true'; 13 14 if (useInMemoryEvents) { 14 - console.log('Starting feed worker in the same process...'); 15 + console.log('Starting in-memory event worker in the same process...'); 16 + const inMemoryWorkerProcess = new InMemoryEventWorkerProcess(configService); 17 + await inMemoryWorkerProcess.start(); 18 + } else { 19 + console.log('Starting BullMQ feed worker in the same process...'); 15 20 const feedWorkerProcess = new FeedWorkerProcess(configService); 16 21 await feedWorkerProcess.start(); 17 22 }
+67
src/modules/feeds/infrastructure/InMemorySagaStateStore.ts
··· 1 + import { ISagaStateStore } from '../application/sagas/ISagaStateStore'; 2 + 3 + interface StoredItem { 4 + value: string; 5 + expiry: number; 6 + } 7 + 8 + export class InMemorySagaStateStore implements ISagaStateStore { 9 + private store = new Map<string, StoredItem>(); 10 + private locks = new Set<string>(); 11 + 12 + async get(key: string): Promise<string | null> { 13 + const item = this.store.get(key); 14 + if (!item || Date.now() > item.expiry) { 15 + this.store.delete(key); 16 + return null; 17 + } 18 + return item.value; 19 + } 20 + 21 + async setex(key: string, ttlSeconds: number, value: string): Promise<void> { 22 + const expiry = Date.now() + ttlSeconds * 1000; 23 + this.store.set(key, { value, expiry }); 24 + 25 + // Clean up expired items after TTL 26 + setTimeout(() => { 27 + const item = this.store.get(key); 28 + if (item && Date.now() > item.expiry) { 29 + this.store.delete(key); 30 + } 31 + }, ttlSeconds * 1000); 32 + } 33 + 34 + async del(key: string): Promise<void> { 35 + this.store.delete(key); 36 + this.locks.delete(key); 37 + } 38 + 39 + async set( 40 + key: string, 41 + value: string, 42 + mode: 'EX', 43 + ttl: number, 44 + flag: 'NX', 45 + ): Promise<'OK' | null> { 46 + if (flag === 'NX' && this.locks.has(key)) { 47 + return null; // Lock already exists 48 + } 49 + 50 + this.locks.add(key); 51 + 52 + // Auto-expire the lock after TTL 53 + setTimeout(() => { 54 + this.locks.delete(key); 55 + }, ttl * 1000); 56 + 57 + return 'OK'; 58 + } 59 + 60 + /** 61 + * Clear all data (useful for testing) 62 + */ 63 + clear(): void { 64 + this.store.clear(); 65 + this.locks.clear(); 66 + } 67 + }
+19 -1
src/shared/infrastructure/http/factories/ServiceFactory.ts
··· 51 51 import { ATProtoIdentityResolutionService } from '../../../../modules/atproto/infrastructure/services/ATProtoIdentityResolutionService'; 52 52 import { IIdentityResolutionService } from '../../../../modules/atproto/domain/services/IIdentityResolutionService'; 53 53 import { CookieService } from '../services/CookieService'; 54 + import { InMemorySagaStateStore } from '../../../../modules/feeds/infrastructure/InMemorySagaStateStore'; 55 + import { RedisSagaStateStore } from '../../../../modules/feeds/infrastructure/RedisSagaStateStore'; 54 56 55 57 // Shared services needed by both web app and workers 56 58 export interface SharedServices { ··· 222 224 }; 223 225 } 224 226 227 + // Create saga with appropriate state store 228 + let cardCollectionSaga: CardCollectionSaga; 229 + if (useInMemoryEvents) { 230 + const stateStore = new InMemorySagaStateStore(); 231 + cardCollectionSaga = new CardCollectionSaga( 232 + sharedServices.feedService as any, // Will be properly typed when used 233 + stateStore, 234 + ); 235 + } else { 236 + const stateStore = new RedisSagaStateStore(redisConnection!); 237 + cardCollectionSaga = new CardCollectionSaga( 238 + sharedServices.feedService as any, // Will be properly typed when used 239 + stateStore, 240 + ); 241 + } 242 + 225 243 return { 226 244 ...sharedServices, 227 245 redisConnection: redisConnection, 228 246 eventPublisher, 229 247 createEventSubscriber, 230 - cardCollectionSaga: null as any, // Will be created in worker process 248 + cardCollectionSaga, 231 249 }; 232 250 } 233 251
+7 -1
src/shared/infrastructure/processes/FeedWorkerProcess.ts
··· 8 8 import { CardAddedToCollectionEventHandler } from '../../../modules/feeds/application/eventHandlers/CardAddedToCollectionEventHandler'; 9 9 import { CardCollectionSaga } from '../../../modules/feeds/application/sagas/CardCollectionSaga'; 10 10 import { RedisSagaStateStore } from '../../../modules/feeds/infrastructure/RedisSagaStateStore'; 11 + import { InMemorySagaStateStore } from '../../../modules/feeds/infrastructure/InMemorySagaStateStore'; 11 12 import { QueueNames } from '../events/QueueConfig'; 12 13 import { EventNames } from '../events/EventConfig'; 13 14 import { BaseWorkerProcess } from './BaseWorkerProcess'; ··· 39 40 ): Promise<void> { 40 41 const useCases = UseCaseFactory.createForWorker(repositories, services); 41 42 42 - const stateStore = new RedisSagaStateStore(services.redisConnection!); 43 + // Create saga with appropriate state store based on event system type 44 + const useInMemoryEvents = process.env.USE_IN_MEMORY_EVENTS === 'true'; 45 + const stateStore = useInMemoryEvents 46 + ? new InMemorySagaStateStore() 47 + : new RedisSagaStateStore(services.redisConnection!); 48 + 43 49 const cardCollectionSaga = new CardCollectionSaga( 44 50 useCases.addActivityToFeedUseCase, 45 51 stateStore,
+57
src/shared/infrastructure/processes/InMemoryEventWorkerProcess.ts
··· 1 + import { IProcess } from '../../domain/IProcess'; 2 + import { EnvironmentConfigService } from '../config/EnvironmentConfigService'; 3 + import { InMemoryEventSubscriber } from '../events/InMemoryEventSubscriber'; 4 + import { CardAddedToLibraryEventHandler } from '../../../modules/feeds/application/eventHandlers/CardAddedToLibraryEventHandler'; 5 + import { CardAddedToCollectionEventHandler } from '../../../modules/feeds/application/eventHandlers/CardAddedToCollectionEventHandler'; 6 + import { CardCollectionSaga } from '../../../modules/feeds/application/sagas/CardCollectionSaga'; 7 + import { InMemorySagaStateStore } from '../../../modules/feeds/infrastructure/InMemorySagaStateStore'; 8 + import { EventNames } from '../events/EventConfig'; 9 + import { RepositoryFactory } from '../http/factories/RepositoryFactory'; 10 + import { ServiceFactory } from '../http/factories/ServiceFactory'; 11 + import { UseCaseFactory } from '../http/factories/UseCaseFactory'; 12 + 13 + export class InMemoryEventWorkerProcess implements IProcess { 14 + constructor(private configService: EnvironmentConfigService) {} 15 + 16 + async start(): Promise<void> { 17 + console.log('Starting in-memory event worker...'); 18 + 19 + // Create dependencies 20 + const repositories = RepositoryFactory.create(this.configService); 21 + const services = ServiceFactory.createForWebApp(this.configService, repositories); 22 + const useCases = UseCaseFactory.createForWorker(repositories, services); 23 + 24 + // Create in-memory saga state store 25 + const stateStore = new InMemorySagaStateStore(); 26 + const cardCollectionSaga = new CardCollectionSaga( 27 + useCases.addActivityToFeedUseCase, 28 + stateStore, 29 + ); 30 + 31 + // Create event subscriber 32 + const eventSubscriber = new InMemoryEventSubscriber(); 33 + 34 + // Create and register event handlers 35 + const cardAddedToLibraryHandler = new CardAddedToLibraryEventHandler( 36 + cardCollectionSaga, 37 + ); 38 + const cardAddedToCollectionHandler = new CardAddedToCollectionEventHandler( 39 + cardCollectionSaga, 40 + ); 41 + 42 + await eventSubscriber.subscribe( 43 + EventNames.CARD_ADDED_TO_LIBRARY, 44 + cardAddedToLibraryHandler, 45 + ); 46 + 47 + await eventSubscriber.subscribe( 48 + EventNames.CARD_ADDED_TO_COLLECTION, 49 + cardAddedToCollectionHandler, 50 + ); 51 + 52 + // Start the subscriber 53 + await eventSubscriber.start(); 54 + 55 + console.log('In-memory event worker started'); 56 + } 57 + }