Live video on the AT Protocol

atproto: successful indexing of labels

+126 -6
+2
go.mod
··· 10 10 11 11 replace github.com/AxisCommunications/go-dpop => github.com/streamplace/go-dpop v0.0.0-20250510031900-c897158a8ad4 12 12 13 + replace github.com/bluesky-social/indigo => ../indigo 14 + 13 15 require ( 14 16 firebase.google.com/go/v4 v4.14.1 15 17 git.stream.place/streamplace/c2pa-go v0.7.0
-4
go.sum
··· 119 119 github.com/bkielbasa/cyclop v1.2.3/go.mod h1:kHTwA9Q0uZqOADdupvcFJQtp/ksSnytRMe8ztxG8Fuo= 120 120 github.com/blizzy78/varnamelen v0.8.0 h1:oqSblyuQvFsW1hbBHh1zfwrKe3kcSj0rnXkKzsQ089M= 121 121 github.com/blizzy78/varnamelen v0.8.0/go.mod h1:V9TzQZ4fLJ1DSrjVDfl89H7aMnTvKkApdHeyESmyR7k= 122 - github.com/bluesky-social/indigo v0.0.0-20250617211950-336ebe49427b h1:qc08nIOSRS3Ue5rqRjI3SgxBPS30QvGkBrMoV2+z2KA= 123 - github.com/bluesky-social/indigo v0.0.0-20250617211950-336ebe49427b/go.mod h1:8FlFpF5cIq3DQG0kEHqyTkPV/5MDQoaWLcVwza5ZPJU= 124 122 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= 125 123 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= 126 124 github.com/bombsimon/wsl/v4 v4.7.0 h1:1Ilm9JBPRczjyUs6hvOPKvd7VL1Q++PL8M0SXBDf+jQ= ··· 492 490 github.com/ipfs/go-block-format v0.2.1/go.mod h1:frtvXHMQhM6zn7HvEQu+Qz5wSTj+04oEH/I+NjDgEjk= 493 491 github.com/ipfs/go-blockservice v0.5.2 h1:in9Bc+QcXwd1apOVM7Un9t8tixPKdaHQFdLSUM1Xgk8= 494 492 github.com/ipfs/go-blockservice v0.5.2/go.mod h1:VpMblFEqG67A/H2sHKAemeH9vlURVavlysbdUI632yk= 495 - github.com/ipfs/go-bs-sqlite3 v0.0.0-20221122195556-bfcee1be620d h1:9V+GGXCuOfDiFpdAHz58q9mKLg447xp0cQKvqQrAwYE= 496 - github.com/ipfs/go-bs-sqlite3 v0.0.0-20221122195556-bfcee1be620d/go.mod h1:pMbnFyNAGjryYCLCe59YDLRv/ujdN+zGJBT1umlvYRM= 497 493 github.com/ipfs/go-cid v0.5.0 h1:goEKKhaGm0ul11IHA7I6p1GmKz8kEYniqFopaB5Otwg= 498 494 github.com/ipfs/go-cid v0.5.0/go.mod h1:0L7vmeNXpQpUS9vt+yEARkJ8rOg43DF3iPgn4GIN0mk= 499 495 github.com/ipfs/go-datastore v0.8.2 h1:Jy3wjqQR6sg/LhyY0NIePZC3Vux19nLtg7dx0TVqr6U=
+39 -2
pkg/atproto/labeler_firehose.go
··· 1 1 package atproto 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 5 6 "fmt" 6 7 "net/http" ··· 15 16 "golang.org/x/sync/errgroup" 16 17 "stream.place/streamplace/pkg/aqhttp" 17 18 "stream.place/streamplace/pkg/log" 19 + "stream.place/streamplace/pkg/model" 18 20 ) 19 21 20 22 func (atsync *ATProtoSynchronizer) StartLabelerFirehose(ctx context.Context, did string) error { ··· 83 85 } else { 84 86 return fmt.Errorf("invalid labeler URI scheme: %s", labeler.URL) 85 87 } 88 + dbLabeler, err := atsync.Model.GetLabeler(did) 89 + if err != nil { 90 + return fmt.Errorf("failed to get labeler %s: %w", did, err) 91 + } 92 + if dbLabeler == nil { 93 + dbLabeler, err = atsync.Model.CreateLabeler(did) 94 + if err != nil { 95 + return fmt.Errorf("failed to create labeler %s: %w", did, err) 96 + } 97 + } 86 98 query := u.Query() 87 - query.Set("cursor", "0") 99 + query.Set("cursor", fmt.Sprintf("%d", dbLabeler.Cursor)) 88 100 u.RawQuery = query.Encode() 89 101 90 102 con, _, err := dialer.Dial(u.String(), http.Header{ ··· 97 109 rsc := &events.RepoStreamCallbacks{ 98 110 LabelLabels: func(evt *comatproto.LabelSubscribeLabels_Labels) error { 99 111 log.Log(ctx, "labeler labels", "labels", evt.Labels, "seq", evt.Seq) 112 + err = atsync.Model.UpdateLabelerCursor(did, evt.Seq) 113 + if err != nil { 114 + log.Error(ctx, "failed to update labeler cursor", "err", err) 115 + } 100 116 for _, labelLex := range evt.Labels { 101 117 l := label.FromLexicon(labelLex) 102 118 err = l.VerifySignature(pub) ··· 109 125 log.Error(ctx, "failed to verify label syntax", "err", err) 110 126 continue 111 127 } 112 - log.Log(ctx, "labeler label", "cid", l.CID, "createdAt", l.CreatedAt, "expiresAt", l.ExpiresAt, "negated", l.Negated, "sourceDID", l.SourceDID, "uri", l.URI, "val", l.Val, "version", l.Version) 128 + bs := bytes.Buffer{} 129 + err = labelLex.MarshalCBOR(&bs) 130 + if err != nil { 131 + log.Error(ctx, "failed to marshal label", "err", err) 132 + continue 133 + } 134 + err = atsync.Model.CreateLabel(&model.Label{ 135 + Cid: l.CID, 136 + Cts: l.CreatedAt, 137 + Exp: l.ExpiresAt, 138 + Neg: l.Negated, 139 + Sig: l.Sig, 140 + Src: l.SourceDID, 141 + Uri: l.URI, 142 + Val: l.Val, 143 + Ver: &l.Version, 144 + Record: bs.Bytes(), 145 + }) 146 + if err != nil { 147 + log.Error(ctx, "failed to create label", "err", err) 148 + continue 149 + } 113 150 } 114 151 return nil 115 152 },
+37
pkg/model/label.go
··· 1 + package model 2 + 3 + import "gorm.io/gorm/clause" 4 + 5 + type Label struct { 6 + // cid: Optionally, CID specifying the specific version of 'uri' resource this label applies to. 7 + Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty" gorm:"column:cid"` 8 + // cts: Timestamp when this label was created. 9 + Cts string `json:"cts" cborgen:"cts" gorm:"column:cts"` 10 + // exp: Timestamp at which this label expires (no longer applies). 11 + Exp *string `json:"exp,omitempty" cborgen:"exp,omitempty" gorm:"column:exp"` 12 + // neg: If true, this is a negation label, overwriting a previous label. 13 + Neg *bool `json:"neg,omitempty" cborgen:"neg,omitempty" gorm:"column:neg"` 14 + // sig: Signature of dag-cbor encoded label. 15 + Sig []byte `json:"sig,omitempty" cborgen:"sig,omitempty" gorm:"column:sig"` 16 + // src: DID of the actor who created this label. 17 + Src string `json:"src" cborgen:"src" gorm:"primaryKey;column:src"` 18 + // uri: AT URI of the record, repository (account), or other resource that this label applies to. 19 + Uri string `json:"uri" cborgen:"uri" gorm:"primaryKey;column:uri;index"` 20 + // val: The short string name of the value or type of this label. 21 + Val string `json:"val" cborgen:"val" gorm:"primaryKey;column:val"` 22 + // ver: The AT Protocol version of the label object. 23 + Ver *int64 `json:"ver,omitempty" cborgen:"ver,omitempty" gorm:"column:ver"` 24 + 25 + Record []byte `json:"record,omitempty" cborgen:"record,omitempty" gorm:"column:record"` 26 + } 27 + 28 + func (m *DBModel) CreateLabel(label *Label) error { 29 + return m.DB.Clauses(clause.OnConflict{ 30 + Columns: []clause.Column{ 31 + {Name: "src"}, 32 + {Name: "uri"}, 33 + {Name: "val"}, 34 + }, 35 + UpdateAll: true, 36 + }).Create(label).Error 37 + }
+40
pkg/model/labeler.go
··· 1 + package model 2 + 3 + import ( 4 + "errors" 5 + 6 + "gorm.io/gorm" 7 + ) 8 + 9 + type Labeler struct { 10 + DID string `gorm:"primaryKey;column:did"` 11 + Cursor int64 `gorm:"column:cursor"` 12 + } 13 + 14 + func (m *DBModel) GetLabeler(did string) (*Labeler, error) { 15 + var labeler Labeler 16 + err := m.DB.Where("did = ?", did).First(&labeler).Error 17 + if errors.Is(err, gorm.ErrRecordNotFound) { 18 + return nil, nil 19 + } 20 + if err != nil { 21 + return nil, err 22 + } 23 + return &labeler, nil 24 + } 25 + 26 + func (m *DBModel) CreateLabeler(did string) (*Labeler, error) { 27 + labeler := &Labeler{ 28 + DID: did, 29 + Cursor: 0, 30 + } 31 + 32 + if err := m.DB.Create(labeler).Error; err != nil { 33 + return nil, err 34 + } 35 + return labeler, nil 36 + } 37 + 38 + func (m *DBModel) UpdateLabelerCursor(did string, cursor int64) error { 39 + return m.DB.Model(&Labeler{}).Where("did = ?", did).Update("cursor", cursor).Error 40 + }
+8
pkg/model/model.go
··· 107 107 GetCommitEventsSince(repoDID string, t time.Time) ([]*XrpcStreamEvent, error) 108 108 GetCommitEventsSinceSeq(repoDID string, seq int64) ([]*XrpcStreamEvent, error) 109 109 GetMostRecentCommitEvent(repoDID string) (*XrpcStreamEvent, error) 110 + 111 + CreateLabeler(did string) (*Labeler, error) 112 + GetLabeler(did string) (*Labeler, error) 113 + UpdateLabelerCursor(did string, cursor int64) error 114 + 115 + CreateLabel(label *Label) error 110 116 } 111 117 112 118 func MakeDB(dbURL string) (Model, error) { ··· 169 175 oatproxy.OAuthSession{}, 170 176 ServerSettings{}, 171 177 XrpcStreamEvent{}, 178 + Labeler{}, 179 + Label{}, 172 180 } { 173 181 err = db.AutoMigrate(model) 174 182 if err != nil {