this repo has no description

Copy RapidFireFeed to BattleFeed

+103 -2
+4 -2
feed_manager.py
··· 1 + from feeds.battle import BattleFeed 2 + from feeds.rapidfire import RapidFireFeed 1 3 from feeds.popular import PopularFeed 2 - from feeds.rapidfire import RapidFireFeed 3 4 4 5 class FeedManager: 5 6 def __init__(self): ··· 27 28 feed.commit_changes() 28 29 29 30 feed_manager = FeedManager() 31 + feed_manager.register(BattleFeed) 32 + feed_manager.register(RapidFireFeed) 30 33 feed_manager.register(PopularFeed) 31 - feed_manager.register(RapidFireFeed)
+99
feeds/battle.py
··· 1 + import logging 2 + 3 + import apsw 4 + import apsw.ext 5 + import grapheme 6 + 7 + from . import BaseFeed 8 + 9 + MAX_TEXT_LENGTH = 140 10 + 11 + class RapidFireFeed(BaseFeed): 12 + FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire' 13 + 14 + def __init__(self): 15 + self.db_cnx = apsw.Connection('db/rapidfire.db') 16 + self.db_cnx.pragma('journal_mode', 'WAL') 17 + self.db_cnx.pragma('wal_autocheckpoint', '0') 18 + 19 + with self.db_cnx: 20 + self.db_cnx.execute(""" 21 + create table if not exists posts (uri text, create_ts timestamp, lang text); 22 + create index if not exists create_ts_idx on posts(create_ts); 23 + """) 24 + 25 + self.logger = logging.getLogger('feeds.rapidfire') 26 + 27 + def process_commit(self, commit): 28 + op = commit['op'] 29 + if op['action'] != 'create': 30 + return 31 + 32 + collection, _ = op['path'].split('/') 33 + if collection != 'app.bsky.feed.post': 34 + return 35 + 36 + record = op.get('record') 37 + if record is None: 38 + return 39 + 40 + if all([ 41 + grapheme.length(record['text']) <= MAX_TEXT_LENGTH, 42 + record.get('reply') is None, 43 + record.get('embed') is None, 44 + record.get('facets') is None 45 + ]): 46 + repo = commit['repo'] 47 + path = op['path'] 48 + post_uri = f'at://{repo}/{path}' 49 + ts = self.safe_timestamp(record['createdAt']).timestamp() 50 + 51 + self.transaction_begin(self.db_cnx) 52 + 53 + langs = record.get('langs') or [''] 54 + for lang in langs: 55 + self.db_cnx.execute( 56 + 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)', 57 + dict(uri=post_uri, ts=ts, lang=lang) 58 + ) 59 + 60 + def delete_old_posts(self): 61 + self.db_cnx.execute( 62 + "delete from posts where create_ts < unixepoch('now', '-15 minutes')" 63 + ) 64 + 65 + def commit_changes(self): 66 + self.logger.debug('committing changes') 67 + self.delete_old_posts() 68 + self.transaction_commit(self.db_cnx) 69 + self.wal_checkpoint(self.db_cnx, 'RESTART') 70 + 71 + def serve_feed(self, limit, offset, langs): 72 + if '*' in langs: 73 + cur = self.db_cnx.execute( 74 + "select uri from posts order by create_ts desc limit :limit offset :offset", 75 + dict(limit=limit, offset=offset) 76 + ) 77 + return [uri for (uri,) in cur] 78 + else: 79 + lang_values = list(langs.values()) 80 + lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values) 81 + lang_clause = ' union '.join(lang_selects) 82 + cur = self.db_cnx.execute( 83 + lang_clause + ' order by create_ts desc limit ? offset ?', 84 + [*lang_values, limit, offset] 85 + ) 86 + return [uri for (uri, create_ts) in cur] 87 + 88 + def serve_feed_debug(self, limit, offset, langs): 89 + query = """ 90 + select *, unixepoch('now') - create_ts as age_seconds 91 + from posts 92 + order by create_ts desc 93 + limit :limit offset :offset 94 + """ 95 + bindings = dict(limit=limit, offset=offset) 96 + return apsw.ext.format_query_table( 97 + self.db_cnx, query, bindings, 98 + string_sanitize=2, text_width=9999, use_unicode=True 99 + )