Write on the margins of the internet. Powered by the AT Protocol.
margin.at
extension
web
atproto
comments
1package sync
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "log"
9 "net/http"
10 "strings"
11 "time"
12
13 "margin.at/internal/crypto"
14 "margin.at/internal/db"
15 "margin.at/internal/xrpc"
16)
17
18var CIDVerificationEnabled = true
19
20type Service struct {
21 db *db.DB
22}
23
24func NewService(database *db.DB) *Service {
25 return &Service{db: database}
26}
27
28func (s *Service) PerformSync(ctx context.Context, did string, getClient func(context.Context, string) (*xrpc.Client, error)) (map[string]string, error) {
29 collections := []string{
30 xrpc.CollectionAnnotation,
31 xrpc.CollectionHighlight,
32 xrpc.CollectionBookmark,
33 xrpc.CollectionReply,
34 xrpc.CollectionLike,
35 xrpc.CollectionCollection,
36 xrpc.CollectionCollectionItem,
37 xrpc.CollectionSembleCard,
38 xrpc.CollectionSembleCollection,
39 xrpc.CollectionSembleCollectionLink,
40 }
41
42 results := make(map[string]string)
43
44 client, err := getClient(ctx, did)
45 if err != nil {
46 return nil, err
47 }
48
49 for _, collectionNSID := range collections {
50 count := 0
51 cursor := ""
52 fetchedURIs := make(map[string]bool)
53
54 for {
55 url := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=100", client.PDS, did, collectionNSID)
56 if cursor != "" {
57 url += "&cursor=" + cursor
58 }
59
60 req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
61 req.Header.Set("Authorization", "Bearer "+client.AccessToken)
62
63 resp, err := http.DefaultClient.Do(req)
64 if err != nil {
65 return nil, fmt.Errorf("failed to fetch %s: %w", collectionNSID, err)
66 }
67 defer resp.Body.Close()
68
69 if resp.StatusCode != 200 {
70 body, _ := io.ReadAll(resp.Body)
71 results[collectionNSID] = fmt.Sprintf("error: %s", string(body))
72 break
73 }
74
75 var output struct {
76 Records []struct {
77 URI string `json:"uri"`
78 CID string `json:"cid"`
79 Value json.RawMessage `json:"value"`
80 } `json:"records"`
81 Cursor string `json:"cursor"`
82 }
83
84 if err := json.NewDecoder(resp.Body).Decode(&output); err != nil {
85 return nil, err
86 }
87
88 for _, rec := range output.Records {
89 if CIDVerificationEnabled && rec.CID != "" {
90 if err := crypto.VerifyRecordCID(rec.Value, rec.CID, rec.URI); err != nil {
91 log.Printf("CID verification failed for %s: %v (skipping)", rec.URI, err)
92 continue
93 }
94 }
95
96 err := s.upsertRecord(did, collectionNSID, rec.URI, rec.CID, rec.Value)
97 if err != nil {
98 fmt.Printf("Error upserting %s: %v\n", rec.URI, err)
99 } else {
100 count++
101 fetchedURIs[rec.URI] = true
102 }
103 }
104
105 if output.Cursor == "" {
106 break
107 }
108 cursor = output.Cursor
109 }
110
111 deletedCount := 0
112 if results[collectionNSID] == "" {
113 var localURIs []string
114 var err error
115
116 switch collectionNSID {
117 case xrpc.CollectionAnnotation:
118 localURIs, err = s.db.GetAnnotationURIs(did)
119 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionAnnotation)
120 case xrpc.CollectionHighlight:
121 localURIs, err = s.db.GetHighlightURIs(did)
122 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionHighlight)
123 case xrpc.CollectionBookmark:
124 localURIs, err = s.db.GetBookmarkURIs(did)
125 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionBookmark)
126 case xrpc.CollectionCollection:
127 cols, e := s.db.GetCollectionsByAuthor(did)
128 if e == nil {
129 for _, c := range cols {
130 localURIs = append(localURIs, c.URI)
131 }
132 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollection)
133 } else {
134 err = e
135 }
136 case xrpc.CollectionCollectionItem:
137 items, e := s.db.GetCollectionItemsByAuthor(did)
138 if e == nil {
139 for _, item := range items {
140 localURIs = append(localURIs, item.URI)
141 }
142 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionCollectionItem)
143 } else {
144 err = e
145 }
146 case xrpc.CollectionReply:
147 replies, e := s.db.GetRepliesByAuthor(did)
148 if e == nil {
149 for _, r := range replies {
150 localURIs = append(localURIs, r.URI)
151 }
152 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionReply)
153 } else {
154 err = e
155 }
156 case xrpc.CollectionLike:
157 likes, e := s.db.GetLikesByAuthor(did)
158 if e == nil {
159 for _, l := range likes {
160 localURIs = append(localURIs, l.URI)
161 }
162 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionLike)
163 } else {
164 err = e
165 }
166 case xrpc.CollectionSembleCard:
167 annos, e1 := s.db.GetAnnotationURIs(did)
168 books, e2 := s.db.GetBookmarkURIs(did)
169 if e1 != nil {
170 err = e1
171 break
172 }
173 if e2 != nil {
174 err = e2
175 break
176 }
177 localURIs = append(localURIs, annos...)
178 localURIs = append(localURIs, books...)
179 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCard)
180 case xrpc.CollectionSembleCollection:
181 cols, e := s.db.GetCollectionsByAuthor(did)
182 if e == nil {
183 for _, c := range cols {
184 localURIs = append(localURIs, c.URI)
185 }
186 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollection)
187 } else {
188 err = e
189 }
190 case xrpc.CollectionSembleCollectionLink:
191 items, e := s.db.GetCollectionItemsByAuthor(did)
192 if e == nil {
193 for _, item := range items {
194 localURIs = append(localURIs, item.URI)
195 }
196 localURIs = filterURIsByCollection(localURIs, xrpc.CollectionSembleCollectionLink)
197 } else {
198 err = e
199 }
200 }
201
202 if err == nil {
203 for _, uri := range localURIs {
204 if !fetchedURIs[uri] {
205 switch collectionNSID {
206 case xrpc.CollectionAnnotation:
207 _ = s.db.DeleteAnnotation(uri)
208 case xrpc.CollectionHighlight:
209 _ = s.db.DeleteHighlight(uri)
210 case xrpc.CollectionBookmark:
211 _ = s.db.DeleteBookmark(uri)
212 case xrpc.CollectionCollection:
213 _ = s.db.DeleteCollection(uri)
214 case xrpc.CollectionCollectionItem:
215 _ = s.db.RemoveFromCollection(uri)
216 case xrpc.CollectionReply:
217 _ = s.db.DeleteReply(uri)
218 case xrpc.CollectionLike:
219 _ = s.db.DeleteLike(uri)
220 case xrpc.CollectionSembleCard:
221 _ = s.db.DeleteAnnotation(uri)
222 _ = s.db.DeleteBookmark(uri)
223 case xrpc.CollectionSembleCollection:
224 _ = s.db.DeleteCollection(uri)
225 case xrpc.CollectionSembleCollectionLink:
226 _ = s.db.RemoveFromCollection(uri)
227 }
228 deletedCount++
229 }
230 }
231 }
232 }
233
234 if results[collectionNSID] == "" {
235 results[collectionNSID] = fmt.Sprintf("synced %d records, deleted %d stale", count, deletedCount)
236 }
237 }
238 return results, nil
239}
240
241func filterURIsByCollection(uris []string, collectionNSID string) []string {
242 if len(uris) == 0 || collectionNSID == "" {
243 return uris
244 }
245 needle := "/" + collectionNSID + "/"
246 out := make([]string, 0, len(uris))
247 for _, u := range uris {
248 if strings.Contains(u, needle) {
249 out = append(out, u)
250 }
251 }
252 return out
253}
254
255func strPtr(s string) *string {
256 if s == "" {
257 return nil
258 }
259 return &s
260}
261
262func (s *Service) upsertRecord(did, collection, uri, cid string, value json.RawMessage) error {
263 cidPtr := strPtr(cid)
264 switch collection {
265 case xrpc.CollectionAnnotation:
266 var record xrpc.AnnotationRecord
267 if err := json.Unmarshal(value, &record); err != nil {
268 return err
269 }
270
271 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
272
273 targetSource := record.Target.Source
274 if targetSource == "" {
275
276 }
277
278 targetHash := record.Target.SourceHash
279 if targetHash == "" && targetSource != "" {
280 targetHash = db.HashURL(targetSource)
281 }
282
283 motivation := record.Motivation
284 if motivation == "" {
285 motivation = "commenting"
286 }
287
288 var bodyValuePtr, bodyFormatPtr, bodyURIPtr, targetTitlePtr, selectorJSONPtr, tagsJSONPtr *string
289 if record.Body != nil {
290 if record.Body.Value != "" {
291 val := record.Body.Value
292 bodyValuePtr = &val
293 }
294 if record.Body.Format != "" {
295 fmt := record.Body.Format
296 bodyFormatPtr = &fmt
297 }
298 }
299 if record.Target.Title != "" {
300 t := record.Target.Title
301 targetTitlePtr = &t
302 }
303 if len(record.Target.Selector) > 0 {
304 selectorStr := string(record.Target.Selector)
305 selectorJSONPtr = &selectorStr
306 }
307 if len(record.Tags) > 0 {
308 tagsBytes, _ := json.Marshal(record.Tags)
309 tagsStr := string(tagsBytes)
310 tagsJSONPtr = &tagsStr
311 }
312
313 return s.db.CreateAnnotation(&db.Annotation{
314 URI: uri,
315 AuthorDID: did,
316 Motivation: motivation,
317 BodyValue: bodyValuePtr,
318 BodyFormat: bodyFormatPtr,
319 BodyURI: bodyURIPtr,
320 TargetSource: targetSource,
321 TargetHash: targetHash,
322 TargetTitle: targetTitlePtr,
323 SelectorJSON: selectorJSONPtr,
324 TagsJSON: tagsJSONPtr,
325 CreatedAt: createdAt,
326 IndexedAt: time.Now(),
327 CID: cidPtr,
328 })
329
330 case xrpc.CollectionHighlight:
331 var record xrpc.HighlightRecord
332 if err := json.Unmarshal(value, &record); err != nil {
333 return err
334 }
335
336 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
337 if createdAt.IsZero() {
338 createdAt = time.Now()
339 }
340
341 targetHash := record.Target.SourceHash
342 if targetHash == "" && record.Target.Source != "" {
343 targetHash = db.HashURL(record.Target.Source)
344 }
345
346 var titlePtr, selectorJSONPtr, colorPtr, tagsJSONPtr *string
347 if record.Target.Title != "" {
348 t := record.Target.Title
349 titlePtr = &t
350 }
351 if len(record.Target.Selector) > 0 {
352 selectorStr := string(record.Target.Selector)
353 selectorJSONPtr = &selectorStr
354 }
355 if record.Color != "" {
356 c := record.Color
357 colorPtr = &c
358 }
359 if len(record.Tags) > 0 {
360 tagsBytes, _ := json.Marshal(record.Tags)
361 tagsStr := string(tagsBytes)
362 tagsJSONPtr = &tagsStr
363 }
364
365 return s.db.CreateHighlight(&db.Highlight{
366 URI: uri,
367 AuthorDID: did,
368 TargetSource: record.Target.Source,
369 TargetHash: targetHash,
370 TargetTitle: titlePtr,
371 SelectorJSON: selectorJSONPtr,
372 Color: colorPtr,
373 TagsJSON: tagsJSONPtr,
374 CreatedAt: createdAt,
375 IndexedAt: time.Now(),
376 CID: cidPtr,
377 })
378
379 case xrpc.CollectionBookmark:
380 var record xrpc.BookmarkRecord
381 if err := json.Unmarshal(value, &record); err != nil {
382 return err
383 }
384
385 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
386
387 sourceHash := record.SourceHash
388 if sourceHash == "" && record.Source != "" {
389 sourceHash = db.HashURL(record.Source)
390 }
391
392 var titlePtr, descPtr, tagsJSONPtr *string
393 if record.Title != "" {
394 t := record.Title
395 titlePtr = &t
396 }
397 if record.Description != "" {
398 d := record.Description
399 descPtr = &d
400 }
401 if len(record.Tags) > 0 {
402 tagsBytes, _ := json.Marshal(record.Tags)
403 tagsStr := string(tagsBytes)
404 tagsJSONPtr = &tagsStr
405 }
406
407 return s.db.CreateBookmark(&db.Bookmark{
408 URI: uri,
409 AuthorDID: did,
410 Source: record.Source,
411 SourceHash: sourceHash,
412 Title: titlePtr,
413 Description: descPtr,
414 TagsJSON: tagsJSONPtr,
415 CreatedAt: createdAt,
416 IndexedAt: time.Now(),
417 CID: cidPtr,
418 })
419
420 case xrpc.CollectionCollection:
421 var record xrpc.CollectionRecord
422 if err := json.Unmarshal(value, &record); err != nil {
423 return err
424 }
425 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
426
427 var descPtr, iconPtr *string
428 if record.Description != "" {
429 d := record.Description
430 descPtr = &d
431 }
432 if record.Icon != "" {
433 i := record.Icon
434 iconPtr = &i
435 }
436
437 return s.db.CreateCollection(&db.Collection{
438 URI: uri,
439 AuthorDID: did,
440 Name: record.Name,
441 Description: descPtr,
442 Icon: iconPtr,
443 CreatedAt: createdAt,
444 IndexedAt: time.Now(),
445 })
446
447 case xrpc.CollectionCollectionItem:
448 var record xrpc.CollectionItemRecord
449 if err := json.Unmarshal(value, &record); err != nil {
450 return err
451 }
452 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
453
454 return s.db.AddToCollection(&db.CollectionItem{
455 URI: uri,
456 AuthorDID: did,
457 CollectionURI: record.Collection,
458 AnnotationURI: record.Annotation,
459 Position: record.Position,
460 CreatedAt: createdAt,
461 IndexedAt: time.Now(),
462 })
463
464 case xrpc.CollectionReply:
465 var record xrpc.ReplyRecord
466 if err := json.Unmarshal(value, &record); err != nil {
467 return err
468 }
469 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
470
471 var formatPtr *string
472 if record.Format != "" {
473 f := record.Format
474 formatPtr = &f
475 }
476
477 return s.db.CreateReply(&db.Reply{
478 URI: uri,
479 AuthorDID: did,
480 ParentURI: record.Parent.URI,
481 RootURI: record.Root.URI,
482 Text: record.Text,
483 Format: formatPtr,
484 CreatedAt: createdAt,
485 IndexedAt: time.Now(),
486 CID: cidPtr,
487 })
488
489 case xrpc.CollectionLike:
490 var record xrpc.LikeRecord
491 if err := json.Unmarshal(value, &record); err != nil {
492 return err
493 }
494 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
495
496 return s.db.CreateLike(&db.Like{
497 URI: uri,
498 AuthorDID: did,
499 SubjectURI: record.Subject.URI,
500 CreatedAt: createdAt,
501 IndexedAt: time.Now(),
502 })
503
504 case xrpc.CollectionSembleCard:
505 var card xrpc.SembleCard
506 if err := json.Unmarshal(value, &card); err != nil {
507 return err
508 }
509
510 createdAt := card.GetCreatedAtTime()
511
512 content, err := card.ParseContent()
513 if err != nil {
514 return nil
515 }
516
517 switch card.Type {
518 case "NOTE":
519 note, ok := content.(*xrpc.SembleNoteContent)
520 if !ok {
521 return nil
522 }
523
524 targetSource := card.URL
525 if targetSource == "" {
526 return nil
527 }
528
529 targetHash := db.HashURL(targetSource)
530 motivation := "commenting"
531 bodyValue := note.Text
532
533 return s.db.CreateAnnotation(&db.Annotation{
534 URI: uri,
535 AuthorDID: did,
536 Motivation: motivation,
537 BodyValue: &bodyValue,
538 TargetSource: targetSource,
539 TargetHash: targetHash,
540 CreatedAt: createdAt,
541 IndexedAt: time.Now(),
542 CID: cidPtr,
543 })
544
545 case "URL":
546 urlContent, ok := content.(*xrpc.SembleURLContent)
547 if !ok {
548 return nil
549 }
550
551 source := urlContent.URL
552 if source == "" {
553 return nil
554 }
555 sourceHash := db.HashURL(source)
556
557 var titlePtr *string
558 if urlContent.Metadata != nil && urlContent.Metadata.Title != "" {
559 t := urlContent.Metadata.Title
560 titlePtr = &t
561 }
562
563 return s.db.CreateBookmark(&db.Bookmark{
564 URI: uri,
565 AuthorDID: did,
566 Source: source,
567 SourceHash: sourceHash,
568 Title: titlePtr,
569 CreatedAt: createdAt,
570 IndexedAt: time.Now(),
571 CID: cidPtr,
572 })
573 }
574
575 case xrpc.CollectionSembleCollection:
576 var record xrpc.SembleCollection
577 if err := json.Unmarshal(value, &record); err != nil {
578 return err
579 }
580 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
581
582 var descPtr, iconPtr *string
583 if record.Description != "" {
584 d := record.Description
585 descPtr = &d
586 }
587 icon := "icon:semble"
588 iconPtr = &icon
589
590 return s.db.CreateCollection(&db.Collection{
591 URI: uri,
592 AuthorDID: did,
593 Name: record.Name,
594 Description: descPtr,
595 Icon: iconPtr,
596 CreatedAt: createdAt,
597 IndexedAt: time.Now(),
598 })
599
600 case xrpc.CollectionSembleCollectionLink:
601 var record xrpc.SembleCollectionLink
602 if err := json.Unmarshal(value, &record); err != nil {
603 return err
604 }
605 createdAt, _ := time.Parse(time.RFC3339, record.CreatedAt)
606
607 return s.db.AddToCollection(&db.CollectionItem{
608 URI: uri,
609 AuthorDID: did,
610 CollectionURI: record.Collection.URI,
611 AnnotationURI: record.Card.URI,
612 Position: 0,
613 CreatedAt: createdAt,
614 IndexedAt: time.Now(),
615 })
616 }
617 return nil
618}