A community based topic aggregation platform built on atproto

test: add E2E tests for comments and community updates

Add true E2E integration tests that verify the full write-forward flow
through real PDS and Jetstream infrastructure:

- Comment E2E tests: create, update, delete with real Jetstream indexing
- Comment authorization tests: verify users cannot modify others' comments
- Comment validation tests: verify proper error handling
- Community update E2E tests: single and multiple updates via Jetstream

These tests require dev infrastructure (make dev-up) and are skipped
in short mode. Fixed race condition where Jetstream subscription started
after create event was emitted.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

+1569
+1188
tests/integration/comment_e2e_test.go
··· 1 + package integration 2 + 3 + import ( 4 + "Coves/internal/atproto/jetstream" 5 + "Coves/internal/atproto/pds" 6 + "Coves/internal/atproto/utils" 7 + "Coves/internal/core/comments" 8 + "Coves/internal/db/postgres" 9 + "context" 10 + "database/sql" 11 + "encoding/json" 12 + "fmt" 13 + "io" 14 + "net" 15 + "net/http" 16 + "os" 17 + "strings" 18 + "testing" 19 + "time" 20 + 21 + oauthlib "github.com/bluesky-social/indigo/atproto/auth/oauth" 22 + "github.com/bluesky-social/indigo/atproto/syntax" 23 + "github.com/gorilla/websocket" 24 + _ "github.com/lib/pq" 25 + "github.com/pressly/goose/v3" 26 + ) 27 + 28 + // TestCommentE2E_CreateWithJetstream tests the full comment creation flow with real Jetstream 29 + // Flow: Client → Service → PDS Write → Jetstream Firehose → Consumer → AppView 30 + func TestCommentE2E_CreateWithJetstream(t *testing.T) { 31 + if testing.Short() { 32 + t.Skip("Skipping E2E test in short mode") 33 + } 34 + 35 + // Setup test database 36 + dbURL := os.Getenv("TEST_DATABASE_URL") 37 + if dbURL == "" { 38 + dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 39 + } 40 + 41 + db, err := sql.Open("postgres", dbURL) 42 + if err != nil { 43 + t.Fatalf("Failed to connect to test database: %v", err) 44 + } 45 + defer func() { _ = db.Close() }() 46 + 47 + // Run migrations 48 + if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 49 + t.Fatalf("Failed to set goose dialect: %v", dialectErr) 50 + } 51 + if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 52 + t.Fatalf("Failed to run migrations: %v", migrateErr) 53 + } 54 + 55 + // Check if PDS is running 56 + pdsURL := getTestPDSURL() 57 + healthResp, err := http.Get(pdsURL + "/xrpc/_health") 58 + if err != nil { 59 + t.Skipf("PDS not running at %s: %v", pdsURL, err) 60 + } 61 + _ = healthResp.Body.Close() 62 + 63 + // Check if Jetstream is running 64 + pdsHostname := strings.TrimPrefix(pdsURL, "http://") 65 + pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 66 + pdsHostname = strings.Split(pdsHostname, ":")[0] 67 + jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.comment", pdsHostname) 68 + 69 + testConn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 70 + if err != nil { 71 + t.Skipf("Jetstream not running at %s: %v. Run 'docker-compose --profile jetstream up' to start.", jetstreamURL, err) 72 + } 73 + _ = testConn.Close() 74 + 75 + ctx := context.Background() 76 + 77 + // Setup repositories 78 + commentRepo := postgres.NewCommentRepository(db) 79 + postRepo := postgres.NewPostRepository(db) 80 + 81 + // Create test user on PDS 82 + // Use shorter handle to avoid PDS length limits (max 20 chars for label) 83 + testUserHandle := fmt.Sprintf("cmt%d.local.coves.dev", time.Now().UnixNano()%1000000) 84 + testUserEmail := fmt.Sprintf("cmt%d@test.local", time.Now().UnixNano()%1000000) 85 + testUserPassword := "test-password-123" 86 + 87 + t.Logf("Creating test user on PDS: %s", testUserHandle) 88 + pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 89 + if err != nil { 90 + t.Fatalf("Failed to create test user on PDS: %v", err) 91 + } 92 + t.Logf("Test user created: DID=%s", userDID) 93 + 94 + // Index user in AppView 95 + testUser := createTestUser(t, db, testUserHandle, userDID) 96 + 97 + // Create test community and post to comment on 98 + testCommunityDID, err := createFeedTestCommunity(db, ctx, "comment-e2e-community", "owner.test") 99 + if err != nil { 100 + t.Fatalf("Failed to create test community: %v", err) 101 + } 102 + 103 + postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post for Comments", 0, time.Now()) 104 + postCID := "bafyposte2etest" 105 + 106 + // Setup comment service with PDS factory 107 + commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 108 + if session.AccessToken == "" { 109 + return nil, fmt.Errorf("session has no access token") 110 + } 111 + if session.HostURL == "" { 112 + return nil, fmt.Errorf("session has no host URL") 113 + } 114 + return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 115 + } 116 + 117 + commentService := comments.NewCommentServiceWithPDSFactory( 118 + commentRepo, 119 + nil, 120 + postRepo, 121 + nil, 122 + nil, 123 + commentPDSFactory, 124 + ) 125 + 126 + // Create mock OAuth session 127 + mockStore := NewMockOAuthStore() 128 + mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 129 + 130 + t.Run("create comment with real Jetstream indexing", func(t *testing.T) { 131 + // Setup Jetstream consumer 132 + commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 133 + 134 + // Channels for event communication 135 + eventChan := make(chan *jetstream.JetstreamEvent, 10) 136 + errorChan := make(chan error, 1) 137 + done := make(chan bool) 138 + 139 + // Start Jetstream consumer in background BEFORE writing to PDS 140 + t.Logf("\n🔄 Starting Jetstream consumer for comments...") 141 + go func() { 142 + subscribeErr := subscribeToJetstreamForComment(ctx, jetstreamURL, userDID, commentConsumer, eventChan, done) 143 + if subscribeErr != nil { 144 + errorChan <- subscribeErr 145 + } 146 + }() 147 + 148 + // Give Jetstream a moment to connect 149 + time.Sleep(500 * time.Millisecond) 150 + 151 + // Create comment via service (writes to PDS) 152 + t.Logf("\n📝 Creating comment via service (writes to PDS)...") 153 + 154 + commentReq := comments.CreateCommentRequest{ 155 + Reply: comments.ReplyRef{ 156 + Root: comments.StrongRef{ 157 + URI: postURI, 158 + CID: postCID, 159 + }, 160 + Parent: comments.StrongRef{ 161 + URI: postURI, 162 + CID: postCID, 163 + }, 164 + }, 165 + Content: "This is a TRUE E2E test comment via Jetstream!", 166 + Langs: []string{"en"}, 167 + } 168 + 169 + parsedDID, parseErr := syntax.ParseDID(userDID) 170 + if parseErr != nil { 171 + t.Fatalf("Failed to parse DID: %v", parseErr) 172 + } 173 + session, sessionErr := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 174 + if sessionErr != nil { 175 + t.Fatalf("Failed to get session: %v", sessionErr) 176 + } 177 + 178 + commentResp, err := commentService.CreateComment(ctx, session, commentReq) 179 + if err != nil { 180 + t.Fatalf("Failed to create comment: %v", err) 181 + } 182 + 183 + t.Logf("✅ Comment written to PDS:") 184 + t.Logf(" URI: %s", commentResp.URI) 185 + t.Logf(" CID: %s", commentResp.CID) 186 + 187 + // Wait for Jetstream event 188 + t.Logf("\n⏳ Waiting for Jetstream event (max 30 seconds)...") 189 + 190 + select { 191 + case event := <-eventChan: 192 + t.Logf("✅ Received real Jetstream event!") 193 + t.Logf(" Event DID: %s", event.Did) 194 + t.Logf(" Collection: %s", event.Commit.Collection) 195 + t.Logf(" Operation: %s", event.Commit.Operation) 196 + t.Logf(" RKey: %s", event.Commit.RKey) 197 + 198 + // Verify it's our comment 199 + if event.Did != userDID { 200 + t.Errorf("Expected DID %s, got %s", userDID, event.Did) 201 + } 202 + if event.Commit.Collection != "social.coves.community.comment" { 203 + t.Errorf("Expected collection social.coves.community.comment, got %s", event.Commit.Collection) 204 + } 205 + if event.Commit.Operation != "create" { 206 + t.Errorf("Expected operation create, got %s", event.Commit.Operation) 207 + } 208 + 209 + // Verify indexed in AppView database 210 + t.Logf("\n🔍 Querying AppView database...") 211 + indexedComment, err := commentRepo.GetByURI(ctx, commentResp.URI) 212 + if err != nil { 213 + t.Fatalf("Comment not indexed in AppView: %v", err) 214 + } 215 + 216 + t.Logf("✅ Comment indexed in AppView:") 217 + t.Logf(" CommenterDID: %s", indexedComment.CommenterDID) 218 + t.Logf(" Content: %s", indexedComment.Content) 219 + t.Logf(" RootURI: %s", indexedComment.RootURI) 220 + t.Logf(" ParentURI: %s", indexedComment.ParentURI) 221 + 222 + // Verify comment details 223 + if indexedComment.CommenterDID != userDID { 224 + t.Errorf("Expected commenter_did %s, got %s", userDID, indexedComment.CommenterDID) 225 + } 226 + if indexedComment.Content != "This is a TRUE E2E test comment via Jetstream!" { 227 + t.Errorf("Expected content mismatch, got %s", indexedComment.Content) 228 + } 229 + 230 + close(done) 231 + 232 + case err := <-errorChan: 233 + t.Fatalf("Jetstream error: %v", err) 234 + 235 + case <-time.After(30 * time.Second): 236 + t.Fatalf("Timeout: No Jetstream event received within 30 seconds") 237 + } 238 + 239 + t.Logf("\n✅ TRUE E2E COMMENT CREATE FLOW COMPLETE:") 240 + t.Logf(" Client → Service → PDS → Jetstream → Consumer → AppView ✓") 241 + }) 242 + } 243 + 244 + // TestCommentE2E_UpdateWithJetstream tests comment update with real Jetstream indexing 245 + func TestCommentE2E_UpdateWithJetstream(t *testing.T) { 246 + if testing.Short() { 247 + t.Skip("Skipping E2E test in short mode") 248 + } 249 + 250 + // Setup test database 251 + dbURL := os.Getenv("TEST_DATABASE_URL") 252 + if dbURL == "" { 253 + dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 254 + } 255 + 256 + db, err := sql.Open("postgres", dbURL) 257 + if err != nil { 258 + t.Fatalf("Failed to connect to test database: %v", err) 259 + } 260 + defer func() { _ = db.Close() }() 261 + 262 + // Run migrations 263 + if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 264 + t.Fatalf("Failed to set goose dialect: %v", dialectErr) 265 + } 266 + if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 267 + t.Fatalf("Failed to run migrations: %v", migrateErr) 268 + } 269 + 270 + // Check if PDS is running 271 + pdsURL := getTestPDSURL() 272 + healthResp, err := http.Get(pdsURL + "/xrpc/_health") 273 + if err != nil { 274 + t.Skipf("PDS not running at %s: %v", pdsURL, err) 275 + } 276 + _ = healthResp.Body.Close() 277 + 278 + // Check if Jetstream is running 279 + pdsHostname := strings.TrimPrefix(pdsURL, "http://") 280 + pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 281 + pdsHostname = strings.Split(pdsHostname, ":")[0] 282 + jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.comment", pdsHostname) 283 + 284 + testConn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 285 + if err != nil { 286 + t.Skipf("Jetstream not running at %s: %v", jetstreamURL, err) 287 + } 288 + _ = testConn.Close() 289 + 290 + ctx := context.Background() 291 + 292 + // Setup repositories 293 + commentRepo := postgres.NewCommentRepository(db) 294 + postRepo := postgres.NewPostRepository(db) 295 + 296 + // Create test user on PDS 297 + testUserHandle := fmt.Sprintf("cmtup%d.local.coves.dev", time.Now().UnixNano()%1000000) 298 + testUserEmail := fmt.Sprintf("cmtup%d@test.local", time.Now().UnixNano()%1000000) 299 + testUserPassword := "test-password-123" 300 + 301 + pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 302 + if err != nil { 303 + t.Fatalf("Failed to create test user on PDS: %v", err) 304 + } 305 + 306 + testUser := createTestUser(t, db, testUserHandle, userDID) 307 + 308 + testCommunityDID, err := createFeedTestCommunity(db, ctx, "comment-upd-community", "owner.test") 309 + if err != nil { 310 + t.Fatalf("Failed to create test community: %v", err) 311 + } 312 + 313 + postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post for Update", 0, time.Now()) 314 + postCID := "bafypostupdate" 315 + 316 + commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 317 + if session.AccessToken == "" { 318 + return nil, fmt.Errorf("session has no access token") 319 + } 320 + return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 321 + } 322 + 323 + commentService := comments.NewCommentServiceWithPDSFactory( 324 + commentRepo, 325 + nil, 326 + postRepo, 327 + nil, 328 + nil, 329 + commentPDSFactory, 330 + ) 331 + 332 + mockStore := NewMockOAuthStore() 333 + mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 334 + 335 + t.Run("update comment with real Jetstream indexing", func(t *testing.T) { 336 + commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 337 + 338 + // First, create a comment and wait for it to be indexed 339 + eventChan := make(chan *jetstream.JetstreamEvent, 10) 340 + errorChan := make(chan error, 1) 341 + done := make(chan bool) 342 + 343 + go func() { 344 + subscribeErr := subscribeToJetstreamForComment(ctx, jetstreamURL, userDID, commentConsumer, eventChan, done) 345 + if subscribeErr != nil { 346 + errorChan <- subscribeErr 347 + } 348 + }() 349 + 350 + time.Sleep(500 * time.Millisecond) 351 + 352 + // Create initial comment 353 + t.Logf("\n📝 Creating initial comment...") 354 + commentReq := comments.CreateCommentRequest{ 355 + Reply: comments.ReplyRef{ 356 + Root: comments.StrongRef{URI: postURI, CID: postCID}, 357 + Parent: comments.StrongRef{URI: postURI, CID: postCID}, 358 + }, 359 + Content: "Original comment content", 360 + Langs: []string{"en"}, 361 + } 362 + 363 + parsedDID, parseErr := syntax.ParseDID(userDID) 364 + if parseErr != nil { 365 + t.Fatalf("Failed to parse DID: %v", parseErr) 366 + } 367 + session, sessionErr := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 368 + if sessionErr != nil { 369 + t.Fatalf("Failed to get session: %v", sessionErr) 370 + } 371 + commentResp, err := commentService.CreateComment(ctx, session, commentReq) 372 + if err != nil { 373 + t.Fatalf("Failed to create comment: %v", err) 374 + } 375 + 376 + // Wait for create event 377 + select { 378 + case <-eventChan: 379 + t.Logf("✅ Create event received and indexed") 380 + case err := <-errorChan: 381 + t.Fatalf("Jetstream error: %v", err) 382 + case <-time.After(30 * time.Second): 383 + t.Fatalf("Timeout waiting for create event") 384 + } 385 + close(done) 386 + 387 + // Now update the comment 388 + t.Logf("\n📝 Updating comment via service...") 389 + 390 + // Start new Jetstream subscription for update event 391 + updateEventChan := make(chan *jetstream.JetstreamEvent, 10) 392 + updateErrorChan := make(chan error, 1) 393 + updateDone := make(chan bool) 394 + 395 + go func() { 396 + subscribeErr := subscribeToJetstreamForCommentUpdate(ctx, jetstreamURL, userDID, commentConsumer, updateEventChan, updateDone) 397 + if subscribeErr != nil { 398 + updateErrorChan <- subscribeErr 399 + } 400 + }() 401 + 402 + time.Sleep(500 * time.Millisecond) 403 + 404 + // Get existing comment CID from PDS for optimistic locking 405 + rkey := utils.ExtractRKeyFromURI(commentResp.URI) 406 + pdsResp, httpErr := http.Get(fmt.Sprintf("%s/xrpc/com.atproto.repo.getRecord?repo=%s&collection=social.coves.community.comment&rkey=%s", 407 + pdsURL, userDID, rkey)) 408 + if httpErr != nil { 409 + t.Fatalf("Failed to get record from PDS: %v", httpErr) 410 + } 411 + defer func() { _ = pdsResp.Body.Close() }() 412 + if pdsResp.StatusCode != http.StatusOK { 413 + body, _ := io.ReadAll(pdsResp.Body) 414 + t.Fatalf("Failed to get record from PDS: status=%d body=%s", pdsResp.StatusCode, string(body)) 415 + } 416 + var pdsRecord struct { 417 + CID string `json:"cid"` 418 + } 419 + if decodeErr := json.NewDecoder(pdsResp.Body).Decode(&pdsRecord); decodeErr != nil { 420 + t.Fatalf("Failed to decode PDS response: %v", decodeErr) 421 + } 422 + 423 + updateReq := comments.UpdateCommentRequest{ 424 + URI: commentResp.URI, 425 + Content: "Updated comment content via E2E test!", 426 + Langs: []string{"en"}, 427 + } 428 + 429 + updatedComment, err := commentService.UpdateComment(ctx, session, updateReq) 430 + if err != nil { 431 + t.Fatalf("Failed to update comment: %v", err) 432 + } 433 + 434 + t.Logf("✅ Comment updated on PDS:") 435 + t.Logf(" URI: %s", updatedComment.URI) 436 + t.Logf(" CID: %s", updatedComment.CID) 437 + 438 + // Wait for update event from Jetstream 439 + t.Logf("\n⏳ Waiting for update event from Jetstream...") 440 + 441 + select { 442 + case event := <-updateEventChan: 443 + t.Logf("✅ Received update event from Jetstream!") 444 + t.Logf(" Operation: %s", event.Commit.Operation) 445 + 446 + if event.Commit.Operation != "update" { 447 + t.Errorf("Expected operation 'update', got '%s'", event.Commit.Operation) 448 + } 449 + 450 + // Verify updated content in AppView 451 + indexedComment, err := commentRepo.GetByURI(ctx, commentResp.URI) 452 + if err != nil { 453 + t.Fatalf("Failed to get updated comment: %v", err) 454 + } 455 + 456 + if indexedComment.Content != "Updated comment content via E2E test!" { 457 + t.Errorf("Expected updated content, got: %s", indexedComment.Content) 458 + } 459 + 460 + t.Logf("✅ Comment updated in AppView:") 461 + t.Logf(" Content: %s", indexedComment.Content) 462 + 463 + close(updateDone) 464 + 465 + case err := <-updateErrorChan: 466 + t.Fatalf("Jetstream error: %v", err) 467 + 468 + case <-time.After(30 * time.Second): 469 + t.Fatalf("Timeout: No update event received within 30 seconds") 470 + } 471 + 472 + t.Logf("\n✅ TRUE E2E COMMENT UPDATE FLOW COMPLETE:") 473 + t.Logf(" Client → Service → PDS PutRecord → Jetstream → Consumer → AppView ✓") 474 + }) 475 + } 476 + 477 + // TestCommentE2E_DeleteWithJetstream tests comment deletion with real Jetstream indexing 478 + func TestCommentE2E_DeleteWithJetstream(t *testing.T) { 479 + if testing.Short() { 480 + t.Skip("Skipping E2E test in short mode") 481 + } 482 + 483 + // Setup test database 484 + dbURL := os.Getenv("TEST_DATABASE_URL") 485 + if dbURL == "" { 486 + dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 487 + } 488 + 489 + db, err := sql.Open("postgres", dbURL) 490 + if err != nil { 491 + t.Fatalf("Failed to connect to test database: %v", err) 492 + } 493 + defer func() { _ = db.Close() }() 494 + 495 + if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 496 + t.Fatalf("Failed to set goose dialect: %v", dialectErr) 497 + } 498 + if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 499 + t.Fatalf("Failed to run migrations: %v", migrateErr) 500 + } 501 + 502 + pdsURL := getTestPDSURL() 503 + healthResp, err := http.Get(pdsURL + "/xrpc/_health") 504 + if err != nil { 505 + t.Skipf("PDS not running at %s: %v", pdsURL, err) 506 + } 507 + _ = healthResp.Body.Close() 508 + 509 + pdsHostname := strings.TrimPrefix(pdsURL, "http://") 510 + pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 511 + pdsHostname = strings.Split(pdsHostname, ":")[0] 512 + jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.comment", pdsHostname) 513 + 514 + testConn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 515 + if err != nil { 516 + t.Skipf("Jetstream not running at %s: %v", jetstreamURL, err) 517 + } 518 + _ = testConn.Close() 519 + 520 + ctx := context.Background() 521 + 522 + commentRepo := postgres.NewCommentRepository(db) 523 + postRepo := postgres.NewPostRepository(db) 524 + 525 + testUserHandle := fmt.Sprintf("cmtdl%d.local.coves.dev", time.Now().UnixNano()%1000000) 526 + testUserEmail := fmt.Sprintf("cmtdl%d@test.local", time.Now().UnixNano()%1000000) 527 + testUserPassword := "test-password-123" 528 + 529 + pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 530 + if err != nil { 531 + t.Fatalf("Failed to create test user on PDS: %v", err) 532 + } 533 + 534 + testUser := createTestUser(t, db, testUserHandle, userDID) 535 + 536 + testCommunityDID, err := createFeedTestCommunity(db, ctx, "comment-del-community", "owner.test") 537 + if err != nil { 538 + t.Fatalf("Failed to create test community: %v", err) 539 + } 540 + 541 + postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Test Post for Delete", 0, time.Now()) 542 + postCID := "bafypostdelete" 543 + 544 + commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 545 + if session.AccessToken == "" { 546 + return nil, fmt.Errorf("session has no access token") 547 + } 548 + return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 549 + } 550 + 551 + commentService := comments.NewCommentServiceWithPDSFactory( 552 + commentRepo, 553 + nil, 554 + postRepo, 555 + nil, 556 + nil, 557 + commentPDSFactory, 558 + ) 559 + 560 + mockStore := NewMockOAuthStore() 561 + mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 562 + 563 + t.Run("delete comment with real Jetstream indexing", func(t *testing.T) { 564 + commentConsumer := jetstream.NewCommentEventConsumer(commentRepo, db) 565 + 566 + // First, create a comment 567 + eventChan := make(chan *jetstream.JetstreamEvent, 10) 568 + errorChan := make(chan error, 1) 569 + done := make(chan bool) 570 + 571 + go func() { 572 + subscribeErr := subscribeToJetstreamForComment(ctx, jetstreamURL, userDID, commentConsumer, eventChan, done) 573 + if subscribeErr != nil { 574 + errorChan <- subscribeErr 575 + } 576 + }() 577 + 578 + time.Sleep(500 * time.Millisecond) 579 + 580 + t.Logf("\n📝 Creating comment to delete...") 581 + commentReq := comments.CreateCommentRequest{ 582 + Reply: comments.ReplyRef{ 583 + Root: comments.StrongRef{URI: postURI, CID: postCID}, 584 + Parent: comments.StrongRef{URI: postURI, CID: postCID}, 585 + }, 586 + Content: "This comment will be deleted", 587 + Langs: []string{"en"}, 588 + } 589 + 590 + parsedDID, parseErr := syntax.ParseDID(userDID) 591 + if parseErr != nil { 592 + t.Fatalf("Failed to parse DID: %v", parseErr) 593 + } 594 + session, sessionErr := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 595 + if sessionErr != nil { 596 + t.Fatalf("Failed to get session: %v", sessionErr) 597 + } 598 + commentResp, err := commentService.CreateComment(ctx, session, commentReq) 599 + if err != nil { 600 + t.Fatalf("Failed to create comment: %v", err) 601 + } 602 + 603 + // Wait for create event 604 + select { 605 + case <-eventChan: 606 + t.Logf("✅ Create event received") 607 + case err := <-errorChan: 608 + t.Fatalf("Jetstream error: %v", err) 609 + case <-time.After(30 * time.Second): 610 + t.Fatalf("Timeout waiting for create event") 611 + } 612 + close(done) 613 + 614 + // Verify comment exists 615 + _, err = commentRepo.GetByURI(ctx, commentResp.URI) 616 + if err != nil { 617 + t.Fatalf("Comment should exist before delete: %v", err) 618 + } 619 + 620 + // Now delete the comment 621 + t.Logf("\n🗑️ Deleting comment via service...") 622 + 623 + deleteEventChan := make(chan *jetstream.JetstreamEvent, 10) 624 + deleteErrorChan := make(chan error, 1) 625 + deleteDone := make(chan bool) 626 + 627 + go func() { 628 + subscribeErr := subscribeToJetstreamForCommentDelete(ctx, jetstreamURL, userDID, commentConsumer, deleteEventChan, deleteDone) 629 + if subscribeErr != nil { 630 + deleteErrorChan <- subscribeErr 631 + } 632 + }() 633 + 634 + time.Sleep(500 * time.Millisecond) 635 + 636 + err = commentService.DeleteComment(ctx, session, comments.DeleteCommentRequest{URI: commentResp.URI}) 637 + if err != nil { 638 + t.Fatalf("Failed to delete comment: %v", err) 639 + } 640 + 641 + t.Logf("✅ Comment delete request sent to PDS") 642 + 643 + // Wait for delete event from Jetstream 644 + t.Logf("\n⏳ Waiting for delete event from Jetstream...") 645 + 646 + select { 647 + case event := <-deleteEventChan: 648 + t.Logf("✅ Received delete event from Jetstream!") 649 + t.Logf(" Operation: %s", event.Commit.Operation) 650 + 651 + if event.Commit.Operation != "delete" { 652 + t.Errorf("Expected operation 'delete', got '%s'", event.Commit.Operation) 653 + } 654 + 655 + // Verify comment is soft-deleted in AppView 656 + deletedComment, err := commentRepo.GetByURI(ctx, commentResp.URI) 657 + if err != nil { 658 + t.Fatalf("Failed to get deleted comment: %v", err) 659 + } 660 + 661 + if deletedComment.DeletedAt == nil { 662 + t.Errorf("Expected comment to be soft-deleted (deleted_at should be set)") 663 + } else { 664 + t.Logf("✅ Comment soft-deleted in AppView at: %v", *deletedComment.DeletedAt) 665 + } 666 + 667 + close(deleteDone) 668 + 669 + case err := <-deleteErrorChan: 670 + t.Fatalf("Jetstream error: %v", err) 671 + 672 + case <-time.After(30 * time.Second): 673 + t.Fatalf("Timeout: No delete event received within 30 seconds") 674 + } 675 + 676 + t.Logf("\n✅ TRUE E2E COMMENT DELETE FLOW COMPLETE:") 677 + t.Logf(" Client → Service → PDS DeleteRecord → Jetstream → Consumer → AppView ✓") 678 + }) 679 + } 680 + 681 + // subscribeToJetstreamForComment subscribes to real Jetstream firehose for comment create events 682 + func subscribeToJetstreamForComment( 683 + ctx context.Context, 684 + jetstreamURL string, 685 + targetDID string, 686 + consumer *jetstream.CommentEventConsumer, 687 + eventChan chan<- *jetstream.JetstreamEvent, 688 + done <-chan bool, 689 + ) error { 690 + conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 691 + if err != nil { 692 + return fmt.Errorf("failed to connect to Jetstream: %w", err) 693 + } 694 + defer func() { _ = conn.Close() }() 695 + 696 + for { 697 + select { 698 + case <-done: 699 + return nil 700 + case <-ctx.Done(): 701 + return ctx.Err() 702 + default: 703 + if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 704 + return fmt.Errorf("failed to set read deadline: %w", err) 705 + } 706 + 707 + var event jetstream.JetstreamEvent 708 + err := conn.ReadJSON(&event) 709 + if err != nil { 710 + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 711 + return nil 712 + } 713 + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 714 + continue 715 + } 716 + return fmt.Errorf("failed to read Jetstream message: %w", err) 717 + } 718 + 719 + // Check if this is a comment create event for the target DID 720 + if event.Did == targetDID && event.Kind == "commit" && 721 + event.Commit != nil && event.Commit.Collection == "social.coves.community.comment" && 722 + event.Commit.Operation == "create" { 723 + 724 + if err := consumer.HandleEvent(ctx, &event); err != nil { 725 + return fmt.Errorf("failed to process event: %w", err) 726 + } 727 + 728 + select { 729 + case eventChan <- &event: 730 + return nil 731 + case <-time.After(1 * time.Second): 732 + return fmt.Errorf("timeout sending event to channel") 733 + } 734 + } 735 + } 736 + } 737 + } 738 + 739 + // subscribeToJetstreamForCommentUpdate subscribes for comment update events 740 + func subscribeToJetstreamForCommentUpdate( 741 + ctx context.Context, 742 + jetstreamURL string, 743 + targetDID string, 744 + consumer *jetstream.CommentEventConsumer, 745 + eventChan chan<- *jetstream.JetstreamEvent, 746 + done <-chan bool, 747 + ) error { 748 + conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 749 + if err != nil { 750 + return fmt.Errorf("failed to connect to Jetstream: %w", err) 751 + } 752 + defer func() { _ = conn.Close() }() 753 + 754 + for { 755 + select { 756 + case <-done: 757 + return nil 758 + case <-ctx.Done(): 759 + return ctx.Err() 760 + default: 761 + if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 762 + return fmt.Errorf("failed to set read deadline: %w", err) 763 + } 764 + 765 + var event jetstream.JetstreamEvent 766 + err := conn.ReadJSON(&event) 767 + if err != nil { 768 + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 769 + return nil 770 + } 771 + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 772 + continue 773 + } 774 + return fmt.Errorf("failed to read Jetstream message: %w", err) 775 + } 776 + 777 + if event.Did == targetDID && event.Kind == "commit" && 778 + event.Commit != nil && event.Commit.Collection == "social.coves.community.comment" && 779 + event.Commit.Operation == "update" { 780 + 781 + if err := consumer.HandleEvent(ctx, &event); err != nil { 782 + return fmt.Errorf("failed to process event: %w", err) 783 + } 784 + 785 + select { 786 + case eventChan <- &event: 787 + return nil 788 + case <-time.After(1 * time.Second): 789 + return fmt.Errorf("timeout sending event to channel") 790 + } 791 + } 792 + } 793 + } 794 + } 795 + 796 + // subscribeToJetstreamForCommentDelete subscribes for comment delete events 797 + func subscribeToJetstreamForCommentDelete( 798 + ctx context.Context, 799 + jetstreamURL string, 800 + targetDID string, 801 + consumer *jetstream.CommentEventConsumer, 802 + eventChan chan<- *jetstream.JetstreamEvent, 803 + done <-chan bool, 804 + ) error { 805 + conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 806 + if err != nil { 807 + return fmt.Errorf("failed to connect to Jetstream: %w", err) 808 + } 809 + defer func() { _ = conn.Close() }() 810 + 811 + for { 812 + select { 813 + case <-done: 814 + return nil 815 + case <-ctx.Done(): 816 + return ctx.Err() 817 + default: 818 + if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 819 + return fmt.Errorf("failed to set read deadline: %w", err) 820 + } 821 + 822 + var event jetstream.JetstreamEvent 823 + err := conn.ReadJSON(&event) 824 + if err != nil { 825 + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 826 + return nil 827 + } 828 + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 829 + continue 830 + } 831 + return fmt.Errorf("failed to read Jetstream message: %w", err) 832 + } 833 + 834 + if event.Did == targetDID && event.Kind == "commit" && 835 + event.Commit != nil && event.Commit.Collection == "social.coves.community.comment" && 836 + event.Commit.Operation == "delete" { 837 + 838 + if err := consumer.HandleEvent(ctx, &event); err != nil { 839 + return fmt.Errorf("failed to process event: %w", err) 840 + } 841 + 842 + select { 843 + case eventChan <- &event: 844 + return nil 845 + case <-time.After(1 * time.Second): 846 + return fmt.Errorf("timeout sending event to channel") 847 + } 848 + } 849 + } 850 + } 851 + } 852 + 853 + // TestCommentE2E_Authorization tests that users cannot modify other users' comments 854 + func TestCommentE2E_Authorization(t *testing.T) { 855 + if testing.Short() { 856 + t.Skip("Skipping E2E test in short mode") 857 + } 858 + 859 + // Setup test database 860 + dbURL := os.Getenv("TEST_DATABASE_URL") 861 + if dbURL == "" { 862 + dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 863 + } 864 + 865 + db, err := sql.Open("postgres", dbURL) 866 + if err != nil { 867 + t.Fatalf("Failed to connect to test database: %v", err) 868 + } 869 + defer func() { _ = db.Close() }() 870 + 871 + if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 872 + t.Fatalf("Failed to set goose dialect: %v", dialectErr) 873 + } 874 + if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 875 + t.Fatalf("Failed to run migrations: %v", migrateErr) 876 + } 877 + 878 + pdsURL := getTestPDSURL() 879 + healthResp, err := http.Get(pdsURL + "/xrpc/_health") 880 + if err != nil { 881 + t.Skipf("PDS not running at %s: %v", pdsURL, err) 882 + } 883 + _ = healthResp.Body.Close() 884 + 885 + ctx := context.Background() 886 + 887 + commentRepo := postgres.NewCommentRepository(db) 888 + postRepo := postgres.NewPostRepository(db) 889 + 890 + // Create two test users on PDS 891 + userAHandle := fmt.Sprintf("usera%d.local.coves.dev", time.Now().UnixNano()%1000000) 892 + userAEmail := fmt.Sprintf("usera%d@test.local", time.Now().UnixNano()%1000000) 893 + userAPassword := "test-password-123" 894 + 895 + userBHandle := fmt.Sprintf("userb%d.local.coves.dev", time.Now().UnixNano()%1000000) 896 + userBEmail := fmt.Sprintf("userb%d@test.local", time.Now().UnixNano()%1000000) 897 + userBPassword := "test-password-123" 898 + 899 + pdsAccessTokenA, userADID, err := createPDSAccount(pdsURL, userAHandle, userAEmail, userAPassword) 900 + if err != nil { 901 + t.Fatalf("Failed to create test user A on PDS: %v", err) 902 + } 903 + 904 + pdsAccessTokenB, userBDID, err := createPDSAccount(pdsURL, userBHandle, userBEmail, userBPassword) 905 + if err != nil { 906 + t.Fatalf("Failed to create test user B on PDS: %v", err) 907 + } 908 + 909 + testUserA := createTestUser(t, db, userAHandle, userADID) 910 + _ = createTestUser(t, db, userBHandle, userBDID) 911 + 912 + testCommunityDID, err := createFeedTestCommunity(db, ctx, "auth-test-community", "owner.test") 913 + if err != nil { 914 + t.Fatalf("Failed to create test community: %v", err) 915 + } 916 + 917 + postURI := createTestPost(t, db, testCommunityDID, testUserA.DID, "Auth Test Post", 0, time.Now()) 918 + postCID := "bafypostauthtest" 919 + 920 + commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 921 + if session.AccessToken == "" { 922 + return nil, fmt.Errorf("session has no access token") 923 + } 924 + return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 925 + } 926 + 927 + commentService := comments.NewCommentServiceWithPDSFactory( 928 + commentRepo, 929 + nil, 930 + postRepo, 931 + nil, 932 + nil, 933 + commentPDSFactory, 934 + ) 935 + 936 + mockStore := NewMockOAuthStore() 937 + mockStore.AddSessionWithPDS(userADID, "session-"+userADID, pdsAccessTokenA, pdsURL) 938 + mockStore.AddSessionWithPDS(userBDID, "session-"+userBDID, pdsAccessTokenB, pdsURL) 939 + 940 + t.Run("user cannot update another user's comment", func(t *testing.T) { 941 + // User A creates a comment 942 + parsedDIDA, parseErr := syntax.ParseDID(userADID) 943 + if parseErr != nil { 944 + t.Fatalf("Failed to parse DID A: %v", parseErr) 945 + } 946 + sessionA, sessionErr := mockStore.GetSession(ctx, parsedDIDA, "session-"+userADID) 947 + if sessionErr != nil { 948 + t.Fatalf("Failed to get session A: %v", sessionErr) 949 + } 950 + 951 + commentReq := comments.CreateCommentRequest{ 952 + Reply: comments.ReplyRef{ 953 + Root: comments.StrongRef{URI: postURI, CID: postCID}, 954 + Parent: comments.StrongRef{URI: postURI, CID: postCID}, 955 + }, 956 + Content: "User A's comment", 957 + Langs: []string{"en"}, 958 + } 959 + 960 + commentResp, err := commentService.CreateComment(ctx, sessionA, commentReq) 961 + if err != nil { 962 + t.Fatalf("User A failed to create comment: %v", err) 963 + } 964 + t.Logf("User A created comment: %s", commentResp.URI) 965 + 966 + // User B tries to update User A's comment 967 + parsedDIDB, parseErr := syntax.ParseDID(userBDID) 968 + if parseErr != nil { 969 + t.Fatalf("Failed to parse DID B: %v", parseErr) 970 + } 971 + sessionB, sessionErr := mockStore.GetSession(ctx, parsedDIDB, "session-"+userBDID) 972 + if sessionErr != nil { 973 + t.Fatalf("Failed to get session B: %v", sessionErr) 974 + } 975 + 976 + updateReq := comments.UpdateCommentRequest{ 977 + URI: commentResp.URI, 978 + Content: "User B trying to update User A's comment", 979 + Langs: []string{"en"}, 980 + } 981 + 982 + _, err = commentService.UpdateComment(ctx, sessionB, updateReq) 983 + if err == nil { 984 + t.Errorf("Expected error when User B tries to update User A's comment, got nil") 985 + } else if err != comments.ErrNotAuthorized { 986 + t.Errorf("Expected ErrNotAuthorized, got: %v", err) 987 + } else { 988 + t.Logf("✅ Correctly rejected: User B cannot update User A's comment") 989 + } 990 + }) 991 + 992 + t.Run("user cannot delete another user's comment", func(t *testing.T) { 993 + // User A creates a comment 994 + parsedDIDA, parseErr := syntax.ParseDID(userADID) 995 + if parseErr != nil { 996 + t.Fatalf("Failed to parse DID A: %v", parseErr) 997 + } 998 + sessionA, sessionErr := mockStore.GetSession(ctx, parsedDIDA, "session-"+userADID) 999 + if sessionErr != nil { 1000 + t.Fatalf("Failed to get session A: %v", sessionErr) 1001 + } 1002 + 1003 + commentReq := comments.CreateCommentRequest{ 1004 + Reply: comments.ReplyRef{ 1005 + Root: comments.StrongRef{URI: postURI, CID: postCID}, 1006 + Parent: comments.StrongRef{URI: postURI, CID: postCID}, 1007 + }, 1008 + Content: "User A's comment for delete test", 1009 + Langs: []string{"en"}, 1010 + } 1011 + 1012 + commentResp, err := commentService.CreateComment(ctx, sessionA, commentReq) 1013 + if err != nil { 1014 + t.Fatalf("User A failed to create comment: %v", err) 1015 + } 1016 + t.Logf("User A created comment: %s", commentResp.URI) 1017 + 1018 + // User B tries to delete User A's comment 1019 + parsedDIDB, parseErr := syntax.ParseDID(userBDID) 1020 + if parseErr != nil { 1021 + t.Fatalf("Failed to parse DID B: %v", parseErr) 1022 + } 1023 + sessionB, sessionErr := mockStore.GetSession(ctx, parsedDIDB, "session-"+userBDID) 1024 + if sessionErr != nil { 1025 + t.Fatalf("Failed to get session B: %v", sessionErr) 1026 + } 1027 + 1028 + deleteReq := comments.DeleteCommentRequest{ 1029 + URI: commentResp.URI, 1030 + } 1031 + 1032 + err = commentService.DeleteComment(ctx, sessionB, deleteReq) 1033 + if err == nil { 1034 + t.Errorf("Expected error when User B tries to delete User A's comment, got nil") 1035 + } else if err != comments.ErrNotAuthorized { 1036 + t.Errorf("Expected ErrNotAuthorized, got: %v", err) 1037 + } else { 1038 + t.Logf("✅ Correctly rejected: User B cannot delete User A's comment") 1039 + } 1040 + }) 1041 + } 1042 + 1043 + // TestCommentE2E_ValidationErrors tests that validation errors are properly returned 1044 + func TestCommentE2E_ValidationErrors(t *testing.T) { 1045 + if testing.Short() { 1046 + t.Skip("Skipping E2E test in short mode") 1047 + } 1048 + 1049 + // Setup test database 1050 + dbURL := os.Getenv("TEST_DATABASE_URL") 1051 + if dbURL == "" { 1052 + dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 1053 + } 1054 + 1055 + db, err := sql.Open("postgres", dbURL) 1056 + if err != nil { 1057 + t.Fatalf("Failed to connect to test database: %v", err) 1058 + } 1059 + defer func() { _ = db.Close() }() 1060 + 1061 + if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 1062 + t.Fatalf("Failed to set goose dialect: %v", dialectErr) 1063 + } 1064 + if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 1065 + t.Fatalf("Failed to run migrations: %v", migrateErr) 1066 + } 1067 + 1068 + pdsURL := getTestPDSURL() 1069 + healthResp, err := http.Get(pdsURL + "/xrpc/_health") 1070 + if err != nil { 1071 + t.Skipf("PDS not running at %s: %v", pdsURL, err) 1072 + } 1073 + _ = healthResp.Body.Close() 1074 + 1075 + ctx := context.Background() 1076 + 1077 + commentRepo := postgres.NewCommentRepository(db) 1078 + postRepo := postgres.NewPostRepository(db) 1079 + 1080 + // Create test user on PDS 1081 + testUserHandle := fmt.Sprintf("valtest%d.local.coves.dev", time.Now().UnixNano()%1000000) 1082 + testUserEmail := fmt.Sprintf("valtest%d@test.local", time.Now().UnixNano()%1000000) 1083 + testUserPassword := "test-password-123" 1084 + 1085 + pdsAccessToken, userDID, err := createPDSAccount(pdsURL, testUserHandle, testUserEmail, testUserPassword) 1086 + if err != nil { 1087 + t.Fatalf("Failed to create test user on PDS: %v", err) 1088 + } 1089 + 1090 + testUser := createTestUser(t, db, testUserHandle, userDID) 1091 + 1092 + testCommunityDID, err := createFeedTestCommunity(db, ctx, "val-test-community", "owner.test") 1093 + if err != nil { 1094 + t.Fatalf("Failed to create test community: %v", err) 1095 + } 1096 + 1097 + postURI := createTestPost(t, db, testCommunityDID, testUser.DID, "Validation Test Post", 0, time.Now()) 1098 + postCID := "bafypostvaltest" 1099 + 1100 + commentPDSFactory := func(ctx context.Context, session *oauthlib.ClientSessionData) (pds.Client, error) { 1101 + if session.AccessToken == "" { 1102 + return nil, fmt.Errorf("session has no access token") 1103 + } 1104 + return pds.NewFromAccessToken(session.HostURL, session.AccountDID.String(), session.AccessToken) 1105 + } 1106 + 1107 + commentService := comments.NewCommentServiceWithPDSFactory( 1108 + commentRepo, 1109 + nil, 1110 + postRepo, 1111 + nil, 1112 + nil, 1113 + commentPDSFactory, 1114 + ) 1115 + 1116 + mockStore := NewMockOAuthStore() 1117 + mockStore.AddSessionWithPDS(userDID, "session-"+userDID, pdsAccessToken, pdsURL) 1118 + 1119 + parsedDID, parseErr := syntax.ParseDID(userDID) 1120 + if parseErr != nil { 1121 + t.Fatalf("Failed to parse DID: %v", parseErr) 1122 + } 1123 + session, sessionErr := mockStore.GetSession(ctx, parsedDID, "session-"+userDID) 1124 + if sessionErr != nil { 1125 + t.Fatalf("Failed to get session: %v", sessionErr) 1126 + } 1127 + 1128 + t.Run("empty content returns ErrContentEmpty", func(t *testing.T) { 1129 + commentReq := comments.CreateCommentRequest{ 1130 + Reply: comments.ReplyRef{ 1131 + Root: comments.StrongRef{URI: postURI, CID: postCID}, 1132 + Parent: comments.StrongRef{URI: postURI, CID: postCID}, 1133 + }, 1134 + Content: "", 1135 + Langs: []string{"en"}, 1136 + } 1137 + 1138 + _, err := commentService.CreateComment(ctx, session, commentReq) 1139 + if err == nil { 1140 + t.Errorf("Expected error for empty content, got nil") 1141 + } else if err != comments.ErrContentEmpty { 1142 + t.Errorf("Expected ErrContentEmpty, got: %v", err) 1143 + } else { 1144 + t.Logf("✅ Correctly rejected: empty content returns ErrContentEmpty") 1145 + } 1146 + }) 1147 + 1148 + t.Run("whitespace-only content returns ErrContentEmpty", func(t *testing.T) { 1149 + commentReq := comments.CreateCommentRequest{ 1150 + Reply: comments.ReplyRef{ 1151 + Root: comments.StrongRef{URI: postURI, CID: postCID}, 1152 + Parent: comments.StrongRef{URI: postURI, CID: postCID}, 1153 + }, 1154 + Content: " \t\n ", 1155 + Langs: []string{"en"}, 1156 + } 1157 + 1158 + _, err := commentService.CreateComment(ctx, session, commentReq) 1159 + if err == nil { 1160 + t.Errorf("Expected error for whitespace-only content, got nil") 1161 + } else if err != comments.ErrContentEmpty { 1162 + t.Errorf("Expected ErrContentEmpty, got: %v", err) 1163 + } else { 1164 + t.Logf("✅ Correctly rejected: whitespace-only content returns ErrContentEmpty") 1165 + } 1166 + }) 1167 + 1168 + t.Run("invalid reply reference returns ErrInvalidReply", func(t *testing.T) { 1169 + commentReq := comments.CreateCommentRequest{ 1170 + Reply: comments.ReplyRef{ 1171 + Root: comments.StrongRef{URI: "", CID: ""}, 1172 + Parent: comments.StrongRef{URI: "", CID: ""}, 1173 + }, 1174 + Content: "Valid content", 1175 + Langs: []string{"en"}, 1176 + } 1177 + 1178 + _, err := commentService.CreateComment(ctx, session, commentReq) 1179 + if err == nil { 1180 + t.Errorf("Expected error for invalid reply, got nil") 1181 + } else if err != comments.ErrInvalidReply { 1182 + t.Errorf("Expected ErrInvalidReply, got: %v", err) 1183 + } else { 1184 + t.Logf("✅ Correctly rejected: invalid reply returns ErrInvalidReply") 1185 + } 1186 + }) 1187 + } 1188 +
+381
tests/integration/community_update_e2e_test.go
··· 1 + package integration 2 + 3 + import ( 4 + "Coves/internal/atproto/identity" 5 + "Coves/internal/atproto/jetstream" 6 + "Coves/internal/core/communities" 7 + "Coves/internal/db/postgres" 8 + "context" 9 + "database/sql" 10 + "fmt" 11 + "net" 12 + "net/http" 13 + "os" 14 + "strings" 15 + "testing" 16 + "time" 17 + 18 + "github.com/gorilla/websocket" 19 + _ "github.com/lib/pq" 20 + "github.com/pressly/goose/v3" 21 + ) 22 + 23 + // TestCommunityUpdateE2E_WithJetstream tests the FULL community update flow with REAL Jetstream 24 + // Flow: Service.UpdateCommunity() → PDS putRecord → REAL Jetstream Firehose → Consumer → AppView DB 25 + // 26 + // This is a TRUE E2E test - no simulated Jetstream events! 27 + func TestCommunityUpdateE2E_WithJetstream(t *testing.T) { 28 + if testing.Short() { 29 + t.Skip("Skipping E2E test in short mode") 30 + } 31 + 32 + // Setup test database 33 + dbURL := os.Getenv("TEST_DATABASE_URL") 34 + if dbURL == "" { 35 + dbURL = "postgres://test_user:test_password@localhost:5434/coves_test?sslmode=disable" 36 + } 37 + 38 + db, err := sql.Open("postgres", dbURL) 39 + if err != nil { 40 + t.Fatalf("Failed to connect to test database: %v", err) 41 + } 42 + defer func() { _ = db.Close() }() 43 + 44 + // Run migrations 45 + if dialectErr := goose.SetDialect("postgres"); dialectErr != nil { 46 + t.Fatalf("Failed to set goose dialect: %v", dialectErr) 47 + } 48 + if migrateErr := goose.Up(db, "../../internal/db/migrations"); migrateErr != nil { 49 + t.Fatalf("Failed to run migrations: %v", migrateErr) 50 + } 51 + 52 + // Check if PDS is running 53 + pdsURL := os.Getenv("PDS_URL") 54 + if pdsURL == "" { 55 + pdsURL = "http://localhost:3001" 56 + } 57 + 58 + healthResp, err := http.Get(pdsURL + "/xrpc/_health") 59 + if err != nil { 60 + t.Skipf("PDS not running at %s: %v. Run 'make dev-up' to start.", pdsURL, err) 61 + } 62 + _ = healthResp.Body.Close() 63 + 64 + // Check if Jetstream is running 65 + pdsHostname := strings.TrimPrefix(pdsURL, "http://") 66 + pdsHostname = strings.TrimPrefix(pdsHostname, "https://") 67 + pdsHostname = strings.Split(pdsHostname, ":")[0] 68 + jetstreamURL := fmt.Sprintf("ws://%s:6008/subscribe?wantedCollections=social.coves.community.profile", pdsHostname) 69 + 70 + testConn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 71 + if err != nil { 72 + t.Skipf("Jetstream not running at %s: %v. Run 'docker-compose --profile jetstream up' to start.", jetstreamURL, err) 73 + } 74 + _ = testConn.Close() 75 + 76 + ctx := context.Background() 77 + instanceDID := "did:web:coves.social" 78 + 79 + // Setup identity resolver with local PLC 80 + plcURL := os.Getenv("PLC_DIRECTORY_URL") 81 + if plcURL == "" { 82 + plcURL = "http://localhost:3002" // Local PLC directory 83 + } 84 + identityConfig := identity.DefaultConfig() 85 + identityConfig.PLCURL = plcURL 86 + identityResolver := identity.NewResolver(db, identityConfig) 87 + 88 + // Setup services 89 + communityRepo := postgres.NewCommunityRepository(db) 90 + provisioner := communities.NewPDSAccountProvisioner("coves.social", pdsURL) 91 + communityService := communities.NewCommunityService( 92 + communityRepo, 93 + pdsURL, 94 + instanceDID, 95 + "coves.social", 96 + provisioner, 97 + ) 98 + 99 + consumer := jetstream.NewCommunityEventConsumer(communityRepo, instanceDID, true, identityResolver) 100 + 101 + t.Run("update community with real Jetstream indexing", func(t *testing.T) { 102 + // First, create a community 103 + uniqueName := fmt.Sprintf("upd%d", time.Now().UnixNano()%1000000) 104 + creatorDID := "did:plc:jetstream-update-test" 105 + 106 + t.Logf("\n📝 Creating community on PDS...") 107 + community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 108 + Name: uniqueName, 109 + DisplayName: "Original Display Name", 110 + Description: "Original description before update", 111 + Visibility: "public", 112 + CreatedByDID: creatorDID, 113 + HostedByDID: instanceDID, 114 + AllowExternalDiscovery: true, 115 + }) 116 + if err != nil { 117 + t.Fatalf("Failed to create community: %v", err) 118 + } 119 + 120 + t.Logf("✅ Community created on PDS:") 121 + t.Logf(" DID: %s", community.DID) 122 + t.Logf(" RecordCID: %s", community.RecordCID) 123 + 124 + // Verify community is indexed (the service indexes it synchronously on create) 125 + t.Logf("\n🔄 Checking community is indexed...") 126 + indexed, err := communityService.GetCommunity(ctx, community.DID) 127 + if err != nil { 128 + t.Fatalf("Community not indexed: %v", err) 129 + } 130 + t.Logf("✅ Community indexed in AppView: %s", indexed.DisplayName) 131 + 132 + // Now update the community 133 + t.Logf("\n📝 Updating community via service...") 134 + 135 + // Start Jetstream subscription for update event BEFORE calling update 136 + updateEventChan := make(chan *jetstream.JetstreamEvent, 10) 137 + updateErrorChan := make(chan error, 1) 138 + updateDone := make(chan bool) 139 + 140 + go func() { 141 + subscribeErr := subscribeToJetstreamForCommunityEvent(ctx, jetstreamURL, community.DID, "update", consumer, updateEventChan, updateDone) 142 + if subscribeErr != nil { 143 + updateErrorChan <- subscribeErr 144 + } 145 + }() 146 + 147 + // Give Jetstream a moment to connect 148 + time.Sleep(500 * time.Millisecond) 149 + 150 + // Perform the update 151 + newDisplayName := "Updated via TRUE E2E Test!" 152 + newDescription := "This description was updated and indexed via real Jetstream firehose" 153 + newVisibility := "unlisted" 154 + 155 + updated, err := communityService.UpdateCommunity(ctx, communities.UpdateCommunityRequest{ 156 + CommunityDID: community.DID, 157 + UpdatedByDID: creatorDID, 158 + DisplayName: &newDisplayName, 159 + Description: &newDescription, 160 + Visibility: &newVisibility, 161 + AllowExternalDiscovery: nil, 162 + }) 163 + if err != nil { 164 + t.Fatalf("Failed to update community: %v", err) 165 + } 166 + 167 + t.Logf("✅ Community update written to PDS:") 168 + t.Logf(" New RecordCID: %s (was: %s)", updated.RecordCID, community.RecordCID) 169 + 170 + // Wait for update event from real Jetstream 171 + t.Logf("\n⏳ Waiting for update event from Jetstream (max 30 seconds)...") 172 + 173 + select { 174 + case event := <-updateEventChan: 175 + t.Logf("✅ Received REAL update event from Jetstream!") 176 + t.Logf(" Event DID: %s", event.Did) 177 + t.Logf(" Collection: %s", event.Commit.Collection) 178 + t.Logf(" Operation: %s", event.Commit.Operation) 179 + t.Logf(" RKey: %s", event.Commit.RKey) 180 + 181 + // Verify operation type 182 + if event.Commit.Operation != "update" { 183 + t.Errorf("Expected operation 'update', got '%s'", event.Commit.Operation) 184 + } 185 + 186 + // Verify the update was indexed in AppView 187 + t.Logf("\n🔍 Verifying update indexed in AppView...") 188 + indexedUpdated, err := communityService.GetCommunity(ctx, community.DID) 189 + if err != nil { 190 + t.Fatalf("Failed to get updated community: %v", err) 191 + } 192 + 193 + t.Logf("✅ Update indexed in AppView:") 194 + t.Logf(" DisplayName: %s", indexedUpdated.DisplayName) 195 + t.Logf(" Description: %s", indexedUpdated.Description) 196 + t.Logf(" Visibility: %s", indexedUpdated.Visibility) 197 + 198 + // Verify the changes 199 + if indexedUpdated.DisplayName != newDisplayName { 200 + t.Errorf("Expected display name '%s', got '%s'", newDisplayName, indexedUpdated.DisplayName) 201 + } 202 + if indexedUpdated.Description != newDescription { 203 + t.Errorf("Expected description '%s', got '%s'", newDescription, indexedUpdated.Description) 204 + } 205 + if indexedUpdated.Visibility != newVisibility { 206 + t.Errorf("Expected visibility '%s', got '%s'", newVisibility, indexedUpdated.Visibility) 207 + } 208 + 209 + close(updateDone) 210 + 211 + case err := <-updateErrorChan: 212 + t.Fatalf("Jetstream error: %v", err) 213 + 214 + case <-time.After(30 * time.Second): 215 + t.Fatalf("Timeout: No update event received from Jetstream within 30 seconds") 216 + } 217 + 218 + t.Logf("\n✅ TRUE E2E COMMUNITY UPDATE FLOW COMPLETE:") 219 + t.Logf(" Service → PDS putRecord → Jetstream Firehose → Consumer → AppView ✓") 220 + }) 221 + 222 + t.Run("multiple updates with real Jetstream", func(t *testing.T) { 223 + // This tests that consecutive updates all flow through Jetstream correctly 224 + uniqueName := fmt.Sprintf("multi%d", time.Now().UnixNano()%1000000) 225 + creatorDID := "did:plc:multi-update-test" 226 + 227 + t.Logf("\n📝 Creating community for multi-update test...") 228 + community, err := communityService.CreateCommunity(ctx, communities.CreateCommunityRequest{ 229 + Name: uniqueName, 230 + DisplayName: "Multi-Update Test", 231 + Description: "Testing multiple updates", 232 + Visibility: "public", 233 + CreatedByDID: creatorDID, 234 + HostedByDID: instanceDID, 235 + AllowExternalDiscovery: true, 236 + }) 237 + if err != nil { 238 + t.Fatalf("Failed to create community: %v", err) 239 + } 240 + 241 + // Verify create is indexed (service indexes synchronously on create) 242 + indexed, err := communityService.GetCommunity(ctx, community.DID) 243 + if err != nil { 244 + t.Fatalf("Community not indexed after create: %v", err) 245 + } 246 + t.Logf("✅ Create indexed: %s", indexed.DisplayName) 247 + 248 + // Perform 3 consecutive updates 249 + for i := 1; i <= 3; i++ { 250 + t.Logf("\n📝 Update %d of 3...", i) 251 + 252 + updateEventChan := make(chan *jetstream.JetstreamEvent, 10) 253 + updateErrorChan := make(chan error, 1) 254 + updateDone := make(chan bool) 255 + 256 + go func() { 257 + subscribeErr := subscribeToJetstreamForCommunityEvent(ctx, jetstreamURL, community.DID, "update", consumer, updateEventChan, updateDone) 258 + if subscribeErr != nil { 259 + updateErrorChan <- subscribeErr 260 + } 261 + }() 262 + 263 + time.Sleep(300 * time.Millisecond) 264 + 265 + newDesc := fmt.Sprintf("Update #%d at %s", i, time.Now().Format(time.RFC3339)) 266 + _, err := communityService.UpdateCommunity(ctx, communities.UpdateCommunityRequest{ 267 + CommunityDID: community.DID, 268 + UpdatedByDID: creatorDID, 269 + Description: &newDesc, 270 + }) 271 + if err != nil { 272 + t.Fatalf("Update %d failed: %v", i, err) 273 + } 274 + 275 + select { 276 + case event := <-updateEventChan: 277 + if event.Commit.Operation != "update" { 278 + t.Errorf("Expected update operation, got %s", event.Commit.Operation) 279 + } 280 + t.Logf("✅ Update %d received via Jetstream", i) 281 + case err := <-updateErrorChan: 282 + t.Fatalf("Jetstream error on update %d: %v", i, err) 283 + case <-time.After(30 * time.Second): 284 + t.Fatalf("Timeout on update %d", i) 285 + } 286 + close(updateDone) 287 + 288 + // Verify in AppView 289 + indexed, getErr := communityService.GetCommunity(ctx, community.DID) 290 + if getErr != nil { 291 + t.Fatalf("Update %d: failed to get community: %v", i, getErr) 292 + } 293 + if indexed.Description != newDesc { 294 + t.Errorf("Update %d: expected description '%s', got '%s'", i, newDesc, indexed.Description) 295 + } 296 + } 297 + 298 + t.Logf("\n✅ MULTIPLE UPDATES TEST COMPLETE:") 299 + t.Logf(" 3 consecutive updates all indexed via real Jetstream ✓") 300 + }) 301 + } 302 + 303 + // subscribeToJetstreamForCommunityEvent subscribes to real Jetstream for specific community events 304 + func subscribeToJetstreamForCommunityEvent( 305 + ctx context.Context, 306 + jetstreamURL string, 307 + targetDID string, 308 + operation string, // "create", "update", or "delete" 309 + consumer *jetstream.CommunityEventConsumer, 310 + eventChan chan<- *jetstream.JetstreamEvent, 311 + done <-chan bool, 312 + ) (returnErr error) { 313 + // Recover from websocket panics during shutdown 314 + defer func() { 315 + if r := recover(); r != nil { 316 + // Panic during shutdown is expected, return nil 317 + returnErr = nil 318 + } 319 + }() 320 + 321 + conn, _, err := websocket.DefaultDialer.Dial(jetstreamURL, nil) 322 + if err != nil { 323 + return fmt.Errorf("failed to connect to Jetstream: %w", err) 324 + } 325 + defer func() { _ = conn.Close() }() 326 + 327 + for { 328 + select { 329 + case <-done: 330 + return nil 331 + case <-ctx.Done(): 332 + return ctx.Err() 333 + default: 334 + if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { 335 + return fmt.Errorf("failed to set read deadline: %w", err) 336 + } 337 + 338 + var event jetstream.JetstreamEvent 339 + err := conn.ReadJSON(&event) 340 + if err != nil { 341 + // Check done channel first to handle clean shutdown 342 + select { 343 + case <-done: 344 + return nil 345 + default: 346 + } 347 + if websocket.IsCloseError(err, websocket.CloseNormalClosure) { 348 + return nil 349 + } 350 + if netErr, ok := err.(net.Error); ok && netErr.Timeout() { 351 + continue 352 + } 353 + // Check for connection closed errors (happens during shutdown) 354 + if strings.Contains(err.Error(), "use of closed network connection") || 355 + strings.Contains(err.Error(), "failed websocket connection") || 356 + strings.Contains(err.Error(), "repeated read on failed websocket") { 357 + return nil 358 + } 359 + return fmt.Errorf("failed to read Jetstream message: %w", err) 360 + } 361 + 362 + // Check if this is the event we're looking for 363 + if event.Did == targetDID && event.Kind == "commit" && 364 + event.Commit != nil && event.Commit.Collection == "social.coves.community.profile" && 365 + event.Commit.Operation == operation { 366 + 367 + // Process through consumer to index in AppView 368 + if err := consumer.HandleEvent(ctx, &event); err != nil { 369 + return fmt.Errorf("failed to process event: %w", err) 370 + } 371 + 372 + select { 373 + case eventChan <- &event: 374 + return nil 375 + case <-time.After(1 * time.Second): 376 + return fmt.Errorf("timeout sending event to channel") 377 + } 378 + } 379 + } 380 + } 381 + }