PLC Bundle V1 Example Implementations
1#!/usr/bin/env ruby
2# frozen_string_literal: true
3
4# plcbundle.rb - Ruby implementation of plcbundle V1 specification
5# Creates compressed, cryptographically-chained bundles of DID PLC operations
6
7require 'json'
8require 'digest'
9require 'net/http'
10require 'uri'
11require 'fileutils'
12require 'time'
13require 'set'
14require 'zstd-ruby'
15
16# Configuration constants
17BUNDLE_SIZE = 10_000
18INDEX_FILE = 'plc_bundles.json'
19PLC_URL = 'https://plc.directory'
20
21class PlcBundle
22 def initialize(dir)
23 @dir = dir
24 @pool = [] # Mempool of operations waiting to be bundled
25 @seen = Set.new # CID deduplication set (pruned after each bundle)
26
27 FileUtils.mkdir_p(@dir)
28 @idx = load_idx
29 puts "plcbundle v1 | Dir: #{@dir} | Last: #{@idx[:last_bundle]}\n"
30
31 # Seed deduplication set with boundary CIDs from previous bundle
32 seed_boundary if @idx[:bundles].any?
33 end
34
35 def run
36 cursor = @idx[:bundles].last&.dig(:end_time)
37
38 loop do
39 puts "\nFetch: #{cursor || 'start'}"
40 ops = fetch(cursor) or (puts('Done.') and break)
41
42 add_ops(ops) # Validate and add to mempool
43 cursor = ops.last[:time]
44 create_bundle while @pool.size >= BUNDLE_SIZE # Create bundles when ready
45
46 sleep 0.2 # Rate limiting
47 end
48
49 save_idx
50 puts "\nBundles: #{@idx[:bundles].size} | Pool: #{@pool.size} | Size: #{'%.1f' % (@idx[:total_size_bytes] / 1e6)}MB"
51 rescue => e
52 puts "\nError: #{e.message}" and save_idx
53 end
54
55 private
56
57 # Fetch operations from PLC directory export endpoint
58 def fetch(after)
59 uri = URI("#{PLC_URL}/export?count=1000#{after ? "&after=#{after}" : ''}")
60 res = Net::HTTP.get_response(uri)
61 res.is_a?(Net::HTTPSuccess) or return nil
62
63 # Parse each line and preserve raw JSON for reproducibility (Spec 4.2)
64 res.body.strip.split("\n").map do |line|
65 {**JSON.parse(line, symbolize_names: true), raw: line, time: JSON.parse(line)['createdAt']}
66 end
67 rescue
68 nil
69 end
70
71 # Process and validate operations before adding to mempool
72 def add_ops(ops)
73 last_t = @pool.last&.dig(:time) || @idx[:bundles].last&.dig(:end_time) || ''
74 added = 0
75
76 ops.each do |op|
77 next if @seen.include?(op[:cid]) # Skip duplicates (boundary + within-batch)
78
79 # Spec 3: Validate chronological order
80 raise "Order fail" if op[:time] < last_t
81
82 @pool << op
83 @seen << op[:cid]
84 last_t = op[:time]
85 added += 1
86 end
87
88 puts " +#{added} ops"
89 end
90
91 # Create a bundle file and update index
92 def create_bundle
93 ops = @pool.shift(BUNDLE_SIZE)
94 parent = @idx[:bundles].last&.dig(:hash) || ''
95
96 # Spec 4.2: Serialize using raw JSON strings for reproducibility
97 jsonl = ops.map { |o| o[:raw] + "\n" }.join
98
99 # Spec 6.3: Calculate hashes
100 ch = sha(jsonl) # Content hash
101 h = sha(parent.empty? ? "plcbundle:genesis:#{ch}" : "#{parent}:#{ch}") # Chain hash
102 zst = Zstd.compress(jsonl) # Compress
103
104 # Write bundle file
105 num = @idx[:last_bundle] + 1
106 file = format('%06d.jsonl.zst', num)
107 File.binwrite("#{@dir}/#{file}", zst)
108
109 # Create metadata entry
110 @idx[:bundles] << {
111 bundle_number: num,
112 start_time: ops[0][:time],
113 end_time: ops[-1][:time],
114 operation_count: ops.size,
115 did_count: ops.map { |o| o[:did] }.uniq.size,
116 hash: h,
117 content_hash: ch,
118 parent: parent,
119 compressed_hash: sha(zst),
120 compressed_size: zst.bytesize,
121 uncompressed_size: jsonl.bytesize,
122 cursor: @idx[:bundles].last&.dig(:end_time) || '',
123 created_at: Time.now.utc.iso8601
124 }
125
126 @idx[:last_bundle] = num
127 @idx[:total_size_bytes] += zst.bytesize
128
129 # Prune seen CIDs: only keep boundary + mempool (memory efficient)
130 @seen = boundary_cids(ops) | @pool.map { |o| o[:cid] }.to_set
131
132 save_idx
133 puts "✓ #{file} | #{h[0..12]}... | seen:#{@seen.size}"
134 end
135
136 # Load index from disk or create new
137 def load_idx
138 JSON.parse(File.read("#{@dir}/#{INDEX_FILE}"), symbolize_names: true)
139 rescue
140 {version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: []}
141 end
142
143 # Atomically save index using temp file + rename
144 def save_idx
145 @idx[:updated_at] = Time.now.utc.iso8601
146 tmp = "#{@dir}/#{INDEX_FILE}.tmp"
147 File.write(tmp, JSON.pretty_generate(@idx))
148 File.rename(tmp, "#{@dir}/#{INDEX_FILE}")
149 end
150
151 # Seed deduplication set with CIDs from last bundle's boundary
152 def seed_boundary
153 last = @idx[:bundles].last
154 file = format('%06d.jsonl.zst', last[:bundle_number])
155
156 data = Zstd.decompress(File.binread("#{@dir}/#{file}"))
157 ops = data.strip.split("\n").map do |line|
158 {time: JSON.parse(line)['createdAt'], cid: JSON.parse(line)['cid']}
159 end
160
161 @seen = boundary_cids(ops)
162 puts "Seeded: #{@seen.size} CIDs from bundle #{last[:bundle_number]}"
163 rescue
164 puts "Warning: couldn't seed boundary"
165 end
166
167 # Get CIDs from operations at the same timestamp as the last op (boundary)
168 def boundary_cids(ops)
169 return Set.new if ops.empty?
170
171 t = ops[-1][:time]
172 ops.reverse.take_while { |o| o[:time] == t }.map { |o| o[:cid] }.to_set
173 end
174
175 # SHA-256 hash helper
176 def sha(data)
177 Digest::SHA256.hexdigest(data)
178 end
179end
180
181# Entry point
182PlcBundle.new(ARGV[0] || './plc_bundles_rb').run if __FILE__ == $PROGRAM_NAME