Write on the margins of the internet. Powered by the AT Protocol. margin.at
extension web atproto comments
at ui-refactor 618 lines 15 kB view raw
1package sync 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "io" 8 "log" 9 "net/http" 10 "strings" 11 "time" 12 13 "margin.at/internal/crypto" 14 "margin.at/internal/db" 15 "margin.at/internal/xrpc" 16) 17 18var CIDVerificationEnabled = true 19 20type Service struct { 21 db *db.DB 22} 23 24func NewService(database *db.DB) *Service { 25 return &Service{db: database} 26} 27 28func (s *Service) PerformSync(ctx context.Context, did string, getClient func(context.Context, string) (*xrpc.Client, error)) (map[string]string, error) { 29 collections := []string{ 30 xrpc.CollectionAnnotation, 31 xrpc.CollectionHighlight, 32 xrpc.CollectionBookmark, 33 xrpc.CollectionReply, 34 xrpc.CollectionLike, 35 xrpc.CollectionCollection, 36 xrpc.CollectionCollectionItem, 37 xrpc.CollectionSembleCard, 38 xrpc.CollectionSembleCollection, 39 xrpc.CollectionSembleCollectionLink, 40 } 41 42 results := make(map[string]string) 43 44 client, err := getClient(ctx, did) 45 if err != nil { 46 return nil, err 47 } 48 49 for _, collectionNSID := range collections { 50 count := 0 51 cursor := "" 52 fetchedURIs := make(map[string]bool) 53 54 for { 55 url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", client.PDS, did, collectionNSID) 56 if cursor != "" { 57 url += "&cursor=" + cursor 58 } 59 60 req, _ := http.NewRequestWithContext(ctx, "GET", url, nil) 61 req.Header.Set("Authorization", "Bearer "+client.AccessToken) 62 63 resp, err := http.DefaultClient.Do(req) 64 if err != nil { 65 return nil, fmt.Errorf("failed to fetch %s: %w", collectionNSID, err) 66 } 67 defer resp.Body.Close() 68 69 if resp.StatusCode != 200 { 70 body, _ := io.ReadAll(resp.Body) 71 results[collectionNSID] = fmt.Sprintf("error: %s", string(body)) 72 break 73 } 74 75 var output struct { 76 Records []struct { 77 URI string `json:"uri"` 78 CID string `json:"cid"` 79 Value json.RawMessage `json:"value"` 80 } `json:"records"` 81 Cursor string `json:"cursor"` 82 } 83 84 if err := json.NewDecoder(resp.Body).Decode(&output); err != nil { 85 return nil, err 86 } 87 88 for _, rec := range output.Records { 89 if CIDVerificationEnabled && rec.CID != "" { 90 if err := crypto.VerifyRecordCID(rec.Value, rec.CID, rec.URI); err != nil { 91 log.Printf("CID verification failed for %s: %v (skipping)", rec.URI, err) 92 continue 93 } 94 } 95 96 err := s.upsertRecord(did, collectionNSID, rec.URI, rec.CID, rec.Value) 97 if err != nil { 98 fmt.Printf("Error upserting %s: %v\n", rec.URI, err) 99 } else { 100 count++ 101 fetchedURIs[rec.URI] = true 102 } 103 } 104 105 if output.Cursor == "" { 106 break 107 } 108 cursor = output.Cursor 109 } 110 111 deletedCount := 0 112 if results[collectionNSID] == "" { 113 var localURIs []string 114 var err error 115 116 switch collectionNSID { 117 case xrpc.CollectionAnnotation: 118 localURIs, err = s.db.GetAnnotationURIs(did) 119 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionAnnotation) 120 case xrpc.CollectionHighlight: 121 localURIs, err = s.db.GetHighlightURIs(did) 122 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionHighlight) 123 case xrpc.CollectionBookmark: 124 localURIs, err = s.db.GetBookmarkURIs(did) 125 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionBookmark) 126 case xrpc.CollectionCollection: 127 cols, e := s.db.GetCollectionsByAuthor(did) 128 if e == nil { 129 for _, c := range cols { 130 localURIs = append(localURIs, c.URI) 131 } 132 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollection) 133 } else { 134 err = e 135 } 136 case xrpc.CollectionCollectionItem: 137 items, e := s.db.GetCollectionItemsByAuthor(did) 138 if e == nil { 139 for _, item := range items { 140 localURIs = append(localURIs, item.URI) 141 } 142 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollectionItem) 143 } else { 144 err = e 145 } 146 case xrpc.CollectionReply: 147 replies, e := s.db.GetRepliesByAuthor(did) 148 if e == nil { 149 for _, r := range replies { 150 localURIs = append(localURIs, r.URI) 151 } 152 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionReply) 153 } else { 154 err = e 155 } 156 case xrpc.CollectionLike: 157 likes, e := s.db.GetLikesByAuthor(did) 158 if e == nil { 159 for _, l := range likes { 160 localURIs = append(localURIs, l.URI) 161 } 162 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionLike) 163 } else { 164 err = e 165 } 166 case xrpc.CollectionSembleCard: 167 annos, e1 := s.db.GetAnnotationURIs(did) 168 books, e2 := s.db.GetBookmarkURIs(did) 169 if e1 != nil { 170 err = e1 171 break 172 } 173 if e2 != nil { 174 err = e2 175 break 176 } 177 localURIs = append(localURIs, annos...) 178 localURIs = append(localURIs, books...) 179 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCard) 180 case xrpc.CollectionSembleCollection: 181 cols, e := s.db.GetCollectionsByAuthor(did) 182 if e == nil { 183 for _, c := range cols { 184 localURIs = append(localURIs, c.URI) 185 } 186 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollection) 187 } else { 188 err = e 189 } 190 case xrpc.CollectionSembleCollectionLink: 191 items, e := s.db.GetCollectionItemsByAuthor(did) 192 if e == nil { 193 for _, item := range items { 194 localURIs = append(localURIs, item.URI) 195 } 196 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollectionLink) 197 } else { 198 err = e 199 } 200 } 201 202 if err == nil { 203 for _, uri := range localURIs { 204 if !fetchedURIs[uri] { 205 switch collectionNSID { 206 case xrpc.CollectionAnnotation: 207 _ = s.db.DeleteAnnotation(uri) 208 case xrpc.CollectionHighlight: 209 _ = s.db.DeleteHighlight(uri) 210 case xrpc.CollectionBookmark: 211 _ = s.db.DeleteBookmark(uri) 212 case xrpc.CollectionCollection: 213 _ = s.db.DeleteCollection(uri) 214 case xrpc.CollectionCollectionItem: 215 _ = s.db.RemoveFromCollection(uri) 216 case xrpc.CollectionReply: 217 _ = s.db.DeleteReply(uri) 218 case xrpc.CollectionLike: 219 _ = s.db.DeleteLike(uri) 220 case xrpc.CollectionSembleCard: 221 _ = s.db.DeleteAnnotation(uri) 222 _ = s.db.DeleteBookmark(uri) 223 case xrpc.CollectionSembleCollection: 224 _ = s.db.DeleteCollection(uri) 225 case xrpc.CollectionSembleCollectionLink: 226 _ = s.db.RemoveFromCollection(uri) 227 } 228 deletedCount++ 229 } 230 } 231 } 232 } 233 234 if results[collectionNSID] == "" { 235 results[collectionNSID] = fmt.Sprintf("synced %d records, deleted %d stale", count, deletedCount) 236 } 237 } 238 return results, nil 239} 240 241func filterURIsByCollection(uris []string, collectionNSID string) []string { 242 if len(uris) == 0 || collectionNSID == "" { 243 return uris 244 } 245 needle := "/" + collectionNSID + "/" 246 out := make([]string, 0, len(uris)) 247 for _, u := range uris { 248 if strings.Contains(u, needle) { 249 out = append(out, u) 250 } 251 } 252 return out 253} 254 255func strPtr(s string) *string { 256 if s == "" { 257 return nil 258 } 259 return &s 260} 261 262func (s *Service) upsertRecord(did, collection, uri, cid string, value json.RawMessage) error { 263 cidPtr := strPtr(cid) 264 switch collection { 265 case xrpc.CollectionAnnotation: 266 var record xrpc.AnnotationRecord 267 if err := json.Unmarshal(value, &record); err != nil { 268 return err 269 } 270 271 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 272 273 targetSource := record.Target.Source 274 if targetSource == "" { 275 276 } 277 278 targetHash := record.Target.SourceHash 279 if targetHash == "" && targetSource != "" { 280 targetHash = db.HashURL(targetSource) 281 } 282 283 motivation := record.Motivation 284 if motivation == "" { 285 motivation = "commenting" 286 } 287 288 var bodyValuePtr, bodyFormatPtr, bodyURIPtr, targetTitlePtr, selectorJSONPtr, tagsJSONPtr *string 289 if record.Body != nil { 290 if record.Body.Value != "" { 291 val := record.Body.Value 292 bodyValuePtr = &val 293 } 294 if record.Body.Format != "" { 295 fmt := record.Body.Format 296 bodyFormatPtr = &fmt 297 } 298 } 299 if record.Target.Title != "" { 300 t := record.Target.Title 301 targetTitlePtr = &t 302 } 303 if len(record.Target.Selector) > 0 { 304 selectorStr := string(record.Target.Selector) 305 selectorJSONPtr = &selectorStr 306 } 307 if len(record.Tags) > 0 { 308 tagsBytes, _ := json.Marshal(record.Tags) 309 tagsStr := string(tagsBytes) 310 tagsJSONPtr = &tagsStr 311 } 312 313 return s.db.CreateAnnotation(&db.Annotation{ 314 URI: uri, 315 AuthorDID: did, 316 Motivation: motivation, 317 BodyValue: bodyValuePtr, 318 BodyFormat: bodyFormatPtr, 319 BodyURI: bodyURIPtr, 320 TargetSource: targetSource, 321 TargetHash: targetHash, 322 TargetTitle: targetTitlePtr, 323 SelectorJSON: selectorJSONPtr, 324 TagsJSON: tagsJSONPtr, 325 CreatedAt: createdAt, 326 IndexedAt: time.Now(), 327 CID: cidPtr, 328 }) 329 330 case xrpc.CollectionHighlight: 331 var record xrpc.HighlightRecord 332 if err := json.Unmarshal(value, &record); err != nil { 333 return err 334 } 335 336 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 337 if createdAt.IsZero() { 338 createdAt = time.Now() 339 } 340 341 targetHash := record.Target.SourceHash 342 if targetHash == "" && record.Target.Source != "" { 343 targetHash = db.HashURL(record.Target.Source) 344 } 345 346 var titlePtr, selectorJSONPtr, colorPtr, tagsJSONPtr *string 347 if record.Target.Title != "" { 348 t := record.Target.Title 349 titlePtr = &t 350 } 351 if len(record.Target.Selector) > 0 { 352 selectorStr := string(record.Target.Selector) 353 selectorJSONPtr = &selectorStr 354 } 355 if record.Color != "" { 356 c := record.Color 357 colorPtr = &c 358 } 359 if len(record.Tags) > 0 { 360 tagsBytes, _ := json.Marshal(record.Tags) 361 tagsStr := string(tagsBytes) 362 tagsJSONPtr = &tagsStr 363 } 364 365 return s.db.CreateHighlight(&db.Highlight{ 366 URI: uri, 367 AuthorDID: did, 368 TargetSource: record.Target.Source, 369 TargetHash: targetHash, 370 TargetTitle: titlePtr, 371 SelectorJSON: selectorJSONPtr, 372 Color: colorPtr, 373 TagsJSON: tagsJSONPtr, 374 CreatedAt: createdAt, 375 IndexedAt: time.Now(), 376 CID: cidPtr, 377 }) 378 379 case xrpc.CollectionBookmark: 380 var record xrpc.BookmarkRecord 381 if err := json.Unmarshal(value, &record); err != nil { 382 return err 383 } 384 385 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 386 387 sourceHash := record.SourceHash 388 if sourceHash == "" && record.Source != "" { 389 sourceHash = db.HashURL(record.Source) 390 } 391 392 var titlePtr, descPtr, tagsJSONPtr *string 393 if record.Title != "" { 394 t := record.Title 395 titlePtr = &t 396 } 397 if record.Description != "" { 398 d := record.Description 399 descPtr = &d 400 } 401 if len(record.Tags) > 0 { 402 tagsBytes, _ := json.Marshal(record.Tags) 403 tagsStr := string(tagsBytes) 404 tagsJSONPtr = &tagsStr 405 } 406 407 return s.db.CreateBookmark(&db.Bookmark{ 408 URI: uri, 409 AuthorDID: did, 410 Source: record.Source, 411 SourceHash: sourceHash, 412 Title: titlePtr, 413 Description: descPtr, 414 TagsJSON: tagsJSONPtr, 415 CreatedAt: createdAt, 416 IndexedAt: time.Now(), 417 CID: cidPtr, 418 }) 419 420 case xrpc.CollectionCollection: 421 var record xrpc.CollectionRecord 422 if err := json.Unmarshal(value, &record); err != nil { 423 return err 424 } 425 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 426 427 var descPtr, iconPtr *string 428 if record.Description != "" { 429 d := record.Description 430 descPtr = &d 431 } 432 if record.Icon != "" { 433 i := record.Icon 434 iconPtr = &i 435 } 436 437 return s.db.CreateCollection(&db.Collection{ 438 URI: uri, 439 AuthorDID: did, 440 Name: record.Name, 441 Description: descPtr, 442 Icon: iconPtr, 443 CreatedAt: createdAt, 444 IndexedAt: time.Now(), 445 }) 446 447 case xrpc.CollectionCollectionItem: 448 var record xrpc.CollectionItemRecord 449 if err := json.Unmarshal(value, &record); err != nil { 450 return err 451 } 452 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 453 454 return s.db.AddToCollection(&db.CollectionItem{ 455 URI: uri, 456 AuthorDID: did, 457 CollectionURI: record.Collection, 458 AnnotationURI: record.Annotation, 459 Position: record.Position, 460 CreatedAt: createdAt, 461 IndexedAt: time.Now(), 462 }) 463 464 case xrpc.CollectionReply: 465 var record xrpc.ReplyRecord 466 if err := json.Unmarshal(value, &record); err != nil { 467 return err 468 } 469 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 470 471 var formatPtr *string 472 if record.Format != "" { 473 f := record.Format 474 formatPtr = &f 475 } 476 477 return s.db.CreateReply(&db.Reply{ 478 URI: uri, 479 AuthorDID: did, 480 ParentURI: record.Parent.URI, 481 RootURI: record.Root.URI, 482 Text: record.Text, 483 Format: formatPtr, 484 CreatedAt: createdAt, 485 IndexedAt: time.Now(), 486 CID: cidPtr, 487 }) 488 489 case xrpc.CollectionLike: 490 var record xrpc.LikeRecord 491 if err := json.Unmarshal(value, &record); err != nil { 492 return err 493 } 494 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 495 496 return s.db.CreateLike(&db.Like{ 497 URI: uri, 498 AuthorDID: did, 499 SubjectURI: record.Subject.URI, 500 CreatedAt: createdAt, 501 IndexedAt: time.Now(), 502 }) 503 504 case xrpc.CollectionSembleCard: 505 var card xrpc.SembleCard 506 if err := json.Unmarshal(value, &card); err != nil { 507 return err 508 } 509 510 createdAt := card.GetCreatedAtTime() 511 512 content, err := card.ParseContent() 513 if err != nil { 514 return nil 515 } 516 517 switch card.Type { 518 case "NOTE": 519 note, ok := content.(*xrpc.SembleNoteContent) 520 if !ok { 521 return nil 522 } 523 524 targetSource := card.URL 525 if targetSource == "" { 526 return nil 527 } 528 529 targetHash := db.HashURL(targetSource) 530 motivation := "commenting" 531 bodyValue := note.Text 532 533 return s.db.CreateAnnotation(&db.Annotation{ 534 URI: uri, 535 AuthorDID: did, 536 Motivation: motivation, 537 BodyValue: &bodyValue, 538 TargetSource: targetSource, 539 TargetHash: targetHash, 540 CreatedAt: createdAt, 541 IndexedAt: time.Now(), 542 CID: cidPtr, 543 }) 544 545 case "URL": 546 urlContent, ok := content.(*xrpc.SembleURLContent) 547 if !ok { 548 return nil 549 } 550 551 source := urlContent.URL 552 if source == "" { 553 return nil 554 } 555 sourceHash := db.HashURL(source) 556 557 var titlePtr *string 558 if urlContent.Metadata != nil && urlContent.Metadata.Title != "" { 559 t := urlContent.Metadata.Title 560 titlePtr = &t 561 } 562 563 return s.db.CreateBookmark(&db.Bookmark{ 564 URI: uri, 565 AuthorDID: did, 566 Source: source, 567 SourceHash: sourceHash, 568 Title: titlePtr, 569 CreatedAt: createdAt, 570 IndexedAt: time.Now(), 571 CID: cidPtr, 572 }) 573 } 574 575 case xrpc.CollectionSembleCollection: 576 var record xrpc.SembleCollection 577 if err := json.Unmarshal(value, &record); err != nil { 578 return err 579 } 580 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 581 582 var descPtr, iconPtr *string 583 if record.Description != "" { 584 d := record.Description 585 descPtr = &d 586 } 587 icon := "icon:semble" 588 iconPtr = &icon 589 590 return s.db.CreateCollection(&db.Collection{ 591 URI: uri, 592 AuthorDID: did, 593 Name: record.Name, 594 Description: descPtr, 595 Icon: iconPtr, 596 CreatedAt: createdAt, 597 IndexedAt: time.Now(), 598 }) 599 600 case xrpc.CollectionSembleCollectionLink: 601 var record xrpc.SembleCollectionLink 602 if err := json.Unmarshal(value, &record); err != nil { 603 return err 604 } 605 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt) 606 607 return s.db.AddToCollection(&db.CollectionItem{ 608 URI: uri, 609 AuthorDID: did, 610 CollectionURI: record.Collection.URI, 611 AnnotationURI: record.Card.URI, 612 Position: 0, 613 CreatedAt: createdAt, 614 IndexedAt: time.Now(), 615 }) 616 } 617 return nil 618}