Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
at dev 359 lines 9.3 kB view raw view rendered
1# Importing Records 2 3Parity includes a comprehensive import system that enables you to sync historical AT Protocol data to your Eloquent models. This complements the real-time sync provided by [ParitySignal](atp-signals-integration.md). 4 5## The Cold Start Problem 6 7When you start consuming the AT Protocol firehose with ParitySignal, you only receive events from that point forward. Any records created before you started listening are not captured. 8 9Importing solves this "cold start" problem by fetching existing records from user repositories via the `com.atproto.repo.listRecords` API. 10 11## Quick Start 12 13### 1. Run the Migration 14 15Publish and run the migration to create the import state tracking table: 16 17```bash 18php artisan vendor:publish --tag=parity-migrations 19php artisan migrate 20``` 21 22### 2. Import a User 23 24```bash 25# Import all registered collections for a user 26php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur 27 28# Import a specific collection 29php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --collection=app.bsky.feed.post 30 31# Show progress 32php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --progress 33``` 34 35### 3. Check Status 36 37```bash 38# Show all import status 39php artisan parity:import-status 40 41# Show status for a specific user 42php artisan parity:import-status did:plc:z72i7hdynmk6r22z27h6tvur 43 44# Show only incomplete imports 45php artisan parity:import-status --pending 46``` 47 48## Programmatic Usage 49 50### ImportService 51 52The `ImportService` is the main orchestration class: 53 54```php 55use SocialDept\AtpParity\Import\ImportService; 56 57$service = app(ImportService::class); 58 59// Import all registered collections for a user 60$result = $service->importUser('did:plc:z72i7hdynmk6r22z27h6tvur'); 61 62echo "Synced {$result->recordsSynced} records"; 63 64// Import a specific collection 65$result = $service->importUserCollection( 66 'did:plc:z72i7hdynmk6r22z27h6tvur', 67 'app.bsky.feed.post' 68); 69 70// With progress callback 71$result = $service->importUser('did:plc:z72i7hdynmk6r22z27h6tvur', null, function ($progress) { 72 echo "Synced {$progress->recordsSynced} records from {$progress->collection}\n"; 73}); 74``` 75 76### ImportResult 77 78The `ImportResult` value object provides information about the import operation: 79 80```php 81$result = $service->importUser($did); 82 83$result->recordsSynced; // Number of records successfully synced 84$result->recordsSkipped; // Number of records skipped 85$result->recordsFailed; // Number of records that failed to sync 86$result->completed; // Whether the import completed fully 87$result->cursor; // Cursor for resuming (if incomplete) 88$result->error; // Error message (if failed) 89 90$result->isSuccess(); // True if completed without errors 91$result->isPartial(); // True if some records were synced before failure 92$result->isFailed(); // True if an error occurred 93``` 94 95### Checking Status 96 97```php 98// Check if a collection has been imported 99if ($service->isImported($did, 'app.bsky.feed.post')) { 100 echo "Already imported!"; 101} 102 103// Get detailed status 104$state = $service->getStatus($did, 'app.bsky.feed.post'); 105 106if ($state) { 107 echo "Status: {$state->status}"; 108 echo "Records synced: {$state->records_synced}"; 109} 110 111// Get all statuses for a user 112$states = $service->getStatusForUser($did); 113``` 114 115### Resuming Interrupted Imports 116 117If an import is interrupted (network error, timeout, etc.), you can resume it: 118 119```php 120// Resume a specific import 121$state = $service->getStatus($did, $collection); 122if ($state && $state->canResume()) { 123 $result = $service->resume($state); 124} 125 126// Resume all interrupted imports 127$results = $service->resumeAll(); 128``` 129 130### Resetting Import State 131 132To re-import a user or collection: 133 134```php 135// Reset a specific collection 136$service->reset($did, 'app.bsky.feed.post'); 137 138// Reset all collections for a user 139$service->resetUser($did); 140``` 141 142## Queue Integration 143 144For large-scale importing, use the queue system: 145 146### Command Line 147 148```bash 149# Queue an import job instead of running synchronously 150php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --queue 151 152# Queue imports for a list of DIDs 153php artisan parity:import --file=dids.txt --queue 154``` 155 156### Programmatic 157 158```php 159use SocialDept\AtpParity\Jobs\ImportUserJob; 160 161// Dispatch a single user import 162ImportUserJob::dispatch('did:plc:z72i7hdynmk6r22z27h6tvur'); 163 164// Dispatch for a specific collection 165ImportUserJob::dispatch('did:plc:z72i7hdynmk6r22z27h6tvur', 'app.bsky.feed.post'); 166``` 167 168## Events 169 170Parity dispatches events during importing that you can listen to: 171 172### ImportStarted 173 174Fired when an import operation begins: 175 176```php 177use SocialDept\AtpParity\Events\ImportStarted; 178 179Event::listen(ImportStarted::class, function (ImportStarted $event) { 180 Log::info("Starting import", [ 181 'did' => $event->did, 182 'collection' => $event->collection, 183 ]); 184}); 185``` 186 187### ImportProgress 188 189Fired after each page of records is processed: 190 191```php 192use SocialDept\AtpParity\Events\ImportProgress; 193 194Event::listen(ImportProgress::class, function (ImportProgress $event) { 195 Log::info("Import progress", [ 196 'did' => $event->did, 197 'collection' => $event->collection, 198 'records_synced' => $event->recordsSynced, 199 ]); 200}); 201``` 202 203### ImportCompleted 204 205Fired when an import operation completes successfully: 206 207```php 208use SocialDept\AtpParity\Events\ImportCompleted; 209 210Event::listen(ImportCompleted::class, function (ImportCompleted $event) { 211 $result = $event->result; 212 213 Log::info("Import completed", [ 214 'did' => $result->did, 215 'collection' => $result->collection, 216 'records_synced' => $result->recordsSynced, 217 ]); 218}); 219``` 220 221### ImportFailed 222 223Fired when an import operation fails: 224 225```php 226use SocialDept\AtpParity\Events\ImportFailed; 227 228Event::listen(ImportFailed::class, function (ImportFailed $event) { 229 Log::error("Import failed", [ 230 'did' => $event->did, 231 'collection' => $event->collection, 232 'error' => $event->error, 233 ]); 234}); 235``` 236 237## Configuration 238 239Configure importing in `config/parity.php`: 240 241```php 242'import' => [ 243 // Records per page when listing from PDS (max 100) 244 'page_size' => 100, 245 246 // Delay between pages in milliseconds (rate limiting) 247 'page_delay' => 100, 248 249 // Queue name for import jobs 250 'queue' => 'parity-import', 251 252 // Database table for storing import state 253 'state_table' => 'parity_import_states', 254], 255``` 256 257## Batch Importing from File 258 259Create a file with DIDs (one per line): 260 261```text 262did:plc:z72i7hdynmk6r22z27h6tvur 263did:plc:ewvi7nxzyoun6zhxrhs64oiz 264did:plc:ragtjsm2j2vknwkz3zp4oxrd 265``` 266 267Then run: 268 269```bash 270# Synchronous (one at a time) 271php artisan parity:import --file=dids.txt --progress 272 273# Queued (parallel via workers) 274php artisan parity:import --file=dids.txt --queue 275``` 276 277## Coordinating with ParitySignal 278 279For a complete sync solution, combine importing with real-time firehose sync: 280 2811. **Start the firehose consumer** - Begin receiving live events 2822. **Import historical data** - Fetch existing records 2833. **Continue firehose sync** - New events are handled automatically 284 285This ensures no gaps in your data. Records that arrive via firehose while importing will be properly deduplicated by the mapper's `upsert()` method (which uses the AT Protocol URI as the unique key). 286 287```php 288// Example: Import a user then subscribe to their updates 289$service->importUser($did); 290 291// The firehose consumer (ParitySignal) handles updates automatically 292// as long as it's running with signal:consume 293``` 294 295## Best Practices 296 297### Rate Limiting 298 299The `page_delay` config option helps prevent overwhelming PDS servers. For bulk importing, consider: 300 301- Using queued jobs to spread load over time 302- Increasing the delay between pages 303- Running during off-peak hours 304 305### Error Handling 306 307Imports can fail due to: 308- Network errors 309- PDS rate limiting 310- Invalid records 311 312The system automatically tracks progress via cursor, allowing you to resume failed imports: 313 314```bash 315# Check for failed imports 316php artisan parity:import-status --failed 317 318# Resume all failed/interrupted imports 319php artisan parity:import --resume 320``` 321 322### Monitoring 323 324Use the events to build monitoring: 325 326```php 327// Track import metrics 328Event::listen(ImportCompleted::class, function (ImportCompleted $event) { 329 Metrics::increment('parity.import.completed'); 330 Metrics::gauge('parity.import.records', $event->result->recordsSynced); 331}); 332 333Event::listen(ImportFailed::class, function (ImportFailed $event) { 334 Metrics::increment('parity.import.failed'); 335 Alert::send("Import failed for {$event->did}: {$event->error}"); 336}); 337``` 338 339## Database Schema 340 341The import state table stores progress: 342 343| Column | Type | Description | 344|--------|------|-------------| 345| id | bigint | Primary key | 346| did | string | The DID being imported | 347| collection | string | The collection NSID | 348| status | string | pending, in_progress, completed, failed | 349| cursor | string | Pagination cursor for resuming | 350| records_synced | int | Count of successfully synced records | 351| records_skipped | int | Count of skipped records | 352| records_failed | int | Count of failed records | 353| started_at | timestamp | When import started | 354| completed_at | timestamp | When import completed | 355| error | text | Error message if failed | 356| created_at | timestamp | | 357| updated_at | timestamp | | 358 359The combination of `did` and `collection` is unique.