Monorepo for Tangled tangled.org

tapc: add tap client package

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me bfcae5ee 0dc2c042

verified
+266
+3
tapc/readme.md
···
··· 1 + basic tap client package 2 + 3 + Replace this to official indigo package when <https://github.com/bluesky-social/indigo/pull/1241> gets merged.
+24
tapc/simpleIndexer.go
···
··· 1 + package tapc 2 + 3 + import "context" 4 + 5 + type SimpleIndexer struct { 6 + EventHandler func(ctx context.Context, evt Event) error 7 + ErrorHandler func(ctx context.Context, err error) 8 + } 9 + 10 + var _ Handler = (*SimpleIndexer)(nil) 11 + 12 + func (i *SimpleIndexer) OnEvent(ctx context.Context, evt Event) error { 13 + if i.EventHandler == nil { 14 + return nil 15 + } 16 + return i.EventHandler(ctx, evt) 17 + } 18 + 19 + func (i *SimpleIndexer) OnError(ctx context.Context, err error) { 20 + if i.ErrorHandler == nil { 21 + return 22 + } 23 + i.ErrorHandler(ctx, err) 24 + }
+177
tapc/tap.go
···
··· 1 + /// heavily inspired by <https://github.com/bluesky-social/atproto/blob/c7f5a868837d3e9b3289f988fee2267789327b06/packages/tap/README.md> 2 + 3 + package tapc 4 + 5 + import ( 6 + "bytes" 7 + "context" 8 + "encoding/json" 9 + "fmt" 10 + "net/http" 11 + "net/url" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/gorilla/websocket" 16 + "tangled.org/core/log" 17 + ) 18 + 19 + // type WebsocketOptions struct { 20 + // maxReconnectSeconds int 21 + // heartbeatIntervalMs int 22 + // // onReconnectError 23 + // } 24 + 25 + type Handler interface { 26 + OnEvent(ctx context.Context, evt Event) error 27 + OnError(ctx context.Context, err error) 28 + } 29 + 30 + type Client struct { 31 + Url string 32 + AdminPassword string 33 + HTTPClient *http.Client 34 + } 35 + 36 + func NewClient(url, adminPassword string) Client { 37 + return Client{ 38 + Url: url, 39 + AdminPassword: adminPassword, 40 + HTTPClient: &http.Client{}, 41 + } 42 + } 43 + 44 + func (c *Client) AddRepos(ctx context.Context, dids []syntax.DID) error { 45 + body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 46 + if err != nil { 47 + return err 48 + } 49 + req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/add", bytes.NewReader(body)) 50 + if err != nil { 51 + return err 52 + } 53 + req.SetBasicAuth("admin", c.AdminPassword) 54 + req.Header.Set("Content-Type", "application/json") 55 + 56 + resp, err := c.HTTPClient.Do(req) 57 + if err != nil { 58 + return err 59 + } 60 + defer resp.Body.Close() 61 + if resp.StatusCode != http.StatusOK { 62 + return fmt.Errorf("tap: /repos/add failed with status %d", resp.StatusCode) 63 + } 64 + return nil 65 + } 66 + 67 + func (c *Client) RemoveRepos(ctx context.Context, dids []syntax.DID) error { 68 + body, err := json.Marshal(map[string][]syntax.DID{"dids": dids}) 69 + if err != nil { 70 + return err 71 + } 72 + req, err := http.NewRequestWithContext(ctx, "POST", c.Url+"/repos/remove", bytes.NewReader(body)) 73 + if err != nil { 74 + return err 75 + } 76 + req.SetBasicAuth("admin", c.AdminPassword) 77 + req.Header.Set("Content-Type", "application/json") 78 + 79 + resp, err := c.HTTPClient.Do(req) 80 + if err != nil { 81 + return err 82 + } 83 + defer resp.Body.Close() 84 + if resp.StatusCode != http.StatusOK { 85 + return fmt.Errorf("tap: /repos/remove failed with status %d", resp.StatusCode) 86 + } 87 + return nil 88 + } 89 + 90 + func (c *Client) Connect(ctx context.Context, handler Handler) error { 91 + l := log.FromContext(ctx) 92 + 93 + u, err := url.Parse(c.Url) 94 + if err != nil { 95 + return err 96 + } 97 + if u.Scheme == "https" { 98 + u.Scheme = "wss" 99 + } else { 100 + u.Scheme = "ws" 101 + } 102 + u.Path = "/channel" 103 + 104 + // TODO: set auth on dial 105 + 106 + url := u.String() 107 + 108 + var backoff int 109 + for { 110 + select { 111 + case <-ctx.Done(): 112 + return ctx.Err() 113 + default: 114 + } 115 + 116 + header := http.Header{ 117 + "Authorization": []string{""}, 118 + } 119 + conn, res, err := websocket.DefaultDialer.DialContext(ctx, url, header) 120 + if err != nil { 121 + l.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 122 + time.Sleep(time.Duration(5+backoff) * time.Second) 123 + backoff++ 124 + 125 + continue 126 + } else { 127 + backoff = 0 128 + } 129 + 130 + l.Info("tap event subscription response", "code", res.StatusCode) 131 + 132 + if err = c.handleConnection(ctx, conn, handler); err != nil { 133 + l.Warn("tap connection failed", "err", err, "backoff", backoff) 134 + } 135 + } 136 + } 137 + 138 + func (c *Client) handleConnection(ctx context.Context, conn *websocket.Conn, handler Handler) error { 139 + l := log.FromContext(ctx) 140 + 141 + defer func() { 142 + conn.Close() 143 + l.Warn("closed tap conection") 144 + }() 145 + l.Info("established tap conection") 146 + 147 + for { 148 + select { 149 + case <-ctx.Done(): 150 + return ctx.Err() 151 + default: 152 + } 153 + _, message, err := conn.ReadMessage() 154 + if err != nil { 155 + return err 156 + } 157 + 158 + var ev Event 159 + if err := json.Unmarshal(message, &ev); err != nil { 160 + handler.OnError(ctx, fmt.Errorf("failed to parse message: %w", err)) 161 + continue 162 + } 163 + if err := handler.OnEvent(ctx, ev); err != nil { 164 + handler.OnError(ctx, fmt.Errorf("failed to process event %d: %w", ev.ID, err)) 165 + continue 166 + } 167 + 168 + ack := map[string]any{ 169 + "type": "ack", 170 + "id": ev.ID, 171 + } 172 + if err := conn.WriteJSON(ack); err != nil { 173 + l.Warn("failed to send ack", "err", err) 174 + continue 175 + } 176 + } 177 + }
+62
tapc/types.go
···
··· 1 + package tapc 2 + 3 + import ( 4 + "encoding/json" 5 + "fmt" 6 + 7 + "github.com/bluesky-social/indigo/atproto/syntax" 8 + ) 9 + 10 + type EventType string 11 + 12 + const ( 13 + EvtRecord EventType = "record" 14 + EvtIdentity EventType = "identity" 15 + ) 16 + 17 + type Event struct { 18 + ID int64 `json:"id"` 19 + Type EventType `json:"type"` 20 + Record *RecordEventData `json:"record,omitempty"` 21 + Identity *IdentityEventData `json:"identity,omitempty"` 22 + } 23 + 24 + type RecordEventData struct { 25 + Live bool `json:"live"` 26 + Did syntax.DID `json:"did"` 27 + Rev string `json:"rev"` 28 + Collection syntax.NSID `json:"collection"` 29 + Rkey syntax.RecordKey `json:"rkey"` 30 + Action RecordAction `json:"action"` 31 + Record json.RawMessage `json:"record,omitempty"` 32 + CID *syntax.CID `json:"cid,omitempty"` 33 + } 34 + 35 + func (r *RecordEventData) AtUri() syntax.ATURI { 36 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, r.Collection, r.Rkey)) 37 + } 38 + 39 + type RecordAction string 40 + 41 + const ( 42 + RecordCreateAction RecordAction = "create" 43 + RecordUpdateAction RecordAction = "update" 44 + RecordDeleteAction RecordAction = "delete" 45 + ) 46 + 47 + type IdentityEventData struct { 48 + DID syntax.DID `json:"did"` 49 + Handle string `json:"handle"` 50 + IsActive bool `json:"is_active"` 51 + Status RepoStatus `json:"status"` 52 + } 53 + 54 + type RepoStatus string 55 + 56 + const ( 57 + RepoStatusActive RepoStatus = "active" 58 + RepoStatusTakendown RepoStatus = "takendown" 59 + RepoStatusSuspended RepoStatus = "suspended" 60 + RepoStatusDeactivated RepoStatus = "deactivated" 61 + RepoStatusDeleted RepoStatus = "deleted" 62 + )