tangled
alpha
login
or
join now
atscan.net
/
plcbundle-go
1
fork
atom
[DEPRECATED] Go implementation of plcbundle
1
fork
atom
overview
issues
pulls
pipelines
update structure (6)
tree.fail
4 months ago
62d58345
b3a1b9d7
+29
-38
1 changed file
expand all
collapse all
unified
split
internal
bundle
manager.go
+29
-38
internal/bundle/manager.go
···
1358
1358
1359
1359
if lastBundle != nil {
1360
1360
nextBundleNum = lastBundle.BundleNumber + 1
1361
1361
+
afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
1361
1362
prevBundleHash = lastBundle.Hash
1362
1363
1363
1363
-
// ✨ FIX: Use mempool's last operation time if available
1364
1364
-
// This prevents re-fetching operations already in mempool
1364
1364
+
prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
1365
1365
+
if err == nil {
1366
1366
+
_, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
1367
1367
+
}
1368
1368
+
}
1369
1369
+
1370
1370
+
// ✨ Use mempool's last time if available
1371
1371
+
if m.mempool.Count() > 0 {
1365
1372
mempoolLastTime := m.mempool.GetLastTime()
1366
1373
if mempoolLastTime != "" {
1367
1374
afterTime = mempoolLastTime
1368
1375
if !quiet {
1369
1369
-
m.logger.Printf("Using mempool cursor: %s", afterTime)
1376
1376
+
m.logger.Printf("Continuing from mempool cursor: %s (have %d ops)",
1377
1377
+
afterTime, m.mempool.Count())
1370
1378
}
1371
1371
-
} else {
1372
1372
-
// No mempool operations yet, use last bundle
1373
1373
-
afterTime = lastBundle.EndTime.Format(time.RFC3339Nano)
1374
1374
-
}
1375
1375
-
1376
1376
-
prevBundle, err := m.LoadBundle(ctx, lastBundle.BundleNumber)
1377
1377
-
if err == nil {
1378
1378
-
_, prevBoundaryCIDs = m.operations.GetBoundaryCIDs(prevBundle.Operations)
1379
1379
}
1380
1380
}
1381
1381
···
1383
1383
m.logger.Printf("Preparing bundle %06d (mempool: %d ops)...", nextBundleNum, m.mempool.Count())
1384
1384
}
1385
1385
1386
1386
-
// Fetch in a loop until we have enough OR hit end-of-data
1387
1387
-
maxAttempts := 10
1388
1388
-
attemptCount := 0
1389
1389
-
1390
1390
-
for m.mempool.Count() < types.BUNDLE_SIZE && attemptCount < maxAttempts {
1391
1391
-
attemptCount++
1392
1392
-
1386
1386
+
// ✨ Fetch operations if needed (FetchToMempool loops internally)
1387
1387
+
if m.mempool.Count() < types.BUNDLE_SIZE {
1393
1388
newOps, err := m.syncer.FetchToMempool(
1394
1389
ctx,
1395
1390
afterTime,
···
1399
1394
m.mempool.Count(),
1400
1395
)
1401
1396
1402
1402
-
if err != nil {
1403
1403
-
m.mempool.Save()
1404
1404
-
return nil, err
1397
1397
+
// Add operations if we got any
1398
1398
+
if len(newOps) > 0 {
1399
1399
+
added, addErr := m.mempool.Add(newOps)
1400
1400
+
if addErr != nil {
1401
1401
+
m.mempool.Save()
1402
1402
+
return nil, fmt.Errorf("chronological validation failed: %w", addErr)
1403
1403
+
}
1404
1404
+
1405
1405
+
if !quiet && added > 0 {
1406
1406
+
m.logger.Printf("Added %d new operations (mempool now: %d)", added, m.mempool.Count())
1407
1407
+
}
1405
1408
}
1406
1409
1407
1407
-
// Add to mempool
1408
1408
-
added, err := m.mempool.Add(newOps)
1409
1409
-
if err != nil {
1410
1410
+
// If fetch failed AND we don't have enough, return error
1411
1411
+
if err != nil && m.mempool.Count() < types.BUNDLE_SIZE {
1410
1412
m.mempool.Save()
1411
1411
-
return nil, fmt.Errorf("chronological validation failed: %w", err)
1412
1412
-
}
1413
1413
-
1414
1414
-
if !quiet && added > 0 {
1415
1415
-
m.logger.Printf("Added %d new operations (mempool now: %d)", added, m.mempool.Count())
1416
1416
-
}
1417
1417
-
1418
1418
-
// ✨ Update cursor to last operation in mempool
1419
1419
-
afterTime = m.mempool.GetLastTime()
1420
1420
-
1421
1421
-
// If we got no new operations, we've caught up
1422
1422
-
if len(newOps) == 0 || added == 0 {
1423
1423
-
break
1413
1413
+
return nil, err
1424
1414
}
1425
1415
}
1426
1416
1417
1417
+
// Check if we have enough for a bundle
1427
1418
if m.mempool.Count() < types.BUNDLE_SIZE {
1428
1419
m.mempool.Save()
1429
1429
-
return nil, fmt.Errorf("insufficient operations: have %d, need %d (reached latest data)",
1420
1420
+
return nil, fmt.Errorf("insufficient operations: have %d, need %d (no more available)",
1430
1421
m.mempool.Count(), types.BUNDLE_SIZE)
1431
1422
}
1432
1423