this repo has no description

Add OutlineTagsFeed

+70
+2
feed_manager.py
··· 8 8 from feeds.sevendirtywords import SevenDirtyWordsFeed 9 9 from feeds.ratio import RatioFeed 10 10 from feeds.mostliked import MostLikedFeed 11 + from feeds.outlinetags import OutlineTagsFeed 11 12 12 13 class FeedManager: 13 14 def __init__(self): ··· 52 53 feed_manager.register(SevenDirtyWordsFeed) 53 54 # feed_manager.register(RatioFeed) 54 55 feed_manager.register(MostLikedFeed) 56 + feed_manager.register(OutlineTagsFeed)
+68
feeds/outlinetags.py
··· 1 + import logging 2 + 3 + import apsw 4 + import apsw.ext 5 + 6 + from . import BaseFeed 7 + 8 + class OutlineTagsFeed(BaseFeed): 9 + FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/outline' 10 + SERVE_FEED_QUERY = """ 11 + select uri, create_ts 12 + from posts 13 + order by create_ts desc 14 + limit :limit offset :offset 15 + """ 16 + 17 + def __init__(self): 18 + self.db_cnx = apsw.Connection('db/outlinetags.db') 19 + self.db_cnx.pragma('journal_mode', 'WAL') 20 + self.db_cnx.pragma('wal_autocheckpoint', '0') 21 + 22 + with self.db_cnx: 23 + self.db_cnx.execute(""" 24 + create table if not exists posts (uri text, create_ts timestamp); 25 + create unique index if not exists create_ts_idx on posts(create_ts); 26 + """) 27 + 28 + self.logger = logging.getLogger('feeds.outlinetags') 29 + 30 + def process_commit(self, commit): 31 + if commit['opType'] != 'c': 32 + return 33 + 34 + if commit['collection'] != 'app.bsky.feed.post': 35 + return 36 + 37 + record = commit.get('record') 38 + if record is None: 39 + return 40 + 41 + if not record.get('tags', []): 42 + return 43 + 44 + repo = commit['did'] 45 + rkey = commit['rkey'] 46 + post_uri = f'at://{repo}/app.bsky.feed.post/{rkey}' 47 + ts = self.safe_timestamp(record.get('createdAt')).timestamp() 48 + self.transaction_begin(self.db_cnx) 49 + self.db_cnx.execute( 50 + 'insert into posts (uri, create_ts) values (:uri, :ts)', 51 + dict(uri=post_uri, ts=ts) 52 + ) 53 + 54 + def commit_changes(self): 55 + self.logger.debug('committing changes') 56 + self.transaction_commit(self.db_cnx) 57 + self.wal_checkpoint(self.db_cnx, 'RESTART') 58 + 59 + def serve_feed(self, limit, offset, langs): 60 + cur = self.db_cnx.execute(self.SERVE_FEED_QUERY, dict(limit=limit, offset=offset)) 61 + return [row[0] for row in cur] 62 + 63 + def serve_feed_debug(self, limit, offset, langs): 64 + bindings = dict(limit=limit, offset=offset) 65 + return apsw.ext.format_query_table( 66 + self.db_cnx, self.SERVE_FEED_QUERY, bindings, 67 + string_sanitize=2, text_width=9999, use_unicode=True 68 + )