this repo has no description

feeds/battle.py: add functionality

+48 -48
+48 -48
feeds/battle.py
··· 6 6 7 7 from . import BaseFeed 8 8 9 - MAX_TEXT_LENGTH = 140 10 - 11 - class RapidFireFeed(BaseFeed): 12 - FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/rapidfire' 9 + class BattleFeed(BaseFeed): 10 + FEED_URI = 'at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.generator/battle' 13 11 14 12 def __init__(self): 15 - self.db_cnx = apsw.Connection('db/rapidfire.db') 13 + self.db_cnx = apsw.Connection('db/battle.db') 16 14 self.db_cnx.pragma('journal_mode', 'WAL') 17 15 self.db_cnx.pragma('wal_autocheckpoint', '0') 18 16 19 17 with self.db_cnx: 20 18 self.db_cnx.execute(""" 21 - create table if not exists posts (uri text, create_ts timestamp, lang text); 22 - create index if not exists create_ts_idx on posts(create_ts); 19 + create table if not exists posts ( 20 + uri text, 21 + grapheme_length integer unique, 22 + create_ts timestamp, 23 + lang text 24 + ); 23 25 """) 24 26 25 - self.logger = logging.getLogger('feeds.rapidfire') 27 + self.logger = logging.getLogger('feeds.battle') 26 28 27 29 def process_commit(self, commit): 28 30 op = commit['op'] ··· 37 39 if record is None: 38 40 return 39 41 40 - if all([ 41 - grapheme.length(record['text']) <= MAX_TEXT_LENGTH, 42 - record.get('reply') is None, 43 - record.get('embed') is None, 44 - record.get('facets') is None 45 - ]): 46 - repo = commit['repo'] 47 - path = op['path'] 48 - post_uri = f'at://{repo}/{path}' 49 - ts = self.safe_timestamp(record['createdAt']).timestamp() 50 - 51 - self.transaction_begin(self.db_cnx) 42 + repo = commit['repo'] 43 + path = op['path'] 44 + post_uri = f'at://{repo}/{path}' 45 + l = grapheme.length(record['text']) 46 + ts = self.safe_timestamp(record['createdAt']).timestamp() 52 47 53 - langs = record.get('langs') or [''] 54 - for lang in langs: 55 - self.db_cnx.execute( 56 - 'insert into posts (uri, create_ts, lang) values (:uri, :ts, :lang)', 57 - dict(uri=post_uri, ts=ts, lang=lang) 58 - ) 48 + self.transaction_begin(self.db_cnx) 59 49 60 - def delete_old_posts(self): 61 - self.db_cnx.execute( 62 - "delete from posts where create_ts < unixepoch('now', '-15 minutes')" 63 - ) 50 + langs = record.get('langs') or [''] 51 + for lang in langs: 52 + self.db_cnx.execute(""" 53 + insert into posts(uri, grapheme_length, create_ts, lang) 54 + values(:uri, :length, :ts, :lang) 55 + on conflict(grapheme_length) do update set uri = :uri 56 + """, dict(uri=post_uri, length=l, ts=ts, lang=lang)) 64 57 65 58 def commit_changes(self): 66 59 self.logger.debug('committing changes') 67 - self.delete_old_posts() 68 60 self.transaction_commit(self.db_cnx) 69 61 self.wal_checkpoint(self.db_cnx, 'RESTART') 70 62 71 63 def serve_feed(self, limit, offset, langs): 72 - if '*' in langs: 73 - cur = self.db_cnx.execute( 74 - "select uri from posts order by create_ts desc limit :limit offset :offset", 75 - dict(limit=limit, offset=offset) 76 - ) 77 - return [uri for (uri,) in cur] 78 - else: 79 - lang_values = list(langs.values()) 80 - lang_selects = ['select uri, create_ts from posts where lang = ?'] * len(lang_values) 81 - lang_clause = ' union '.join(lang_selects) 82 - cur = self.db_cnx.execute( 83 - lang_clause + ' order by create_ts desc limit ? offset ?', 84 - [*lang_values, limit, offset] 85 - ) 86 - return [uri for (uri, create_ts) in cur] 64 + # if '*' in langs: 65 + # cur = self.db_cnx.execute( 66 + # "select uri from posts order by grapheme_length asc limit :limit offset :offset", 67 + # dict(limit=limit, offset=offset) 68 + # ) 69 + # return [uri for (uri,) in cur] 70 + # else: 71 + # lang_values = list(langs.values()) 72 + # lang_selects = ['select uri, grapheme_length from posts where lang = ?'] * len(lang_values) 73 + # lang_clause = ' union '.join(lang_selects) 74 + # cur = self.db_cnx.execute( 75 + # lang_clause + ' order by grapheme_length asc limit ? offset ?', 76 + # [*lang_values, limit, offset] 77 + # ) 78 + # return [uri for (uri, create_ts) in cur] 79 + 80 + cur = self.db_cnx.execute(""" 81 + select uri 82 + from posts 83 + order by grapheme_length asc 84 + limit :limit offset :offset 85 + """, dict(limit=limit, offset=offset)) 86 + return [uri for (uri,) in cur] 87 87 88 88 def serve_feed_debug(self, limit, offset, langs): 89 89 query = """ 90 - select *, unixepoch('now') - create_ts as age_seconds 90 + select * 91 91 from posts 92 - order by create_ts desc 92 + order by grapheme_length asc 93 93 limit :limit offset :offset 94 94 """ 95 95 bindings = dict(limit=limit, offset=offset)