A from-scratch atproto PDS implementation in Python (mirrors https://github.com/DavidBuchanan314/millipds)

fix cache expiry, write db migration, integrate did resolver into service proxying

+35 -17
+8
src/millipds/app_util.py
··· 5 5 from aiohttp import web 6 6 7 7 from . import database 8 + from .did import DIDResolver 8 9 9 10 MILLIPDS_DB = web.AppKey("MILLIPDS_DB", database.Database) 10 11 MILLIPDS_AIOHTTP_CLIENT = web.AppKey( ··· 16 17 MILLIPDS_FIREHOSE_QUEUES_LOCK = web.AppKey( 17 18 "MILLIPDS_FIREHOSE_QUEUES_LOCK", asyncio.Lock 18 19 ) 20 + MILLIPDS_DID_RESOLVER = web.AppKey("MILLIPDS_DID_RESOLVER", DIDResolver) 19 21 20 22 21 23 # these helpers are useful for conciseness and type hinting ··· 35 37 return req.app[MILLIPDS_FIREHOSE_QUEUES_LOCK] 36 38 37 39 40 + def get_did_resolver(req: web.Request): 41 + return req.app[MILLIPDS_DID_RESOLVER] 42 + 43 + 38 44 __all__ = [ 39 45 "MILLIPDS_DB", 40 46 "MILLIPDS_AIOHTTP_CLIENT", 41 47 "MILLIPDS_FIREHOSE_QUEUES", 42 48 "MILLIPDS_FIREHOSE_QUEUES_LOCK", 49 + "MILLIPDS_DID_RESOLVER", 43 50 "get_db", 44 51 "get_client", 45 52 "get_firehose_queues", 46 53 "get_firehose_queues_lock", 54 + "get_did_resolver", 47 55 ]
+15 -12
src/millipds/appview_proxy.py
··· 12 12 logger = logging.getLogger(__name__) 13 13 14 14 15 - # TODO: this should be done via actual DID resolution, not hardcoded! 16 - SERVICE_ROUTES = { 17 - "did:web:api.bsky.chat#bsky_chat": "https://api.bsky.chat", 18 - "did:web:discover.bsky.app#bsky_fg": "https://discover.bsky.app", 19 - "did:plc:ar7c4by46qjdydhdevvrndac#atproto_labeler": "https://mod.bsky.app", 20 - } 21 - 22 - 23 15 @authenticated 24 16 async def service_proxy(request: web.Request, service: Optional[str] = None): 25 17 """ ··· 30 22 logger.info(f"proxying lxm {lxm}") 31 23 db = get_db(request) 32 24 if service: 33 - service_did = service.partition("#")[0] 34 - service_route = SERVICE_ROUTES.get(service) 35 - if service_route is None: 25 + service_did, _, fragment = service.partition("#") 26 + fragment = "#" + fragment 27 + did_doc = await get_did_resolver(request).resolve_with_db_cache( 28 + db, service_did 29 + ) 30 + if did_doc is None: 31 + return web.HTTPInternalServerError( 32 + f"unable to resolve service {service!r}" 33 + ) 34 + for service in did_doc.get("service", []): 35 + if service.get("id") == fragment: 36 + service_route = service["serviceEndpoint"] 37 + break 38 + else: 36 39 return web.HTTPBadRequest(f"unable to resolve service {service!r}") 37 - else: 40 + else: # fall thru to assuming bsky appview 38 41 service_did = db.config["bsky_appview_did"] 39 42 service_route = db.config["bsky_appview_pfx"] 40 43
+3 -2
src/millipds/did.py
··· 55 55 # try the db first 56 56 now = int(time.time()) 57 57 row = db.con.execute( 58 - "SELECT doc FROM did_cache WHERE did=? AND expires_at<?", (did, now) 58 + "SELECT doc FROM did_cache WHERE did=? AND ?<expires_at", (did, now) 59 59 ).fetchone() 60 60 61 61 # cache hit ··· 71 71 ) 72 72 try: 73 73 doc = await self.resolve_uncached(did) 74 + logger.info(f"Successfully resolved {did}") 74 75 except Exception as e: 75 - logger.exception(f"Error resolving DID {did}: {e}") 76 + logger.exception(f"Error resolving {did}: {e}") 76 77 doc = None 77 78 78 79 # update "now" because resolution might've taken a while
+4
src/millipds/service.py
··· 26 26 from .appview_proxy import service_proxy 27 27 from .auth_bearer import authenticated 28 28 from .app_util import * 29 + from .did import DIDResolver 29 30 30 31 logger = logging.getLogger(__name__) 31 32 ··· 402 403 {"User-Agent": importlib.metadata.version("millipds")} 403 404 ) 404 405 406 + did_resolver = DIDResolver(client, static_config.PLC_DIRECTORY_HOST) 407 + 405 408 app = web.Application(middlewares=[cors, atproto_service_proxy_middleware]) 406 409 app[MILLIPDS_DB] = db 407 410 app[MILLIPDS_AIOHTTP_CLIENT] = client 408 411 app[MILLIPDS_FIREHOSE_QUEUES] = set() 409 412 app[MILLIPDS_FIREHOSE_QUEUES_LOCK] = asyncio.Lock() 413 + app[MILLIPDS_DID_RESOLVER] = did_resolver 410 414 411 415 app.add_routes(routes) 412 416 app.add_routes(auth_oauth.routes)
+5 -3
src/millipds/static_config.py
··· 10 10 11 11 GROUPNAME = "millipds-sock" 12 12 13 - MILLIPDS_DB_VERSION = ( 14 - 1 # this gets bumped if we make breaking changes to the db schema 15 - ) 13 + # this gets bumped if we make breaking changes to the db schema 14 + MILLIPDS_DB_VERSION = 2 15 + 16 16 ATPROTO_REPO_VERSION_3 = 3 # might get bumped if the atproto spec changes 17 17 CAR_VERSION_1 = 1 18 18 ··· 27 27 28 28 DID_CACHE_TTL = 60 * 60 # 1 hour 29 29 DID_CACHE_ERROR_TTL = 60 * 5 # 5 mins 30 + 31 + PLC_DIRECTORY_HOST = "https://plc.directory"