this repo has no description
1package server
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/syntax"
11 "github.com/gocql/gocql"
12 vyletdatabase "github.com/vylet-app/go/database/proto"
13 "github.com/vylet-app/go/internal/helpers"
14 "google.golang.org/protobuf/types/known/timestamppb"
15)
16
17func (s *Server) getPostImages(ctx context.Context, postUri string) ([]*vyletdatabase.Image, error) {
18 query := `
19 SELECT image_index, cid, alt, width, height, size, mime
20 FROM images_by_post
21 WHERE post_uri = ?
22 ORDER BY image_index ASC
23 `
24
25 iter := s.cqlSession.Query(query, postUri).WithContext(ctx).Iter()
26 defer iter.Close()
27
28 var images []*vyletdatabase.Image
29
30 for {
31 img := &vyletdatabase.Image{}
32 var imageIndex int
33
34 if !iter.Scan(
35 &imageIndex,
36 &img.Cid,
37 &img.Alt,
38 &img.Width,
39 &img.Height,
40 &img.Size,
41 &img.Mime,
42 ) {
43 break
44 }
45
46 images = append(images, img)
47 }
48
49 if err := iter.Close(); err != nil {
50 return nil, fmt.Errorf("failed to iterate images: %w", err)
51 }
52
53 return images, nil
54}
55
56func (s *Server) CreatePost(ctx context.Context, req *vyletdatabase.CreatePostRequest) (*vyletdatabase.CreatePostResponse, error) {
57 logger := s.logger.With("name", "CreatePost")
58
59 aturi, err := syntax.ParseATURI(req.Post.Uri)
60 if err != nil {
61 return nil, fmt.Errorf("failed to parse aturi: %w", err)
62 }
63
64 did := aturi.Authority().String()
65 now := time.Now().UTC()
66
67 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx)
68
69 postArgs := []any{
70 req.Post.Uri,
71 req.Post.Cid,
72 did,
73 req.Post.Caption,
74 req.Post.Facets,
75 req.Post.CreatedAt.AsTime(),
76 now,
77 }
78
79 postQuery := `
80 INSERT INTO %s
81 (uri, cid, author_did, caption, facets, created_at, indexed_at)
82 VALUES
83 (?, ?, ?, ?, ?, ?, ?)
84 `
85
86 batch.Query(fmt.Sprintf(postQuery, "posts_by_uri"), postArgs...)
87 batch.Query(fmt.Sprintf(postQuery, "posts_by_actor"), postArgs...)
88
89 for idx, img := range req.Post.Images {
90 batch.Query(
91 `INSERT INTO images_by_post
92 (post_uri, image_index, cid, alt, width, height, size, mime)
93 VALUES
94 (?, ?, ?, ?, ?, ?, ?, ?)`,
95 req.Post.Uri,
96 idx,
97 img.Cid,
98 img.Alt,
99 img.Width,
100 img.Height,
101 img.Size,
102 img.Mime,
103 )
104 }
105
106 if err := s.cqlSession.ExecuteBatch(batch); err != nil {
107 logger.Error("failed to create post", "uri", req.Post.Uri, "err", err)
108 return &vyletdatabase.CreatePostResponse{
109 Error: helpers.ToStringPtr(err.Error()),
110 }, nil
111 }
112
113 return &vyletdatabase.CreatePostResponse{}, nil
114}
115
116func (s *Server) DeletePost(ctx context.Context, req *vyletdatabase.DeletePostRequest) (*vyletdatabase.DeletePostResponse, error) {
117 logger := s.logger.With("name", "DeletePost", "uri", req.Uri)
118
119 aturi, err := syntax.ParseATURI(req.Uri)
120 if err != nil {
121 return nil, fmt.Errorf("failed to parse aturi: %w", err)
122 }
123 did := aturi.Authority().String()
124
125 var createdAt time.Time
126 query := `
127 SELECT created_at
128 FROM posts_by_uri
129 WHERE uri = ?
130 `
131 if err := s.cqlSession.Query(query, req.Uri).WithContext(ctx).Scan(&createdAt); err != nil {
132 if err == gocql.ErrNotFound {
133 logger.Warn("post not found", "uri", req.Uri)
134 return &vyletdatabase.DeletePostResponse{
135 Error: helpers.ToStringPtr("post not found"),
136 }, nil
137 }
138 logger.Error("failed to fetch post", "uri", req.Uri, "err", err)
139 return &vyletdatabase.DeletePostResponse{
140 Error: helpers.ToStringPtr(err.Error()),
141 }, nil
142 }
143
144 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx)
145
146 batch.Query(`
147 DELETE FROM posts_by_uri
148 WHERE uri = ?
149 `, req.Uri)
150
151 batch.Query(`
152 DELETE FROM posts_by_actor
153 WHERE author_did = ? AND created_at = ? AND uri = ?
154 `, did, createdAt, req.Uri)
155
156 batch.Query(`
157 DELETE FROM images_by_post
158 WHERE post_uri = ?
159 `, req.Uri)
160
161 if err := s.cqlSession.ExecuteBatch(batch); err != nil {
162 logger.Error("failed to delete post", "uri", req.Uri, "err", err)
163 return &vyletdatabase.DeletePostResponse{
164 Error: helpers.ToStringPtr(err.Error()),
165 }, nil
166 }
167
168 return &vyletdatabase.DeletePostResponse{}, nil
169}
170
171func (s *Server) GetPosts(ctx context.Context, req *vyletdatabase.GetPostsRequest) (*vyletdatabase.GetPostsResponse, error) {
172 logger := s.logger.With("name", "GetPosts", "uris", req.Uris)
173
174 if len(req.Uris) == 0 {
175 return nil, fmt.Errorf("at least one URI must be specified")
176 }
177
178 query := `
179 SELECT uri, cid, author_did, caption, facets, created_at, indexed_at
180 FROM posts_by_uri
181 WHERE uri IN ?
182 `
183
184 iter := s.cqlSession.Query(query, req.Uris).WithContext(ctx).Iter()
185 defer iter.Close()
186
187 posts := make(map[string]*vyletdatabase.Post)
188 for {
189 post := &vyletdatabase.Post{}
190 var createdAt, indexedAt time.Time
191
192 if !iter.Scan(
193 &post.Uri,
194 &post.Cid,
195 &post.AuthorDid,
196 &post.Caption,
197 &post.Facets,
198 &createdAt,
199 &indexedAt,
200 ) {
201 break
202 }
203
204 post.CreatedAt = timestamppb.New(createdAt)
205 post.IndexedAt = timestamppb.New(indexedAt)
206
207 images, err := s.getPostImages(ctx, post.Uri)
208 if err != nil {
209 logger.Warn("failed to fetch images for post", "uri", post.Uri, "err", err)
210 } else {
211 post.Images = images
212 }
213
214 posts[post.Uri] = post
215 }
216
217 if err := iter.Close(); err != nil {
218 logger.Error("failed to iterate posts", "err", err)
219 return &vyletdatabase.GetPostsResponse{
220 Error: helpers.ToStringPtr(err.Error()),
221 }, nil
222 }
223
224 return &vyletdatabase.GetPostsResponse{
225 Posts: posts,
226 }, nil
227}
228
229func (s *Server) GetPostsByActor(ctx context.Context, req *vyletdatabase.GetPostsByActorRequest) (*vyletdatabase.GetPostsByActorResponse, error) {
230 logger := s.logger.With("name", "GetPostsByActor", "did", req.Did)
231
232 if req.Limit <= 0 {
233 return nil, fmt.Errorf("limit must be greater than 0")
234 }
235
236 var (
237 query string
238 args []any
239 )
240
241 if req.Cursor != nil && *req.Cursor != "" {
242 cursorParts := strings.SplitN(*req.Cursor, "|", 2)
243 if len(cursorParts) != 2 {
244 logger.Error("invalid cursor format", "cursor", *req.Cursor)
245 return &vyletdatabase.GetPostsByActorResponse{
246 Error: helpers.ToStringPtr("invalid cursor format"),
247 }, nil
248 }
249
250 cursorTime, err := time.Parse(time.RFC3339Nano, cursorParts[0])
251 if err != nil {
252 logger.Error("failed to parse cursor timestamp", "cursor", *req.Cursor, "err", err)
253 return &vyletdatabase.GetPostsByActorResponse{
254 Error: helpers.ToStringPtr("invalid cursor format"),
255 }, nil
256 }
257 cursorUri := cursorParts[1]
258
259 query = `
260 SELECT uri, cid, author_did, caption, facets, created_at, indexed_at
261 FROM posts_by_actor
262 WHERE author_did = ? AND (created_at, uri) < (?, ?)
263 ORDER BY created_at DESC, uri ASC
264 LIMIT ?
265 `
266 args = []any{req.Did, cursorTime, cursorUri, req.Limit + 1}
267 } else {
268 query = `
269 SELECT uri, cid, author_did, caption, facets, created_at, indexed_at
270 FROM posts_by_actor
271 WHERE author_did = ?
272 ORDER BY created_at DESC, uri ASC
273 LIMIT ?
274 `
275 args = []any{req.Did, req.Limit + 1}
276 }
277
278 iter := s.cqlSession.Query(query, args...).WithContext(ctx).Iter()
279 defer iter.Close()
280
281 var postsList []*vyletdatabase.Post
282 for {
283 post := &vyletdatabase.Post{}
284 var createdAt, indexedAt time.Time
285
286 if !iter.Scan(
287 &post.Uri,
288 &post.Cid,
289 &post.AuthorDid,
290 &post.Caption,
291 &post.Facets,
292 &createdAt,
293 &indexedAt,
294 ) {
295 break
296 }
297
298 post.CreatedAt = timestamppb.New(createdAt)
299 post.IndexedAt = timestamppb.New(indexedAt)
300 postsList = append(postsList, post)
301 }
302
303 if err := iter.Close(); err != nil {
304 logger.Error("failed to iterate posts", "err", err)
305 return &vyletdatabase.GetPostsByActorResponse{
306 Error: helpers.ToStringPtr(err.Error()),
307 }, nil
308 }
309
310 var nextCursor *string
311 if len(postsList) > int(req.Limit) {
312 postsList = postsList[:req.Limit]
313 lastPost := postsList[len(postsList)-1]
314 cursorStr := fmt.Sprintf("%s|%s",
315 lastPost.CreatedAt.AsTime().Format(time.RFC3339Nano),
316 lastPost.Uri)
317 nextCursor = &cursorStr
318 }
319
320 posts := make(map[string]*vyletdatabase.Post)
321 for _, post := range postsList {
322 images, err := s.getPostImages(ctx, post.Uri)
323 if err != nil {
324 logger.Warn("failed to fetch images for post", "uri", post.Uri, "err", err)
325 } else {
326 post.Images = images
327 }
328 posts[post.Uri] = post
329 }
330
331 return &vyletdatabase.GetPostsByActorResponse{
332 Posts: posts,
333 Cursor: nextCursor,
334 }, nil
335}
336
337func (s *Server) GetPostInteractionCounts(ctx context.Context, req *vyletdatabase.GetPostInteractionCountsRequest) (*vyletdatabase.GetPostInteractionCountsResponse, error) {
338 logger := s.logger.With("name", "GetPostInteractionCounts", "uri", req.Uri)
339
340 var likeCount, replyCount int64
341
342 query := `
343 SELECT like_count
344 FROM post_interaction_counts
345 WHERE post_uri = ?
346 `
347
348 err := s.cqlSession.Query(query, req.Uri).WithContext(ctx).Scan(&likeCount)
349 if err != nil {
350 if errors.Is(err, gocql.ErrNotFound) {
351 return &vyletdatabase.GetPostInteractionCountsResponse{
352 Counts: &vyletdatabase.PostInteractionCounts{
353 Likes: 0,
354 Replies: 0,
355 },
356 }, nil
357 }
358 logger.Error("failed to fetch interaction counts", "uri", req.Uri, "err", err)
359 return &vyletdatabase.GetPostInteractionCountsResponse{
360 Error: helpers.ToStringPtr(err.Error()),
361 }, nil
362 }
363
364 return &vyletdatabase.GetPostInteractionCountsResponse{
365 Counts: &vyletdatabase.PostInteractionCounts{
366 Likes: likeCount,
367 Replies: replyCount,
368 },
369 }, nil
370}
371
372func (s *Server) GetPostsInteractionCounts(ctx context.Context, req *vyletdatabase.GetPostsInteractionCountsRequest) (*vyletdatabase.GetPostsInteractionCountsResponse, error) {
373 logger := s.logger.With("name", "GetPostsInteractionCounts")
374
375 query := `
376 SELECT post_uri, like_count
377 FROM post_interaction_counts
378 WHERE post_uri IN ?
379 `
380
381 iter := s.cqlSession.Query(query, req.Uris).WithContext(ctx).Iter()
382 defer iter.Close()
383
384 counts := make(map[string]*vyletdatabase.PostInteractionCounts)
385
386 var uri string
387 var likeCount, replyCount int64
388 for iter.Scan(&uri, &likeCount) {
389 counts[uri] = &vyletdatabase.PostInteractionCounts{
390 Likes: likeCount,
391 Replies: replyCount,
392 }
393 }
394
395 if err := iter.Close(); err != nil {
396 logger.Error("failed to iterate interaction counts", "err", err)
397 return &vyletdatabase.GetPostsInteractionCountsResponse{
398 Error: helpers.ToStringPtr(err.Error()),
399 }, nil
400 }
401
402 for _, uri := range req.Uris {
403 if _, exists := counts[uri]; !exists {
404 counts[uri] = &vyletdatabase.PostInteractionCounts{
405 Likes: 0,
406 Replies: 0,
407 }
408 }
409 }
410
411 return &vyletdatabase.GetPostsInteractionCountsResponse{
412 Counts: counts,
413 }, nil
414}