this repo has no description
at main 414 lines 10 kB view raw
1package server 2 3import ( 4 "context" 5 "errors" 6 "fmt" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/syntax" 11 "github.com/gocql/gocql" 12 vyletdatabase "github.com/vylet-app/go/database/proto" 13 "github.com/vylet-app/go/internal/helpers" 14 "google.golang.org/protobuf/types/known/timestamppb" 15) 16 17func (s *Server) getPostImages(ctx context.Context, postUri string) ([]*vyletdatabase.Image, error) { 18 query := ` 19 SELECT image_index, cid, alt, width, height, size, mime 20 FROM images_by_post 21 WHERE post_uri = ? 22 ORDER BY image_index ASC 23 ` 24 25 iter := s.cqlSession.Query(query, postUri).WithContext(ctx).Iter() 26 defer iter.Close() 27 28 var images []*vyletdatabase.Image 29 30 for { 31 img := &vyletdatabase.Image{} 32 var imageIndex int 33 34 if !iter.Scan( 35 &imageIndex, 36 &img.Cid, 37 &img.Alt, 38 &img.Width, 39 &img.Height, 40 &img.Size, 41 &img.Mime, 42 ) { 43 break 44 } 45 46 images = append(images, img) 47 } 48 49 if err := iter.Close(); err != nil { 50 return nil, fmt.Errorf("failed to iterate images: %w", err) 51 } 52 53 return images, nil 54} 55 56func (s *Server) CreatePost(ctx context.Context, req *vyletdatabase.CreatePostRequest) (*vyletdatabase.CreatePostResponse, error) { 57 logger := s.logger.With("name", "CreatePost") 58 59 aturi, err := syntax.ParseATURI(req.Post.Uri) 60 if err != nil { 61 return nil, fmt.Errorf("failed to parse aturi: %w", err) 62 } 63 64 did := aturi.Authority().String() 65 now := time.Now().UTC() 66 67 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx) 68 69 postArgs := []any{ 70 req.Post.Uri, 71 req.Post.Cid, 72 did, 73 req.Post.Caption, 74 req.Post.Facets, 75 req.Post.CreatedAt.AsTime(), 76 now, 77 } 78 79 postQuery := ` 80 INSERT INTO %s 81 (uri, cid, author_did, caption, facets, created_at, indexed_at) 82 VALUES 83 (?, ?, ?, ?, ?, ?, ?) 84 ` 85 86 batch.Query(fmt.Sprintf(postQuery, "posts_by_uri"), postArgs...) 87 batch.Query(fmt.Sprintf(postQuery, "posts_by_actor"), postArgs...) 88 89 for idx, img := range req.Post.Images { 90 batch.Query( 91 `INSERT INTO images_by_post 92 (post_uri, image_index, cid, alt, width, height, size, mime) 93 VALUES 94 (?, ?, ?, ?, ?, ?, ?, ?)`, 95 req.Post.Uri, 96 idx, 97 img.Cid, 98 img.Alt, 99 img.Width, 100 img.Height, 101 img.Size, 102 img.Mime, 103 ) 104 } 105 106 if err := s.cqlSession.ExecuteBatch(batch); err != nil { 107 logger.Error("failed to create post", "uri", req.Post.Uri, "err", err) 108 return &vyletdatabase.CreatePostResponse{ 109 Error: helpers.ToStringPtr(err.Error()), 110 }, nil 111 } 112 113 return &vyletdatabase.CreatePostResponse{}, nil 114} 115 116func (s *Server) DeletePost(ctx context.Context, req *vyletdatabase.DeletePostRequest) (*vyletdatabase.DeletePostResponse, error) { 117 logger := s.logger.With("name", "DeletePost", "uri", req.Uri) 118 119 aturi, err := syntax.ParseATURI(req.Uri) 120 if err != nil { 121 return nil, fmt.Errorf("failed to parse aturi: %w", err) 122 } 123 did := aturi.Authority().String() 124 125 var createdAt time.Time 126 query := ` 127 SELECT created_at 128 FROM posts_by_uri 129 WHERE uri = ? 130 ` 131 if err := s.cqlSession.Query(query, req.Uri).WithContext(ctx).Scan(&createdAt); err != nil { 132 if err == gocql.ErrNotFound { 133 logger.Warn("post not found", "uri", req.Uri) 134 return &vyletdatabase.DeletePostResponse{ 135 Error: helpers.ToStringPtr("post not found"), 136 }, nil 137 } 138 logger.Error("failed to fetch post", "uri", req.Uri, "err", err) 139 return &vyletdatabase.DeletePostResponse{ 140 Error: helpers.ToStringPtr(err.Error()), 141 }, nil 142 } 143 144 batch := s.cqlSession.NewBatch(gocql.LoggedBatch).WithContext(ctx) 145 146 batch.Query(` 147 DELETE FROM posts_by_uri 148 WHERE uri = ? 149 `, req.Uri) 150 151 batch.Query(` 152 DELETE FROM posts_by_actor 153 WHERE author_did = ? AND created_at = ? AND uri = ? 154 `, did, createdAt, req.Uri) 155 156 batch.Query(` 157 DELETE FROM images_by_post 158 WHERE post_uri = ? 159 `, req.Uri) 160 161 if err := s.cqlSession.ExecuteBatch(batch); err != nil { 162 logger.Error("failed to delete post", "uri", req.Uri, "err", err) 163 return &vyletdatabase.DeletePostResponse{ 164 Error: helpers.ToStringPtr(err.Error()), 165 }, nil 166 } 167 168 return &vyletdatabase.DeletePostResponse{}, nil 169} 170 171func (s *Server) GetPosts(ctx context.Context, req *vyletdatabase.GetPostsRequest) (*vyletdatabase.GetPostsResponse, error) { 172 logger := s.logger.With("name", "GetPosts", "uris", req.Uris) 173 174 if len(req.Uris) == 0 { 175 return nil, fmt.Errorf("at least one URI must be specified") 176 } 177 178 query := ` 179 SELECT uri, cid, author_did, caption, facets, created_at, indexed_at 180 FROM posts_by_uri 181 WHERE uri IN ? 182 ` 183 184 iter := s.cqlSession.Query(query, req.Uris).WithContext(ctx).Iter() 185 defer iter.Close() 186 187 posts := make(map[string]*vyletdatabase.Post) 188 for { 189 post := &vyletdatabase.Post{} 190 var createdAt, indexedAt time.Time 191 192 if !iter.Scan( 193 &post.Uri, 194 &post.Cid, 195 &post.AuthorDid, 196 &post.Caption, 197 &post.Facets, 198 &createdAt, 199 &indexedAt, 200 ) { 201 break 202 } 203 204 post.CreatedAt = timestamppb.New(createdAt) 205 post.IndexedAt = timestamppb.New(indexedAt) 206 207 images, err := s.getPostImages(ctx, post.Uri) 208 if err != nil { 209 logger.Warn("failed to fetch images for post", "uri", post.Uri, "err", err) 210 } else { 211 post.Images = images 212 } 213 214 posts[post.Uri] = post 215 } 216 217 if err := iter.Close(); err != nil { 218 logger.Error("failed to iterate posts", "err", err) 219 return &vyletdatabase.GetPostsResponse{ 220 Error: helpers.ToStringPtr(err.Error()), 221 }, nil 222 } 223 224 return &vyletdatabase.GetPostsResponse{ 225 Posts: posts, 226 }, nil 227} 228 229func (s *Server) GetPostsByActor(ctx context.Context, req *vyletdatabase.GetPostsByActorRequest) (*vyletdatabase.GetPostsByActorResponse, error) { 230 logger := s.logger.With("name", "GetPostsByActor", "did", req.Did) 231 232 if req.Limit <= 0 { 233 return nil, fmt.Errorf("limit must be greater than 0") 234 } 235 236 var ( 237 query string 238 args []any 239 ) 240 241 if req.Cursor != nil && *req.Cursor != "" { 242 cursorParts := strings.SplitN(*req.Cursor, "|", 2) 243 if len(cursorParts) != 2 { 244 logger.Error("invalid cursor format", "cursor", *req.Cursor) 245 return &vyletdatabase.GetPostsByActorResponse{ 246 Error: helpers.ToStringPtr("invalid cursor format"), 247 }, nil 248 } 249 250 cursorTime, err := time.Parse(time.RFC3339Nano, cursorParts[0]) 251 if err != nil { 252 logger.Error("failed to parse cursor timestamp", "cursor", *req.Cursor, "err", err) 253 return &vyletdatabase.GetPostsByActorResponse{ 254 Error: helpers.ToStringPtr("invalid cursor format"), 255 }, nil 256 } 257 cursorUri := cursorParts[1] 258 259 query = ` 260 SELECT uri, cid, author_did, caption, facets, created_at, indexed_at 261 FROM posts_by_actor 262 WHERE author_did = ? AND (created_at, uri) < (?, ?) 263 ORDER BY created_at DESC, uri ASC 264 LIMIT ? 265 ` 266 args = []any{req.Did, cursorTime, cursorUri, req.Limit + 1} 267 } else { 268 query = ` 269 SELECT uri, cid, author_did, caption, facets, created_at, indexed_at 270 FROM posts_by_actor 271 WHERE author_did = ? 272 ORDER BY created_at DESC, uri ASC 273 LIMIT ? 274 ` 275 args = []any{req.Did, req.Limit + 1} 276 } 277 278 iter := s.cqlSession.Query(query, args...).WithContext(ctx).Iter() 279 defer iter.Close() 280 281 var postsList []*vyletdatabase.Post 282 for { 283 post := &vyletdatabase.Post{} 284 var createdAt, indexedAt time.Time 285 286 if !iter.Scan( 287 &post.Uri, 288 &post.Cid, 289 &post.AuthorDid, 290 &post.Caption, 291 &post.Facets, 292 &createdAt, 293 &indexedAt, 294 ) { 295 break 296 } 297 298 post.CreatedAt = timestamppb.New(createdAt) 299 post.IndexedAt = timestamppb.New(indexedAt) 300 postsList = append(postsList, post) 301 } 302 303 if err := iter.Close(); err != nil { 304 logger.Error("failed to iterate posts", "err", err) 305 return &vyletdatabase.GetPostsByActorResponse{ 306 Error: helpers.ToStringPtr(err.Error()), 307 }, nil 308 } 309 310 var nextCursor *string 311 if len(postsList) > int(req.Limit) { 312 postsList = postsList[:req.Limit] 313 lastPost := postsList[len(postsList)-1] 314 cursorStr := fmt.Sprintf("%s|%s", 315 lastPost.CreatedAt.AsTime().Format(time.RFC3339Nano), 316 lastPost.Uri) 317 nextCursor = &cursorStr 318 } 319 320 posts := make(map[string]*vyletdatabase.Post) 321 for _, post := range postsList { 322 images, err := s.getPostImages(ctx, post.Uri) 323 if err != nil { 324 logger.Warn("failed to fetch images for post", "uri", post.Uri, "err", err) 325 } else { 326 post.Images = images 327 } 328 posts[post.Uri] = post 329 } 330 331 return &vyletdatabase.GetPostsByActorResponse{ 332 Posts: posts, 333 Cursor: nextCursor, 334 }, nil 335} 336 337func (s *Server) GetPostInteractionCounts(ctx context.Context, req *vyletdatabase.GetPostInteractionCountsRequest) (*vyletdatabase.GetPostInteractionCountsResponse, error) { 338 logger := s.logger.With("name", "GetPostInteractionCounts", "uri", req.Uri) 339 340 var likeCount, replyCount int64 341 342 query := ` 343 SELECT like_count 344 FROM post_interaction_counts 345 WHERE post_uri = ? 346 ` 347 348 err := s.cqlSession.Query(query, req.Uri).WithContext(ctx).Scan(&likeCount) 349 if err != nil { 350 if errors.Is(err, gocql.ErrNotFound) { 351 return &vyletdatabase.GetPostInteractionCountsResponse{ 352 Counts: &vyletdatabase.PostInteractionCounts{ 353 Likes: 0, 354 Replies: 0, 355 }, 356 }, nil 357 } 358 logger.Error("failed to fetch interaction counts", "uri", req.Uri, "err", err) 359 return &vyletdatabase.GetPostInteractionCountsResponse{ 360 Error: helpers.ToStringPtr(err.Error()), 361 }, nil 362 } 363 364 return &vyletdatabase.GetPostInteractionCountsResponse{ 365 Counts: &vyletdatabase.PostInteractionCounts{ 366 Likes: likeCount, 367 Replies: replyCount, 368 }, 369 }, nil 370} 371 372func (s *Server) GetPostsInteractionCounts(ctx context.Context, req *vyletdatabase.GetPostsInteractionCountsRequest) (*vyletdatabase.GetPostsInteractionCountsResponse, error) { 373 logger := s.logger.With("name", "GetPostsInteractionCounts") 374 375 query := ` 376 SELECT post_uri, like_count 377 FROM post_interaction_counts 378 WHERE post_uri IN ? 379 ` 380 381 iter := s.cqlSession.Query(query, req.Uris).WithContext(ctx).Iter() 382 defer iter.Close() 383 384 counts := make(map[string]*vyletdatabase.PostInteractionCounts) 385 386 var uri string 387 var likeCount, replyCount int64 388 for iter.Scan(&uri, &likeCount) { 389 counts[uri] = &vyletdatabase.PostInteractionCounts{ 390 Likes: likeCount, 391 Replies: replyCount, 392 } 393 } 394 395 if err := iter.Close(); err != nil { 396 logger.Error("failed to iterate interaction counts", "err", err) 397 return &vyletdatabase.GetPostsInteractionCountsResponse{ 398 Error: helpers.ToStringPtr(err.Error()), 399 }, nil 400 } 401 402 for _, uri := range req.Uris { 403 if _, exists := counts[uri]; !exists { 404 counts[uri] = &vyletdatabase.PostInteractionCounts{ 405 Likes: 0, 406 Replies: 0, 407 } 408 } 409 } 410 411 return &vyletdatabase.GetPostsInteractionCountsResponse{ 412 Counts: counts, 413 }, nil 414}