tangled
alpha
login
or
join now
atscan.net
/
atscand
1
fork
atom
wip
1
fork
atom
overview
issues
pulls
pipelines
pds duplicates
tree.fail
4 months ago
f608ec8b
a1069e99
+285
-124
6 changed files
expand all
collapse all
unified
split
internal
api
handlers.go
server.go
pds
scanner.go
storage
db.go
postgres.go
types.go
+43
-5
internal/api/handlers.go
···
235
"status": statusToString(pds.Status),
236
}
237
0
0
0
0
0
238
// Add last_checked if available
239
if !pds.LastChecked.IsZero() {
240
response["last_checked"] = pds.LastChecked
···
244
if pds.LatestScan != nil {
245
response["user_count"] = pds.LatestScan.UserCount
246
response["response_time"] = pds.LatestScan.ResponseTime
247
-
if pds.LatestScan.Version != "" { // NEW: Add this block
248
response["version"] = pds.LatestScan.Version
249
}
250
if !pds.LatestScan.ScannedAt.IsZero() {
···
271
if pds.IPInfo.ASN > 0 {
272
response["asn"] = pds.IPInfo.ASN
273
}
274
-
if pds.IPInfo.IsDatacenter {
275
-
response["is_datacenter"] = pds.IPInfo.IsDatacenter
276
-
}
277
}
278
279
return response
280
}
281
282
func formatPDSDetail(pds *storage.PDSDetail) map[string]interface{} {
283
-
// Start with list item formatting
284
response := formatPDSListItem(&pds.PDSListItem)
0
0
0
0
0
0
0
0
0
285
286
// Add server_info and version from latest scan (PDSDetail's LatestScan takes precedence)
287
if pds.LatestScan != nil {
···
1355
}
1356
1357
resp.json(result)
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
1358
}
1359
1360
// ===== UTILITY FUNCTIONS =====
···
235
"status": statusToString(pds.Status),
236
}
237
238
+
// Add server_did if available
239
+
if pds.ServerDID != "" {
240
+
response["server_did"] = pds.ServerDID
241
+
}
242
+
243
// Add last_checked if available
244
if !pds.LastChecked.IsZero() {
245
response["last_checked"] = pds.LastChecked
···
249
if pds.LatestScan != nil {
250
response["user_count"] = pds.LatestScan.UserCount
251
response["response_time"] = pds.LatestScan.ResponseTime
252
+
if pds.LatestScan.Version != "" {
253
response["version"] = pds.LatestScan.Version
254
}
255
if !pds.LatestScan.ScannedAt.IsZero() {
···
276
if pds.IPInfo.ASN > 0 {
277
response["asn"] = pds.IPInfo.ASN
278
}
0
0
0
279
}
280
281
return response
282
}
283
284
func formatPDSDetail(pds *storage.PDSDetail) map[string]interface{} {
285
+
// Start with list item formatting (includes server_did)
286
response := formatPDSListItem(&pds.PDSListItem)
287
+
288
+
// Add is_primary flag
289
+
response["is_primary"] = pds.IsPrimary
290
+
291
+
// Add aliases if available
292
+
if len(pds.Aliases) > 0 {
293
+
response["aliases"] = pds.Aliases
294
+
response["alias_count"] = len(pds.Aliases)
295
+
}
296
297
// Add server_info and version from latest scan (PDSDetail's LatestScan takes precedence)
298
if pds.LatestScan != nil {
···
1366
}
1367
1368
resp.json(result)
1369
+
}
1370
+
1371
+
func (s *Server) handleGetDuplicateEndpoints(w http.ResponseWriter, r *http.Request) {
1372
+
resp := newResponse(w)
1373
+
1374
+
duplicates, err := s.db.GetDuplicateEndpoints(r.Context())
1375
+
if err != nil {
1376
+
resp.error(err.Error(), http.StatusInternalServerError)
1377
+
return
1378
+
}
1379
+
1380
+
// Format response
1381
+
result := make([]map[string]interface{}, 0)
1382
+
for serverDID, endpoints := range duplicates {
1383
+
result = append(result, map[string]interface{}{
1384
+
"server_did": serverDID,
1385
+
"primary": endpoints[0], // First discovered
1386
+
"aliases": endpoints[1:], // Other domains
1387
+
"alias_count": len(endpoints) - 1,
1388
+
"total_domains": len(endpoints),
1389
+
})
1390
+
}
1391
+
1392
+
resp.json(map[string]interface{}{
1393
+
"duplicates": result,
1394
+
"total_duplicate_servers": len(duplicates),
1395
+
})
1396
}
1397
1398
// ===== UTILITY FUNCTIONS =====
+1
internal/api/server.go
···
65
api.HandleFunc("/pds/stats", s.handleGetPDSStats).Methods("GET")
66
api.HandleFunc("/pds/countries", s.handleGetCountryLeaderboard).Methods("GET")
67
api.HandleFunc("/pds/versions", s.handleGetVersionStats).Methods("GET")
0
68
api.HandleFunc("/pds/{endpoint}", s.handleGetPDSDetail).Methods("GET")
69
70
// PLC Bundle routes
···
65
api.HandleFunc("/pds/stats", s.handleGetPDSStats).Methods("GET")
66
api.HandleFunc("/pds/countries", s.handleGetCountryLeaderboard).Methods("GET")
67
api.HandleFunc("/pds/versions", s.handleGetVersionStats).Methods("GET")
68
+
api.HandleFunc("/pds/duplicates", s.handleGetDuplicateEndpoints).Methods("GET")
69
api.HandleFunc("/pds/{endpoint}", s.handleGetPDSDetail).Methods("GET")
70
71
// PLC Bundle routes
+5
-13
internal/pds/scanner.go
···
123
}
124
}
125
126
-
func (s *Scanner) worker(ctx context.Context, jobs <-chan *storage.Endpoint) {
127
-
for server := range jobs {
128
-
select {
129
-
case <-ctx.Done():
130
-
return
131
-
default:
132
-
s.scanAndSaveEndpoint(ctx, server)
133
-
}
134
-
}
135
-
}
136
-
137
func (s *Scanner) scanAndSaveEndpoint(ctx context.Context, ep *storage.Endpoint) {
138
// STEP 1: Resolve IP (before any network call)
139
ip, err := ipinfo.ExtractIPFromEndpoint(ep.Endpoint)
···
150
s.db.UpdateEndpointIP(ctx, ep.ID, ip, time.Now().UTC())
151
152
// STEP 2: Health check
153
-
available, responseTime, version, err := s.client.CheckHealth(ctx, ep.Endpoint) // CHANGED: receive version
154
if err != nil || !available {
155
errMsg := "health check failed"
156
if err != nil {
···
168
desc, err := s.client.DescribeServer(ctx, ep.Endpoint)
169
if err != nil {
170
log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err)
0
0
0
171
}
172
173
dids, err := s.client.ListRepos(ctx, ep.Endpoint)
···
182
ResponseTime: responseTime,
183
Description: desc,
184
DIDs: dids,
185
-
Version: version, // CHANGED: Pass version
186
})
187
188
// STEP 5: Fetch IP info if needed (async, with backoff)
···
123
}
124
}
125
0
0
0
0
0
0
0
0
0
0
0
126
func (s *Scanner) scanAndSaveEndpoint(ctx context.Context, ep *storage.Endpoint) {
127
// STEP 1: Resolve IP (before any network call)
128
ip, err := ipinfo.ExtractIPFromEndpoint(ep.Endpoint)
···
139
s.db.UpdateEndpointIP(ctx, ep.ID, ip, time.Now().UTC())
140
141
// STEP 2: Health check
142
+
available, responseTime, version, err := s.client.CheckHealth(ctx, ep.Endpoint)
143
if err != nil || !available {
144
errMsg := "health check failed"
145
if err != nil {
···
157
desc, err := s.client.DescribeServer(ctx, ep.Endpoint)
158
if err != nil {
159
log.Verbose("Warning: failed to describe server %s: %v", stripansi.Strip(ep.Endpoint), err)
160
+
} else if desc != nil && desc.DID != "" {
161
+
// NEW: Update server DID
162
+
s.db.UpdateEndpointServerDID(ctx, ep.ID, desc.DID)
163
}
164
165
dids, err := s.client.ListRepos(ctx, ep.Endpoint)
···
174
ResponseTime: responseTime,
175
Description: desc,
176
DIDs: dids,
177
+
Version: version,
178
})
179
180
// STEP 5: Fetch IP info if needed (async, with backoff)
+2
internal/storage/db.go
···
31
SaveEndpointScan(ctx context.Context, scan *EndpointScan) error
32
SetScanRetention(retention int)
33
UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error
0
0
34
35
// PDS virtual endpoints (created via JOINs)
36
GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error)
···
31
SaveEndpointScan(ctx context.Context, scan *EndpointScan) error
32
SetScanRetention(retention int)
33
UpdateEndpointStatus(ctx context.Context, endpointID int64, update *EndpointUpdate) error
34
+
UpdateEndpointServerDID(ctx context.Context, endpointID int64, serverDID string) error
35
+
GetDuplicateEndpoints(ctx context.Context) (map[string][]string, error)
36
37
// PDS virtual endpoints (created via JOINs)
38
GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error)
+226
-104
internal/storage/postgres.go
···
12
"github.com/jackc/pgx/v5"
13
"github.com/jackc/pgx/v5/pgxpool"
14
_ "github.com/jackc/pgx/v5/stdlib"
0
15
)
16
17
type PostgresDB struct {
···
77
id BIGSERIAL PRIMARY KEY,
78
endpoint_type TEXT NOT NULL DEFAULT 'pds',
79
endpoint TEXT NOT NULL,
0
80
discovered_at TIMESTAMP NOT NULL,
81
last_checked TIMESTAMP,
82
status INTEGER DEFAULT 0,
···
90
CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status);
91
CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type);
92
CREATE INDEX IF NOT EXISTS idx_endpoints_ip ON endpoints(ip);
0
93
94
-- IP infos table (IP as PRIMARY KEY)
95
CREATE TABLE IF NOT EXISTS ip_infos (
···
276
277
func (p *PostgresDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) {
278
query := `
279
-
SELECT id, endpoint_type, endpoint, discovered_at, last_checked, status,
280
-
ip, ip_resolved_at, updated_at
281
-
FROM endpoints
282
-
WHERE 1=1
283
-
`
0
284
args := []interface{}{}
285
argIdx := 1
286
···
303
argIdx++
304
}
305
306
-
// FIXED: Filter for stale endpoints only
307
if filter.OnlyStale && filter.RecheckInterval > 0 {
308
-
// Calculate cutoff time in UTC (Go side, not PostgreSQL side)
309
cutoffTime := time.Now().UTC().Add(-filter.RecheckInterval)
310
query += fmt.Sprintf(" AND (last_checked IS NULL OR last_checked < $%d)", argIdx)
311
args = append(args, cutoffTime)
···
313
}
314
}
315
316
-
query += " ORDER BY id DESC"
0
317
318
if filter != nil && filter.Limit > 0 {
319
query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
···
330
for rows.Next() {
331
var ep Endpoint
332
var lastChecked, ipResolvedAt sql.NullTime
333
-
var ip sql.NullString
334
335
err := rows.Scan(
336
-
&ep.ID, &ep.EndpointType, &ep.Endpoint, &ep.DiscoveredAt, &lastChecked,
337
&ep.Status, &ip, &ipResolvedAt, &ep.UpdatedAt,
338
)
339
if err != nil {
340
return nil, err
341
}
342
0
0
0
343
if lastChecked.Valid {
344
ep.LastChecked = lastChecked.Time
345
}
···
376
return err
377
}
378
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
379
// ===== SCAN OPERATIONS =====
380
381
func (p *PostgresDB) SetScanRetention(retention int) {
···
480
481
func (p *PostgresDB) GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) {
482
query := `
483
-
SELECT
484
-
e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip,
485
-
latest.user_count, latest.response_time, latest.version, latest.scanned_at,
486
-
i.city, i.country, i.country_code, i.asn, i.asn_org,
487
-
i.is_datacenter, i.is_vpn, i.latitude, i.longitude
488
-
FROM endpoints e
489
-
LEFT JOIN LATERAL (
490
-
SELECT
491
-
user_count,
492
-
response_time,
493
-
version,
494
-
scanned_at
495
-
FROM endpoint_scans
496
-
WHERE endpoint_id = e.id AND status = 1
497
-
ORDER BY scanned_at DESC
498
-
LIMIT 1
499
-
) latest ON true
500
-
LEFT JOIN ip_infos i ON e.ip = i.ip
501
-
WHERE e.endpoint_type = 'pds'
502
-
`
0
0
0
0
0
0
0
0
0
0
0
0
0
503
504
args := []interface{}{}
505
argIdx := 1
···
541
var items []*PDSListItem
542
for rows.Next() {
543
item := &PDSListItem{}
544
-
var ip, city, country, countryCode, asnOrg sql.NullString
545
var asn sql.NullInt32
546
var isDatacenter, isVPN sql.NullBool
547
var lat, lon sql.NullFloat64
548
var userCount sql.NullInt32
549
var responseTime sql.NullFloat64
550
-
var version sql.NullString // ADD THIS LINE
551
var scannedAt sql.NullTime
552
553
err := rows.Scan(
554
-
&item.ID, &item.Endpoint, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip,
555
-
&userCount, &responseTime, &version, &scannedAt, // ADD &version HERE
556
&city, &country, &countryCode, &asn, &asnOrg,
557
&isDatacenter, &isVPN, &lat, &lon,
558
)
···
562
563
if ip.Valid {
564
item.IP = ip.String
0
0
0
565
}
566
567
// Add latest scan data if available
···
603
604
func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) {
605
query := `
606
-
SELECT
607
-
e.id, e.endpoint, e.discovered_at, e.last_checked, e.status, e.ip,
608
-
latest.user_count,
609
-
latest.response_time,
610
-
latest.version, -- ADD THIS LINE
611
-
latest.scan_data->'metadata'->'server_info' as server_info,
612
-
latest.scanned_at,
613
-
i.city, i.country, i.country_code, i.asn, i.asn_org,
614
-
i.is_datacenter, i.is_vpn, i.latitude, i.longitude,
615
-
i.raw_data
616
-
FROM endpoints e
617
-
LEFT JOIN LATERAL (
618
-
SELECT scan_data, response_time, version, scanned_at, user_count -- ADD version HERE
619
-
FROM endpoint_scans
620
-
WHERE endpoint_id = e.id
621
-
ORDER BY scanned_at DESC
622
-
LIMIT 1
623
-
) latest ON true
624
-
LEFT JOIN ip_infos i ON e.ip = i.ip
625
-
WHERE e.endpoint = $1 AND e.endpoint_type = 'pds'
626
-
`
627
628
detail := &PDSDetail{}
629
-
var ip, city, country, countryCode, asnOrg sql.NullString
630
var asn sql.NullInt32
631
var isDatacenter, isVPN sql.NullBool
632
var lat, lon sql.NullFloat64
633
var userCount sql.NullInt32
634
var responseTime sql.NullFloat64
635
-
var version sql.NullString // ADD THIS LINE
636
var serverInfoJSON []byte
637
var scannedAt sql.NullTime
638
var rawDataJSON []byte
639
640
err := p.db.QueryRowContext(ctx, query, endpoint).Scan(
641
-
&detail.ID, &detail.Endpoint, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip,
642
-
&userCount, &responseTime, &version, &serverInfoJSON, &scannedAt, // ADD &version HERE
643
&city, &country, &countryCode, &asn, &asnOrg,
644
&isDatacenter, &isVPN, &lat, &lon,
645
&rawDataJSON,
···
652
detail.IP = ip.String
653
}
654
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
655
// Parse latest scan data
656
if userCount.Valid {
657
var serverInfo interface{}
···
662
detail.LatestScan = &struct {
663
UserCount int
664
ResponseTime float64
665
-
Version string // ADD THIS LINE
666
ServerInfo interface{}
667
ScannedAt time.Time
668
}{
669
UserCount: int(userCount.Int32),
670
ResponseTime: responseTime.Float64,
671
-
Version: version.String, // ADD THIS LINE
672
ServerInfo: serverInfo,
673
ScannedAt: scannedAt.Time,
674
}
···
687
IsVPN: isVPN.Bool,
688
Latitude: float32(lat.Float64),
689
Longitude: float32(lon.Float64),
690
-
// RawData is unmarshaled below
691
}
692
693
-
// NEW: Unmarshal the raw_data JSON
694
if len(rawDataJSON) > 0 {
695
-
if err := json.Unmarshal(rawDataJSON, &detail.IPInfo.RawData); err != nil {
696
-
// Log the error but don't fail the request
697
-
fmt.Printf("Warning: failed to unmarshal raw_data for IP %s: %v\n", ip.String, err)
698
-
}
699
}
700
}
701
···
703
}
704
705
func (p *PostgresDB) GetPDSStats(ctx context.Context) (*PDSStats, error) {
706
-
// PDS stats - aggregate from latest scans
707
query := `
708
-
WITH latest_scans AS (
709
-
SELECT DISTINCT ON (endpoint_id)
710
-
endpoint_id,
711
-
user_count,
712
status
713
-
FROM endpoint_scans
714
-
WHERE endpoint_id IN (SELECT id FROM endpoints WHERE endpoint_type = 'pds')
715
-
ORDER BY endpoint_id, scanned_at DESC
716
-
)
717
-
SELECT
718
-
COUNT(*) as total,
719
-
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online,
720
-
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline,
721
-
SUM(user_count) as total_users
722
-
FROM latest_scans
723
-
`
0
0
0
0
0
0
0
0
0
724
725
stats := &PDSStats{}
726
err := p.db.QueryRowContext(ctx, query).Scan(
···
1534
1535
func (p *PostgresDB) GetCountryLeaderboard(ctx context.Context) ([]*CountryStats, error) {
1536
query := `
1537
-
WITH pds_by_country AS (
0
0
0
0
0
0
0
0
0
1538
SELECT
1539
i.country,
1540
i.country_code,
1541
-
COUNT(DISTINCT e.id) as active_pds_count,
1542
SUM(latest.user_count) as total_users,
1543
AVG(latest.response_time) as avg_response_time
1544
-
FROM endpoints e
1545
-
JOIN ip_infos i ON e.ip = i.ip
1546
LEFT JOIN LATERAL (
1547
SELECT user_count, response_time
1548
FROM endpoint_scans
1549
-
WHERE endpoint_id = e.id
1550
ORDER BY scanned_at DESC
1551
LIMIT 1
1552
) latest ON true
1553
-
WHERE e.endpoint_type = 'pds'
1554
-
AND e.status = 1
1555
-
AND i.country IS NOT NULL
1556
-
AND i.country != ''
1557
GROUP BY i.country, i.country_code
1558
),
1559
totals AS (
···
1566
pbc.country,
1567
pbc.country_code,
1568
pbc.active_pds_count,
1569
-
ROUND((pbc.active_pds_count * 100.0 / NULLIF(t.total_pds, 0))::numeric, 2) as pds_percentage,
1570
COALESCE(pbc.total_users, 0) as total_users,
1571
-
ROUND((COALESCE(pbc.total_users, 0) * 100.0 / NULLIF(t.total_users_global, 0))::numeric, 2) as users_percentage,
1572
ROUND(COALESCE(pbc.avg_response_time, 0)::numeric, 2) as avg_response_time_ms
1573
FROM pds_by_country pbc
1574
CROSS JOIN totals t
1575
-
ORDER BY pbc.active_pds_count DESC;
1576
`
1577
1578
rows, err := p.db.QueryContext(ctx, query)
···
1614
1615
func (p *PostgresDB) GetVersionStats(ctx context.Context) ([]*VersionStats, error) {
1616
query := `
1617
-
WITH latest_scans AS (
1618
-
SELECT DISTINCT ON (e.id)
1619
-
e.id,
1620
-
es.version,
1621
-
es.user_count,
1622
-
es.scanned_at
1623
FROM endpoints e
1624
-
JOIN endpoint_scans es ON e.id = es.endpoint_id
1625
WHERE e.endpoint_type = 'pds'
1626
AND e.status = 1
1627
-
AND es.version IS NOT NULL
0
0
0
0
0
0
0
0
0
0
1628
AND es.version != ''
1629
-
ORDER BY e.id, es.scanned_at DESC
1630
),
1631
version_groups AS (
1632
SELECT
···
12
"github.com/jackc/pgx/v5"
13
"github.com/jackc/pgx/v5/pgxpool"
14
_ "github.com/jackc/pgx/v5/stdlib"
15
+
"github.com/lib/pq"
16
)
17
18
type PostgresDB struct {
···
78
id BIGSERIAL PRIMARY KEY,
79
endpoint_type TEXT NOT NULL DEFAULT 'pds',
80
endpoint TEXT NOT NULL,
81
+
server_did TEXT,
82
discovered_at TIMESTAMP NOT NULL,
83
last_checked TIMESTAMP,
84
status INTEGER DEFAULT 0,
···
92
CREATE INDEX IF NOT EXISTS idx_endpoints_status ON endpoints(status);
93
CREATE INDEX IF NOT EXISTS idx_endpoints_type ON endpoints(endpoint_type);
94
CREATE INDEX IF NOT EXISTS idx_endpoints_ip ON endpoints(ip);
95
+
CREATE INDEX IF NOT EXISTS idx_endpoints_server_did ON endpoints(server_did);
96
97
-- IP infos table (IP as PRIMARY KEY)
98
CREATE TABLE IF NOT EXISTS ip_infos (
···
279
280
func (p *PostgresDB) GetEndpoints(ctx context.Context, filter *EndpointFilter) ([]*Endpoint, error) {
281
query := `
282
+
SELECT DISTINCT ON (COALESCE(server_did, id::text))
283
+
id, endpoint_type, endpoint, server_did, discovered_at, last_checked, status,
284
+
ip, ip_resolved_at, updated_at
285
+
FROM endpoints
286
+
WHERE 1=1
287
+
`
288
args := []interface{}{}
289
argIdx := 1
290
···
307
argIdx++
308
}
309
310
+
// Filter for stale endpoints only
311
if filter.OnlyStale && filter.RecheckInterval > 0 {
0
312
cutoffTime := time.Now().UTC().Add(-filter.RecheckInterval)
313
query += fmt.Sprintf(" AND (last_checked IS NULL OR last_checked < $%d)", argIdx)
314
args = append(args, cutoffTime)
···
316
}
317
}
318
319
+
// NEW: Order by server_did and discovered_at to get primary endpoints
320
+
query += " ORDER BY COALESCE(server_did, id::text), discovered_at ASC"
321
322
if filter != nil && filter.Limit > 0 {
323
query += fmt.Sprintf(" LIMIT $%d OFFSET $%d", argIdx, argIdx+1)
···
334
for rows.Next() {
335
var ep Endpoint
336
var lastChecked, ipResolvedAt sql.NullTime
337
+
var ip, serverDID sql.NullString
338
339
err := rows.Scan(
340
+
&ep.ID, &ep.EndpointType, &ep.Endpoint, &serverDID, &ep.DiscoveredAt, &lastChecked,
341
&ep.Status, &ip, &ipResolvedAt, &ep.UpdatedAt,
342
)
343
if err != nil {
344
return nil, err
345
}
346
347
+
if serverDID.Valid {
348
+
ep.ServerDID = serverDID.String
349
+
}
350
if lastChecked.Valid {
351
ep.LastChecked = lastChecked.Time
352
}
···
383
return err
384
}
385
386
+
func (p *PostgresDB) UpdateEndpointServerDID(ctx context.Context, endpointID int64, serverDID string) error {
387
+
query := `
388
+
UPDATE endpoints
389
+
SET server_did = $1, updated_at = $2
390
+
WHERE id = $3
391
+
`
392
+
_, err := p.db.ExecContext(ctx, query, serverDID, time.Now().UTC(), endpointID)
393
+
return err
394
+
}
395
+
396
+
func (p *PostgresDB) GetDuplicateEndpoints(ctx context.Context) (map[string][]string, error) {
397
+
query := `
398
+
SELECT server_did, array_agg(endpoint ORDER BY discovered_at ASC) as endpoints
399
+
FROM endpoints
400
+
WHERE server_did IS NOT NULL
401
+
AND server_did != ''
402
+
AND endpoint_type = 'pds'
403
+
GROUP BY server_did
404
+
HAVING COUNT(*) > 1
405
+
ORDER BY COUNT(*) DESC
406
+
`
407
+
408
+
rows, err := p.db.QueryContext(ctx, query)
409
+
if err != nil {
410
+
return nil, err
411
+
}
412
+
defer rows.Close()
413
+
414
+
duplicates := make(map[string][]string)
415
+
for rows.Next() {
416
+
var serverDID string
417
+
var endpoints []string
418
+
419
+
err := rows.Scan(&serverDID, pq.Array(&endpoints))
420
+
if err != nil {
421
+
return nil, err
422
+
}
423
+
424
+
duplicates[serverDID] = endpoints
425
+
}
426
+
427
+
return duplicates, rows.Err()
428
+
}
429
+
430
// ===== SCAN OPERATIONS =====
431
432
func (p *PostgresDB) SetScanRetention(retention int) {
···
531
532
func (p *PostgresDB) GetPDSList(ctx context.Context, filter *EndpointFilter) ([]*PDSListItem, error) {
533
query := `
534
+
WITH unique_servers AS (
535
+
SELECT DISTINCT ON (COALESCE(server_did, id::text))
536
+
id,
537
+
endpoint,
538
+
server_did,
539
+
discovered_at,
540
+
last_checked,
541
+
status,
542
+
ip
543
+
FROM endpoints
544
+
WHERE endpoint_type = 'pds'
545
+
ORDER BY COALESCE(server_did, id::text), discovered_at ASC
546
+
)
547
+
SELECT
548
+
e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip,
549
+
latest.user_count, latest.response_time, latest.version, latest.scanned_at,
550
+
i.city, i.country, i.country_code, i.asn, i.asn_org,
551
+
i.is_datacenter, i.is_vpn, i.latitude, i.longitude
552
+
FROM unique_servers e
553
+
LEFT JOIN LATERAL (
554
+
SELECT
555
+
user_count,
556
+
response_time,
557
+
version,
558
+
scanned_at
559
+
FROM endpoint_scans
560
+
WHERE endpoint_id = e.id AND status = 1
561
+
ORDER BY scanned_at DESC
562
+
LIMIT 1
563
+
) latest ON true
564
+
LEFT JOIN ip_infos i ON e.ip = i.ip
565
+
WHERE 1=1
566
+
`
567
568
args := []interface{}{}
569
argIdx := 1
···
605
var items []*PDSListItem
606
for rows.Next() {
607
item := &PDSListItem{}
608
+
var ip, serverDID, city, country, countryCode, asnOrg sql.NullString
609
var asn sql.NullInt32
610
var isDatacenter, isVPN sql.NullBool
611
var lat, lon sql.NullFloat64
612
var userCount sql.NullInt32
613
var responseTime sql.NullFloat64
614
+
var version sql.NullString
615
var scannedAt sql.NullTime
616
617
err := rows.Scan(
618
+
&item.ID, &item.Endpoint, &serverDID, &item.DiscoveredAt, &item.LastChecked, &item.Status, &ip,
619
+
&userCount, &responseTime, &version, &scannedAt,
620
&city, &country, &countryCode, &asn, &asnOrg,
621
&isDatacenter, &isVPN, &lat, &lon,
622
)
···
626
627
if ip.Valid {
628
item.IP = ip.String
629
+
}
630
+
if serverDID.Valid {
631
+
item.ServerDID = serverDID.String
632
}
633
634
// Add latest scan data if available
···
670
671
func (p *PostgresDB) GetPDSDetail(ctx context.Context, endpoint string) (*PDSDetail, error) {
672
query := `
673
+
SELECT
674
+
e.id, e.endpoint, e.server_did, e.discovered_at, e.last_checked, e.status, e.ip,
675
+
latest.user_count,
676
+
latest.response_time,
677
+
latest.version,
678
+
latest.scan_data->'metadata'->'server_info' as server_info,
679
+
latest.scanned_at,
680
+
i.city, i.country, i.country_code, i.asn, i.asn_org,
681
+
i.is_datacenter, i.is_vpn, i.latitude, i.longitude,
682
+
i.raw_data
683
+
FROM endpoints e
684
+
LEFT JOIN LATERAL (
685
+
SELECT scan_data, response_time, version, scanned_at, user_count
686
+
FROM endpoint_scans
687
+
WHERE endpoint_id = e.id
688
+
ORDER BY scanned_at DESC
689
+
LIMIT 1
690
+
) latest ON true
691
+
LEFT JOIN ip_infos i ON e.ip = i.ip
692
+
WHERE e.endpoint = $1 AND e.endpoint_type = 'pds'
693
+
`
694
695
detail := &PDSDetail{}
696
+
var ip, city, country, countryCode, asnOrg, serverDID sql.NullString
697
var asn sql.NullInt32
698
var isDatacenter, isVPN sql.NullBool
699
var lat, lon sql.NullFloat64
700
var userCount sql.NullInt32
701
var responseTime sql.NullFloat64
702
+
var version sql.NullString
703
var serverInfoJSON []byte
704
var scannedAt sql.NullTime
705
var rawDataJSON []byte
706
707
err := p.db.QueryRowContext(ctx, query, endpoint).Scan(
708
+
&detail.ID, &detail.Endpoint, &serverDID, &detail.DiscoveredAt, &detail.LastChecked, &detail.Status, &ip,
709
+
&userCount, &responseTime, &version, &serverInfoJSON, &scannedAt,
710
&city, &country, &countryCode, &asn, &asnOrg,
711
&isDatacenter, &isVPN, &lat, &lon,
712
&rawDataJSON,
···
719
detail.IP = ip.String
720
}
721
722
+
// NEW: Get aliases if this endpoint has a server_did
723
+
if serverDID.Valid && serverDID.String != "" {
724
+
aliasQuery := `
725
+
SELECT endpoint, discovered_at
726
+
FROM endpoints
727
+
WHERE server_did = $1
728
+
AND endpoint_type = 'pds'
729
+
AND endpoint != $2
730
+
ORDER BY discovered_at ASC
731
+
`
732
+
733
+
rows, err := p.db.QueryContext(ctx, aliasQuery, serverDID.String, endpoint)
734
+
if err == nil {
735
+
defer rows.Close()
736
+
737
+
var aliases []string
738
+
var primaryDiscoveredAt time.Time
739
+
740
+
for rows.Next() {
741
+
var alias string
742
+
var discoveredAt time.Time
743
+
if err := rows.Scan(&alias, &discoveredAt); err == nil {
744
+
aliases = append(aliases, alias)
745
+
if primaryDiscoveredAt.IsZero() || discoveredAt.Before(detail.DiscoveredAt) {
746
+
primaryDiscoveredAt = discoveredAt
747
+
}
748
+
}
749
+
}
750
+
751
+
detail.Aliases = aliases
752
+
detail.IsPrimary = detail.DiscoveredAt.Equal(primaryDiscoveredAt) ||
753
+
detail.DiscoveredAt.Before(primaryDiscoveredAt)
754
+
}
755
+
} else {
756
+
// No server_did means it's a unique server
757
+
detail.IsPrimary = true
758
+
}
759
+
760
// Parse latest scan data
761
if userCount.Valid {
762
var serverInfo interface{}
···
767
detail.LatestScan = &struct {
768
UserCount int
769
ResponseTime float64
770
+
Version string
771
ServerInfo interface{}
772
ScannedAt time.Time
773
}{
774
UserCount: int(userCount.Int32),
775
ResponseTime: responseTime.Float64,
776
+
Version: version.String,
777
ServerInfo: serverInfo,
778
ScannedAt: scannedAt.Time,
779
}
···
792
IsVPN: isVPN.Bool,
793
Latitude: float32(lat.Float64),
794
Longitude: float32(lon.Float64),
0
795
}
796
0
797
if len(rawDataJSON) > 0 {
798
+
json.Unmarshal(rawDataJSON, &detail.IPInfo.RawData)
0
0
0
799
}
800
}
801
···
803
}
804
805
func (p *PostgresDB) GetPDSStats(ctx context.Context) (*PDSStats, error) {
0
806
query := `
807
+
WITH unique_servers AS (
808
+
SELECT DISTINCT ON (COALESCE(server_did, id::text))
809
+
id,
810
+
COALESCE(server_did, id::text) as server_identity,
811
status
812
+
FROM endpoints
813
+
WHERE endpoint_type = 'pds'
814
+
ORDER BY COALESCE(server_did, id::text), discovered_at ASC
815
+
),
816
+
latest_scans AS (
817
+
SELECT DISTINCT ON (us.id)
818
+
us.id,
819
+
es.user_count,
820
+
us.status
821
+
FROM unique_servers us
822
+
LEFT JOIN endpoint_scans es ON us.id = es.endpoint_id
823
+
ORDER BY us.id, es.scanned_at DESC
824
+
)
825
+
SELECT
826
+
COUNT(*) as total,
827
+
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as online,
828
+
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as offline,
829
+
SUM(COALESCE(user_count, 0)) as total_users
830
+
FROM latest_scans
831
+
`
832
833
stats := &PDSStats{}
834
err := p.db.QueryRowContext(ctx, query).Scan(
···
1642
1643
func (p *PostgresDB) GetCountryLeaderboard(ctx context.Context) ([]*CountryStats, error) {
1644
query := `
1645
+
WITH unique_servers AS (
1646
+
SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text))
1647
+
e.id,
1648
+
e.ip,
1649
+
e.status
1650
+
FROM endpoints e
1651
+
WHERE e.endpoint_type = 'pds'
1652
+
ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC
1653
+
),
1654
+
pds_by_country AS (
1655
SELECT
1656
i.country,
1657
i.country_code,
1658
+
COUNT(DISTINCT us.id) as active_pds_count,
1659
SUM(latest.user_count) as total_users,
1660
AVG(latest.response_time) as avg_response_time
1661
+
FROM unique_servers us
1662
+
JOIN ip_infos i ON us.ip = i.ip
1663
LEFT JOIN LATERAL (
1664
SELECT user_count, response_time
1665
FROM endpoint_scans
1666
+
WHERE endpoint_id = us.id
1667
ORDER BY scanned_at DESC
1668
LIMIT 1
1669
) latest ON true
1670
+
WHERE us.status = 1
1671
+
AND i.country IS NOT NULL
1672
+
AND i.country != ''
0
1673
GROUP BY i.country, i.country_code
1674
),
1675
totals AS (
···
1682
pbc.country,
1683
pbc.country_code,
1684
pbc.active_pds_count,
1685
+
ROUND((pbc.active_pds_count * 100.0 / NULLIF(t.total_pds, 0))::numeric, 4) as pds_percentage,
1686
COALESCE(pbc.total_users, 0) as total_users,
1687
+
ROUND((COALESCE(pbc.total_users, 0) * 100.0 / NULLIF(t.total_users_global, 0))::numeric, 4) as users_percentage,
1688
ROUND(COALESCE(pbc.avg_response_time, 0)::numeric, 2) as avg_response_time_ms
1689
FROM pds_by_country pbc
1690
CROSS JOIN totals t
1691
+
ORDER BY pbc.active_pds_count DESC
1692
`
1693
1694
rows, err := p.db.QueryContext(ctx, query)
···
1730
1731
func (p *PostgresDB) GetVersionStats(ctx context.Context) ([]*VersionStats, error) {
1732
query := `
1733
+
WITH unique_servers AS (
1734
+
SELECT DISTINCT ON (COALESCE(e.server_did, e.id::text))
1735
+
e.id
0
0
0
1736
FROM endpoints e
0
1737
WHERE e.endpoint_type = 'pds'
1738
AND e.status = 1
1739
+
ORDER BY COALESCE(e.server_did, e.id::text), e.discovered_at ASC
1740
+
),
1741
+
latest_scans AS (
1742
+
SELECT DISTINCT ON (us.id)
1743
+
us.id,
1744
+
es.version,
1745
+
es.user_count,
1746
+
es.scanned_at
1747
+
FROM unique_servers us
1748
+
JOIN endpoint_scans es ON us.id = es.endpoint_id
1749
+
WHERE es.version IS NOT NULL
1750
AND es.version != ''
1751
+
ORDER BY us.id, es.scanned_at DESC
1752
),
1753
version_groups AS (
1754
SELECT
+8
-2
internal/storage/types.go
···
20
ID int64
21
EndpointType string
22
Endpoint string
0
23
DiscoveredAt time.Time
24
LastChecked time.Time
25
Status int
···
185
// From endpoints table
186
ID int64
187
Endpoint string
0
188
DiscoveredAt time.Time
189
LastChecked time.Time
190
Status int
···
194
LatestScan *struct {
195
UserCount int
196
ResponseTime float64
197
-
Version string // NEW: Add this
198
ScannedAt time.Time
199
}
200
···
210
LatestScan *struct {
211
UserCount int
212
ResponseTime float64
213
-
Version string // ADD THIS LINE
214
ServerInfo interface{} // Full server description
215
ScannedAt time.Time
216
}
0
0
0
0
217
}
218
219
type CountryStats struct {
···
20
ID int64
21
EndpointType string
22
Endpoint string
23
+
ServerDID string
24
DiscoveredAt time.Time
25
LastChecked time.Time
26
Status int
···
186
// From endpoints table
187
ID int64
188
Endpoint string
189
+
ServerDID string // NEW: Add this
190
DiscoveredAt time.Time
191
LastChecked time.Time
192
Status int
···
196
LatestScan *struct {
197
UserCount int
198
ResponseTime float64
199
+
Version string
200
ScannedAt time.Time
201
}
202
···
212
LatestScan *struct {
213
UserCount int
214
ResponseTime float64
215
+
Version string
216
ServerInfo interface{} // Full server description
217
ScannedAt time.Time
218
}
219
+
220
+
// NEW: Aliases (other domains pointing to same server)
221
+
Aliases []string `json:"aliases,omitempty"`
222
+
IsPrimary bool `json:"is_primary"`
223
}
224
225
type CountryStats struct {