this repo has no description
1package appview 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "log" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/bluesky-social/jetstream/pkg/models" 12 "github.com/go-git/go-git/v5/plumbing" 13 "github.com/ipfs/go-cid" 14 "tangled.sh/tangled.sh/core/api/tangled" 15 "tangled.sh/tangled.sh/core/appview/db" 16 "tangled.sh/tangled.sh/core/rbac" 17) 18 19type Ingester func(ctx context.Context, e *models.Event) error 20 21func Ingest(d db.DbWrapper, enforcer *rbac.Enforcer) Ingester { 22 return func(ctx context.Context, e *models.Event) error { 23 var err error 24 defer func() { 25 eventTime := e.TimeUS 26 lastTimeUs := eventTime + 1 27 if err := d.SaveLastTimeUs(lastTimeUs); err != nil { 28 err = fmt.Errorf("(deferred) failed to save last time us: %w", err) 29 } 30 }() 31 32 if e.Kind != models.EventKindCommit { 33 return nil 34 } 35 36 switch e.Commit.Collection { 37 case tangled.GraphFollowNSID: 38 ingestFollow(&d, e) 39 case tangled.FeedStarNSID: 40 ingestStar(&d, e) 41 case tangled.PublicKeyNSID: 42 ingestPublicKey(&d, e) 43 case tangled.RepoArtifactNSID: 44 ingestArtifact(&d, e, enforcer) 45 case tangled.ActorProfileNSID: 46 ingestProfile(&d, e) 47 case tangled.SpindleMemberNSID: 48 ingestSpindleMember(&d, e, enforcer) 49 } 50 51 return err 52 } 53} 54 55func ingestStar(d *db.DbWrapper, e *models.Event) error { 56 var err error 57 did := e.Did 58 59 switch e.Commit.Operation { 60 case models.CommitOperationCreate, models.CommitOperationUpdate: 61 var subjectUri syntax.ATURI 62 63 raw := json.RawMessage(e.Commit.Record) 64 record := tangled.FeedStar{} 65 err := json.Unmarshal(raw, &record) 66 if err != nil { 67 log.Println("invalid record") 68 return err 69 } 70 71 subjectUri, err = syntax.ParseATURI(record.Subject) 72 if err != nil { 73 log.Println("invalid record") 74 return err 75 } 76 err = db.AddStar(d, did, subjectUri, e.Commit.RKey) 77 case models.CommitOperationDelete: 78 err = db.DeleteStarByRkey(d, did, e.Commit.RKey) 79 } 80 81 if err != nil { 82 return fmt.Errorf("failed to %s star record: %w", e.Commit.Operation, err) 83 } 84 85 return nil 86} 87 88func ingestFollow(d *db.DbWrapper, e *models.Event) error { 89 var err error 90 did := e.Did 91 92 switch e.Commit.Operation { 93 case models.CommitOperationCreate, models.CommitOperationUpdate: 94 raw := json.RawMessage(e.Commit.Record) 95 record := tangled.GraphFollow{} 96 err = json.Unmarshal(raw, &record) 97 if err != nil { 98 log.Println("invalid record") 99 return err 100 } 101 102 subjectDid := record.Subject 103 err = db.AddFollow(d, did, subjectDid, e.Commit.RKey) 104 case models.CommitOperationDelete: 105 err = db.DeleteFollowByRkey(d, did, e.Commit.RKey) 106 } 107 108 if err != nil { 109 return fmt.Errorf("failed to %s follow record: %w", e.Commit.Operation, err) 110 } 111 112 return nil 113} 114 115func ingestPublicKey(d *db.DbWrapper, e *models.Event) error { 116 did := e.Did 117 var err error 118 119 switch e.Commit.Operation { 120 case models.CommitOperationCreate, models.CommitOperationUpdate: 121 log.Println("processing add of pubkey") 122 raw := json.RawMessage(e.Commit.Record) 123 record := tangled.PublicKey{} 124 err = json.Unmarshal(raw, &record) 125 if err != nil { 126 log.Printf("invalid record: %s", err) 127 return err 128 } 129 130 name := record.Name 131 key := record.Key 132 err = db.AddPublicKey(d, did, name, key, e.Commit.RKey) 133 case models.CommitOperationDelete: 134 log.Println("processing delete of pubkey") 135 err = db.DeletePublicKeyByRkey(d, did, e.Commit.RKey) 136 } 137 138 if err != nil { 139 return fmt.Errorf("failed to %s pubkey record: %w", e.Commit.Operation, err) 140 } 141 142 return nil 143} 144 145func ingestArtifact(d *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 146 did := e.Did 147 var err error 148 149 switch e.Commit.Operation { 150 case models.CommitOperationCreate, models.CommitOperationUpdate: 151 raw := json.RawMessage(e.Commit.Record) 152 record := tangled.RepoArtifact{} 153 err = json.Unmarshal(raw, &record) 154 if err != nil { 155 log.Printf("invalid record: %s", err) 156 return err 157 } 158 159 repoAt, err := syntax.ParseATURI(record.Repo) 160 if err != nil { 161 return err 162 } 163 164 repo, err := db.GetRepoByAtUri(d, repoAt.String()) 165 if err != nil { 166 return err 167 } 168 169 ok, err := enforcer.E.Enforce(did, repo.Knot, repo.DidSlashRepo(), "repo:push") 170 if err != nil || !ok { 171 return err 172 } 173 174 createdAt, err := time.Parse(time.RFC3339, record.CreatedAt) 175 if err != nil { 176 createdAt = time.Now() 177 } 178 179 artifact := db.Artifact{ 180 Did: did, 181 Rkey: e.Commit.RKey, 182 RepoAt: repoAt, 183 Tag: plumbing.Hash(record.Tag), 184 CreatedAt: createdAt, 185 BlobCid: cid.Cid(record.Artifact.Ref), 186 Name: record.Name, 187 Size: uint64(record.Artifact.Size), 188 MimeType: record.Artifact.MimeType, 189 } 190 191 err = db.AddArtifact(d, artifact) 192 case models.CommitOperationDelete: 193 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 194 } 195 196 if err != nil { 197 return fmt.Errorf("failed to %s artifact record: %w", e.Commit.Operation, err) 198 } 199 200 return nil 201} 202 203func ingestProfile(d *db.DbWrapper, e *models.Event) error { 204 did := e.Did 205 var err error 206 207 if e.Commit.RKey != "self" { 208 return fmt.Errorf("ingestProfile only ingests `self` record") 209 } 210 211 switch e.Commit.Operation { 212 case models.CommitOperationCreate, models.CommitOperationUpdate: 213 raw := json.RawMessage(e.Commit.Record) 214 record := tangled.ActorProfile{} 215 err = json.Unmarshal(raw, &record) 216 if err != nil { 217 log.Printf("invalid record: %s", err) 218 return err 219 } 220 221 description := "" 222 if record.Description != nil { 223 description = *record.Description 224 } 225 226 includeBluesky := record.Bluesky 227 228 location := "" 229 if record.Location != nil { 230 location = *record.Location 231 } 232 233 var links [5]string 234 for i, l := range record.Links { 235 if i < 5 { 236 links[i] = l 237 } 238 } 239 240 var stats [2]db.VanityStat 241 for i, s := range record.Stats { 242 if i < 2 { 243 stats[i].Kind = db.VanityStatKind(s) 244 } 245 } 246 247 var pinned [6]syntax.ATURI 248 for i, r := range record.PinnedRepositories { 249 if i < 6 { 250 pinned[i] = syntax.ATURI(r) 251 } 252 } 253 254 profile := db.Profile{ 255 Did: did, 256 Description: description, 257 IncludeBluesky: includeBluesky, 258 Location: location, 259 Links: links, 260 Stats: stats, 261 PinnedRepos: pinned, 262 } 263 264 ddb, ok := d.Execer.(*db.DB) 265 if !ok { 266 return fmt.Errorf("failed to index profile record, invalid db cast") 267 } 268 269 tx, err := ddb.Begin() 270 if err != nil { 271 return fmt.Errorf("failed to start transaction") 272 } 273 274 err = db.ValidateProfile(tx, &profile) 275 if err != nil { 276 return fmt.Errorf("invalid profile record") 277 } 278 279 err = db.UpsertProfile(tx, &profile) 280 case models.CommitOperationDelete: 281 err = db.DeleteArtifact(d, db.FilterEq("did", did), db.FilterEq("rkey", e.Commit.RKey)) 282 } 283 284 if err != nil { 285 return fmt.Errorf("failed to %s profile record: %w", e.Commit.Operation, err) 286 } 287 288 return nil 289} 290 291func ingestSpindleMember(_ *db.DbWrapper, e *models.Event, enforcer *rbac.Enforcer) error { 292 did := e.Did 293 var err error 294 295 switch e.Commit.Operation { 296 case models.CommitOperationCreate: 297 raw := json.RawMessage(e.Commit.Record) 298 record := tangled.SpindleMember{} 299 err = json.Unmarshal(raw, &record) 300 if err != nil { 301 log.Printf("invalid record: %s", err) 302 return err 303 } 304 305 // only spindle owner can invite to spindles 306 ok, err := enforcer.IsSpindleInviteAllowed(did, record.Instance) 307 if err != nil || !ok { 308 return fmt.Errorf("failed to enforce permissions: %w", err) 309 } 310 311 err = enforcer.AddSpindleMember(record.Instance, record.Subject) 312 if err != nil { 313 return fmt.Errorf("failed to add member: %w", err) 314 } 315 } 316 317 return nil 318}