···52525353- **PLC HTTP API:** Full support for GET and POST operations (resolve DID docs, logs, audit logs, and submit new operations), with the detail that only sequence-based export pagination is supported
5454 - didplcbft is already able to work as a decentralized, standard-compliant PLC implementation, independently from plc.directory
5555+ - Supports **Websocket-based Export** equivalent to that of the official plc.directory service (described [here](https://github.com/did-method-plc/did-method-plc/pull/129)).
5556- **Validation:** On-chain validation of PLC operations to ensure history integrity.
5657- **Node-to node fast syncing:** Support for snapshot-based sync, to quickly bring new replicas online, making use of the facilities offered by CometBFT.
5758 - A custom compact serialization format is used, able to archive/transmit the entire directory using around 30 GB of space/data transfer as of January 2026.
···65666667### Planned
67686868-- **Websocket-based Export** equivalent to that of the official plc.directory service.
6969- **Bi-directional Sync:** submitting operations observed on the didplcbft network back to the official plc.directory, while still deferring to operations served by the latter in case of conflict.
7070- **Spam Prevention:** developing a non-currency-based throttling mechanism.
7171 - For example, by gossipping hashes of IP addresses and AS numbers across the network in order to limit how quickly spammers can create new identities in the PLC. The challenge is that certain entities (e.g. Bluesky's own official PDSs) will naturally need to create many more identities than others... maybe some sort of allowlisting mechanism would need to be implemented.
+3-1
abciapp/range_challenge.go
···186186 }
187187 defer func() {
188188 err := c.nodeEventBus.Unsubscribe(context.Background(), subscriber, cmttypes.EventQueryNewBlockHeader)
189189- _ = err
189189+ if err != nil {
190190+ c.logger.Error("failed to unsubscribe from new blocks", "error", stacktrace.Propagate(err))
191191+ }
190192 }()
191193192194 for {
+193-26
httpapi/server.go
···1414 "strings"
1515 "sync"
1616 "sync/atomic"
1717+ "syscall"
1718 "time"
18191920 "github.com/bluesky-social/indigo/atproto/atcrypto"
2121+ "github.com/coder/websocket"
2222+ "github.com/coder/websocket/wsjson"
2023 "github.com/did-method-plc/go-didplc"
2124 "github.com/gbl08ma/stacktrace"
2525+ "github.com/google/uuid"
2226 cbornode "github.com/ipfs/go-ipld-cbor"
2327 "github.com/rs/cors"
2428 "github.com/samber/lo"
25293030+ cmtlog "github.com/cometbft/cometbft/libs/log"
3131+ "github.com/cometbft/cometbft/rpc/core"
3232+ cmttypes "github.com/cometbft/cometbft/types"
2633 "tangled.org/gbl08ma.com/didplcbft/abciapp"
2734 "tangled.org/gbl08ma.com/didplcbft/plc"
2835 "tangled.org/gbl08ma.com/didplcbft/transaction"
···31383239// Server represents the HTTP server for the PLC directory.
3340type Server struct {
4141+ logger cmtlog.Logger
3442 txFactory *transaction.Factory
3543 plc plc.ReadPLC
3644 router *http.ServeMux
3745 mempoolSubmitter types.MempoolSubmitter
4646+ nodeEventBus *cmttypes.EventBus
3847 srv http.Server
3948 handlerTimeout time.Duration
4049 proto string
···4554}
46554756// NewServer creates a new instance of the Server.
4848-func NewServer(txFactory *transaction.Factory, plc plc.ReadPLC, mempoolSubmitter types.MempoolSubmitter, listenAddr string, handlerTimeout time.Duration) (*Server, error) {
5757+func NewServer(
5858+ logger cmtlog.Logger,
5959+ txFactory *transaction.Factory,
6060+ plc plc.ReadPLC,
6161+ mempoolSubmitter types.MempoolSubmitter,
6262+ nodeEventBus *cmttypes.EventBus,
6363+ listenAddr string,
6464+ handlerTimeout time.Duration) (*Server, error) {
4965 s := &Server{
6666+ logger: logger,
5067 txFactory: txFactory,
5168 plc: plc,
5269 router: http.NewServeMux(),
5370 mempoolSubmitter: mempoolSubmitter,
7171+ nodeEventBus: nodeEventBus,
5472 srv: http.Server{Addr: listenAddr},
5573 handlerTimeout: handlerTimeout,
5674 }
···58765977 handler := cors.Default().Handler(s.router)
60786161- timeoutMsg, _ := json.Marshal(map[string]string{"message": "Internal server timeout"})
6262-6363- handler = http.TimeoutHandler(handler, s.handlerTimeout, string(timeoutMsg))
6464-6579 s.srv.Handler = handler
66806781 parts := strings.SplitN(listenAddr, "://", 2)
···78927993// setupRoutes configures the routes for the server.
8094func (s *Server) setupRoutes() {
8181- s.router.HandleFunc("GET /{did}", s.makeDIDHandler(s.handleResolveDID))
8282- s.router.HandleFunc("POST /{did}", s.makeDIDHandler(s.handleCreatePLC))
8383- s.router.HandleFunc("GET /{did}/log", s.makeDIDHandler(s.handleGetPLCLog))
8484- s.router.HandleFunc("GET /{did}/log/audit", s.makeDIDHandler(s.handleGetPLCAuditLog))
8585- s.router.HandleFunc("GET /{did}/log/last", s.makeDIDHandler(s.handleGetLastOp))
8686- s.router.HandleFunc("GET /{did}/data", s.makeDIDHandler(s.handleGetPLCData))
8787- s.router.HandleFunc("GET /export", s.handleExport)
9595+ wrapInTimeout := func(fn http.HandlerFunc) http.Handler {
9696+ timeoutMsg, _ := json.Marshal(map[string]string{"message": "Internal server timeout"})
9797+9898+ return http.TimeoutHandler(http.HandlerFunc(fn), s.handlerTimeout, string(timeoutMsg))
9999+ }
100100+ s.router.Handle("GET /{did}", wrapInTimeout(s.makeDIDHandler(s.handleResolveDID)))
101101+ s.router.Handle("POST /{did}", wrapInTimeout(s.makeDIDHandler(s.handleCreatePLC)))
102102+ s.router.Handle("GET /{did}/log", wrapInTimeout(s.makeDIDHandler(s.handleGetPLCLog)))
103103+ s.router.Handle("GET /{did}/log/audit", wrapInTimeout(s.makeDIDHandler(s.handleGetPLCAuditLog)))
104104+ s.router.Handle("GET /{did}/log/last", wrapInTimeout(s.makeDIDHandler(s.handleGetLastOp)))
105105+ s.router.Handle("GET /{did}/data", wrapInTimeout(s.makeDIDHandler(s.handleGetPLCData)))
106106+ s.router.Handle("GET /export", wrapInTimeout(s.handleExport))
107107+ s.router.HandleFunc("/export/stream", s.handleExportStream)
8810889109 // TODO expose pprof only if enabled in [plc] settings
90110 s.router.HandleFunc("/debug/pprof/", pprof.Index)
···143163func (s *Server) handleResolveDID(w http.ResponseWriter, r *http.Request, did string) {
144164 ctx := context.Background()
145165 doc, err := s.plc.Resolve(ctx, s.txFactory.ReadCommitted(), did)
146146- if handlePLCError(w, err, did) {
166166+ if s.handlePLCError(w, err, did) {
147167 return
148168 }
149169···221241 }
222242223243 txBytes, err := cbornode.DumpObject(tx)
224224- if handlePLCError(w, err, "") {
244244+ if s.handlePLCError(w, err, "") {
225245 return
226246 }
227247···229249 // in practice we expect operations to be included in about one second
230250 result, err := s.mempoolSubmitter.BroadcastTx(r.Context(), txBytes, true)
231251 // TODO more robust error handling
232232- if handlePLCError(w, err, "") {
252252+ if s.handlePLCError(w, err, "") {
233253 return
234254 }
235255···249269// handleGetPLCLog handles the GET /{did}/log endpoint.
250270func (s *Server) handleGetPLCLog(w http.ResponseWriter, r *http.Request, did string) {
251271 ops, err := s.plc.OperationLog(r.Context(), s.txFactory.ReadCommitted(), did)
252252- if handlePLCError(w, err, did) {
272272+ if s.handlePLCError(w, err, did) {
253273 return
254274 }
255275···260280// handleGetPLCAuditLog handles the GET /{did}/log/audit endpoint.
261281func (s *Server) handleGetPLCAuditLog(w http.ResponseWriter, r *http.Request, did string) {
262282 entries, err := s.plc.AuditLog(r.Context(), s.txFactory.ReadCommitted(), did)
263263- if handlePLCError(w, err, did) {
283283+ if s.handlePLCError(w, err, did) {
264284 return
265285 }
266286···271291// handleGetLastOp handles the GET /{did}/log/last endpoint.
272292func (s *Server) handleGetLastOp(w http.ResponseWriter, r *http.Request, did string) {
273293 op, err := s.plc.LastOperation(r.Context(), s.txFactory.ReadCommitted(), did)
274274- if handlePLCError(w, err, did) {
294294+ if s.handlePLCError(w, err, did) {
275295 return
276296 }
277297···282302// handleGetPLCData handles the GET /{did}/data endpoint.
283303func (s *Server) handleGetPLCData(w http.ResponseWriter, r *http.Request, did string) {
284304 data, err := s.plc.Data(r.Context(), s.txFactory.ReadCommitted(), did)
285285- if handlePLCError(w, err, did) {
305305+ if s.handlePLCError(w, err, did) {
286306 return
287307 }
288308···304324 json.NewEncoder(w).Encode(resp)
305325}
306326327327+type jsonEntry struct {
328328+ Seq uint64 `json:"seq"`
329329+ Type string `json:"type"`
330330+ *didplc.LogEntry
331331+}
332332+307333// handleExport handles the GET /export endpoint.
308334func (s *Server) handleExport(w http.ResponseWriter, r *http.Request) {
309335 query := r.URL.Query()
···332358 }
333359334360 entries, err := s.plc.Export(r.Context(), s.txFactory.ReadCommitted(), after, count)
335335- if handlePLCError(w, err, "") {
361361+ if s.handlePLCError(w, err, "") {
336362 return
337363 }
338364339365 w.Header().Set("Content-Type", "application/jsonlines")
340366341341- type jsonEntry struct {
342342- Seq uint64 `json:"seq"`
343343- Type string `json:"type"`
344344- *didplc.LogEntry
345345- }
346367 for _, entry := range entries {
347368 json.NewEncoder(w).Encode(jsonEntry{
348369 Seq: entry.Seq,
···352373 }
353374}
354375376376+func (s *Server) handleExportStream(w http.ResponseWriter, r *http.Request) {
377377+ c, err := websocket.Accept(w, r, nil)
378378+ if s.handlePLCError(w, err, "") {
379379+ return
380380+ }
381381+ defer c.CloseNow()
382382+383383+ cursorStr := r.URL.Query().Get("cursor")
384384+ cursor := uint64(0)
385385+ operationCount, err := s.txFactory.ReadCommitted().CountOperations()
386386+ if err != nil {
387387+ s.logger.Error("Export stream failed to get operation count", "error", stacktrace.Propagate(err))
388388+ c.Close(websocket.StatusInternalError, "internal error")
389389+ return
390390+ }
391391+392392+ if cursorStr != "" {
393393+ // the official implementation always uses status code 1000 (websocket.StatusNormalClosure) for these errors
394394+ cursor, err = strconv.ParseUint(cursorStr, 10, 64)
395395+ if err != nil {
396396+ c.Close(websocket.StatusNormalClosure, "InvalidCursor") // not specified in the spec, but should be good enough
397397+ return
398398+ }
399399+400400+ // validate cursor against operationCount
401401+ if cursor > operationCount {
402402+ c.Close(websocket.StatusNormalClosure, "FutureCursor") // as in the spec
403403+ return
404404+ }
405405+ } else {
406406+ cursor = operationCount
407407+ }
408408+409409+ uuid, err := uuid.NewRandom()
410410+ if err != nil {
411411+ s.logger.Error("Export stream failed to generate UUID", "error", stacktrace.Propagate(err))
412412+ c.Close(websocket.StatusInternalError, "internal error")
413413+ return
414414+ }
415415+ subscriber := uuid.String()
416416+417417+ ctx, cancelCtx := context.WithCancel(r.Context())
418418+419419+ // We do not expect to read anything from the websocket
420420+ ctx = c.CloseRead(ctx)
421421+422422+ newBlockCh := make(chan struct{}, 1)
423423+ wg := sync.WaitGroup{}
424424+ wg.Go(func() {
425425+ defer close(newBlockCh)
426426+427427+ subCtx, cancel := context.WithTimeout(ctx, core.SubscribeTimeout)
428428+ defer cancel()
429429+430430+ blocksSub, err := s.nodeEventBus.Subscribe(subCtx, subscriber, cmttypes.EventQueryNewBlockHeader)
431431+ if err != nil {
432432+ s.logger.Error("Export stream failed to subscribe to new block headers", "error", stacktrace.Propagate(err))
433433+ c.Close(websocket.StatusInternalError, "internal error")
434434+ return
435435+ }
436436+ defer func() {
437437+ err := s.nodeEventBus.Unsubscribe(context.Background(), subscriber, cmttypes.EventQueryNewBlockHeader)
438438+ if err != nil {
439439+ s.logger.Error("Export stream failed to unsubscribe from new block headers", "error", stacktrace.Propagate(err))
440440+ }
441441+ }()
442442+443443+ for {
444444+ select {
445445+ case <-ctx.Done():
446446+ return
447447+ case <-blocksSub.Out():
448448+ // We can't block here! Otherwise our subscription will be terminated for not consuming messages fast enough
449449+ select {
450450+ case newBlockCh <- struct{}{}:
451451+ default:
452452+ }
453453+ case <-blocksSub.Canceled():
454454+ err := blocksSub.Err()
455455+ if err != nil {
456456+ s.logger.Error("blocksSub was canceled with error", "error", stacktrace.Propagate(err))
457457+ }
458458+ return
459459+ }
460460+ }
461461+ })
462462+463463+ defer wg.Wait()
464464+ defer cancelCtx()
465465+466466+ const numEntriesPerBatch = 100
467467+ for {
468468+ entries, err := s.plc.Export(ctx, s.txFactory.ReadCommitted(), cursor, numEntriesPerBatch)
469469+ if err != nil {
470470+ s.logger.Error("Export stream failed to export entries", "error", stacktrace.Propagate(err))
471471+ c.Close(websocket.StatusInternalError, "internal error")
472472+ return
473473+ }
474474+475475+ for _, entry := range entries {
476476+ // 1 week is the same as what the official implementation appears to allow
477477+ // TODO make configurable (the official implementation has it configurable)
478478+ // TODO consider whether we really need to implement this limitation, as much like what happens with slow consumers,
479479+ // we probably don't have any problems dealing with old entries, unlike the official implementation's "outbox"?
480480+ if time.Since(entry.CreatedAt) > 7*24*time.Hour {
481481+ c.Close(websocket.StatusNormalClosure, "OutdatedCursor") // as in the spec
482482+ return
483483+ }
484484+ err := wsjson.Write(ctx, c, jsonEntry{
485485+ Seq: entry.Seq,
486486+ Type: "sequenced_op",
487487+ LogEntry: lo.ToPtr(entry.ToDIDPLCLogEntry()),
488488+ })
489489+ if err != nil {
490490+ if !errors.Is(err, context.Canceled) && !errors.Is(err, syscall.EPIPE) {
491491+ s.logger.Error("Export stream failed to write entry", "error", stacktrace.Propagate(err))
492492+ }
493493+ c.Close(websocket.StatusInternalError, "internal error")
494494+ return
495495+ }
496496+497497+ cursor = entry.Seq
498498+ }
499499+500500+ if len(entries) == numEntriesPerBatch {
501501+ // there's a chance there's already more available, so immediately try fetching and sending again
502502+ continue
503503+ }
504504+505505+ // TODO periodically check CountOperations and compare against cursor
506506+ // to see if we need to close with reason ConsumerTooSlow
507507+ // (the official implementation needs this because their "outbox" has some buffers, but our buffers are the same size regardless of "where" in the stream we are: do we really need to check for slow consumers?)
508508+ // (does a slow consumer cause us to spend any more resources than a fast one, or one that resorts to using the non-HTTP endpoint? I don't think so)
509509+510510+ select {
511511+ case <-ctx.Done():
512512+ return
513513+ case _, ok := <-newBlockCh:
514514+ if !ok {
515515+ return
516516+ }
517517+ }
518518+ }
519519+}
520520+355521// handlePLCError handles errors from the PLC interface and sends the appropriate HTTP response.
356356-func handlePLCError(w http.ResponseWriter, err error, did string) bool {
522522+func (s *Server) handlePLCError(w http.ResponseWriter, err error, did string) bool {
357523 if err == nil {
358524 return false
359525 }
···363529 case errors.Is(err, plc.ErrDIDGone):
364530 sendErrorResponse(w, http.StatusGone, fmt.Sprintf("DID not available: %s", did))
365531 default:
532532+ s.logger.Error("PLC server returning internal server error", "did", did, "error", stacktrace.Propagate(err))
366533 sendErrorResponse(w, http.StatusInternalServerError, "Internal server error")
367534 }
368535 return true