PLC Bundle V1 Example Implementations
at 9a773b02d3e5dcb1c5f7415e1c33efd59b4ea92f 182 lines 5.5 kB view raw
1#!/usr/bin/env ruby 2# frozen_string_literal: true 3 4# plcbundle.rb - Ruby implementation of plcbundle V1 specification 5# Creates compressed, cryptographically-chained bundles of DID PLC operations 6 7require 'json' 8require 'digest' 9require 'net/http' 10require 'uri' 11require 'fileutils' 12require 'time' 13require 'set' 14require 'zstd-ruby' 15 16# Configuration constants 17BUNDLE_SIZE = 10_000 18INDEX_FILE = 'plc_bundles.json' 19PLC_URL = 'https://plc.directory' 20 21class PlcBundle 22 def initialize(dir) 23 @dir = dir 24 @pool = [] # Mempool of operations waiting to be bundled 25 @seen = Set.new # CID deduplication set (pruned after each bundle) 26 27 FileUtils.mkdir_p(@dir) 28 @idx = load_idx 29 puts "plcbundle v1 | Dir: #{@dir} | Last: #{@idx[:last_bundle]}\n" 30 31 # Seed deduplication set with boundary CIDs from previous bundle 32 seed_boundary if @idx[:bundles].any? 33 end 34 35 def run 36 cursor = @idx[:bundles].last&.dig(:end_time) 37 38 loop do 39 puts "\nFetch: #{cursor || 'start'}" 40 ops = fetch(cursor) or (puts('Done.') and break) 41 42 add_ops(ops) # Validate and add to mempool 43 cursor = ops.last[:time] 44 create_bundle while @pool.size >= BUNDLE_SIZE # Create bundles when ready 45 46 sleep 0.2 # Rate limiting 47 end 48 49 save_idx 50 puts "\nBundles: #{@idx[:bundles].size} | Pool: #{@pool.size} | Size: #{'%.1f' % (@idx[:total_size_bytes] / 1e6)}MB" 51 rescue => e 52 puts "\nError: #{e.message}" and save_idx 53 end 54 55 private 56 57 # Fetch operations from PLC directory export endpoint 58 def fetch(after) 59 uri = URI("#{PLC_URL}/export?count=1000#{after ? "&after=#{after}" : ''}") 60 res = Net::HTTP.get_response(uri) 61 res.is_a?(Net::HTTPSuccess) or return nil 62 63 # Parse each line and preserve raw JSON for reproducibility (Spec 4.2) 64 res.body.strip.split("\n").map do |line| 65 {**JSON.parse(line, symbolize_names: true), raw: line, time: JSON.parse(line)['createdAt']} 66 end 67 rescue 68 nil 69 end 70 71 # Process and validate operations before adding to mempool 72 def add_ops(ops) 73 last_t = @pool.last&.dig(:time) || @idx[:bundles].last&.dig(:end_time) || '' 74 added = 0 75 76 ops.each do |op| 77 next if @seen.include?(op[:cid]) # Skip duplicates (boundary + within-batch) 78 79 # Spec 3: Validate chronological order 80 raise "Order fail" if op[:time] < last_t 81 82 @pool << op 83 @seen << op[:cid] 84 last_t = op[:time] 85 added += 1 86 end 87 88 puts " +#{added} ops" 89 end 90 91 # Create a bundle file and update index 92 def create_bundle 93 ops = @pool.shift(BUNDLE_SIZE) 94 parent = @idx[:bundles].last&.dig(:hash) || '' 95 96 # Spec 4.2: Serialize using raw JSON strings for reproducibility 97 jsonl = ops.map { |o| o[:raw] + "\n" }.join 98 99 # Spec 6.3: Calculate hashes 100 ch = sha(jsonl) # Content hash 101 h = sha(parent.empty? ? "plcbundle:genesis:#{ch}" : "#{parent}:#{ch}") # Chain hash 102 zst = Zstd.compress(jsonl) # Compress 103 104 # Write bundle file 105 num = @idx[:last_bundle] + 1 106 file = format('%06d.jsonl.zst', num) 107 File.binwrite("#{@dir}/#{file}", zst) 108 109 # Create metadata entry 110 @idx[:bundles] << { 111 bundle_number: num, 112 start_time: ops[0][:time], 113 end_time: ops[-1][:time], 114 operation_count: ops.size, 115 did_count: ops.map { |o| o[:did] }.uniq.size, 116 hash: h, 117 content_hash: ch, 118 parent: parent, 119 compressed_hash: sha(zst), 120 compressed_size: zst.bytesize, 121 uncompressed_size: jsonl.bytesize, 122 cursor: @idx[:bundles].last&.dig(:end_time) || '', 123 created_at: Time.now.utc.iso8601 124 } 125 126 @idx[:last_bundle] = num 127 @idx[:total_size_bytes] += zst.bytesize 128 129 # Prune seen CIDs: only keep boundary + mempool (memory efficient) 130 @seen = boundary_cids(ops) | @pool.map { |o| o[:cid] }.to_set 131 132 save_idx 133 puts "#{file} | #{h[0..12]}... | seen:#{@seen.size}" 134 end 135 136 # Load index from disk or create new 137 def load_idx 138 JSON.parse(File.read("#{@dir}/#{INDEX_FILE}"), symbolize_names: true) 139 rescue 140 {version: '1.0', last_bundle: 0, updated_at: '', total_size_bytes: 0, bundles: []} 141 end 142 143 # Atomically save index using temp file + rename 144 def save_idx 145 @idx[:updated_at] = Time.now.utc.iso8601 146 tmp = "#{@dir}/#{INDEX_FILE}.tmp" 147 File.write(tmp, JSON.pretty_generate(@idx)) 148 File.rename(tmp, "#{@dir}/#{INDEX_FILE}") 149 end 150 151 # Seed deduplication set with CIDs from last bundle's boundary 152 def seed_boundary 153 last = @idx[:bundles].last 154 file = format('%06d.jsonl.zst', last[:bundle_number]) 155 156 data = Zstd.decompress(File.binread("#{@dir}/#{file}")) 157 ops = data.strip.split("\n").map do |line| 158 {time: JSON.parse(line)['createdAt'], cid: JSON.parse(line)['cid']} 159 end 160 161 @seen = boundary_cids(ops) 162 puts "Seeded: #{@seen.size} CIDs from bundle #{last[:bundle_number]}" 163 rescue 164 puts "Warning: couldn't seed boundary" 165 end 166 167 # Get CIDs from operations at the same timestamp as the last op (boundary) 168 def boundary_cids(ops) 169 return Set.new if ops.empty? 170 171 t = ops[-1][:time] 172 ops.reverse.take_while { |o| o[:time] == t }.map { |o| o[:cid] }.to_set 173 end 174 175 # SHA-256 hash helper 176 def sha(data) 177 Digest::SHA256.hexdigest(data) 178 end 179end 180 181# Entry point 182PlcBundle.new(ARGV[0] || './plc_bundles_rb').run if __FILE__ == $PROGRAM_NAME