wip
1package storage
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "time"
9
10 "github.com/atscan/atscand/internal/log"
11 "github.com/jackc/pgx/v5"
12 "github.com/jackc/pgx/v5/pgxpool"
13 _ "github.com/jackc/pgx/v5/stdlib"
14 "github.com/lib/pq"
15)
16
17type PostgresDB struct {
18 db *sql.DB
19 pool *pgxpool.Pool
20 scanRetention int
21}
22
23func NewPostgresDB(connString string) (*PostgresDB, error) {
24 log.Info("Connecting to PostgreSQL database...")
25
26 // Open standard sql.DB (for compatibility)
27 db, err := sql.Open("pgx", connString)
28 if err != nil {
29 return nil, fmt.Errorf("failed to open database: %w", err)
30 }
31
32 // Connection pool settings
33 db.SetMaxOpenConns(50)
34 db.SetMaxIdleConns(25)
35 db.SetConnMaxLifetime(5 * time.Minute)
36 db.SetConnMaxIdleTime(2 * time.Minute)
37
38 log.Verbose(" Max open connections: 50")
39 log.Verbose(" Max idle connections: 25")
40 log.Verbose(" Connection max lifetime: 5m")
41
42 // Test connection
43 log.Info("Testing database connection...")
44 if err := db.Ping(); err != nil {
45 return nil, fmt.Errorf("failed to ping database: %w", err)
46 }
47 log.Info("✓ Database connection successful")
48
49 // Also create pgx pool for COPY operations
50 log.Verbose("Creating pgx connection pool...")
51 pool, err := pgxpool.New(context.Background(), connString)
52 if err != nil {
53 return nil, fmt.Errorf("failed to create pgx pool: %w", err)
54 }
55 log.Verbose("✓ Connection pool created")
56
57 return &PostgresDB{
58 db: db,
59 pool: pool,
60 scanRetention: 3, // Default
61 }, nil
62}
63
64func (p *PostgresDB) Close() error {
65 if p.pool != nil {
66 p.pool.Close()
67 }
68 return p.db.Close()
69}
70
71func (p *PostgresDB) Migrate() error {
72 log.Info("Running database migrations...")
73
74 schema := `
75 -- Endpoints table (with IPv6 support)
76 CREATE TABLE IF NOT EXISTS endpoints (
77 id BIGSERIAL PRIMARY KEY,
78 endpoint_type TEXT NOT NULL DEFAULT 'pds',
79 endpoint TEXT NOT NULL,
80 server_did TEXT,
81 discovered_at TIMESTAMP NOT NULL,
82 last_checked TIMESTAMP,
83 status INTEGER DEFAULT 0,
84 ip TEXT,
85 ipv6 TEXT,
86 ip_resolved_at TIMESTAMP,
87 valid BOOLEAN DEFAULT true,
88 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
89 UNIQUE(endpoint_type, endpoint)
90 );
91
92 CREATE INDEX IF NOT EXISTS idx_endpoints_type_endpoint ON endpoints(endpoint_type, endpoint);
93 CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status);
94 CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type);
95 CREATE INDEX IF NOT EXISTS idx_endpoints_ip ON endpoints(ip);
96 CREATE INDEX IF NOT EXISTS idx_endpoints_ipv6 ON endpoints(ipv6);
97 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did ON endpoints(server_did);
98 CREATE INDEX IF NOT EXISTS idx_endpoints_server_did_type_discovered ON endpoints(server_did, endpoint_type, discovered_at);
99 CREATE INDEX IF NOT EXISTS idx_endpoints_valid ON endpoints(valid);
100
101 -- IP infos table (IP as PRIMARY KEY)
102 CREATE TABLE IF NOT EXISTS ip_infos (
103 ip TEXT PRIMARY KEY,
104 city TEXT,
105 country TEXT,
106 country_code TEXT,
107 asn INTEGER,
108 asn_org TEXT,
109 is_datacenter BOOLEAN,
110 is_vpn BOOLEAN,
111 is_crawler BOOLEAN,
112 is_tor BOOLEAN,
113 is_proxy BOOLEAN,
114 latitude REAL,
115 longitude REAL,
116 raw_data JSONB,
117 fetched_at TIMESTAMP NOT NULL,
118 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
119 );
120
121 CREATE INDEX IF NOT EXISTS idx_ip_infos_country_code ON ip_infos(country_code);
122 CREATE INDEX IF NOT EXISTS idx_ip_infos_asn ON ip_infos(asn);
123
124 -- Endpoint scans
125 CREATE TABLE IF NOT EXISTS endpoint_scans (
126 id BIGSERIAL PRIMARY KEY,
127 endpoint_id BIGINT NOT NULL,
128 status INTEGER NOT NULL,
129 response_time DOUBLE PRECISION,
130 user_count BIGINT,
131 version TEXT,
132 used_ip TEXT,
133 scan_data JSONB,
134 scanned_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
135 FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE
136 );
137
138 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_endpoint_status_scanned ON endpoint_scans(endpoint_id, status, scanned_at DESC);
139 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_scanned_at ON endpoint_scans(scanned_at);
140 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_user_count ON endpoint_scans(user_count DESC NULLS LAST);
141 CREATE INDEX IF NOT EXISTS idx_endpoint_scans_used_ip ON endpoint_scans(used_ip);
142
143
144 CREATE TABLE IF NOT EXISTS plc_metrics (
145 id BIGSERIAL PRIMARY KEY,
146 total_dids BIGINT,
147 total_pds BIGINT,
148 unique_pds BIGINT,
149 scan_duration_ms BIGINT,
150 error_count INTEGER,
151 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
152 );
153
154 CREATE TABLE IF NOT EXISTS scan_cursors (
155 source TEXT PRIMARY KEY,
156 last_bundle_number INTEGER DEFAULT 0,
157 last_scan_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
158 records_processed BIGINT DEFAULT 0
159 );
160
161 -- Minimal dids table
162 CREATE TABLE IF NOT EXISTS dids (
163 did TEXT PRIMARY KEY,
164 handle TEXT,
165 pds TEXT,
166 bundle_numbers JSONB NOT NULL DEFAULT '[]'::jsonb,
167 created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
168 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
169 );
170
171 CREATE INDEX IF NOT EXISTS idx_dids_bundle_numbers ON dids USING gin(bundle_numbers);
172 CREATE INDEX IF NOT EXISTS idx_dids_created_at ON dids(created_at);
173 CREATE INDEX IF NOT EXISTS idx_dids_handle ON dids(handle);
174 CREATE INDEX IF NOT EXISTS idx_dids_pds ON dids(pds);
175
176 -- PDS Repositories table
177 CREATE TABLE IF NOT EXISTS pds_repos (
178 id BIGSERIAL PRIMARY KEY,
179 endpoint_id BIGINT NOT NULL,
180 did TEXT NOT NULL,
181 head TEXT,
182 rev TEXT,
183 active BOOLEAN DEFAULT true,
184 status TEXT,
185 first_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
186 last_seen TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
187 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
188 FOREIGN KEY (endpoint_id) REFERENCES endpoints(id) ON DELETE CASCADE,
189 UNIQUE(endpoint_id, did)
190 );
191
192 CREATE INDEX IF NOT EXISTS idx_pds_repos_endpoint ON pds_repos(endpoint_id);
193 CREATE INDEX IF NOT EXISTS idx_pds_repos_endpoint_id_desc ON pds_repos(endpoint_id, id DESC);
194 CREATE INDEX IF NOT EXISTS idx_pds_repos_did ON pds_repos(did);
195 CREATE INDEX IF NOT EXISTS idx_pds_repos_active ON pds_repos(active);
196 CREATE INDEX IF NOT EXISTS idx_pds_repos_status ON pds_repos(status);
197 CREATE INDEX IF NOT EXISTS idx_pds_repos_last_seen ON pds_repos(last_seen DESC);
198 `
199
200 _, err := p.db.Exec(schema)
201 if err != nil {
202 return err
203 }
204
205 log.Info("✓ Database migrations completed successfully")
206 return nil
207}
208
209// ===== ENDPOINT OPERATIONS =====
210
211func (p *PostgresDB) UpsertEndpoint(ctx context.Context, endpoint *Endpoint) error {
212 query := `
213 INSERT INTO endpoints (endpoint_type, endpoint, discovered_at, last_checked, status, ip, ipv6, ip_resolved_at, valid)
214 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
215 ON CONFLICT(endpoint_type, endpoint) DO UPDATE SET
216 last_checked = EXCLUDED.last_checked,
217 status = EXCLUDED.status,
218 ip = CASE
219 WHEN EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '' THEN EXCLUDED.ip
220 ELSE endpoints.ip
221 END,
222 ipv6 = CASE
223 WHEN EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '' THEN EXCLUDED.ipv6
224 ELSE endpoints.ipv6
225 END,
226 ip_resolved_at = CASE
227 WHEN (EXCLUDED.ip IS NOT NULL AND EXCLUDED.ip != '') OR (EXCLUDED.ipv6 IS NOT NULL AND EXCLUDED.ipv6 != '') THEN EXCLUDED.ip_resolved_at
228 ELSE endpoints.ip_resolved_at
229 END,
230 valid = EXCLUDED.valid,
231 updated_at = CURRENT_TIMESTAMP
232 RETURNING id
233 `
234 err := p.db.QueryRowContext(ctx, query,
235 endpoint.EndpointType, endpoint.Endpoint, endpoint.DiscoveredAt,
236 endpoint.LastChecked, endpoint.Status, endpoint.IP, endpoint.IPv6, endpoint.IPResolvedAt, endpoint.Valid).Scan(&endpoint.ID)
237 return err
238}
239
240func (p *PostgresDB) EndpointExists(ctx context.Context, endpoint string, endpointType string) (bool, error) {
241 query := "SELECT EXISTS(SELECT 1 FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2)"
242 var exists bool
243 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&exists)
244 return exists, err
245}
246
247func (p *PostgresDB) GetEndpointIDByEndpoint(ctx context.Context, endpoint string, endpointType string) (int64, error) {
248 query := "SELECT id FROM endpoints WHERE endpoint = $1 AND endpoint_type = $2"
249 var id int64
250 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(&id)
251 return id, err
252}
253
254func (p *PostgresDB) GetEndpoint(ctx context.Context, endpoint string, endpointType string) (*Endpoint, error) {
255 query := `
256 SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status,
257 ip, ipv6, ip_resolved_at, valid, updated_at
258 FROM endpoints
259 WHERE endpoint = $1 AND endpoint_type = $2
260 `
261
262 var ep Endpoint
263 var lastChecked, ipResolvedAt sql.NullTime
264 var ip, ipv6 sql.NullString
265
266 err := p.db.QueryRowContext(ctx, query, endpoint, endpointType).Scan(
267 &ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
268 &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.Valid, &ep.UpdatedAt,
269 )
270 if err != nil {
271 return nil, err
272 }
273
274 if lastChecked.Valid {
275 ep.LastChecked = lastChecked.Time
276 }
277 if ip.Valid {
278 ep.IP = ip.String
279 }
280 if ipv6.Valid {
281 ep.IPv6 = ipv6.String
282 }
283 if ipResolvedAt.Valid {
284 ep.IPResolvedAt = ipResolvedAt.Time
285 }
286
287 return &ep, nil
288}
289
290func (p *PostgresDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) {
291 query := `
292 SELECT DISTINCT ON (COALESCE(server_did, id::text))
293 id, endpoint_type, endpoint, server_did, discovered_at, last_checked, status,
294 ip, ipv6, ip_resolved_at, valid, updated_at
295 FROM endpoints
296 WHERE 1=1
297 `
298 args := []interface{}{}
299 argIdx := 1
300
301 if filter != nil {
302 if filter.Type != "" {
303 query += fmt.Sprintf(" AND endpoint_type = $%d", argIdx)
304 args = append(args, filter.Type)
305 argIdx++
306 }
307
308 // NEW: Filter by valid flag
309 if filter.OnlyValid {
310 query += fmt.Sprintf(" AND valid = true", argIdx)
311 }
312 if filter.Status != "" {
313 statusInt := EndpointStatusUnknown
314 switch filter.Status {
315 case "online":
316 statusInt = EndpointStatusOnline
317 case "offline":
318 statusInt = EndpointStatusOffline
319 }
320 query += fmt.Sprintf(" AND status = $%d", argIdx)
321 args = append(args, statusInt)
322 argIdx++
323 }
324
325 // Filter for stale endpoints only
326 if filter.OnlyStale && filter.RecheckInterval > 0 {
327 cutoffTime := time.Now().UTC().Add(-filter.RecheckInterval)
328 query += fmt.Sprintf(" AND (last_checked IS NULL OR last_checked < $%d)", argIdx)
329 args = append(args, cutoffTime)
330 argIdx++
331 }
332 }
333
334 // NEW: Choose ordering strategy
335 if filter != nil && filter.Random {
336 // For random selection, we need to wrap in a subquery
337 query = fmt.Sprintf(`
338 WITH filtered_endpoints AS (
339 %s
340 )
341 SELECT * FROM filtered_endpoints
342 ORDER BY RANDOM()
343 `, query)
344 } else {
345 // Original ordering for non-random queries
346 query += " ORDER BY COALESCE(server_did, id::text), discovered_at ASC"
347 }
348
349 if filter != nil && filter.Limit > 0 {
350 query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
351 args = append(args, filter.Limit, filter.Offset)
352 }
353
354 rows, err := p.db.QueryContext(ctx, query, args...)
355 if err != nil {
356 return nil, err
357 }
358 defer rows.Close()
359
360 var endpoints []*Endpoint
361 for rows.Next() {
362 var ep Endpoint
363 var lastChecked, ipResolvedAt sql.NullTime
364 var ip, ipv6, serverDID sql.NullString
365
366 err := rows.Scan(
367 &ep.ID, &ep.EndpointType, &ep.Endpoint, &serverDID, &ep.DiscoveredAt, &lastChecked,
368 &ep.Status, &ip, &ipv6, &ipResolvedAt, &ep.UpdatedAt,
369 )
370 if err != nil {
371 return nil, err
372 }
373
374 if serverDID.Valid {
375 ep.ServerDID = serverDID.String
376 }
377 if lastChecked.Valid {
378 ep.LastChecked = lastChecked.Time
379 }
380 if ip.Valid {
381 ep.IP = ip.String
382 }
383 if ipv6.Valid {
384 ep.IPv6 = ipv6.String
385 }
386 if ipResolvedAt.Valid {
387 ep.IPResolvedAt = ipResolvedAt.Time
388 }
389
390 endpoints = append(endpoints, &ep)
391 }
392
393 return endpoints, rows.Err()
394}
395
396func (p *PostgresDB) UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error {
397 query := `
398 UPDATE endpoints
399 SET status = $1, last_checked = $2, updated_at = $3
400 WHERE id = $4
401 `
402 _, err := p.db.ExecContext(ctx, query, update.Status, update.LastChecked, time.Now().UTC(), endpointID)
403 return err
404}
405
406func (p *PostgresDB) UpdateEndpointIPs(ctx context.Context, endpointID int64, ipv4, ipv6 string, resolvedAt time.Time) error {
407 query := `
408 UPDATE endpoints
409 SET ip = $1, ipv6 = $2, ip_resolved_at = $3, updated_at = $4
410 WHERE id = $5
411 `
412 _, err := p.db.ExecContext(ctx, query, ipv4, ipv6, resolvedAt, time.Now().UTC(), endpointID)
413 return err
414}
415
416func (p *PostgresDB) UpdateEndpointServerDID(ctx context.Context, endpointID int64, serverDID string) error {
417 query := `
418 UPDATE endpoints
419 SET server_did = $1, updated_at = $2
420 WHERE id = $3
421 `
422 _, err := p.db.ExecContext(ctx, query, serverDID, time.Now().UTC(), endpointID)
423 return err
424}
425
426func (p *PostgresDB) GetDuplicateEndpoints(ctx context.Context) (map[string][]string, error) {
427 query := `
428 SELECT server_did, array_agg(endpoint ORDER BY discovered_at ASC) as endpoints
429 FROM endpoints
430 WHERE server_did IS NOT NULL
431 AND server_did != ''
432 AND endpoint_type = 'pds'
433 GROUP BY server_did
434 HAVING COUNT(*) > 1
435 ORDER BY COUNT(*) DESC
436 `
437
438 rows, err := p.db.QueryContext(ctx, query)
439 if err != nil {
440 return nil, err
441 }
442 defer rows.Close()
443
444 duplicates := make(map[string][]string)
445 for rows.Next() {
446 var serverDID string
447 var endpoints []string
448
449 err := rows.Scan(&serverDID, pq.Array(&endpoints))
450 if err != nil {
451 return nil, err
452 }
453
454 duplicates[serverDID] = endpoints
455 }
456
457 return duplicates, rows.Err()
458}
459
460// ===== SCAN OPERATIONS =====
461
462func (p *PostgresDB) SetScanRetention(retention int) {
463 p.scanRetention = retention
464}
465
466func (p *PostgresDB) SaveEndpointScan(ctx context.Context, scan *EndpointScan) error {
467 var scanDataJSON []byte
468 if scan.ScanData != nil {
469 scanDataJSON, _ = json.Marshal(scan.ScanData)
470 }
471
472 tx, err := p.db.BeginTx(ctx, nil)
473 if err != nil {
474 return err
475 }
476 defer tx.Rollback()
477
478 query := `
479 INSERT INTO endpoint_scans (endpoint_id, status, response_time, user_count, version, used_ip, scan_data, scanned_at)
480 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
481 `
482 _, err = tx.ExecContext(ctx, query, scan.EndpointID, scan.Status, scan.ResponseTime, scan.UserCount, scan.Version, scan.UsedIP, scanDataJSON, scan.ScannedAt)
483 if err != nil {
484 return err
485 }
486
487 // Use configured retention value
488 cleanupQuery := `
489 DELETE FROM endpoint_scans
490 WHERE endpoint_id = $1
491 AND id NOT IN (
492 SELECT id
493 FROM endpoint_scans
494 WHERE endpoint_id = $1
495 ORDER BY scanned_at DESC
496 LIMIT $2
497 )
498 `
499 _, err = tx.ExecContext(ctx, cleanupQuery, scan.EndpointID, p.scanRetention)
500 if err != nil {
501 return err
502 }
503
504 return tx.Commit()
505}
506
507func (p *PostgresDB) GetEndpointScans(ctx context.Context, endpointID int64, limit int) ([]*EndpointScan, error) {
508 query := `
509 SELECT id, endpoint_id, status, response_time, user_count, version, used_ip, scan_data, scanned_at
510 FROM endpoint_scans
511 WHERE endpoint_id = $1
512 ORDER BY scanned_at DESC
513 LIMIT $2
514 `
515
516 rows, err := p.db.QueryContext(ctx, query, endpointID, limit)
517 if err != nil {
518 return nil, err
519 }
520 defer rows.Close()
521
522 var scans []*EndpointScan
523 for rows.Next() {
524 var scan EndpointScan
525 var responseTime sql.NullFloat64
526 var userCount sql.NullInt64
527 var version, usedIP sql.NullString
528 var scanDataJSON []byte
529
530 err := rows.Scan(&scan.ID, &scan.EndpointID, &scan.Status, &responseTime, &userCount, &version, &usedIP, &scanDataJSON, &scan.ScannedAt)
531 if err != nil {
532 return nil, err
533 }
534
535 if responseTime.Valid {
536 scan.ResponseTime = responseTime.Float64
537 }
538
539 if userCount.Valid {
540 scan.UserCount = userCount.Int64
541 }
542
543 if version.Valid {
544 scan.Version = version.String
545 }
546
547 if usedIP.Valid {
548 scan.UsedIP = usedIP.String
549 }
550
551 if len(scanDataJSON) > 0 {
552 var scanData EndpointScanData
553 if err := json.Unmarshal(scanDataJSON, &scanData); err == nil {
554 scan.ScanData = &scanData
555 }
556 }
557
558 scans = append(scans, &scan)
559 }
560
561 return scans, rows.Err()
562}
563
564// ===== PDS VIRTUAL ENDPOINTS =====
565
566func (p *PostgresDB) GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) {
567 query := `
568 WITH unique_servers AS (
569 SELECT DISTINCT ON (COALESCE(server_did, id::text))
570 id,
571 endpoint,
572 server_did,
573 discovered_at,
574 last_checked,
575 status,
576 ip,
577 ipv6,
578 valid
579 FROM endpoints
580 WHERE endpoint_type = 'pds'
581 ORDER BY COALESCE(server_did, id::text), discovered_at ASC
582 )
583 SELECT
584 e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip, e.ipv6, e.valid,
585 latest.user_count, latest.response_time, latest.version, latest.scanned_at,
586 i.city, i.country, i.country_code, i.asn, i.asn_org,
587 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy,
588 i.latitude, i.longitude
589 FROM unique_servers e
590 LEFT JOIN LATERAL (
591 SELECT
592 user_count,
593 response_time,
594 version,
595 scanned_at
596 FROM endpoint_scans
597 WHERE endpoint_id = e.id AND status = 1
598 ORDER BY scanned_at DESC
599 LIMIT 1
600 ) latest ON true
601 LEFT JOIN ip_infos i ON e.ip = i.ip
602 WHERE 1=1
603 `
604
605 args := []interface{}{}
606 argIdx := 1
607
608 if filter != nil {
609 if filter.Status != "" {
610 statusInt := EndpointStatusUnknown
611 switch filter.Status {
612 case "online":
613 statusInt = EndpointStatusOnline
614 case "offline":
615 statusInt = EndpointStatusOffline
616 }
617 query += fmt.Sprintf(" AND e.status = $%d", argIdx)
618 args = append(args, statusInt)
619 argIdx++
620 }
621
622 if filter.MinUserCount > 0 {
623 query += fmt.Sprintf(" AND latest.user_count >= $%d", argIdx)
624 args = append(args, filter.MinUserCount)
625 argIdx++
626 }
627 }
628
629 query += " ORDER BY latest.user_count DESC NULLS LAST"
630
631 if filter != nil && filter.Limit > 0 {
632 query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
633 args = append(args, filter.Limit, filter.Offset)
634 }
635
636 rows, err := p.db.QueryContext(ctx, query, args...)
637 if err != nil {
638 return nil, err
639 }
640 defer rows.Close()
641
642 var items []*PDSListItem
643 for rows.Next() {
644 item := &PDSListItem{}
645 var ip, ipv6, serverDID, city, country, countryCode, asnOrg sql.NullString
646 var asn sql.NullInt32
647 var isDatacenter, isVPN, isCrawler, isTor, isProxy sql.NullBool
648 var lat, lon sql.NullFloat64
649 var userCount sql.NullInt32
650 var responseTime sql.NullFloat64
651 var version sql.NullString
652 var scannedAt sql.NullTime
653
654 err := rows.Scan(
655 &item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip, &ipv6, &item.Valid,
656 &userCount, &responseTime, &version, &scannedAt,
657 &city, &country, &countryCode, &asn, &asnOrg,
658 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy,
659 &lat, &lon,
660 )
661 if err != nil {
662 return nil, err
663 }
664
665 if ip.Valid {
666 item.IP = ip.String
667 }
668 if ipv6.Valid {
669 item.IPv6 = ipv6.String
670 }
671 if serverDID.Valid {
672 item.ServerDID = serverDID.String
673 }
674
675 // Add latest scan data if available
676 if userCount.Valid {
677 item.LatestScan = &struct {
678 UserCount int
679 ResponseTime float64
680 Version string
681 ScannedAt time.Time
682 }{
683 UserCount: int(userCount.Int32),
684 ResponseTime: responseTime.Float64,
685 Version: version.String,
686 ScannedAt: scannedAt.Time,
687 }
688 }
689
690 // Add IP info if available
691 if city.Valid || country.Valid {
692 item.IPInfo = &IPInfo{
693 IP: ip.String,
694 City: city.String,
695 Country: country.String,
696 CountryCode: countryCode.String,
697 ASN: int(asn.Int32),
698 ASNOrg: asnOrg.String,
699 IsDatacenter: isDatacenter.Bool,
700 IsVPN: isVPN.Bool,
701 IsCrawler: isCrawler.Bool,
702 IsTor: isTor.Bool,
703 IsProxy: isProxy.Bool,
704 Latitude: float32(lat.Float64),
705 Longitude: float32(lon.Float64),
706 }
707 }
708
709 items = append(items, item)
710 }
711
712 return items, rows.Err()
713}
714
715func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) {
716 query := `
717 WITH target_endpoint AS MATERIALIZED (
718 SELECT
719 e.id,
720 e.endpoint,
721 e.server_did,
722 e.discovered_at,
723 e.last_checked,
724 e.status,
725 e.ip,
726 e.ipv6,
727 e.valid
728 FROM endpoints e
729 WHERE e.endpoint = $1
730 AND e.endpoint_type = 'pds'
731 LIMIT 1
732 )
733 SELECT
734 te.id,
735 te.endpoint,
736 te.server_did,
737 te.discovered_at,
738 te.last_checked,
739 te.status,
740 te.ip,
741 te.ipv6,
742 te.valid,
743 latest.user_count,
744 latest.response_time,
745 latest.version,
746 latest.scan_data->'metadata'->'server_info' as server_info,
747 latest.scanned_at,
748 i.city, i.country, i.country_code, i.asn, i.asn_org,
749 i.is_datacenter, i.is_vpn, i.is_crawler, i.is_tor, i.is_proxy,
750 i.latitude, i.longitude,
751 i.raw_data,
752 COALESCE(
753 ARRAY(
754 SELECT e2.endpoint
755 FROM endpoints e2
756 WHERE e2.server_did = te.server_did
757 AND e2.endpoint_type = 'pds'
758 AND e2.endpoint != te.endpoint
759 AND te.server_did IS NOT NULL
760 ORDER BY e2.discovered_at
761 ),
762 ARRAY[]::text[]
763 ) as aliases,
764 CASE
765 WHEN te.server_did IS NOT NULL THEN (
766 SELECT MIN(e3.discovered_at)
767 FROM endpoints e3
768 WHERE e3.server_did = te.server_did
769 AND e3.endpoint_type = 'pds'
770 )
771 ELSE NULL
772 END as first_discovered_at
773 FROM target_endpoint te
774 LEFT JOIN LATERAL (
775 SELECT
776 es.scan_data,
777 es.response_time,
778 es.version,
779 es.scanned_at,
780 es.user_count
781 FROM endpoint_scans es
782 WHERE es.endpoint_id = te.id
783 ORDER BY es.scanned_at DESC
784 LIMIT 1
785 ) latest ON true
786 LEFT JOIN ip_infos i ON te.ip = i.ip;
787 `
788
789 detail := &PDSDetail{}
790 var ip, ipv6, city, country, countryCode, asnOrg, serverDID sql.NullString
791 var asn sql.NullInt32
792 var isDatacenter, isVPN, isCrawler, isTor, isProxy sql.NullBool
793 var lat, lon sql.NullFloat64
794 var userCount sql.NullInt32
795 var responseTime sql.NullFloat64
796 var version sql.NullString
797 var serverInfoJSON []byte
798 var scannedAt sql.NullTime
799 var rawDataJSON []byte
800 var aliases []string
801 var firstDiscoveredAt sql.NullTime
802
803 err := p.db.QueryRowContext(ctx, query, endpoint).Scan(
804 &detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip, &ipv6, &detail.Valid,
805 &userCount, &responseTime, &version, &serverInfoJSON, &scannedAt,
806 &city, &country, &countryCode, &asn, &asnOrg,
807 &isDatacenter, &isVPN, &isCrawler, &isTor, &isProxy,
808 &lat, &lon,
809 &rawDataJSON,
810 pq.Array(&aliases),
811 &firstDiscoveredAt,
812 )
813 if err != nil {
814 return nil, err
815 }
816
817 if ip.Valid {
818 detail.IP = ip.String
819 }
820 if ipv6.Valid {
821 detail.IPv6 = ipv6.String
822 }
823
824 if serverDID.Valid {
825 detail.ServerDID = serverDID.String
826 }
827
828 // Set aliases and is_primary
829 detail.Aliases = aliases
830 if serverDID.Valid && serverDID.String != "" && firstDiscoveredAt.Valid {
831 detail.IsPrimary = detail.DiscoveredAt.Equal(firstDiscoveredAt.Time) ||
832 detail.DiscoveredAt.Before(firstDiscoveredAt.Time)
833 } else {
834 detail.IsPrimary = true
835 }
836
837 // Parse latest scan data
838 if userCount.Valid {
839 var serverInfo interface{}
840 if len(serverInfoJSON) > 0 {
841 json.Unmarshal(serverInfoJSON, &serverInfo)
842 }
843
844 detail.LatestScan = &struct {
845 UserCount int
846 ResponseTime float64
847 Version string
848 ServerInfo interface{}
849 ScannedAt time.Time
850 }{
851 UserCount: int(userCount.Int32),
852 ResponseTime: responseTime.Float64,
853 Version: version.String,
854 ServerInfo: serverInfo,
855 ScannedAt: scannedAt.Time,
856 }
857 }
858
859 // Parse IP info with all fields
860 if city.Valid || country.Valid {
861 detail.IPInfo = &IPInfo{
862 IP: ip.String,
863 City: city.String,
864 Country: country.String,
865 CountryCode: countryCode.String,
866 ASN: int(asn.Int32),
867 ASNOrg: asnOrg.String,
868 IsDatacenter: isDatacenter.Bool,
869 IsVPN: isVPN.Bool,
870 IsCrawler: isCrawler.Bool,
871 IsTor: isTor.Bool,
872 IsProxy: isProxy.Bool,
873 Latitude: float32(lat.Float64),
874 Longitude: float32(lon.Float64),
875 }
876
877 if len(rawDataJSON) > 0 {
878 json.Unmarshal(rawDataJSON, &detail.IPInfo.RawData)
879 }
880 }
881
882 return detail, nil
883}
884
885func (p *PostgresDB) GetPDSStats(ctx context.Context) (*PDSStats, error) {
886 query := `
887 WITH unique_servers AS (
888 SELECT DISTINCT ON (COALESCE(server_did, id::text))
889 id,
890 COALESCE(server_did, id::text) as server_identity,
891 status
892 FROM endpoints
893 WHERE endpoint_type = 'pds'
894 ORDER BY COALESCE(server_did, id::text), discovered_at ASC
895 ),
896 latest_scans AS (
897 SELECT DISTINCT ON (us.id)
898 us.id,
899 es.user_count,
900 us.status
901 FROM unique_servers us
902 LEFT JOIN endpoint_scans es ON us.id = es.endpoint_id
903 ORDER BY us.id, es.scanned_at DESC
904 )
905 SELECT
906 COUNT(*) as total,
907 SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online,
908 SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline,
909 SUM(COALESCE(user_count, 0)) as total_users
910 FROM latest_scans
911 `
912
913 stats := &PDSStats{}
914 err := p.db.QueryRowContext(ctx, query).Scan(
915 &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints, &stats.TotalDIDs,
916 )
917
918 return stats, err
919}
920
921func (p *PostgresDB) GetEndpointStats(ctx context.Context) (*EndpointStats, error) {
922 query := `
923 SELECT
924 COUNT(*) as total_endpoints,
925 SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online_endpoints,
926 SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline_endpoints
927 FROM endpoints
928 `
929
930 var stats EndpointStats
931 err := p.db.QueryRowContext(ctx, query).Scan(
932 &stats.TotalEndpoints, &stats.OnlineEndpoints, &stats.OfflineEndpoints,
933 )
934 if err != nil {
935 return nil, err
936 }
937
938 // Get average response time from recent scans
939 avgQuery := `
940 SELECT AVG(response_time)
941 FROM endpoint_scans
942 WHERE response_time > 0 AND scanned_at > NOW() - INTERVAL '1 hour'
943 `
944 var avgResponseTime sql.NullFloat64
945 _ = p.db.QueryRowContext(ctx, avgQuery).Scan(&avgResponseTime)
946 if avgResponseTime.Valid {
947 stats.AvgResponseTime = avgResponseTime.Float64
948 }
949
950 // Get counts by type
951 typeQuery := `
952 SELECT endpoint_type, COUNT(*)
953 FROM endpoints
954 GROUP BY endpoint_type
955 `
956 rows, err := p.db.QueryContext(ctx, typeQuery)
957 if err == nil {
958 defer rows.Close()
959 stats.ByType = make(map[string]int64)
960 for rows.Next() {
961 var typ string
962 var count int64
963 if err := rows.Scan(&typ, &count); err == nil {
964 stats.ByType[typ] = count
965 }
966 }
967 }
968
969 // Get total DIDs from latest PDS scans
970 didQuery := `
971 WITH unique_servers AS (
972 SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text))
973 e.id
974 FROM endpoints e
975 WHERE e.endpoint_type = 'pds'
976 ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC
977 ),
978 latest_pds_scans AS (
979 SELECT DISTINCT ON (us.id)
980 us.id,
981 es.user_count
982 FROM unique_servers us
983 LEFT JOIN endpoint_scans es ON us.id = es.endpoint_id
984 ORDER BY us.id, es.scanned_at DESC
985 )
986 SELECT SUM(user_count) FROM latest_pds_scans
987 `
988 var totalDIDs sql.NullInt64
989 _ = p.db.QueryRowContext(ctx, didQuery).Scan(&totalDIDs)
990 if totalDIDs.Valid {
991 stats.TotalDIDs = totalDIDs.Int64
992 }
993
994 return &stats, err
995}
996
997// ===== IP INFO OPERATIONS =====
998
999func (p *PostgresDB) UpsertIPInfo(ctx context.Context, ip string, ipInfo map[string]interface{}) error {
1000 rawDataJSON, _ := json.Marshal(ipInfo)
1001
1002 // Extract fields from ipInfo map
1003 city := extractString(ipInfo, "location", "city")
1004 country := extractString(ipInfo, "location", "country")
1005 countryCode := extractString(ipInfo, "location", "country_code")
1006 asn := extractInt(ipInfo, "asn", "asn")
1007 asnOrg := extractString(ipInfo, "asn", "org")
1008
1009 // Extract top-level boolean flags
1010 isDatacenter := false
1011 if val, ok := ipInfo["is_datacenter"].(bool); ok {
1012 isDatacenter = val
1013 }
1014
1015 isVPN := false
1016 if val, ok := ipInfo["is_vpn"].(bool); ok {
1017 isVPN = val
1018 }
1019
1020 isCrawler := false
1021 if val, ok := ipInfo["is_crawler"].(bool); ok {
1022 isCrawler = val
1023 }
1024
1025 isTor := false
1026 if val, ok := ipInfo["is_tor"].(bool); ok {
1027 isTor = val
1028 }
1029
1030 isProxy := false
1031 if val, ok := ipInfo["is_proxy"].(bool); ok {
1032 isProxy = val
1033 }
1034
1035 lat := extractFloat(ipInfo, "location", "latitude")
1036 lon := extractFloat(ipInfo, "location", "longitude")
1037
1038 query := `
1039 INSERT INTO ip_infos (ip, city, country, country_code, asn, asn_org, is_datacenter, is_vpn, is_crawler, is_tor, is_proxy, latitude, longitude, raw_data, fetched_at)
1040 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15)
1041 ON CONFLICT(ip) DO UPDATE SET
1042 city = EXCLUDED.city,
1043 country = EXCLUDED.country,
1044 country_code = EXCLUDED.country_code,
1045 asn = EXCLUDED.asn,
1046 asn_org = EXCLUDED.asn_org,
1047 is_datacenter = EXCLUDED.is_datacenter,
1048 is_vpn = EXCLUDED.is_vpn,
1049 is_crawler = EXCLUDED.is_crawler,
1050 is_tor = EXCLUDED.is_tor,
1051 is_proxy = EXCLUDED.is_proxy,
1052 latitude = EXCLUDED.latitude,
1053 longitude = EXCLUDED.longitude,
1054 raw_data = EXCLUDED.raw_data,
1055 fetched_at = EXCLUDED.fetched_at,
1056 updated_at = CURRENT_TIMESTAMP
1057 `
1058 _, err := p.db.ExecContext(ctx, query, ip, city, country, countryCode, asn, asnOrg, isDatacenter, isVPN, isCrawler, isTor, isProxy, lat, lon, rawDataJSON, time.Now().UTC())
1059 return err
1060}
1061
1062func (p *PostgresDB) GetIPInfo(ctx context.Context, ip string) (*IPInfo, error) {
1063 query := `
1064 SELECT ip, city, country, country_code, asn, asn_org, is_datacenter, is_vpn, is_crawler, is_tor, is_proxy,
1065 latitude, longitude, raw_data, fetched_at, updated_at
1066 FROM ip_infos
1067 WHERE ip = $1
1068 `
1069
1070 info := &IPInfo{}
1071 var rawDataJSON []byte
1072
1073 err := p.db.QueryRowContext(ctx, query, ip).Scan(
1074 &info.IP, &info.City, &info.Country, &info.CountryCode, &info.ASN, &info.ASNOrg,
1075 &info.IsDatacenter, &info.IsVPN, &info.IsCrawler, &info.IsTor, &info.IsProxy,
1076 &info.Latitude, &info.Longitude,
1077 &rawDataJSON, &info.FetchedAt, &info.UpdatedAt,
1078 )
1079 if err != nil {
1080 return nil, err
1081 }
1082
1083 if len(rawDataJSON) > 0 {
1084 json.Unmarshal(rawDataJSON, &info.RawData)
1085 }
1086
1087 return info, nil
1088}
1089
1090func (p *PostgresDB) ShouldUpdateIPInfo(ctx context.Context, ip string) (bool, bool, error) {
1091 query := `SELECT fetched_at FROM ip_infos WHERE ip = $1`
1092
1093 var fetchedAt time.Time
1094 err := p.db.QueryRowContext(ctx, query, ip).Scan(&fetchedAt)
1095 if err == sql.ErrNoRows {
1096 return false, true, nil // Doesn't exist, needs update
1097 }
1098 if err != nil {
1099 return false, false, err
1100 }
1101
1102 // Check if older than 30 days
1103 needsUpdate := time.Since(fetchedAt) > 30*24*time.Hour
1104 return true, needsUpdate, nil
1105}
1106
1107// ===== HELPER FUNCTIONS =====
1108
1109func extractString(data map[string]interface{}, keys ...string) string {
1110 current := data
1111 for i, key := range keys {
1112 if i == len(keys)-1 {
1113 if val, ok := current[key].(string); ok {
1114 return val
1115 }
1116 return ""
1117 }
1118 if nested, ok := current[key].(map[string]interface{}); ok {
1119 current = nested
1120 } else {
1121 return ""
1122 }
1123 }
1124 return ""
1125}
1126
1127func extractInt(data map[string]interface{}, keys ...string) int {
1128 current := data
1129 for i, key := range keys {
1130 if i == len(keys)-1 {
1131 if val, ok := current[key].(float64); ok {
1132 return int(val)
1133 }
1134 if val, ok := current[key].(int); ok {
1135 return val
1136 }
1137 return 0
1138 }
1139 if nested, ok := current[key].(map[string]interface{}); ok {
1140 current = nested
1141 } else {
1142 return 0
1143 }
1144 }
1145 return 0
1146}
1147
1148func extractFloat(data map[string]interface{}, keys ...string) float32 {
1149 current := data
1150 for i, key := range keys {
1151 if i == len(keys)-1 {
1152 if val, ok := current[key].(float64); ok {
1153 return float32(val)
1154 }
1155 return 0
1156 }
1157 if nested, ok := current[key].(map[string]interface{}); ok {
1158 current = nested
1159 } else {
1160 return 0
1161 }
1162 }
1163 return 0
1164}
1165
1166// ===== CURSOR OPERATIONS =====
1167
1168func (p *PostgresDB) GetScanCursor(ctx context.Context, source string) (*ScanCursor, error) {
1169 query := "SELECT source, last_bundle_number, last_scan_time, records_processed FROM scan_cursors WHERE source = $1"
1170
1171 var cursor ScanCursor
1172 err := p.db.QueryRowContext(ctx, query, source).Scan(
1173 &cursor.Source, &cursor.LastBundleNumber, &cursor.LastScanTime, &cursor.RecordsProcessed,
1174 )
1175 if err == sql.ErrNoRows {
1176 return &ScanCursor{
1177 Source: source,
1178 LastBundleNumber: 0,
1179 LastScanTime: time.Time{},
1180 }, nil
1181 }
1182 return &cursor, err
1183}
1184
1185func (p *PostgresDB) UpdateScanCursor(ctx context.Context, cursor *ScanCursor) error {
1186 query := `
1187 INSERT INTO scan_cursors (source, last_bundle_number, last_scan_time, records_processed)
1188 VALUES ($1, $2, $3, $4)
1189 ON CONFLICT(source) DO UPDATE SET
1190 last_bundle_number = EXCLUDED.last_bundle_number,
1191 last_scan_time = EXCLUDED.last_scan_time,
1192 records_processed = EXCLUDED.records_processed
1193 `
1194 _, err := p.db.ExecContext(ctx, query, cursor.Source, cursor.LastBundleNumber, cursor.LastScanTime, cursor.RecordsProcessed)
1195 return err
1196}
1197
1198// ===== METRICS OPERATIONS =====
1199
1200func (p *PostgresDB) StorePLCMetrics(ctx context.Context, metrics *PLCMetrics) error {
1201 query := `
1202 INSERT INTO plc_metrics (total_dids, total_pds, unique_pds, scan_duration_ms, error_count)
1203 VALUES ($1, $2, $3, $4, $5)
1204 `
1205 _, err := p.db.ExecContext(ctx, query, metrics.TotalDIDs, metrics.TotalPDS,
1206 metrics.UniquePDS, metrics.ScanDuration, metrics.ErrorCount)
1207 return err
1208}
1209
1210func (p *PostgresDB) GetPLCMetrics(ctx context.Context, limit int) ([]*PLCMetrics, error) {
1211 query := `
1212 SELECT total_dids, total_pds, unique_pds, scan_duration_ms, error_count, created_at
1213 FROM plc_metrics
1214 ORDER BY created_at DESC
1215 LIMIT $1
1216 `
1217
1218 rows, err := p.db.QueryContext(ctx, query, limit)
1219 if err != nil {
1220 return nil, err
1221 }
1222 defer rows.Close()
1223
1224 var metrics []*PLCMetrics
1225 for rows.Next() {
1226 var m PLCMetrics
1227 if err := rows.Scan(&m.TotalDIDs, &m.TotalPDS, &m.UniquePDS, &m.ScanDuration, &m.ErrorCount, &m.LastScanTime); err != nil {
1228 return nil, err
1229 }
1230 metrics = append(metrics, &m)
1231 }
1232
1233 return metrics, rows.Err()
1234}
1235
1236// ===== DID OPERATIONS =====
1237
1238func (p *PostgresDB) UpsertDID(ctx context.Context, did string, bundleNum int, handle, pds string) error {
1239 query := `
1240 INSERT INTO dids (did, handle, pds, bundle_numbers, created_at)
1241 VALUES ($1, $2, $3, jsonb_build_array($4::integer), CURRENT_TIMESTAMP)
1242 ON CONFLICT(did) DO UPDATE SET
1243 handle = EXCLUDED.handle,
1244 pds = EXCLUDED.pds,
1245 bundle_numbers = CASE
1246 WHEN dids.bundle_numbers @> jsonb_build_array($4::integer) THEN dids.bundle_numbers
1247 ELSE dids.bundle_numbers || jsonb_build_array($4::integer)
1248 END,
1249 updated_at = CURRENT_TIMESTAMP
1250 `
1251 _, err := p.db.ExecContext(ctx, query, did, handle, pds, bundleNum)
1252 return err
1253}
1254
1255// UpsertDIDFromMempool creates/updates DID record without adding to bundle_numbers
1256func (p *PostgresDB) UpsertDIDFromMempool(ctx context.Context, did string, handle, pds string) error {
1257 query := `
1258 INSERT INTO dids (did, handle, pds, bundle_numbers, created_at)
1259 VALUES ($1, $2, $3, '[]'::jsonb, CURRENT_TIMESTAMP)
1260 ON CONFLICT(did) DO UPDATE SET
1261 handle = EXCLUDED.handle,
1262 pds = EXCLUDED.pds,
1263 updated_at = CURRENT_TIMESTAMP
1264 `
1265 _, err := p.db.ExecContext(ctx, query, did, handle, pds)
1266 return err
1267}
1268
1269func (p *PostgresDB) GetDIDRecord(ctx context.Context, did string) (*DIDRecord, error) {
1270 query := `
1271 SELECT did, handle, pds, bundle_numbers, created_at
1272 FROM dids
1273 WHERE did = $1
1274 `
1275
1276 var record DIDRecord
1277 var bundleNumbersJSON []byte
1278 var handle, pds sql.NullString
1279
1280 err := p.db.QueryRowContext(ctx, query, did).Scan(
1281 &record.DID,
1282 &handle,
1283 &pds,
1284 &bundleNumbersJSON,
1285 &record.CreatedAt,
1286 )
1287 if err != nil {
1288 return nil, err
1289 }
1290
1291 if handle.Valid {
1292 record.Handle = handle.String
1293 }
1294 if pds.Valid {
1295 record.CurrentPDS = pds.String
1296 }
1297
1298 if err := json.Unmarshal(bundleNumbersJSON, &record.BundleNumbers); err != nil {
1299 return nil, err
1300 }
1301
1302 return &record, nil
1303}
1304
1305func (p *PostgresDB) GetDIDByHandle(ctx context.Context, handle string) (*DIDRecord, error) {
1306 query := `
1307 SELECT did, handle, pds, bundle_numbers, created_at
1308 FROM dids
1309 WHERE handle = $1
1310 `
1311
1312 var record DIDRecord
1313 var bundleNumbersJSON []byte
1314 var recordHandle, pds sql.NullString
1315
1316 err := p.db.QueryRowContext(ctx, query, handle).Scan(
1317 &record.DID,
1318 &recordHandle,
1319 &pds,
1320 &bundleNumbersJSON,
1321 &record.CreatedAt,
1322 )
1323 if err != nil {
1324 return nil, err
1325 }
1326
1327 if recordHandle.Valid {
1328 record.Handle = recordHandle.String
1329 }
1330 if pds.Valid {
1331 record.CurrentPDS = pds.String
1332 }
1333
1334 if err := json.Unmarshal(bundleNumbersJSON, &record.BundleNumbers); err != nil {
1335 return nil, err
1336 }
1337
1338 return &record, nil
1339}
1340
1341// GetGlobalDIDInfo retrieves consolidated DID info from 'dids' and 'pds_repos'
1342func (p *PostgresDB) GetGlobalDIDInfo(ctx context.Context, did string) (*GlobalDIDInfo, error) {
1343 query := `
1344 WITH primary_endpoints AS (
1345 SELECT DISTINCT ON (COALESCE(server_did, id::text))
1346 id
1347 FROM endpoints
1348 WHERE endpoint_type = 'pds'
1349 ORDER BY COALESCE(server_did, id::text), discovered_at ASC
1350 )
1351 SELECT
1352 d.did,
1353 d.handle,
1354 d.pds,
1355 d.bundle_numbers,
1356 d.created_at,
1357 COALESCE(
1358 jsonb_agg(
1359 jsonb_build_object(
1360 'id', pr.id,
1361 'endpoint_id', pr.endpoint_id,
1362 'endpoint', e.endpoint,
1363 'did', pr.did,
1364 'head', pr.head,
1365 'rev', pr.rev,
1366 'active', pr.active,
1367 'status', pr.status,
1368 'first_seen', pr.first_seen AT TIME ZONE 'UTC',
1369 'last_seen', pr.last_seen AT TIME ZONE 'UTC',
1370 'updated_at', pr.updated_at AT TIME ZONE 'UTC'
1371 )
1372 ORDER BY pr.last_seen DESC
1373 ) FILTER (
1374 WHERE pr.id IS NOT NULL AND pe.id IS NOT NULL
1375 ),
1376 '[]'::jsonb
1377 ) AS hosting_on
1378 FROM
1379 dids d
1380 LEFT JOIN
1381 pds_repos pr ON d.did = pr.did
1382 LEFT JOIN
1383 endpoints e ON pr.endpoint_id = e.id
1384 LEFT JOIN
1385 primary_endpoints pe ON pr.endpoint_id = pe.id
1386 WHERE
1387 d.did = $1
1388 GROUP BY
1389 d.did, d.handle, d.pds, d.bundle_numbers, d.created_at
1390 `
1391
1392 var info GlobalDIDInfo
1393 var bundleNumbersJSON []byte
1394 var hostingOnJSON []byte
1395 var handle, pds sql.NullString
1396
1397 err := p.db.QueryRowContext(ctx, query, did).Scan(
1398 &info.DID,
1399 &handle,
1400 &pds,
1401 &bundleNumbersJSON,
1402 &info.CreatedAt,
1403 &hostingOnJSON,
1404 )
1405 if err != nil {
1406 return nil, err
1407 }
1408
1409 if handle.Valid {
1410 info.Handle = handle.String
1411 }
1412 if pds.Valid {
1413 info.CurrentPDS = pds.String
1414 }
1415
1416 if err := json.Unmarshal(bundleNumbersJSON, &info.BundleNumbers); err != nil {
1417 return nil, fmt.Errorf("failed to unmarshal bundle_numbers: %w", err)
1418 }
1419
1420 if err := json.Unmarshal(hostingOnJSON, &info.HostingOn); err != nil {
1421 return nil, fmt.Errorf("failed to unmarshal hosting_on: %w", err)
1422 }
1423
1424 return &info, nil
1425}
1426
1427func (p *PostgresDB) AddBundleDIDs(ctx context.Context, bundleNum int, dids []string) error {
1428 if len(dids) == 0 {
1429 return nil
1430 }
1431
1432 // Acquire a connection from the pool
1433 conn, err := p.pool.Acquire(ctx)
1434 if err != nil {
1435 return err
1436 }
1437 defer conn.Release()
1438
1439 // Start transaction
1440 tx, err := conn.Begin(ctx)
1441 if err != nil {
1442 return err
1443 }
1444 defer tx.Rollback(ctx)
1445
1446 // Create temporary table
1447 _, err = tx.Exec(ctx, `
1448 CREATE TEMP TABLE temp_dids (did TEXT PRIMARY KEY) ON COMMIT DROP
1449 `)
1450 if err != nil {
1451 return err
1452 }
1453
1454 // Use COPY for blazing fast bulk insert
1455 _, err = tx.Conn().CopyFrom(
1456 ctx,
1457 pgx.Identifier{"temp_dids"},
1458 []string{"did"},
1459 pgx.CopyFromSlice(len(dids), func(i int) ([]interface{}, error) {
1460 return []interface{}{dids[i]}, nil
1461 }),
1462 )
1463 if err != nil {
1464 return err
1465 }
1466
1467 // Step 1: Insert new DIDs
1468 _, err = tx.Exec(ctx, `
1469 INSERT INTO dids (did, bundle_numbers, created_at)
1470 SELECT td.did, $1::jsonb, CURRENT_TIMESTAMP
1471 FROM temp_dids td
1472 WHERE NOT EXISTS (SELECT 1 FROM dids WHERE dids.did = td.did)
1473 `, fmt.Sprintf("[%d]", bundleNum))
1474
1475 if err != nil {
1476 return err
1477 }
1478
1479 // Step 2: Update existing DIDs
1480 _, err = tx.Exec(ctx, `
1481 UPDATE dids
1482 SET bundle_numbers = bundle_numbers || $1::jsonb
1483 FROM temp_dids
1484 WHERE dids.did = temp_dids.did
1485 AND NOT (bundle_numbers @> $1::jsonb)
1486 `, fmt.Sprintf("[%d]", bundleNum))
1487
1488 if err != nil {
1489 return err
1490 }
1491
1492 return tx.Commit(ctx)
1493}
1494
1495func (p *PostgresDB) GetTotalDIDCount(ctx context.Context) (int64, error) {
1496 query := "SELECT COUNT(*) FROM dids"
1497 var count int64
1498 err := p.db.QueryRowContext(ctx, query).Scan(&count)
1499 return count, err
1500}
1501
1502func (p *PostgresDB) GetCountryLeaderboard(ctx context.Context) ([]*CountryStats, error) {
1503 query := `
1504 WITH unique_servers AS (
1505 SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text))
1506 e.id,
1507 e.ip,
1508 e.status
1509 FROM endpoints e
1510 WHERE e.endpoint_type = 'pds'
1511 ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC
1512 ),
1513 pds_by_country AS (
1514 SELECT
1515 i.country,
1516 i.country_code,
1517 COUNT(DISTINCT us.id) as active_pds_count,
1518 SUM(latest.user_count) as total_users,
1519 AVG(latest.response_time) as avg_response_time
1520 FROM unique_servers us
1521 JOIN ip_infos i ON us.ip = i.ip
1522 LEFT JOIN LATERAL (
1523 SELECT user_count, response_time
1524 FROM endpoint_scans
1525 WHERE endpoint_id = us.id
1526 ORDER BY scanned_at DESC
1527 LIMIT 1
1528 ) latest ON true
1529 WHERE us.status = 1
1530 AND i.country IS NOT NULL
1531 AND i.country != ''
1532 GROUP BY i.country, i.country_code
1533 ),
1534 totals AS (
1535 SELECT
1536 SUM(active_pds_count) as total_pds,
1537 SUM(total_users) as total_users_global
1538 FROM pds_by_country
1539 )
1540 SELECT
1541 pbc.country,
1542 pbc.country_code,
1543 pbc.active_pds_count,
1544 ROUND((pbc.active_pds_count * 100.0 / NULLIF(t.total_pds, 0))::numeric, 2) as pds_percentage,
1545 COALESCE(pbc.total_users, 0) as total_users,
1546 ROUND((COALESCE(pbc.total_users, 0) * 100.0 / NULLIF(t.total_users_global, 0))::numeric, 2) as users_percentage,
1547 ROUND(COALESCE(pbc.avg_response_time, 0)::numeric, 2) as avg_response_time_ms
1548 FROM pds_by_country pbc
1549 CROSS JOIN totals t
1550 ORDER BY pbc.active_pds_count DESC
1551 `
1552
1553 rows, err := p.db.QueryContext(ctx, query)
1554 if err != nil {
1555 return nil, err
1556 }
1557 defer rows.Close()
1558
1559 var stats []*CountryStats
1560 for rows.Next() {
1561 var s CountryStats
1562 var pdsPercentage, usersPercentage sql.NullFloat64
1563
1564 err := rows.Scan(
1565 &s.Country,
1566 &s.CountryCode,
1567 &s.ActivePDSCount,
1568 &pdsPercentage,
1569 &s.TotalUsers,
1570 &usersPercentage,
1571 &s.AvgResponseTimeMS,
1572 )
1573 if err != nil {
1574 return nil, err
1575 }
1576
1577 if pdsPercentage.Valid {
1578 s.PDSPercentage = pdsPercentage.Float64
1579 }
1580 if usersPercentage.Valid {
1581 s.UsersPercentage = usersPercentage.Float64
1582 }
1583
1584 stats = append(stats, &s)
1585 }
1586
1587 return stats, rows.Err()
1588}
1589
1590func (p *PostgresDB) GetVersionStats(ctx context.Context) ([]*VersionStats, error) {
1591 query := `
1592 WITH unique_servers AS (
1593 SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text))
1594 e.id
1595 FROM endpoints e
1596 WHERE e.endpoint_type = 'pds'
1597 AND e.status = 1
1598 ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC
1599 ),
1600 latest_scans AS (
1601 SELECT DISTINCT ON (us.id)
1602 us.id,
1603 es.version,
1604 es.user_count,
1605 es.scanned_at
1606 FROM unique_servers us
1607 JOIN endpoint_scans es ON us.id = es.endpoint_id
1608 WHERE es.version IS NOT NULL
1609 AND es.version != ''
1610 ORDER BY us.id, es.scanned_at DESC
1611 ),
1612 version_groups AS (
1613 SELECT
1614 version,
1615 COUNT(*) as pds_count,
1616 SUM(user_count) as total_users,
1617 MIN(scanned_at) as first_seen,
1618 MAX(scanned_at) as last_seen
1619 FROM latest_scans
1620 GROUP BY version
1621 ),
1622 totals AS (
1623 SELECT
1624 SUM(pds_count) as total_pds,
1625 SUM(total_users) as total_users_global
1626 FROM version_groups
1627 )
1628 SELECT
1629 vg.version,
1630 vg.pds_count,
1631 (vg.pds_count * 100.0 / NULLIF(t.total_pds, 0))::numeric as percentage,
1632 COALESCE(vg.total_users, 0) as total_users,
1633 (COALESCE(vg.total_users, 0) * 100.0 / NULLIF(t.total_users_global, 0))::numeric as users_percentage,
1634 vg.first_seen,
1635 vg.last_seen
1636 FROM version_groups vg
1637 CROSS JOIN totals t
1638 ORDER BY vg.pds_count DESC
1639 `
1640
1641 rows, err := p.db.QueryContext(ctx, query)
1642 if err != nil {
1643 return nil, err
1644 }
1645 defer rows.Close()
1646
1647 var stats []*VersionStats
1648 for rows.Next() {
1649 var s VersionStats
1650 var percentage, usersPercentage sql.NullFloat64
1651
1652 err := rows.Scan(
1653 &s.Version,
1654 &s.PDSCount,
1655 &percentage,
1656 &s.TotalUsers,
1657 &usersPercentage,
1658 &s.FirstSeen,
1659 &s.LastSeen,
1660 )
1661 if err != nil {
1662 return nil, err
1663 }
1664
1665 if percentage.Valid {
1666 s.Percentage = percentage.Float64
1667 s.PercentageText = formatPercentage(percentage.Float64)
1668 }
1669 if usersPercentage.Valid {
1670 s.UsersPercentage = usersPercentage.Float64
1671 }
1672
1673 stats = append(stats, &s)
1674 }
1675
1676 return stats, rows.Err()
1677}
1678
1679// Helper function (add if not already present)
1680func formatPercentage(pct float64) string {
1681 if pct >= 10 {
1682 return fmt.Sprintf("%.2f%%", pct)
1683 } else if pct >= 1 {
1684 return fmt.Sprintf("%.3f%%", pct)
1685 } else if pct >= 0.01 {
1686 return fmt.Sprintf("%.4f%%", pct)
1687 } else if pct > 0 {
1688 return fmt.Sprintf("%.6f%%", pct)
1689 }
1690 return "0%"
1691}
1692
1693func (p *PostgresDB) UpsertPDSRepos(ctx context.Context, endpointID int64, repos []PDSRepoData) error {
1694 if len(repos) == 0 {
1695 return nil
1696 }
1697
1698 // Step 1: Load all existing repos for this endpoint into memory
1699 query := `
1700 SELECT did, head, rev, active, status
1701 FROM pds_repos
1702 WHERE endpoint_id = $1
1703 `
1704
1705 rows, err := p.db.QueryContext(ctx, query, endpointID)
1706 if err != nil {
1707 return err
1708 }
1709
1710 existingRepos := make(map[string]*PDSRepo)
1711 for rows.Next() {
1712 var repo PDSRepo
1713 var head, rev, status sql.NullString
1714
1715 err := rows.Scan(&repo.DID, &head, &rev, &repo.Active, &status)
1716 if err != nil {
1717 rows.Close()
1718 return err
1719 }
1720
1721 if head.Valid {
1722 repo.Head = head.String
1723 }
1724 if rev.Valid {
1725 repo.Rev = rev.String
1726 }
1727 if status.Valid {
1728 repo.Status = status.String
1729 }
1730
1731 existingRepos[repo.DID] = &repo
1732 }
1733 rows.Close()
1734
1735 if err := rows.Err(); err != nil {
1736 return err
1737 }
1738
1739 // Step 2: Compare and collect changes
1740 var newRepos []PDSRepoData
1741 var changedRepos []PDSRepoData
1742
1743 for _, repo := range repos {
1744 existing, exists := existingRepos[repo.DID]
1745 if !exists {
1746 // New repo
1747 newRepos = append(newRepos, repo)
1748 } else if existing.Head != repo.Head ||
1749 existing.Rev != repo.Rev ||
1750 existing.Active != repo.Active ||
1751 existing.Status != repo.Status {
1752 // Repo changed
1753 changedRepos = append(changedRepos, repo)
1754 }
1755 }
1756
1757 // Log comparison results
1758 log.Verbose("UpsertPDSRepos: endpoint_id=%d, total=%d, existing=%d, new=%d, changed=%d, unchanged=%d",
1759 endpointID, len(repos), len(existingRepos), len(newRepos), len(changedRepos),
1760 len(repos)-len(newRepos)-len(changedRepos))
1761
1762 // If nothing changed, return early
1763 if len(newRepos) == 0 && len(changedRepos) == 0 {
1764 log.Verbose("UpsertPDSRepos: endpoint_id=%d, no changes detected, skipping database operations", endpointID)
1765 return nil
1766 }
1767
1768 // Step 3: Execute batched operations
1769 conn, err := p.pool.Acquire(ctx)
1770 if err != nil {
1771 return err
1772 }
1773 defer conn.Release()
1774
1775 tx, err := conn.Begin(ctx)
1776 if err != nil {
1777 return err
1778 }
1779 defer tx.Rollback(ctx)
1780
1781 // Insert new repos
1782 if len(newRepos) > 0 {
1783 _, err := tx.Exec(ctx, `
1784 CREATE TEMP TABLE temp_new_repos (
1785 did TEXT,
1786 head TEXT,
1787 rev TEXT,
1788 active BOOLEAN,
1789 status TEXT
1790 ) ON COMMIT DROP
1791 `)
1792 if err != nil {
1793 return err
1794 }
1795
1796 _, err = tx.Conn().CopyFrom(
1797 ctx,
1798 pgx.Identifier{"temp_new_repos"},
1799 []string{"did", "head", "rev", "active", "status"},
1800 pgx.CopyFromSlice(len(newRepos), func(i int) ([]interface{}, error) {
1801 repo := newRepos[i]
1802 return []interface{}{repo.DID, repo.Head, repo.Rev, repo.Active, repo.Status}, nil
1803 }),
1804 )
1805 if err != nil {
1806 return err
1807 }
1808
1809 result, err := tx.Exec(ctx, `
1810 INSERT INTO pds_repos (endpoint_id, did, head, rev, active, status, first_seen, last_seen)
1811 SELECT $1, did, head, rev, active, status,
1812 TIMEZONE('UTC', NOW()),
1813 TIMEZONE('UTC', NOW())
1814 FROM temp_new_repos
1815 `, endpointID)
1816 if err != nil {
1817 return err
1818 }
1819
1820 log.Verbose("UpsertPDSRepos: endpoint_id=%d, inserted %d new repos", endpointID, result.RowsAffected())
1821 }
1822
1823 // Update changed repos
1824 if len(changedRepos) > 0 {
1825 _, err := tx.Exec(ctx, `
1826 CREATE TEMP TABLE temp_changed_repos (
1827 did TEXT,
1828 head TEXT,
1829 rev TEXT,
1830 active BOOLEAN,
1831 status TEXT
1832 ) ON COMMIT DROP
1833 `)
1834 if err != nil {
1835 return err
1836 }
1837
1838 _, err = tx.Conn().CopyFrom(
1839 ctx,
1840 pgx.Identifier{"temp_changed_repos"},
1841 []string{"did", "head", "rev", "active", "status"},
1842 pgx.CopyFromSlice(len(changedRepos), func(i int) ([]interface{}, error) {
1843 repo := changedRepos[i]
1844 return []interface{}{repo.DID, repo.Head, repo.Rev, repo.Active, repo.Status}, nil
1845 }),
1846 )
1847 if err != nil {
1848 return err
1849 }
1850
1851 result, err := tx.Exec(ctx, `
1852 UPDATE pds_repos
1853 SET head = t.head,
1854 rev = t.rev,
1855 active = t.active,
1856 status = t.status,
1857 last_seen = TIMEZONE('UTC', NOW()),
1858 updated_at = TIMEZONE('UTC', NOW())
1859 FROM temp_changed_repos t
1860 WHERE pds_repos.endpoint_id = $1
1861 AND pds_repos.did = t.did
1862 `, endpointID)
1863 if err != nil {
1864 return err
1865 }
1866
1867 log.Verbose("UpsertPDSRepos: endpoint_id=%d, updated %d changed repos", endpointID, result.RowsAffected())
1868 }
1869
1870 if err := tx.Commit(ctx); err != nil {
1871 return err
1872 }
1873
1874 log.Verbose("UpsertPDSRepos: endpoint_id=%d, transaction committed successfully", endpointID)
1875 return nil
1876}
1877
1878func (p *PostgresDB) GetPDSRepos(ctx context.Context, endpointID int64, activeOnly bool, limit int, offset int) ([]*PDSRepo, error) {
1879 query := `
1880 SELECT id, endpoint_id, did, head, rev, active, status, first_seen, last_seen, updated_at
1881 FROM pds_repos
1882 WHERE endpoint_id = $1
1883 `
1884
1885 args := []interface{}{endpointID}
1886 argIdx := 2
1887
1888 if activeOnly {
1889 query += " AND active = true"
1890 }
1891
1892 // Order by id (primary key) - fastest
1893 query += " ORDER BY id DESC"
1894
1895 if limit > 0 {
1896 query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
1897 args = append(args, limit, offset)
1898 }
1899
1900 rows, err := p.db.QueryContext(ctx, query, args...)
1901 if err != nil {
1902 return nil, err
1903 }
1904 defer rows.Close()
1905
1906 var repos []*PDSRepo
1907 for rows.Next() {
1908 var repo PDSRepo
1909 var head, rev, status sql.NullString
1910
1911 err := rows.Scan(
1912 &repo.ID, &repo.EndpointID, &repo.DID, &head, &rev,
1913 &repo.Active, &status, &repo.FirstSeen, &repo.LastSeen, &repo.UpdatedAt,
1914 )
1915 if err != nil {
1916 return nil, err
1917 }
1918
1919 if head.Valid {
1920 repo.Head = head.String
1921 }
1922 if rev.Valid {
1923 repo.Rev = rev.String
1924 }
1925 if status.Valid {
1926 repo.Status = status.String
1927 }
1928
1929 repos = append(repos, &repo)
1930 }
1931
1932 return repos, rows.Err()
1933}
1934
1935func (p *PostgresDB) GetReposByDID(ctx context.Context, did string) ([]*PDSRepo, error) {
1936 query := `
1937 SELECT id, endpoint_id, did, head, rev, active, status, first_seen, last_seen, updated_at
1938 FROM pds_repos
1939 WHERE did = $1
1940 ORDER BY last_seen DESC
1941 `
1942
1943 rows, err := p.db.QueryContext(ctx, query, did)
1944 if err != nil {
1945 return nil, err
1946 }
1947 defer rows.Close()
1948
1949 var repos []*PDSRepo
1950 for rows.Next() {
1951 var repo PDSRepo
1952 var head, rev, status sql.NullString
1953
1954 err := rows.Scan(
1955 &repo.ID, &repo.EndpointID, &repo.DID, &head, &rev,
1956 &repo.Active, &status, &repo.FirstSeen, &repo.LastSeen, &repo.UpdatedAt,
1957 )
1958 if err != nil {
1959 return nil, err
1960 }
1961
1962 if head.Valid {
1963 repo.Head = head.String
1964 }
1965 if rev.Valid {
1966 repo.Rev = rev.String
1967 }
1968 if status.Valid {
1969 repo.Status = status.String
1970 }
1971
1972 repos = append(repos, &repo)
1973 }
1974
1975 return repos, rows.Err()
1976}
1977
1978func (p *PostgresDB) GetPDSRepoStats(ctx context.Context, endpointID int64) (map[string]interface{}, error) {
1979 query := `
1980 SELECT
1981 COUNT(*) as total_repos,
1982 COUNT(*) FILTER (WHERE active = true) as active_repos,
1983 COUNT(*) FILTER (WHERE active = false) as inactive_repos,
1984 COUNT(*) FILTER (WHERE status IS NOT NULL AND status != '') as repos_with_status,
1985 COUNT(*) FILTER (WHERE updated_at > CURRENT_TIMESTAMP - INTERVAL '1 hour') as recent_changes
1986 FROM pds_repos
1987 WHERE endpoint_id = $1
1988 `
1989
1990 var totalRepos, activeRepos, inactiveRepos, reposWithStatus, recentChanges int64
1991
1992 err := p.db.QueryRowContext(ctx, query, endpointID).Scan(
1993 &totalRepos, &activeRepos, &inactiveRepos, &reposWithStatus, &recentChanges,
1994 )
1995 if err != nil {
1996 return nil, err
1997 }
1998
1999 return map[string]interface{}{
2000 "total_repos": totalRepos,
2001 "active_repos": activeRepos,
2002 "inactive_repos": inactiveRepos,
2003 "repos_with_status": reposWithStatus,
2004 "recent_changes": recentChanges,
2005 }, nil
2006}
2007
2008// GetTableSizes fetches size information (in bytes) for all tables in the specified schema.
2009func (p *PostgresDB) GetTableSizes(ctx context.Context, schema string) ([]TableSizeInfo, error) {
2010 // Query now selects raw byte values directly
2011 query := `
2012 SELECT
2013 c.relname AS table_name,
2014 pg_total_relation_size(c.oid) AS total_bytes,
2015 pg_relation_size(c.oid) AS table_heap_bytes,
2016 pg_indexes_size(c.oid) AS indexes_bytes
2017 FROM
2018 pg_class c
2019 LEFT JOIN
2020 pg_namespace n ON n.oid = c.relnamespace
2021 WHERE
2022 c.relkind = 'r' -- 'r' = ordinary table
2023 AND n.nspname = $1
2024 ORDER BY
2025 total_bytes DESC;
2026 `
2027 rows, err := p.db.QueryContext(ctx, query, schema)
2028 if err != nil {
2029 return nil, fmt.Errorf("failed to query table sizes: %w", err)
2030 }
2031 defer rows.Close()
2032
2033 var results []TableSizeInfo
2034 for rows.Next() {
2035 var info TableSizeInfo
2036 // Scan directly into int64 fields
2037 if err := rows.Scan(
2038 &info.TableName,
2039 &info.TotalBytes,
2040 &info.TableHeapBytes,
2041 &info.IndexesBytes,
2042 ); err != nil {
2043 return nil, fmt.Errorf("failed to scan table size row: %w", err)
2044 }
2045 results = append(results, info)
2046 }
2047 if err := rows.Err(); err != nil {
2048 return nil, fmt.Errorf("error iterating table size rows: %w", err)
2049 }
2050
2051 return results, nil
2052}
2053
2054// GetIndexSizes fetches size information (in bytes) for all indexes in the specified schema.
2055func (p *PostgresDB) GetIndexSizes(ctx context.Context, schema string) ([]IndexSizeInfo, error) {
2056 // Query now selects raw byte values directly
2057 query := `
2058 SELECT
2059 c.relname AS index_name,
2060 COALESCE(i.indrelid::regclass::text, 'N/A') AS table_name,
2061 pg_relation_size(c.oid) AS index_bytes
2062 FROM
2063 pg_class c
2064 LEFT JOIN
2065 pg_index i ON i.indexrelid = c.oid
2066 LEFT JOIN
2067 pg_namespace n ON n.oid = c.relnamespace
2068 WHERE
2069 c.relkind = 'i' -- 'i' = index
2070 AND n.nspname = $1
2071 ORDER BY
2072 index_bytes DESC;
2073 `
2074 rows, err := p.db.QueryContext(ctx, query, schema)
2075 if err != nil {
2076 return nil, fmt.Errorf("failed to query index sizes: %w", err)
2077 }
2078 defer rows.Close()
2079
2080 var results []IndexSizeInfo
2081 for rows.Next() {
2082 var info IndexSizeInfo
2083 var tableName sql.NullString
2084 // Scan directly into int64 field
2085 if err := rows.Scan(
2086 &info.IndexName,
2087 &tableName,
2088 &info.IndexBytes,
2089 ); err != nil {
2090 return nil, fmt.Errorf("failed to scan index size row: %w", err)
2091 }
2092 if tableName.Valid {
2093 info.TableName = tableName.String
2094 } else {
2095 info.TableName = "N/A"
2096 }
2097 results = append(results, info)
2098 }
2099 if err := rows.Err(); err != nil {
2100 return nil, fmt.Errorf("error iterating index size rows: %w", err)
2101 }
2102
2103 return results, nil
2104}