wip
at main 2104 lines 55 kB view raw
1package storage 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "time" 9 10 "github.com/atscan/atscand/internal/log" 11 "github.com/jackc/pgx/v5" 12 "github.com/jackc/pgx/v5/pgxpool" 13 _ "github.com/jackc/pgx/v5/stdlib" 14 "github.com/lib/pq" 15) 16 17type PostgresDB struct { 18 db *sql.DB 19 pool *pgxpool.Pool 20 scanRetention int 21} 22 23func NewPostgresDB(connString string) (*PostgresDB, error) { 24 log.Info("Connecting to PostgreSQL database...") 25 26 // Open standard sql.DB (for compatibility) 27 db, err := sql.Open("pgx", connString) 28 if err != nil { 29 return nil, fmt.Errorf("failed to open database: %w", err) 30 } 31 32 // Connection pool settings 33 db.SetMaxOpenConns(50) 34 db.SetMaxIdleConns(25) 35 db.SetConnMaxLifetime(5 * time.Minute) 36 db.SetConnMaxIdleTime(2 * time.Minute) 37 38 log.Verbose(" Max open connections: 50") 39 log.Verbose(" Max idle connections: 25") 40 log.Verbose(" Connection max lifetime: 5m") 41 42 // Test connection 43 log.Info("Testing database connection...") 44 if err := db.Ping(); err != nil { 45 return nil, fmt.Errorf("failed to ping database: %w", err) 46 } 47 log.Info("✓ Database connection successful") 48 49 // Also create pgx pool for COPY operations 50 log.Verbose("Creating pgx connection pool...") 51 pool, err := pgxpool.New(context.Background(), connString) 52 if err != nil { 53 return nil, fmt.Errorf("failed to create pgx pool: %w", err) 54 } 55 log.Verbose("✓ Connection pool created") 56 57 return &PostgresDB{ 58 db: db, 59 pool: pool, 60 scanRetention: 3, // Default 61 }, nil 62} 63 64func (p *PostgresDB) Close() error { 65 if p.pool != nil { 66 p.pool.Close() 67 } 68 return p.db.Close() 69} 70 71func (p *PostgresDB) Migrate() error { 72 log.Info("Running database migrations...") 73 74 schema := ` 75 -- Endpoints table (with IPv6 support) 76 CREATE TABLE IF NOT EXISTS endpoints ( 77 id BIGSERIAL PRIMARY KEY, 78 endpoint_type TEXT NOT NULL DEFAULT 'pds', 79 endpoint TEXT NOT NULL, 80 server_did TEXT, 81 discovered_at TIMESTAMP NOT NULL, 82 last_checked TIMESTAMP, 83 status INTEGER DEFAULT 0, 84 ip TEXT, 85 ipv6 TEXT, 86 ip_resolved_at TIMESTAMP, 87 valid BOOLEAN DEFAULT true, 88 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 89 UNIQUE(endpoint_type, endpoint) 90 ); 91 92 CREATE INDEX IF NOT EXISTS idx_endpoints_type_endpoint ON endpoints(endpoint_type, endpoint); 93 CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status); 94 CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type); 95 CREATE INDEX IF NOT EXISTS idx_endpoints_ip ON endpoints(ip); 96 CREATE INDEX IF NOT EXISTS idx_endpoints_ipv6 ON endpoints(ipv6); 97 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did ON endpoints(server_did); 98 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did_type_discovered ON endpoints(server_did, endpoint_type, discovered_at); 99 CREATE INDEX IF NOT EXISTS idx_endpoints_valid ON endpoints(valid); 100 101 -- IP infos table (IP as PRIMARY KEY) 102 CREATE TABLE IF NOT EXISTS ip_infos ( 103 ip TEXT PRIMARY KEY, 104 city TEXT, 105 country TEXT, 106 country_code TEXT, 107 asn INTEGER, 108 asn_org TEXT, 109 is_datacenter BOOLEAN, 110 is_vpn BOOLEAN, 111 is_crawler BOOLEAN, 112 is_tor BOOLEAN, 113 is_proxy BOOLEAN, 114 latitude REAL, 115 longitude REAL, 116 raw_data JSONB, 117 fetched_at TIMESTAMP NOT NULL, 118 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 119 ); 120 121 CREATE INDEX IF NOT EXISTS idx_ip_infos_country_code ON ip_infos(country_code); 122 CREATE INDEX IF NOT EXISTS idx_ip_infos_asn ON ip_infos(asn); 123 124 -- Endpoint scans 125 CREATE TABLE IF NOT EXISTS endpoint_scans ( 126 id BIGSERIAL PRIMARY KEY, 127 endpoint_id BIGINT NOT NULL, 128 status INTEGER NOT NULL, 129 response_time DOUBLE PRECISION, 130 user_count BIGINT, 131 version TEXT, 132 used_ip TEXT, 133 scan_data JSONB, 134 scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 135 FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE 136 ); 137 138 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_endpoint_status_scanned ON endpoint_scans(endpoint_id, status, scanned_at DESC); 139 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_scanned_at ON endpoint_scans(scanned_at); 140 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_user_count ON endpoint_scans(user_count DESC NULLS LAST); 141 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_used_ip ON endpoint_scans(used_ip); 142 143 144 CREATE TABLE IF NOT EXISTS plc_metrics ( 145 id BIGSERIAL PRIMARY KEY, 146 total_dids BIGINT, 147 total_pds BIGINT, 148 unique_pds BIGINT, 149 scan_duration_ms BIGINT, 150 error_count INTEGER, 151 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 152 ); 153 154 CREATE TABLE IF NOT EXISTS scan_cursors ( 155 source TEXT PRIMARY KEY, 156 last_bundle_number INTEGER DEFAULT 0, 157 last_scan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 158 records_processed BIGINT DEFAULT 0 159 ); 160 161 -- Minimal dids table 162 CREATE TABLE IF NOT EXISTS dids ( 163 did TEXT PRIMARY KEY, 164 handle TEXT, 165 pds TEXT, 166 bundle_numbers JSONB NOT NULL DEFAULT '[]'::jsonb, 167 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 168 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 169 ); 170 171 CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers); 172 CREATE INDEX IF NOT EXISTS idx_dids_created_at ON dids(created_at); 173 CREATE INDEX IF NOT EXISTS idx_dids_handle ON dids(handle); 174 CREATE INDEX IF NOT EXISTS idx_dids_pds ON dids(pds); 175 176 -- PDS Repositories table 177 CREATE TABLE IF NOT EXISTS pds_repos ( 178 id BIGSERIAL PRIMARY KEY, 179 endpoint_id BIGINT NOT NULL, 180 did TEXT NOT NULL, 181 head TEXT, 182 rev TEXT, 183 active BOOLEAN DEFAULT true, 184 status TEXT, 185 first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 186 last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, 187 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, 188 FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE, 189 UNIQUE(endpoint_id, did) 190 ); 191 192 CREATE INDEX IF NOT EXISTS idx_pds_repos_endpoint ON pds_repos(endpoint_id); 193 CREATE INDEX IF NOT EXISTS idx_pds_repos_endpoint_id_desc ON pds_repos(endpoint_id, id DESC); 194 CREATE INDEX IF NOT EXISTS idx_pds_repos_did ON pds_repos(did); 195 CREATE INDEX IF NOT EXISTS idx_pds_repos_active ON pds_repos(active); 196 CREATE INDEX IF NOT EXISTS idx_pds_repos_status ON pds_repos(status); 197 CREATE INDEX IF NOT EXISTS idx_pds_repos_last_seen ON pds_repos(last_seen DESC); 198 ` 199 200 _, err := p.db.Exec(schema) 201 if err != nil { 202 return err 203 } 204 205 log.Info("✓ Database migrations completed successfully") 206 return nil 207} 208 209// ===== ENDPOINT OPERATIONS ===== 210 211func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error { 212 query := ` 213 INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at, valid) 214 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 215 ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET 216 last_checked = EXCLUDED.last_checked, 217 status = EXCLUDED.status, 218 ip = CASE 219 WHEN EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '' THEN EXCLUDED.ip 220 ELSE endpoints.ip 221 END, 222 ipv6 = CASE 223 WHEN EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '' THEN EXCLUDED.ipv6 224 ELSE endpoints.ipv6 225 END, 226 ip_resolved_at = CASE 227 WHEN (EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '') OR (EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '') THEN EXCLUDED.ip_resolved_at 228 ELSE endpoints.ip_resolved_at 229 END, 230 valid = EXCLUDED.valid, 231 updated_at = CURRENT_TIMESTAMP 232 RETURNING id 233 ` 234 err := p.db.QueryRowContext(ctx, query, 235 endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt, 236 endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt, endpoint.Valid).Scan(&endpoint.ID) 237 return err 238} 239 240func (p *PostgresDB) EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) { 241 query := "SELECT EXISTS(SELECT 1 FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2)" 242 var exists bool 243 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&exists) 244 return exists, err 245} 246 247func (p *PostgresDB) GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) { 248 query := "SELECT id FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2" 249 var id int64 250 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&id) 251 return id, err 252} 253 254func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) { 255 query := ` 256 SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status, 257 ip, ipv6, ip_resolved_at, valid, updated_at 258 FROM endpoints 259 WHERE endpoint = $1 AND endpoint_type = $2 260 ` 261 262 var ep Endpoint 263 var lastChecked, ipResolvedAt sql.NullTime 264 var ip, ipv6 sql.NullString 265 266 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan( 267 &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked, 268 &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.Valid, &ep.UpdatedAt, 269 ) 270 if err != nil { 271 return nil, err 272 } 273 274 if lastChecked.Valid { 275 ep.LastChecked = lastChecked.Time 276 } 277 if ip.Valid { 278 ep.IP = ip.String 279 } 280 if ipv6.Valid { 281 ep.IPv6 = ipv6.String 282 } 283 if ipResolvedAt.Valid { 284 ep.IPResolvedAt = ipResolvedAt.Time 285 } 286 287 return &ep, nil 288} 289 290func (p *PostgresDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) { 291 query := ` 292 SELECT DISTINCT ON (COALESCE(server_did, id::text)) 293 id, endpoint_type, endpoint, server_did, discovered_at, last_checked, status, 294 ip, ipv6, ip_resolved_at, valid, updated_at 295 FROM endpoints 296 WHERE 1=1 297 ` 298 args := []interface{}{} 299 argIdx := 1 300 301 if filter != nil { 302 if filter.Type != "" { 303 query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx) 304 args = append(args, filter.Type) 305 argIdx++ 306 } 307 308 // NEW: Filter by valid flag 309 if filter.OnlyValid { 310 query += fmt.Sprintf(" AND valid = true", argIdx) 311 } 312 if filter.Status != "" { 313 statusInt := EndpointStatusUnknown 314 switch filter.Status { 315 case "online": 316 statusInt = EndpointStatusOnline 317 case "offline": 318 statusInt = EndpointStatusOffline 319 } 320 query += fmt.Sprintf(" AND status = $%d", argIdx) 321 args = append(args, statusInt) 322 argIdx++ 323 } 324 325 // Filter for stale endpoints only 326 if filter.OnlyStale && filter.RecheckInterval > 0 { 327 cutoffTime := time.Now().UTC().Add(-filter.RecheckInterval) 328 query += fmt.Sprintf(" AND (last_checked IS NULL OR last_checked < $%d)", argIdx) 329 args = append(args, cutoffTime) 330 argIdx++ 331 } 332 } 333 334 // NEW: Choose ordering strategy 335 if filter != nil && filter.Random { 336 // For random selection, we need to wrap in a subquery 337 query = fmt.Sprintf(` 338 WITH filtered_endpoints AS ( 339 %s 340 ) 341 SELECT * FROM filtered_endpoints 342 ORDER BY RANDOM() 343 `, query) 344 } else { 345 // Original ordering for non-random queries 346 query += " ORDER BY COALESCE(server_did, id::text), discovered_at ASC" 347 } 348 349 if filter != nil && filter.Limit > 0 { 350 query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1) 351 args = append(args, filter.Limit, filter.Offset) 352 } 353 354 rows, err := p.db.QueryContext(ctx, query, args...) 355 if err != nil { 356 return nil, err 357 } 358 defer rows.Close() 359 360 var endpoints []*Endpoint 361 for rows.Next() { 362 var ep Endpoint 363 var lastChecked, ipResolvedAt sql.NullTime 364 var ip, ipv6, serverDID sql.NullString 365 366 err := rows.Scan( 367 &ep.ID, &ep.EndpointType, &ep.Endpoint, &serverDID, &ep.DiscoveredAt, &lastChecked, 368 &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.UpdatedAt, 369 ) 370 if err != nil { 371 return nil, err 372 } 373 374 if serverDID.Valid { 375 ep.ServerDID = serverDID.String 376 } 377 if lastChecked.Valid { 378 ep.LastChecked = lastChecked.Time 379 } 380 if ip.Valid { 381 ep.IP = ip.String 382 } 383 if ipv6.Valid { 384 ep.IPv6 = ipv6.String 385 } 386 if ipResolvedAt.Valid { 387 ep.IPResolvedAt = ipResolvedAt.Time 388 } 389 390 endpoints = append(endpoints, &ep) 391 } 392 393 return endpoints, rows.Err() 394} 395 396func (p *PostgresDB) UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error { 397 query := ` 398 UPDATE endpoints 399 SET status = $1, last_checked = $2, updated_at = $3 400 WHERE id = $4 401 ` 402 _, err := p.db.ExecContext(ctx, query, update.Status, update.LastChecked, time.Now().UTC(), endpointID) 403 return err 404} 405 406func (p *PostgresDB) UpdateEndpointIPs(ctx context.Context, endpointID int64, ipv4, ipv6 string, resolvedAt time.Time) error { 407 query := ` 408 UPDATE endpoints 409 SET ip = $1, ipv6 = $2, ip_resolved_at = $3, updated_at = $4 410 WHERE id = $5 411 ` 412 _, err := p.db.ExecContext(ctx, query, ipv4, ipv6, resolvedAt, time.Now().UTC(), endpointID) 413 return err 414} 415 416func (p *PostgresDB) UpdateEndpointServerDID(ctx context.Context, endpointID int64, serverDID string) error { 417 query := ` 418 UPDATE endpoints 419 SET server_did = $1, updated_at = $2 420 WHERE id = $3 421 ` 422 _, err := p.db.ExecContext(ctx, query, serverDID, time.Now().UTC(), endpointID) 423 return err 424} 425 426func (p *PostgresDB) GetDuplicateEndpoints(ctx context.Context) (map[string][]string, error) { 427 query := ` 428 SELECT server_did, array_agg(endpoint ORDER BY discovered_at ASC) as endpoints 429 FROM endpoints 430 WHERE server_did IS NOT NULL 431 AND server_did != '' 432 AND endpoint_type = 'pds' 433 GROUP BY server_did 434 HAVING COUNT(*) > 1 435 ORDER BY COUNT(*) DESC 436 ` 437 438 rows, err := p.db.QueryContext(ctx, query) 439 if err != nil { 440 return nil, err 441 } 442 defer rows.Close() 443 444 duplicates := make(map[string][]string) 445 for rows.Next() { 446 var serverDID string 447 var endpoints []string 448 449 err := rows.Scan(&serverDID, pq.Array(&endpoints)) 450 if err != nil { 451 return nil, err 452 } 453 454 duplicates[serverDID] = endpoints 455 } 456 457 return duplicates, rows.Err() 458} 459 460// ===== SCAN OPERATIONS ===== 461 462func (p *PostgresDB) SetScanRetention(retention int) { 463 p.scanRetention = retention 464} 465 466func (p *PostgresDB) SaveEndpointScan(ctx context.Context, scan *EndpointScan) error { 467 var scanDataJSON []byte 468 if scan.ScanData != nil { 469 scanDataJSON, _ = json.Marshal(scan.ScanData) 470 } 471 472 tx, err := p.db.BeginTx(ctx, nil) 473 if err != nil { 474 return err 475 } 476 defer tx.Rollback() 477 478 query := ` 479 INSERT INTO endpoint_scans (endpoint_id, status, response_time, user_count, version, used_ip, scan_data, scanned_at) 480 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 481 ` 482 _, err = tx.ExecContext(ctx, query, scan.EndpointID, scan.Status, scan.ResponseTime, scan.UserCount, scan.Version, scan.UsedIP, scanDataJSON, scan.ScannedAt) 483 if err != nil { 484 return err 485 } 486 487 // Use configured retention value 488 cleanupQuery := ` 489 DELETE FROM endpoint_scans 490 WHERE endpoint_id = $1 491 AND id NOT IN ( 492 SELECT id 493 FROM endpoint_scans 494 WHERE endpoint_id = $1 495 ORDER BY scanned_at DESC 496 LIMIT $2 497 ) 498 ` 499 _, err = tx.ExecContext(ctx, cleanupQuery, scan.EndpointID, p.scanRetention) 500 if err != nil { 501 return err 502 } 503 504 return tx.Commit() 505} 506 507func (p *PostgresDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) { 508 query := ` 509 SELECT id, endpoint_id, status, response_time, user_count, version, used_ip, scan_data, scanned_at 510 FROM endpoint_scans 511 WHERE endpoint_id = $1 512 ORDER BY scanned_at DESC 513 LIMIT $2 514 ` 515 516 rows, err := p.db.QueryContext(ctx, query, endpointID, limit) 517 if err != nil { 518 return nil, err 519 } 520 defer rows.Close() 521 522 var scans []*EndpointScan 523 for rows.Next() { 524 var scan EndpointScan 525 var responseTime sql.NullFloat64 526 var userCount sql.NullInt64 527 var version, usedIP sql.NullString 528 var scanDataJSON []byte 529 530 err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &userCount, &version, &usedIP, &scanDataJSON, &scan.ScannedAt) 531 if err != nil { 532 return nil, err 533 } 534 535 if responseTime.Valid { 536 scan.ResponseTime = responseTime.Float64 537 } 538 539 if userCount.Valid { 540 scan.UserCount = userCount.Int64 541 } 542 543 if version.Valid { 544 scan.Version = version.String 545 } 546 547 if usedIP.Valid { 548 scan.UsedIP = usedIP.String 549 } 550 551 if len(scanDataJSON) > 0 { 552 var scanData EndpointScanData 553 if err := json.Unmarshal(scanDataJSON, &scanData); err == nil { 554 scan.ScanData = &scanData 555 } 556 } 557 558 scans = append(scans, &scan) 559 } 560 561 return scans, rows.Err() 562} 563 564// ===== PDS VIRTUAL ENDPOINTS ===== 565 566func (p *PostgresDB) GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) { 567 query := ` 568 WITH unique_servers AS ( 569 SELECT DISTINCT ON (COALESCE(server_did, id::text)) 570 id, 571 endpoint, 572 server_did, 573 discovered_at, 574 last_checked, 575 status, 576 ip, 577 ipv6, 578 valid 579 FROM endpoints 580 WHERE endpoint_type = 'pds' 581 ORDER BY COALESCE(server_did, id::text), discovered_at ASC 582 ) 583 SELECT 584 e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6, e.valid, 585 latest.user_count, latest.response_time, latest.version, latest.scanned_at, 586 i.city, i.country, i.country_code, i.asn, i.asn_org, 587 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, 588 i.latitude, i.longitude 589 FROM unique_servers e 590 LEFT JOIN LATERAL ( 591 SELECT 592 user_count, 593 response_time, 594 version, 595 scanned_at 596 FROM endpoint_scans 597 WHERE endpoint_id = e.id AND status = 1 598 ORDER BY scanned_at DESC 599 LIMIT 1 600 ) latest ON true 601 LEFT JOIN ip_infos i ON e.ip = i.ip 602 WHERE 1=1 603 ` 604 605 args := []interface{}{} 606 argIdx := 1 607 608 if filter != nil { 609 if filter.Status != "" { 610 statusInt := EndpointStatusUnknown 611 switch filter.Status { 612 case "online": 613 statusInt = EndpointStatusOnline 614 case "offline": 615 statusInt = EndpointStatusOffline 616 } 617 query += fmt.Sprintf(" AND e.status = $%d", argIdx) 618 args = append(args, statusInt) 619 argIdx++ 620 } 621 622 if filter.MinUserCount > 0 { 623 query += fmt.Sprintf(" AND latest.user_count >= $%d", argIdx) 624 args = append(args, filter.MinUserCount) 625 argIdx++ 626 } 627 } 628 629 query += " ORDER BY latest.user_count DESC NULLS LAST" 630 631 if filter != nil && filter.Limit > 0 { 632 query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1) 633 args = append(args, filter.Limit, filter.Offset) 634 } 635 636 rows, err := p.db.QueryContext(ctx, query, args...) 637 if err != nil { 638 return nil, err 639 } 640 defer rows.Close() 641 642 var items []*PDSListItem 643 for rows.Next() { 644 item := &PDSListItem{} 645 var ip, ipv6, serverDID, city, country, countryCode, asnOrg sql.NullString 646 var asn sql.NullInt32 647 var isDatacenter, isVPN, isCrawler, isTor, isProxy sql.NullBool 648 var lat, lon sql.NullFloat64 649 var userCount sql.NullInt32 650 var responseTime sql.NullFloat64 651 var version sql.NullString 652 var scannedAt sql.NullTime 653 654 err := rows.Scan( 655 &item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6, &item.Valid, 656 &userCount, &responseTime, &version, &scannedAt, 657 &city, &country, &countryCode, &asn, &asnOrg, 658 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, 659 &lat, &lon, 660 ) 661 if err != nil { 662 return nil, err 663 } 664 665 if ip.Valid { 666 item.IP = ip.String 667 } 668 if ipv6.Valid { 669 item.IPv6 = ipv6.String 670 } 671 if serverDID.Valid { 672 item.ServerDID = serverDID.String 673 } 674 675 // Add latest scan data if available 676 if userCount.Valid { 677 item.LatestScan = &struct { 678 UserCount int 679 ResponseTime float64 680 Version string 681 ScannedAt time.Time 682 }{ 683 UserCount: int(userCount.Int32), 684 ResponseTime: responseTime.Float64, 685 Version: version.String, 686 ScannedAt: scannedAt.Time, 687 } 688 } 689 690 // Add IP info if available 691 if city.Valid || country.Valid { 692 item.IPInfo = &IPInfo{ 693 IP: ip.String, 694 City: city.String, 695 Country: country.String, 696 CountryCode: countryCode.String, 697 ASN: int(asn.Int32), 698 ASNOrg: asnOrg.String, 699 IsDatacenter: isDatacenter.Bool, 700 IsVPN: isVPN.Bool, 701 IsCrawler: isCrawler.Bool, 702 IsTor: isTor.Bool, 703 IsProxy: isProxy.Bool, 704 Latitude: float32(lat.Float64), 705 Longitude: float32(lon.Float64), 706 } 707 } 708 709 items = append(items, item) 710 } 711 712 return items, rows.Err() 713} 714 715func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) { 716 query := ` 717 WITH target_endpoint AS MATERIALIZED ( 718 SELECT 719 e.id, 720 e.endpoint, 721 e.server_did, 722 e.discovered_at, 723 e.last_checked, 724 e.status, 725 e.ip, 726 e.ipv6, 727 e.valid 728 FROM endpoints e 729 WHERE e.endpoint = $1 730 AND e.endpoint_type = 'pds' 731 LIMIT 1 732 ) 733 SELECT 734 te.id, 735 te.endpoint, 736 te.server_did, 737 te.discovered_at, 738 te.last_checked, 739 te.status, 740 te.ip, 741 te.ipv6, 742 te.valid, 743 latest.user_count, 744 latest.response_time, 745 latest.version, 746 latest.scan_data->'metadata'->'server_info' as server_info, 747 latest.scanned_at, 748 i.city, i.country, i.country_code, i.asn, i.asn_org, 749 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy, 750 i.latitude, i.longitude, 751 i.raw_data, 752 COALESCE( 753 ARRAY( 754 SELECT e2.endpoint 755 FROM endpoints e2 756 WHERE e2.server_did = te.server_did 757 AND e2.endpoint_type = 'pds' 758 AND e2.endpoint != te.endpoint 759 AND te.server_did IS NOT NULL 760 ORDER BY e2.discovered_at 761 ), 762 ARRAY[]::text[] 763 ) as aliases, 764 CASE 765 WHEN te.server_did IS NOT NULL THEN ( 766 SELECT MIN(e3.discovered_at) 767 FROM endpoints e3 768 WHERE e3.server_did = te.server_did 769 AND e3.endpoint_type = 'pds' 770 ) 771 ELSE NULL 772 END as first_discovered_at 773 FROM target_endpoint te 774 LEFT JOIN LATERAL ( 775 SELECT 776 es.scan_data, 777 es.response_time, 778 es.version, 779 es.scanned_at, 780 es.user_count 781 FROM endpoint_scans es 782 WHERE es.endpoint_id = te.id 783 ORDER BY es.scanned_at DESC 784 LIMIT 1 785 ) latest ON true 786 LEFT JOIN ip_infos i ON te.ip = i.ip; 787 ` 788 789 detail := &PDSDetail{} 790 var ip, ipv6, city, country, countryCode, asnOrg, serverDID sql.NullString 791 var asn sql.NullInt32 792 var isDatacenter, isVPN, isCrawler, isTor, isProxy sql.NullBool 793 var lat, lon sql.NullFloat64 794 var userCount sql.NullInt32 795 var responseTime sql.NullFloat64 796 var version sql.NullString 797 var serverInfoJSON []byte 798 var scannedAt sql.NullTime 799 var rawDataJSON []byte 800 var aliases []string 801 var firstDiscoveredAt sql.NullTime 802 803 err := p.db.QueryRowContext(ctx, query, endpoint).Scan( 804 &detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6, &detail.Valid, 805 &userCount, &responseTime, &version, &serverInfoJSON, &scannedAt, 806 &city, &country, &countryCode, &asn, &asnOrg, 807 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy, 808 &lat, &lon, 809 &rawDataJSON, 810 pq.Array(&aliases), 811 &firstDiscoveredAt, 812 ) 813 if err != nil { 814 return nil, err 815 } 816 817 if ip.Valid { 818 detail.IP = ip.String 819 } 820 if ipv6.Valid { 821 detail.IPv6 = ipv6.String 822 } 823 824 if serverDID.Valid { 825 detail.ServerDID = serverDID.String 826 } 827 828 // Set aliases and is_primary 829 detail.Aliases = aliases 830 if serverDID.Valid && serverDID.String != "" && firstDiscoveredAt.Valid { 831 detail.IsPrimary = detail.DiscoveredAt.Equal(firstDiscoveredAt.Time) || 832 detail.DiscoveredAt.Before(firstDiscoveredAt.Time) 833 } else { 834 detail.IsPrimary = true 835 } 836 837 // Parse latest scan data 838 if userCount.Valid { 839 var serverInfo interface{} 840 if len(serverInfoJSON) > 0 { 841 json.Unmarshal(serverInfoJSON, &serverInfo) 842 } 843 844 detail.LatestScan = &struct { 845 UserCount int 846 ResponseTime float64 847 Version string 848 ServerInfo interface{} 849 ScannedAt time.Time 850 }{ 851 UserCount: int(userCount.Int32), 852 ResponseTime: responseTime.Float64, 853 Version: version.String, 854 ServerInfo: serverInfo, 855 ScannedAt: scannedAt.Time, 856 } 857 } 858 859 // Parse IP info with all fields 860 if city.Valid || country.Valid { 861 detail.IPInfo = &IPInfo{ 862 IP: ip.String, 863 City: city.String, 864 Country: country.String, 865 CountryCode: countryCode.String, 866 ASN: int(asn.Int32), 867 ASNOrg: asnOrg.String, 868 IsDatacenter: isDatacenter.Bool, 869 IsVPN: isVPN.Bool, 870 IsCrawler: isCrawler.Bool, 871 IsTor: isTor.Bool, 872 IsProxy: isProxy.Bool, 873 Latitude: float32(lat.Float64), 874 Longitude: float32(lon.Float64), 875 } 876 877 if len(rawDataJSON) > 0 { 878 json.Unmarshal(rawDataJSON, &detail.IPInfo.RawData) 879 } 880 } 881 882 return detail, nil 883} 884 885func (p *PostgresDB) GetPDSStats(ctx context.Context) (*PDSStats, error) { 886 query := ` 887 WITH unique_servers AS ( 888 SELECT DISTINCT ON (COALESCE(server_did, id::text)) 889 id, 890 COALESCE(server_did, id::text) as server_identity, 891 status 892 FROM endpoints 893 WHERE endpoint_type = 'pds' 894 ORDER BY COALESCE(server_did, id::text), discovered_at ASC 895 ), 896 latest_scans AS ( 897 SELECT DISTINCT ON (us.id) 898 us.id, 899 es.user_count, 900 us.status 901 FROM unique_servers us 902 LEFT JOIN endpoint_scans es ON us.id = es.endpoint_id 903 ORDER BY us.id, es.scanned_at DESC 904 ) 905 SELECT 906 COUNT(*) as total, 907 SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online, 908 SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline, 909 SUM(COALESCE(user_count, 0)) as total_users 910 FROM latest_scans 911 ` 912 913 stats := &PDSStats{} 914 err := p.db.QueryRowContext(ctx, query).Scan( 915 &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints, &stats.TotalDIDs, 916 ) 917 918 return stats, err 919} 920 921func (p *PostgresDB) GetEndpointStats(ctx context.Context) (*EndpointStats, error) { 922 query := ` 923 SELECT 924 COUNT(*) as total_endpoints, 925 SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_endpoints, 926 SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_endpoints 927 FROM endpoints 928 ` 929 930 var stats EndpointStats 931 err := p.db.QueryRowContext(ctx, query).Scan( 932 &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints, 933 ) 934 if err != nil { 935 return nil, err 936 } 937 938 // Get average response time from recent scans 939 avgQuery := ` 940 SELECT AVG(response_time) 941 FROM endpoint_scans 942 WHERE response_time > 0 AND scanned_at > NOW() - INTERVAL '1 hour' 943 ` 944 var avgResponseTime sql.NullFloat64 945 _ = p.db.QueryRowContext(ctx, avgQuery).Scan(&avgResponseTime) 946 if avgResponseTime.Valid { 947 stats.AvgResponseTime = avgResponseTime.Float64 948 } 949 950 // Get counts by type 951 typeQuery := ` 952 SELECT endpoint_type, COUNT(*) 953 FROM endpoints 954 GROUP BY endpoint_type 955 ` 956 rows, err := p.db.QueryContext(ctx, typeQuery) 957 if err == nil { 958 defer rows.Close() 959 stats.ByType = make(map[string]int64) 960 for rows.Next() { 961 var typ string 962 var count int64 963 if err := rows.Scan(&typ, &count); err == nil { 964 stats.ByType[typ] = count 965 } 966 } 967 } 968 969 // Get total DIDs from latest PDS scans 970 didQuery := ` 971 WITH unique_servers AS ( 972 SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text)) 973 e.id 974 FROM endpoints e 975 WHERE e.endpoint_type = 'pds' 976 ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC 977 ), 978 latest_pds_scans AS ( 979 SELECT DISTINCT ON (us.id) 980 us.id, 981 es.user_count 982 FROM unique_servers us 983 LEFT JOIN endpoint_scans es ON us.id = es.endpoint_id 984 ORDER BY us.id, es.scanned_at DESC 985 ) 986 SELECT SUM(user_count) FROM latest_pds_scans 987 ` 988 var totalDIDs sql.NullInt64 989 _ = p.db.QueryRowContext(ctx, didQuery).Scan(&totalDIDs) 990 if totalDIDs.Valid { 991 stats.TotalDIDs = totalDIDs.Int64 992 } 993 994 return &stats, err 995} 996 997// ===== IP INFO OPERATIONS ===== 998 999func (p *PostgresDB) UpsertIPInfo(ctx context.Context, ip string, ipInfo map[string]interface{}) error { 1000 rawDataJSON, _ := json.Marshal(ipInfo) 1001 1002 // Extract fields from ipInfo map 1003 city := extractString(ipInfo, "location", "city") 1004 country := extractString(ipInfo, "location", "country") 1005 countryCode := extractString(ipInfo, "location", "country_code") 1006 asn := extractInt(ipInfo, "asn", "asn") 1007 asnOrg := extractString(ipInfo, "asn", "org") 1008 1009 // Extract top-level boolean flags 1010 isDatacenter := false 1011 if val, ok := ipInfo["is_datacenter"].(bool); ok { 1012 isDatacenter = val 1013 } 1014 1015 isVPN := false 1016 if val, ok := ipInfo["is_vpn"].(bool); ok { 1017 isVPN = val 1018 } 1019 1020 isCrawler := false 1021 if val, ok := ipInfo["is_crawler"].(bool); ok { 1022 isCrawler = val 1023 } 1024 1025 isTor := false 1026 if val, ok := ipInfo["is_tor"].(bool); ok { 1027 isTor = val 1028 } 1029 1030 isProxy := false 1031 if val, ok := ipInfo["is_proxy"].(bool); ok { 1032 isProxy = val 1033 } 1034 1035 lat := extractFloat(ipInfo, "location", "latitude") 1036 lon := extractFloat(ipInfo, "location", "longitude") 1037 1038 query := ` 1039 INSERT INTO ip_infos (ip, city, country, country_code, asn, asn_org, is_datacenter, is_vpn, is_crawler, is_tor, is_proxy, latitude, longitude, raw_data, fetched_at) 1040 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15) 1041 ON CONFLICT(ip) DO UPDATE SET 1042 city = EXCLUDED.city, 1043 country = EXCLUDED.country, 1044 country_code = EXCLUDED.country_code, 1045 asn = EXCLUDED.asn, 1046 asn_org = EXCLUDED.asn_org, 1047 is_datacenter = EXCLUDED.is_datacenter, 1048 is_vpn = EXCLUDED.is_vpn, 1049 is_crawler = EXCLUDED.is_crawler, 1050 is_tor = EXCLUDED.is_tor, 1051 is_proxy = EXCLUDED.is_proxy, 1052 latitude = EXCLUDED.latitude, 1053 longitude = EXCLUDED.longitude, 1054 raw_data = EXCLUDED.raw_data, 1055 fetched_at = EXCLUDED.fetched_at, 1056 updated_at = CURRENT_TIMESTAMP 1057 ` 1058 _, err := p.db.ExecContext(ctx, query, ip, city, country, countryCode, asn, asnOrg, isDatacenter, isVPN, isCrawler, isTor, isProxy, lat, lon, rawDataJSON, time.Now().UTC()) 1059 return err 1060} 1061 1062func (p *PostgresDB) GetIPInfo(ctx context.Context, ip string) (*IPInfo, error) { 1063 query := ` 1064 SELECT ip, city, country, country_code, asn, asn_org, is_datacenter, is_vpn, is_crawler, is_tor, is_proxy, 1065 latitude, longitude, raw_data, fetched_at, updated_at 1066 FROM ip_infos 1067 WHERE ip = $1 1068 ` 1069 1070 info := &IPInfo{} 1071 var rawDataJSON []byte 1072 1073 err := p.db.QueryRowContext(ctx, query, ip).Scan( 1074 &info.IP, &info.City, &info.Country, &info.CountryCode, &info.ASN, &info.ASNOrg, 1075 &info.IsDatacenter, &info.IsVPN, &info.IsCrawler, &info.IsTor, &info.IsProxy, 1076 &info.Latitude, &info.Longitude, 1077 &rawDataJSON, &info.FetchedAt, &info.UpdatedAt, 1078 ) 1079 if err != nil { 1080 return nil, err 1081 } 1082 1083 if len(rawDataJSON) > 0 { 1084 json.Unmarshal(rawDataJSON, &info.RawData) 1085 } 1086 1087 return info, nil 1088} 1089 1090func (p *PostgresDB) ShouldUpdateIPInfo(ctx context.Context, ip string) (bool, bool, error) { 1091 query := `SELECT fetched_at FROM ip_infos WHERE ip = $1` 1092 1093 var fetchedAt time.Time 1094 err := p.db.QueryRowContext(ctx, query, ip).Scan(&fetchedAt) 1095 if err == sql.ErrNoRows { 1096 return false, true, nil // Doesn't exist, needs update 1097 } 1098 if err != nil { 1099 return false, false, err 1100 } 1101 1102 // Check if older than 30 days 1103 needsUpdate := time.Since(fetchedAt) > 30*24*time.Hour 1104 return true, needsUpdate, nil 1105} 1106 1107// ===== HELPER FUNCTIONS ===== 1108 1109func extractString(data map[string]interface{}, keys ...string) string { 1110 current := data 1111 for i, key := range keys { 1112 if i == len(keys)-1 { 1113 if val, ok := current[key].(string); ok { 1114 return val 1115 } 1116 return "" 1117 } 1118 if nested, ok := current[key].(map[string]interface{}); ok { 1119 current = nested 1120 } else { 1121 return "" 1122 } 1123 } 1124 return "" 1125} 1126 1127func extractInt(data map[string]interface{}, keys ...string) int { 1128 current := data 1129 for i, key := range keys { 1130 if i == len(keys)-1 { 1131 if val, ok := current[key].(float64); ok { 1132 return int(val) 1133 } 1134 if val, ok := current[key].(int); ok { 1135 return val 1136 } 1137 return 0 1138 } 1139 if nested, ok := current[key].(map[string]interface{}); ok { 1140 current = nested 1141 } else { 1142 return 0 1143 } 1144 } 1145 return 0 1146} 1147 1148func extractFloat(data map[string]interface{}, keys ...string) float32 { 1149 current := data 1150 for i, key := range keys { 1151 if i == len(keys)-1 { 1152 if val, ok := current[key].(float64); ok { 1153 return float32(val) 1154 } 1155 return 0 1156 } 1157 if nested, ok := current[key].(map[string]interface{}); ok { 1158 current = nested 1159 } else { 1160 return 0 1161 } 1162 } 1163 return 0 1164} 1165 1166// ===== CURSOR OPERATIONS ===== 1167 1168func (p *PostgresDB) GetScanCursor(ctx context.Context, source string) (*ScanCursor, error) { 1169 query := "SELECT source, last_bundle_number, last_scan_time, records_processed FROM scan_cursors WHERE source = $1" 1170 1171 var cursor ScanCursor 1172 err := p.db.QueryRowContext(ctx, query, source).Scan( 1173 &cursor.Source, &cursor.LastBundleNumber, &cursor.LastScanTime, &cursor.RecordsProcessed, 1174 ) 1175 if err == sql.ErrNoRows { 1176 return &ScanCursor{ 1177 Source: source, 1178 LastBundleNumber: 0, 1179 LastScanTime: time.Time{}, 1180 }, nil 1181 } 1182 return &cursor, err 1183} 1184 1185func (p *PostgresDB) UpdateScanCursor(ctx context.Context, cursor *ScanCursor) error { 1186 query := ` 1187 INSERT INTO scan_cursors (source, last_bundle_number, last_scan_time, records_processed) 1188 VALUES ($1, $2, $3, $4) 1189 ON CONFLICT(source) DO UPDATE SET 1190 last_bundle_number = EXCLUDED.last_bundle_number, 1191 last_scan_time = EXCLUDED.last_scan_time, 1192 records_processed = EXCLUDED.records_processed 1193 ` 1194 _, err := p.db.ExecContext(ctx, query, cursor.Source, cursor.LastBundleNumber, cursor.LastScanTime, cursor.RecordsProcessed) 1195 return err 1196} 1197 1198// ===== METRICS OPERATIONS ===== 1199 1200func (p *PostgresDB) StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error { 1201 query := ` 1202 INSERT INTO plc_metrics (total_dids, total_pds, unique_pds, scan_duration_ms, error_count) 1203 VALUES ($1, $2, $3, $4, $5) 1204 ` 1205 _, err := p.db.ExecContext(ctx, query, metrics.TotalDIDs, metrics.TotalPDS, 1206 metrics.UniquePDS, metrics.ScanDuration, metrics.ErrorCount) 1207 return err 1208} 1209 1210func (p *PostgresDB) GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error) { 1211 query := ` 1212 SELECT total_dids, total_pds, unique_pds, scan_duration_ms, error_count, created_at 1213 FROM plc_metrics 1214 ORDER BY created_at DESC 1215 LIMIT $1 1216 ` 1217 1218 rows, err := p.db.QueryContext(ctx, query, limit) 1219 if err != nil { 1220 return nil, err 1221 } 1222 defer rows.Close() 1223 1224 var metrics []*PLCMetrics 1225 for rows.Next() { 1226 var m PLCMetrics 1227 if err := rows.Scan(&m.TotalDIDs, &m.TotalPDS, &m.UniquePDS, &m.ScanDuration, &m.ErrorCount, &m.LastScanTime); err != nil { 1228 return nil, err 1229 } 1230 metrics = append(metrics, &m) 1231 } 1232 1233 return metrics, rows.Err() 1234} 1235 1236// ===== DID OPERATIONS ===== 1237 1238func (p *PostgresDB) UpsertDID(ctx context.Context, did string, bundleNum int, handle, pds string) error { 1239 query := ` 1240 INSERT INTO dids (did, handle, pds, bundle_numbers, created_at) 1241 VALUES ($1, $2, $3, jsonb_build_array($4::integer), CURRENT_TIMESTAMP) 1242 ON CONFLICT(did) DO UPDATE SET 1243 handle = EXCLUDED.handle, 1244 pds = EXCLUDED.pds, 1245 bundle_numbers = CASE 1246 WHEN dids.bundle_numbers @> jsonb_build_array($4::integer) THEN dids.bundle_numbers 1247 ELSE dids.bundle_numbers || jsonb_build_array($4::integer) 1248 END, 1249 updated_at = CURRENT_TIMESTAMP 1250 ` 1251 _, err := p.db.ExecContext(ctx, query, did, handle, pds, bundleNum) 1252 return err 1253} 1254 1255// UpsertDIDFromMempool creates/updates DID record without adding to bundle_numbers 1256func (p *PostgresDB) UpsertDIDFromMempool(ctx context.Context, did string, handle, pds string) error { 1257 query := ` 1258 INSERT INTO dids (did, handle, pds, bundle_numbers, created_at) 1259 VALUES ($1, $2, $3, '[]'::jsonb, CURRENT_TIMESTAMP) 1260 ON CONFLICT(did) DO UPDATE SET 1261 handle = EXCLUDED.handle, 1262 pds = EXCLUDED.pds, 1263 updated_at = CURRENT_TIMESTAMP 1264 ` 1265 _, err := p.db.ExecContext(ctx, query, did, handle, pds) 1266 return err 1267} 1268 1269func (p *PostgresDB) GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) { 1270 query := ` 1271 SELECT did, handle, pds, bundle_numbers, created_at 1272 FROM dids 1273 WHERE did = $1 1274 ` 1275 1276 var record DIDRecord 1277 var bundleNumbersJSON []byte 1278 var handle, pds sql.NullString 1279 1280 err := p.db.QueryRowContext(ctx, query, did).Scan( 1281 &record.DID, 1282 &handle, 1283 &pds, 1284 &bundleNumbersJSON, 1285 &record.CreatedAt, 1286 ) 1287 if err != nil { 1288 return nil, err 1289 } 1290 1291 if handle.Valid { 1292 record.Handle = handle.String 1293 } 1294 if pds.Valid { 1295 record.CurrentPDS = pds.String 1296 } 1297 1298 if err := json.Unmarshal(bundleNumbersJSON, &record.BundleNumbers); err != nil { 1299 return nil, err 1300 } 1301 1302 return &record, nil 1303} 1304 1305func (p *PostgresDB) GetDIDByHandle(ctx context.Context, handle string) (*DIDRecord, error) { 1306 query := ` 1307 SELECT did, handle, pds, bundle_numbers, created_at 1308 FROM dids 1309 WHERE handle = $1 1310 ` 1311 1312 var record DIDRecord 1313 var bundleNumbersJSON []byte 1314 var recordHandle, pds sql.NullString 1315 1316 err := p.db.QueryRowContext(ctx, query, handle).Scan( 1317 &record.DID, 1318 &recordHandle, 1319 &pds, 1320 &bundleNumbersJSON, 1321 &record.CreatedAt, 1322 ) 1323 if err != nil { 1324 return nil, err 1325 } 1326 1327 if recordHandle.Valid { 1328 record.Handle = recordHandle.String 1329 } 1330 if pds.Valid { 1331 record.CurrentPDS = pds.String 1332 } 1333 1334 if err := json.Unmarshal(bundleNumbersJSON, &record.BundleNumbers); err != nil { 1335 return nil, err 1336 } 1337 1338 return &record, nil 1339} 1340 1341// GetGlobalDIDInfo retrieves consolidated DID info from 'dids' and 'pds_repos' 1342func (p *PostgresDB) GetGlobalDIDInfo(ctx context.Context, did string) (*GlobalDIDInfo, error) { 1343 query := ` 1344 WITH primary_endpoints AS ( 1345 SELECT DISTINCT ON (COALESCE(server_did, id::text)) 1346 id 1347 FROM endpoints 1348 WHERE endpoint_type = 'pds' 1349 ORDER BY COALESCE(server_did, id::text), discovered_at ASC 1350 ) 1351 SELECT 1352 d.did, 1353 d.handle, 1354 d.pds, 1355 d.bundle_numbers, 1356 d.created_at, 1357 COALESCE( 1358 jsonb_agg( 1359 jsonb_build_object( 1360 'id', pr.id, 1361 'endpoint_id', pr.endpoint_id, 1362 'endpoint', e.endpoint, 1363 'did', pr.did, 1364 'head', pr.head, 1365 'rev', pr.rev, 1366 'active', pr.active, 1367 'status', pr.status, 1368 'first_seen', pr.first_seen AT TIME ZONE 'UTC', 1369 'last_seen', pr.last_seen AT TIME ZONE 'UTC', 1370 'updated_at', pr.updated_at AT TIME ZONE 'UTC' 1371 ) 1372 ORDER BY pr.last_seen DESC 1373 ) FILTER ( 1374 WHERE pr.id IS NOT NULL AND pe.id IS NOT NULL 1375 ), 1376 '[]'::jsonb 1377 ) AS hosting_on 1378 FROM 1379 dids d 1380 LEFT JOIN 1381 pds_repos pr ON d.did = pr.did 1382 LEFT JOIN 1383 endpoints e ON pr.endpoint_id = e.id 1384 LEFT JOIN 1385 primary_endpoints pe ON pr.endpoint_id = pe.id 1386 WHERE 1387 d.did = $1 1388 GROUP BY 1389 d.did, d.handle, d.pds, d.bundle_numbers, d.created_at 1390 ` 1391 1392 var info GlobalDIDInfo 1393 var bundleNumbersJSON []byte 1394 var hostingOnJSON []byte 1395 var handle, pds sql.NullString 1396 1397 err := p.db.QueryRowContext(ctx, query, did).Scan( 1398 &info.DID, 1399 &handle, 1400 &pds, 1401 &bundleNumbersJSON, 1402 &info.CreatedAt, 1403 &hostingOnJSON, 1404 ) 1405 if err != nil { 1406 return nil, err 1407 } 1408 1409 if handle.Valid { 1410 info.Handle = handle.String 1411 } 1412 if pds.Valid { 1413 info.CurrentPDS = pds.String 1414 } 1415 1416 if err := json.Unmarshal(bundleNumbersJSON, &info.BundleNumbers); err != nil { 1417 return nil, fmt.Errorf("failed to unmarshal bundle_numbers: %w", err) 1418 } 1419 1420 if err := json.Unmarshal(hostingOnJSON, &info.HostingOn); err != nil { 1421 return nil, fmt.Errorf("failed to unmarshal hosting_on: %w", err) 1422 } 1423 1424 return &info, nil 1425} 1426 1427func (p *PostgresDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error { 1428 if len(dids) == 0 { 1429 return nil 1430 } 1431 1432 // Acquire a connection from the pool 1433 conn, err := p.pool.Acquire(ctx) 1434 if err != nil { 1435 return err 1436 } 1437 defer conn.Release() 1438 1439 // Start transaction 1440 tx, err := conn.Begin(ctx) 1441 if err != nil { 1442 return err 1443 } 1444 defer tx.Rollback(ctx) 1445 1446 // Create temporary table 1447 _, err = tx.Exec(ctx, ` 1448 CREATE TEMP TABLE temp_dids (did TEXT PRIMARY KEY) ON COMMIT DROP 1449 `) 1450 if err != nil { 1451 return err 1452 } 1453 1454 // Use COPY for blazing fast bulk insert 1455 _, err = tx.Conn().CopyFrom( 1456 ctx, 1457 pgx.Identifier{"temp_dids"}, 1458 []string{"did"}, 1459 pgx.CopyFromSlice(len(dids), func(i int) ([]interface{}, error) { 1460 return []interface{}{dids[i]}, nil 1461 }), 1462 ) 1463 if err != nil { 1464 return err 1465 } 1466 1467 // Step 1: Insert new DIDs 1468 _, err = tx.Exec(ctx, ` 1469 INSERT INTO dids (did, bundle_numbers, created_at) 1470 SELECT td.did, $1::jsonb, CURRENT_TIMESTAMP 1471 FROM temp_dids td 1472 WHERE NOT EXISTS (SELECT 1 FROM dids WHERE dids.did = td.did) 1473 `, fmt.Sprintf("[%d]", bundleNum)) 1474 1475 if err != nil { 1476 return err 1477 } 1478 1479 // Step 2: Update existing DIDs 1480 _, err = tx.Exec(ctx, ` 1481 UPDATE dids 1482 SET bundle_numbers = bundle_numbers || $1::jsonb 1483 FROM temp_dids 1484 WHERE dids.did = temp_dids.did 1485 AND NOT (bundle_numbers @> $1::jsonb) 1486 `, fmt.Sprintf("[%d]", bundleNum)) 1487 1488 if err != nil { 1489 return err 1490 } 1491 1492 return tx.Commit(ctx) 1493} 1494 1495func (p *PostgresDB) GetTotalDIDCount(ctx context.Context) (int64, error) { 1496 query := "SELECT COUNT(*) FROM dids" 1497 var count int64 1498 err := p.db.QueryRowContext(ctx, query).Scan(&count) 1499 return count, err 1500} 1501 1502func (p *PostgresDB) GetCountryLeaderboard(ctx context.Context) ([]*CountryStats, error) { 1503 query := ` 1504 WITH unique_servers AS ( 1505 SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text)) 1506 e.id, 1507 e.ip, 1508 e.status 1509 FROM endpoints e 1510 WHERE e.endpoint_type = 'pds' 1511 ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC 1512 ), 1513 pds_by_country AS ( 1514 SELECT 1515 i.country, 1516 i.country_code, 1517 COUNT(DISTINCT us.id) as active_pds_count, 1518 SUM(latest.user_count) as total_users, 1519 AVG(latest.response_time) as avg_response_time 1520 FROM unique_servers us 1521 JOIN ip_infos i ON us.ip = i.ip 1522 LEFT JOIN LATERAL ( 1523 SELECT user_count, response_time 1524 FROM endpoint_scans 1525 WHERE endpoint_id = us.id 1526 ORDER BY scanned_at DESC 1527 LIMIT 1 1528 ) latest ON true 1529 WHERE us.status = 1 1530 AND i.country IS NOT NULL 1531 AND i.country != '' 1532 GROUP BY i.country, i.country_code 1533 ), 1534 totals AS ( 1535 SELECT 1536 SUM(active_pds_count) as total_pds, 1537 SUM(total_users) as total_users_global 1538 FROM pds_by_country 1539 ) 1540 SELECT 1541 pbc.country, 1542 pbc.country_code, 1543 pbc.active_pds_count, 1544 ROUND((pbc.active_pds_count * 100.0 / NULLIF(t.total_pds, 0))::numeric, 2) as pds_percentage, 1545 COALESCE(pbc.total_users, 0) as total_users, 1546 ROUND((COALESCE(pbc.total_users, 0) * 100.0 / NULLIF(t.total_users_global, 0))::numeric, 2) as users_percentage, 1547 ROUND(COALESCE(pbc.avg_response_time, 0)::numeric, 2) as avg_response_time_ms 1548 FROM pds_by_country pbc 1549 CROSS JOIN totals t 1550 ORDER BY pbc.active_pds_count DESC 1551 ` 1552 1553 rows, err := p.db.QueryContext(ctx, query) 1554 if err != nil { 1555 return nil, err 1556 } 1557 defer rows.Close() 1558 1559 var stats []*CountryStats 1560 for rows.Next() { 1561 var s CountryStats 1562 var pdsPercentage, usersPercentage sql.NullFloat64 1563 1564 err := rows.Scan( 1565 &s.Country, 1566 &s.CountryCode, 1567 &s.ActivePDSCount, 1568 &pdsPercentage, 1569 &s.TotalUsers, 1570 &usersPercentage, 1571 &s.AvgResponseTimeMS, 1572 ) 1573 if err != nil { 1574 return nil, err 1575 } 1576 1577 if pdsPercentage.Valid { 1578 s.PDSPercentage = pdsPercentage.Float64 1579 } 1580 if usersPercentage.Valid { 1581 s.UsersPercentage = usersPercentage.Float64 1582 } 1583 1584 stats = append(stats, &s) 1585 } 1586 1587 return stats, rows.Err() 1588} 1589 1590func (p *PostgresDB) GetVersionStats(ctx context.Context) ([]*VersionStats, error) { 1591 query := ` 1592 WITH unique_servers AS ( 1593 SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text)) 1594 e.id 1595 FROM endpoints e 1596 WHERE e.endpoint_type = 'pds' 1597 AND e.status = 1 1598 ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC 1599 ), 1600 latest_scans AS ( 1601 SELECT DISTINCT ON (us.id) 1602 us.id, 1603 es.version, 1604 es.user_count, 1605 es.scanned_at 1606 FROM unique_servers us 1607 JOIN endpoint_scans es ON us.id = es.endpoint_id 1608 WHERE es.version IS NOT NULL 1609 AND es.version != '' 1610 ORDER BY us.id, es.scanned_at DESC 1611 ), 1612 version_groups AS ( 1613 SELECT 1614 version, 1615 COUNT(*) as pds_count, 1616 SUM(user_count) as total_users, 1617 MIN(scanned_at) as first_seen, 1618 MAX(scanned_at) as last_seen 1619 FROM latest_scans 1620 GROUP BY version 1621 ), 1622 totals AS ( 1623 SELECT 1624 SUM(pds_count) as total_pds, 1625 SUM(total_users) as total_users_global 1626 FROM version_groups 1627 ) 1628 SELECT 1629 vg.version, 1630 vg.pds_count, 1631 (vg.pds_count * 100.0 / NULLIF(t.total_pds, 0))::numeric as percentage, 1632 COALESCE(vg.total_users, 0) as total_users, 1633 (COALESCE(vg.total_users, 0) * 100.0 / NULLIF(t.total_users_global, 0))::numeric as users_percentage, 1634 vg.first_seen, 1635 vg.last_seen 1636 FROM version_groups vg 1637 CROSS JOIN totals t 1638 ORDER BY vg.pds_count DESC 1639 ` 1640 1641 rows, err := p.db.QueryContext(ctx, query) 1642 if err != nil { 1643 return nil, err 1644 } 1645 defer rows.Close() 1646 1647 var stats []*VersionStats 1648 for rows.Next() { 1649 var s VersionStats 1650 var percentage, usersPercentage sql.NullFloat64 1651 1652 err := rows.Scan( 1653 &s.Version, 1654 &s.PDSCount, 1655 &percentage, 1656 &s.TotalUsers, 1657 &usersPercentage, 1658 &s.FirstSeen, 1659 &s.LastSeen, 1660 ) 1661 if err != nil { 1662 return nil, err 1663 } 1664 1665 if percentage.Valid { 1666 s.Percentage = percentage.Float64 1667 s.PercentageText = formatPercentage(percentage.Float64) 1668 } 1669 if usersPercentage.Valid { 1670 s.UsersPercentage = usersPercentage.Float64 1671 } 1672 1673 stats = append(stats, &s) 1674 } 1675 1676 return stats, rows.Err() 1677} 1678 1679// Helper function (add if not already present) 1680func formatPercentage(pct float64) string { 1681 if pct >= 10 { 1682 return fmt.Sprintf("%.2f%%", pct) 1683 } else if pct >= 1 { 1684 return fmt.Sprintf("%.3f%%", pct) 1685 } else if pct >= 0.01 { 1686 return fmt.Sprintf("%.4f%%", pct) 1687 } else if pct > 0 { 1688 return fmt.Sprintf("%.6f%%", pct) 1689 } 1690 return "0%" 1691} 1692 1693func (p *PostgresDB) UpsertPDSRepos(ctx context.Context, endpointID int64, repos []PDSRepoData) error { 1694 if len(repos) == 0 { 1695 return nil 1696 } 1697 1698 // Step 1: Load all existing repos for this endpoint into memory 1699 query := ` 1700 SELECT did, head, rev, active, status 1701 FROM pds_repos 1702 WHERE endpoint_id = $1 1703 ` 1704 1705 rows, err := p.db.QueryContext(ctx, query, endpointID) 1706 if err != nil { 1707 return err 1708 } 1709 1710 existingRepos := make(map[string]*PDSRepo) 1711 for rows.Next() { 1712 var repo PDSRepo 1713 var head, rev, status sql.NullString 1714 1715 err := rows.Scan(&repo.DID, &head, &rev, &repo.Active, &status) 1716 if err != nil { 1717 rows.Close() 1718 return err 1719 } 1720 1721 if head.Valid { 1722 repo.Head = head.String 1723 } 1724 if rev.Valid { 1725 repo.Rev = rev.String 1726 } 1727 if status.Valid { 1728 repo.Status = status.String 1729 } 1730 1731 existingRepos[repo.DID] = &repo 1732 } 1733 rows.Close() 1734 1735 if err := rows.Err(); err != nil { 1736 return err 1737 } 1738 1739 // Step 2: Compare and collect changes 1740 var newRepos []PDSRepoData 1741 var changedRepos []PDSRepoData 1742 1743 for _, repo := range repos { 1744 existing, exists := existingRepos[repo.DID] 1745 if !exists { 1746 // New repo 1747 newRepos = append(newRepos, repo) 1748 } else if existing.Head != repo.Head || 1749 existing.Rev != repo.Rev || 1750 existing.Active != repo.Active || 1751 existing.Status != repo.Status { 1752 // Repo changed 1753 changedRepos = append(changedRepos, repo) 1754 } 1755 } 1756 1757 // Log comparison results 1758 log.Verbose("UpsertPDSRepos: endpoint_id=%d, total=%d, existing=%d, new=%d, changed=%d, unchanged=%d", 1759 endpointID, len(repos), len(existingRepos), len(newRepos), len(changedRepos), 1760 len(repos)-len(newRepos)-len(changedRepos)) 1761 1762 // If nothing changed, return early 1763 if len(newRepos) == 0 && len(changedRepos) == 0 { 1764 log.Verbose("UpsertPDSRepos: endpoint_id=%d, no changes detected, skipping database operations", endpointID) 1765 return nil 1766 } 1767 1768 // Step 3: Execute batched operations 1769 conn, err := p.pool.Acquire(ctx) 1770 if err != nil { 1771 return err 1772 } 1773 defer conn.Release() 1774 1775 tx, err := conn.Begin(ctx) 1776 if err != nil { 1777 return err 1778 } 1779 defer tx.Rollback(ctx) 1780 1781 // Insert new repos 1782 if len(newRepos) > 0 { 1783 _, err := tx.Exec(ctx, ` 1784 CREATE TEMP TABLE temp_new_repos ( 1785 did TEXT, 1786 head TEXT, 1787 rev TEXT, 1788 active BOOLEAN, 1789 status TEXT 1790 ) ON COMMIT DROP 1791 `) 1792 if err != nil { 1793 return err 1794 } 1795 1796 _, err = tx.Conn().CopyFrom( 1797 ctx, 1798 pgx.Identifier{"temp_new_repos"}, 1799 []string{"did", "head", "rev", "active", "status"}, 1800 pgx.CopyFromSlice(len(newRepos), func(i int) ([]interface{}, error) { 1801 repo := newRepos[i] 1802 return []interface{}{repo.DID, repo.Head, repo.Rev, repo.Active, repo.Status}, nil 1803 }), 1804 ) 1805 if err != nil { 1806 return err 1807 } 1808 1809 result, err := tx.Exec(ctx, ` 1810 INSERT INTO pds_repos (endpoint_id, did, head, rev, active, status, first_seen, last_seen) 1811 SELECT $1, did, head, rev, active, status, 1812 TIMEZONE('UTC', NOW()), 1813 TIMEZONE('UTC', NOW()) 1814 FROM temp_new_repos 1815 `, endpointID) 1816 if err != nil { 1817 return err 1818 } 1819 1820 log.Verbose("UpsertPDSRepos: endpoint_id=%d, inserted %d new repos", endpointID, result.RowsAffected()) 1821 } 1822 1823 // Update changed repos 1824 if len(changedRepos) > 0 { 1825 _, err := tx.Exec(ctx, ` 1826 CREATE TEMP TABLE temp_changed_repos ( 1827 did TEXT, 1828 head TEXT, 1829 rev TEXT, 1830 active BOOLEAN, 1831 status TEXT 1832 ) ON COMMIT DROP 1833 `) 1834 if err != nil { 1835 return err 1836 } 1837 1838 _, err = tx.Conn().CopyFrom( 1839 ctx, 1840 pgx.Identifier{"temp_changed_repos"}, 1841 []string{"did", "head", "rev", "active", "status"}, 1842 pgx.CopyFromSlice(len(changedRepos), func(i int) ([]interface{}, error) { 1843 repo := changedRepos[i] 1844 return []interface{}{repo.DID, repo.Head, repo.Rev, repo.Active, repo.Status}, nil 1845 }), 1846 ) 1847 if err != nil { 1848 return err 1849 } 1850 1851 result, err := tx.Exec(ctx, ` 1852 UPDATE pds_repos 1853 SET head = t.head, 1854 rev = t.rev, 1855 active = t.active, 1856 status = t.status, 1857 last_seen = TIMEZONE('UTC', NOW()), 1858 updated_at = TIMEZONE('UTC', NOW()) 1859 FROM temp_changed_repos t 1860 WHERE pds_repos.endpoint_id = $1 1861 AND pds_repos.did = t.did 1862 `, endpointID) 1863 if err != nil { 1864 return err 1865 } 1866 1867 log.Verbose("UpsertPDSRepos: endpoint_id=%d, updated %d changed repos", endpointID, result.RowsAffected()) 1868 } 1869 1870 if err := tx.Commit(ctx); err != nil { 1871 return err 1872 } 1873 1874 log.Verbose("UpsertPDSRepos: endpoint_id=%d, transaction committed successfully", endpointID) 1875 return nil 1876} 1877 1878func (p *PostgresDB) GetPDSRepos(ctx context.Context, endpointID int64, activeOnly bool, limit int, offset int) ([]*PDSRepo, error) { 1879 query := ` 1880 SELECT id, endpoint_id, did, head, rev, active, status, first_seen, last_seen, updated_at 1881 FROM pds_repos 1882 WHERE endpoint_id = $1 1883 ` 1884 1885 args := []interface{}{endpointID} 1886 argIdx := 2 1887 1888 if activeOnly { 1889 query += " AND active = true" 1890 } 1891 1892 // Order by id (primary key) - fastest 1893 query += " ORDER BY id DESC" 1894 1895 if limit > 0 { 1896 query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1) 1897 args = append(args, limit, offset) 1898 } 1899 1900 rows, err := p.db.QueryContext(ctx, query, args...) 1901 if err != nil { 1902 return nil, err 1903 } 1904 defer rows.Close() 1905 1906 var repos []*PDSRepo 1907 for rows.Next() { 1908 var repo PDSRepo 1909 var head, rev, status sql.NullString 1910 1911 err := rows.Scan( 1912 &repo.ID, &repo.EndpointID, &repo.DID, &head, &rev, 1913 &repo.Active, &status, &repo.FirstSeen, &repo.LastSeen, &repo.UpdatedAt, 1914 ) 1915 if err != nil { 1916 return nil, err 1917 } 1918 1919 if head.Valid { 1920 repo.Head = head.String 1921 } 1922 if rev.Valid { 1923 repo.Rev = rev.String 1924 } 1925 if status.Valid { 1926 repo.Status = status.String 1927 } 1928 1929 repos = append(repos, &repo) 1930 } 1931 1932 return repos, rows.Err() 1933} 1934 1935func (p *PostgresDB) GetReposByDID(ctx context.Context, did string) ([]*PDSRepo, error) { 1936 query := ` 1937 SELECT id, endpoint_id, did, head, rev, active, status, first_seen, last_seen, updated_at 1938 FROM pds_repos 1939 WHERE did = $1 1940 ORDER BY last_seen DESC 1941 ` 1942 1943 rows, err := p.db.QueryContext(ctx, query, did) 1944 if err != nil { 1945 return nil, err 1946 } 1947 defer rows.Close() 1948 1949 var repos []*PDSRepo 1950 for rows.Next() { 1951 var repo PDSRepo 1952 var head, rev, status sql.NullString 1953 1954 err := rows.Scan( 1955 &repo.ID, &repo.EndpointID, &repo.DID, &head, &rev, 1956 &repo.Active, &status, &repo.FirstSeen, &repo.LastSeen, &repo.UpdatedAt, 1957 ) 1958 if err != nil { 1959 return nil, err 1960 } 1961 1962 if head.Valid { 1963 repo.Head = head.String 1964 } 1965 if rev.Valid { 1966 repo.Rev = rev.String 1967 } 1968 if status.Valid { 1969 repo.Status = status.String 1970 } 1971 1972 repos = append(repos, &repo) 1973 } 1974 1975 return repos, rows.Err() 1976} 1977 1978func (p *PostgresDB) GetPDSRepoStats(ctx context.Context, endpointID int64) (map[string]interface{}, error) { 1979 query := ` 1980 SELECT 1981 COUNT(*) as total_repos, 1982 COUNT(*) FILTER (WHERE active = true) as active_repos, 1983 COUNT(*) FILTER (WHERE active = false) as inactive_repos, 1984 COUNT(*) FILTER (WHERE status IS NOT NULL AND status != '') as repos_with_status, 1985 COUNT(*) FILTER (WHERE updated_at > CURRENT_TIMESTAMP - INTERVAL '1 hour') as recent_changes 1986 FROM pds_repos 1987 WHERE endpoint_id = $1 1988 ` 1989 1990 var totalRepos, activeRepos, inactiveRepos, reposWithStatus, recentChanges int64 1991 1992 err := p.db.QueryRowContext(ctx, query, endpointID).Scan( 1993 &totalRepos, &activeRepos, &inactiveRepos, &reposWithStatus, &recentChanges, 1994 ) 1995 if err != nil { 1996 return nil, err 1997 } 1998 1999 return map[string]interface{}{ 2000 "total_repos": totalRepos, 2001 "active_repos": activeRepos, 2002 "inactive_repos": inactiveRepos, 2003 "repos_with_status": reposWithStatus, 2004 "recent_changes": recentChanges, 2005 }, nil 2006} 2007 2008// GetTableSizes fetches size information (in bytes) for all tables in the specified schema. 2009func (p *PostgresDB) GetTableSizes(ctx context.Context, schema string) ([]TableSizeInfo, error) { 2010 // Query now selects raw byte values directly 2011 query := ` 2012 SELECT 2013 c.relname AS table_name, 2014 pg_total_relation_size(c.oid) AS total_bytes, 2015 pg_relation_size(c.oid) AS table_heap_bytes, 2016 pg_indexes_size(c.oid) AS indexes_bytes 2017 FROM 2018 pg_class c 2019 LEFT JOIN 2020 pg_namespace n ON n.oid = c.relnamespace 2021 WHERE 2022 c.relkind = 'r' -- 'r' = ordinary table 2023 AND n.nspname = $1 2024 ORDER BY 2025 total_bytes DESC; 2026 ` 2027 rows, err := p.db.QueryContext(ctx, query, schema) 2028 if err != nil { 2029 return nil, fmt.Errorf("failed to query table sizes: %w", err) 2030 } 2031 defer rows.Close() 2032 2033 var results []TableSizeInfo 2034 for rows.Next() { 2035 var info TableSizeInfo 2036 // Scan directly into int64 fields 2037 if err := rows.Scan( 2038 &info.TableName, 2039 &info.TotalBytes, 2040 &info.TableHeapBytes, 2041 &info.IndexesBytes, 2042 ); err != nil { 2043 return nil, fmt.Errorf("failed to scan table size row: %w", err) 2044 } 2045 results = append(results, info) 2046 } 2047 if err := rows.Err(); err != nil { 2048 return nil, fmt.Errorf("error iterating table size rows: %w", err) 2049 } 2050 2051 return results, nil 2052} 2053 2054// GetIndexSizes fetches size information (in bytes) for all indexes in the specified schema. 2055func (p *PostgresDB) GetIndexSizes(ctx context.Context, schema string) ([]IndexSizeInfo, error) { 2056 // Query now selects raw byte values directly 2057 query := ` 2058 SELECT 2059 c.relname AS index_name, 2060 COALESCE(i.indrelid::regclass::text, 'N/A') AS table_name, 2061 pg_relation_size(c.oid) AS index_bytes 2062 FROM 2063 pg_class c 2064 LEFT JOIN 2065 pg_index i ON i.indexrelid = c.oid 2066 LEFT JOIN 2067 pg_namespace n ON n.oid = c.relnamespace 2068 WHERE 2069 c.relkind = 'i' -- 'i' = index 2070 AND n.nspname = $1 2071 ORDER BY 2072 index_bytes DESC; 2073 ` 2074 rows, err := p.db.QueryContext(ctx, query, schema) 2075 if err != nil { 2076 return nil, fmt.Errorf("failed to query index sizes: %w", err) 2077 } 2078 defer rows.Close() 2079 2080 var results []IndexSizeInfo 2081 for rows.Next() { 2082 var info IndexSizeInfo 2083 var tableName sql.NullString 2084 // Scan directly into int64 field 2085 if err := rows.Scan( 2086 &info.IndexName, 2087 &tableName, 2088 &info.IndexBytes, 2089 ); err != nil { 2090 return nil, fmt.Errorf("failed to scan index size row: %w", err) 2091 } 2092 if tableName.Valid { 2093 info.TableName = tableName.String 2094 } else { 2095 info.TableName = "N/A" 2096 } 2097 results = append(results, info) 2098 } 2099 if err := rows.Err(); err != nil { 2100 return nil, fmt.Errorf("error iterating index size rows: %w", err) 2101 } 2102 2103 return results, nil 2104}