tangled
alpha
login
or
join now
why.bsky.team
/
konbini
25
fork
atom
A locally focused bluesky appview
25
fork
atom
overview
issues
1
pulls
pipelines
missing post fetcher
whyrusleeping
5 months ago
54a74e9e
066dd3fa
+68
3 changed files
expand all
collapse all
unified
split
handlers.go
main.go
missing.go
+1
handlers.go
···
351
351
352
352
uri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", r.Did, p.Rkey)
353
353
if len(p.Raw) == 0 || p.NotFound {
354
354
+
s.addMissingPost(ctx, uri)
354
355
posts[ix] = postResponse{
355
356
Uri: uri,
356
357
Missing: true,
+3
main.go
···
159
159
dir: dir,
160
160
161
161
missingProfiles: make(chan string, 1024),
162
162
+
missingPosts: make(chan string, 1024),
162
163
}
163
164
164
165
pgb := &PostgresBackend{
···
193
194
}()
194
195
195
196
go s.missingProfileFetcher()
197
197
+
go s.missingPostFetcher()
196
198
197
199
seqno, err := loadLastSeq("sequence.txt")
198
200
if err != nil {
···
219
221
220
222
mpLk sync.Mutex
221
223
missingProfiles chan string
224
224
+
missingPosts chan string
222
225
}
223
226
224
227
func (s *Server) getXrpcClient() (*xrpc.Client, error) {
+64
missing.go
···
4
4
"bytes"
5
5
"context"
6
6
"fmt"
7
7
+
"strings"
7
8
8
9
"github.com/bluesky-social/indigo/api/atproto"
9
10
"github.com/bluesky-social/indigo/api/bsky"
···
65
66
66
67
return s.backend.HandleUpdateProfile(ctx, repo, "self", "", buf.Bytes(), cc)
67
68
}
69
69
+
70
70
+
func (s *Server) addMissingPost(ctx context.Context, uri string) {
71
71
+
select {
72
72
+
case s.missingPosts <- uri:
73
73
+
case <-ctx.Done():
74
74
+
}
75
75
+
}
76
76
+
77
77
+
func (s *Server) missingPostFetcher() {
78
78
+
for uri := range s.missingPosts {
79
79
+
if err := s.fetchMissingPost(context.TODO(), uri); err != nil {
80
80
+
log.Warn("failed to fetch missing post", "uri", uri, "error", err)
81
81
+
}
82
82
+
}
83
83
+
}
84
84
+
85
85
+
func (s *Server) fetchMissingPost(ctx context.Context, uri string) error {
86
86
+
// Parse AT URI: at://did:plc:xxx/app.bsky.feed.post/rkey
87
87
+
parts := strings.Split(uri, "/")
88
88
+
if len(parts) < 5 || !strings.HasPrefix(parts[2], "did:") {
89
89
+
return fmt.Errorf("invalid AT URI: %s", uri)
90
90
+
}
91
91
+
92
92
+
did := parts[2]
93
93
+
collection := parts[3]
94
94
+
rkey := parts[4]
95
95
+
96
96
+
repo, err := s.backend.getOrCreateRepo(ctx, did)
97
97
+
if err != nil {
98
98
+
return err
99
99
+
}
100
100
+
101
101
+
resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
102
102
+
if err != nil {
103
103
+
return err
104
104
+
}
105
105
+
106
106
+
c := &xrpc.Client{
107
107
+
Host: resp.PDSEndpoint(),
108
108
+
}
109
109
+
110
110
+
rec, err := atproto.RepoGetRecord(ctx, c, "", collection, did, rkey)
111
111
+
if err != nil {
112
112
+
return err
113
113
+
}
114
114
+
115
115
+
post, ok := rec.Value.Val.(*bsky.FeedPost)
116
116
+
if !ok {
117
117
+
return fmt.Errorf("record we got back wasn't a post somehow")
118
118
+
}
119
119
+
120
120
+
buf := new(bytes.Buffer)
121
121
+
if err := post.MarshalCBOR(buf); err != nil {
122
122
+
return err
123
123
+
}
124
124
+
125
125
+
cc, err := cid.Decode(*rec.Cid)
126
126
+
if err != nil {
127
127
+
return err
128
128
+
}
129
129
+
130
130
+
return s.backend.HandleCreatePost(ctx, repo, rkey, buf.Bytes(), cc)
131
131
+
}