this repo has no description

feeds/popular.py: updates

+53 -30
+1 -1
feed_manager.py
··· 27 27 feed.commit_changes() 28 28 29 29 feed_manager = FeedManager() 30 - # feed_manager.register(PopularFeed) 30 + feed_manager.register(PopularFeed) 31 31 feed_manager.register(RapidFireFeed)
+52 -29
feeds/popular.py
··· 1 1 import logging 2 - import os 3 2 4 3 import apsw 4 + import apsw.ext 5 5 6 6 from . import BaseFeed 7 7 ··· 9 9 FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/popular' 10 10 11 11 def __init__(self): 12 - db_fname = '' 13 - if os.path.isdir('/dev/shm/'): 14 - os.makedirs('/dev/shm/feedgens/', exist_ok=True) 15 - db_fname = '/dev/shm/feedgens/popular.db' 16 - else: 17 - db_fname = 'db/popular.db' 12 + super().__init__() 18 13 19 - self.db_cnx = apsw.Connection(db_fname) 14 + self.db_cnx = apsw.Connection('db/popular.db') 20 15 self.db_cnx.pragma('journal_mode', 'WAL') 21 - self.db_cnx.pragma('synchronous', 'OFF') 22 16 self.db_cnx.pragma('wal_autocheckpoint', '0') 23 17 24 18 with self.db_cnx: ··· 38 32 if collection != 'app.bsky.feed.like': 39 33 return 40 34 41 - ts = commit['time'] 35 + record = op['record'] 36 + if record is None: 37 + return 38 + 39 + ts = self.safe_timestamp(record['createdAt']).timestamp() 42 40 like_subject_uri = op['record']['subject']['uri'] 43 41 44 - with self.db_cnx: 45 - self.db_cnx.execute(( 46 - "insert into posts (uri, create_ts, update_ts, temperature) " 47 - "values (:uri, :ts, :ts, 1) " 48 - "on conflict (uri) do update set temperature = temperature + 1, update_ts = :ts" 49 - ), dict(uri=like_subject_uri, ts=ts)) 42 + self.transaction_begin(self.db_cnx) 50 43 51 - def run_tasks_minute(self): 52 - self.logger.debug('running minute tasks') 44 + self.db_cnx.execute(""" 45 + insert into posts (uri, create_ts, update_ts, temperature) 46 + values (:uri, :ts, :ts, 1) 47 + on conflict (uri) do update set temperature = temperature + 1, update_ts = :ts 48 + """, dict(uri=like_subject_uri, ts=ts)) 53 49 54 - with self.db_cnx: 55 - self.db_cnx.execute( 56 - "delete from posts where temperature * exp( -1 * ( ( strftime( '%s', 'now' ) - strftime( '%s', create_ts ) ) / 1800.0 ) ) < 1.0 and strftime( '%s', create_ts ) < strftime( '%s', 'now', '-15 minutes' )" 57 - ) 50 + def delete_old_posts(self): 51 + self.db_cnx.execute(""" 52 + delete from posts 53 + where 54 + temperature * exp( -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) ) < 1.0 55 + and create_ts < unixepoch('now', '-15 minutes') 56 + """) 58 57 59 - self.db_cnx.pragma('wal_checkpoint(TRUNCATE)') 58 + def commit_changes(self): 59 + self.logger.debug('committing changes') 60 + self.delete_old_posts() 61 + self.transaction_commit(self.db_cnx) 62 + self.wal_checkpoint(self.db_cnx, 'RESTART') 60 63 61 64 def serve_feed(self, limit, offset, langs): 62 - cur = self.db_cnx.execute(( 63 - "select uri from posts " 64 - "order by temperature * exp( " 65 - "-1 * ( ( strftime( '%s', 'now' ) - strftime( '%s', create_ts ) ) / 1800.0 ) " 66 - ") desc limit :limit offset :offset" 67 - ), dict(limit=limit, offset=offset)) 65 + cur = self.db_cnx.execute(""" 66 + select uri from posts 67 + order by temperature * exp( -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) ) 68 + desc limit :limit offset :offset 69 + """, dict(limit=limit, offset=offset)) 68 70 return [uri for (uri,) in cur] 71 + 72 + def serve_feed_debug(self, limit, offset, langs): 73 + query = """ 74 + select 75 + uri, temperature, 76 + unixepoch('now') - create_ts as age_seconds, 77 + exp( 78 + -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) 79 + ) as decay, 80 + temperature * exp( 81 + -1 * ( ( unixepoch('now') - create_ts ) / 1800.0 ) 82 + ) as score 83 + from posts 84 + order by score desc 85 + limit :limit offset :offset 86 + """ 87 + bindings = dict(limit=limit, offset=offset) 88 + return apsw.ext.format_query_table( 89 + self.db_cnx, query, bindings, 90 + string_sanitize=2, text_width=9999, use_unicode=True 91 + )