···741 return orphaned, totalBlobs, nil
742}
7430000744// reconcileMissingRecords creates layer records for manifest+layer pairs that are missing.
0745func (gc *GarbageCollector) reconcileMissingRecords(ctx context.Context, missing []MissingRecordDetail, result *GCResult) {
746- for i, m := range missing {
747- record := atproto.NewLayerRecord(
748- m.Digest,
749- m.Size,
750- m.MediaType,
751- m.UserDID,
752- m.ManifestURI,
753- )
754- if _, _, err := gc.pds.CreateLayerRecord(ctx, record); err != nil {
755- gc.logger.Error("Failed to create reconciled layer record",
756- "digest", m.Digest,
757- "manifest", m.ManifestURI,
00000000000758 "error", err)
759 continue
760 }
761- result.RecordsReconciled++
762- if result.RecordsReconciled%100 == 0 {
763- gc.logger.Info("Reconciliation progress",
764- "created", result.RecordsReconciled,
765- "total", len(missing))
766- }
767768- // Throttle: ramp delay based on record index to avoid flooding relays
769- delay := max(10*time.Millisecond, time.Duration(i)*100*time.Microsecond)
770- time.Sleep(delay)
00000771 }
772773 if result.RecordsReconciled > 0 {
···741 return orphaned, totalBlobs, nil
742}
743744+// reconcileBatchSize is the number of layer records per repo commit.
745+// Batching reduces firehose events from N to N/batchSize.
746+const reconcileBatchSize = 200
747+748// reconcileMissingRecords creates layer records for manifest+layer pairs that are missing.
749+// Records are batched into single commits to avoid flooding relays with firehose events.
750func (gc *GarbageCollector) reconcileMissingRecords(ctx context.Context, missing []MissingRecordDetail, result *GCResult) {
751+ for i := 0; i < len(missing); i += reconcileBatchSize {
752+ end := i + reconcileBatchSize
753+ if end > len(missing) {
754+ end = len(missing)
755+ }
756+ chunk := missing[i:end]
757+758+ records := make([]*atproto.LayerRecord, 0, len(chunk))
759+ for _, m := range chunk {
760+ records = append(records, atproto.NewLayerRecord(
761+ m.Digest,
762+ m.Size,
763+ m.MediaType,
764+ m.UserDID,
765+ m.ManifestURI,
766+ ))
767+ }
768+769+ created, err := gc.pds.BatchCreateLayerRecords(ctx, records)
770+ if err != nil {
771+ gc.logger.Error("Failed to create reconciled layer batch",
772+ "batchStart", i,
773+ "batchSize", len(chunk),
774 "error", err)
775 continue
776 }
000000777778+ result.RecordsReconciled += int64(created)
779+ gc.logger.Info("Reconciliation progress",
780+ "created", result.RecordsReconciled,
781+ "total", len(missing),
782+ "batch", len(chunk))
783+784+ // Small delay between batches as a courtesy to relays
785+ time.Sleep(100 * time.Millisecond)
786 }
787788 if result.RecordsReconciled > 0 {
+41
pkg/hold/pds/layer.go
···3import (
4 "context"
5 "fmt"
067 "atcr.io/pkg/atproto"
8 "atcr.io/pkg/hold/quota"
09 lexutil "github.com/bluesky-social/indigo/lex/util"
10 "github.com/bluesky-social/indigo/repo"
11)
···39 }
4041 return rkey, recordCID.String(), nil
00000000000000000000000000000000000000042}
4344// GetLayerRecord retrieves a specific layer record by rkey
···3import (
4 "context"
5 "fmt"
6+ "log/slog"
78 "atcr.io/pkg/atproto"
9 "atcr.io/pkg/hold/quota"
10+ indigoatproto "github.com/bluesky-social/indigo/api/atproto"
11 lexutil "github.com/bluesky-social/indigo/lex/util"
12 "github.com/bluesky-social/indigo/repo"
13)
···41 }
4243 return rkey, recordCID.String(), nil
44+}
45+46+// BatchCreateLayerRecords creates multiple layer records in a single repo commit.
47+// This produces one firehose event instead of one per record.
48+// Invalid records are skipped with a warning. Returns the number of records created.
49+func (p *HoldPDS) BatchCreateLayerRecords(ctx context.Context, records []*atproto.LayerRecord) (int, error) {
50+ var writes []*indigoatproto.RepoApplyWrites_Input_Writes_Elem
51+52+ for _, record := range records {
53+ if record.Type != atproto.LayerCollection {
54+ slog.Warn("Skipping invalid record type in batch", "type", record.Type)
55+ continue
56+ }
57+ if record.Digest == "" {
58+ slog.Warn("Skipping record with empty digest in batch")
59+ continue
60+ }
61+ if record.Size <= 0 {
62+ slog.Warn("Skipping record with non-positive size in batch", "size", record.Size)
63+ continue
64+ }
65+66+ writes = append(writes, &indigoatproto.RepoApplyWrites_Input_Writes_Elem{
67+ RepoApplyWrites_Create: &indigoatproto.RepoApplyWrites_Create{
68+ Collection: atproto.LayerCollection,
69+ Value: &lexutil.LexiconTypeDecoder{Val: record},
70+ },
71+ })
72+ }
73+74+ if len(writes) == 0 {
75+ return 0, nil
76+ }
77+78+ if err := p.repomgr.BatchWrite(ctx, p.uid, writes); err != nil {
79+ return 0, fmt.Errorf("batch write failed: %w", err)
80+ }
81+82+ return len(writes), nil
83}
8485// GetLayerRecord retrieves a specific layer record by rkey