Maintain local ⭤ remote in sync with automatic AT Protocol parity for Laravel (alpha & unstable)
1# Importing Records
2
3Parity includes a comprehensive import system that enables you to sync historical AT Protocol data to your Eloquent models. This complements the real-time sync provided by [ParitySignal](atp-signals-integration.md).
4
5## The Cold Start Problem
6
7When you start consuming the AT Protocol firehose with ParitySignal, you only receive events from that point forward. Any records created before you started listening are not captured.
8
9Importing solves this "cold start" problem by fetching existing records from user repositories via the `com.atproto.repo.listRecords` API.
10
11## Quick Start
12
13### 1. Run the Migration
14
15Publish and run the migration to create the import state tracking table:
16
17```bash
18php artisan vendor:publish --tag=parity-migrations
19php artisan migrate
20```
21
22### 2. Import a User
23
24```bash
25# Import all registered collections for a user
26php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur
27
28# Import a specific collection
29php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --collection=app.bsky.feed.post
30
31# Show progress
32php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --progress
33```
34
35### 3. Check Status
36
37```bash
38# Show all import status
39php artisan parity:import-status
40
41# Show status for a specific user
42php artisan parity:import-status did:plc:z72i7hdynmk6r22z27h6tvur
43
44# Show only incomplete imports
45php artisan parity:import-status --pending
46```
47
48## Programmatic Usage
49
50### ImportService
51
52The `ImportService` is the main orchestration class:
53
54```php
55use SocialDept\AtpParity\Import\ImportService;
56
57$service = app(ImportService::class);
58
59// Import all registered collections for a user
60$result = $service->importUser('did:plc:z72i7hdynmk6r22z27h6tvur');
61
62echo "Synced {$result->recordsSynced} records";
63
64// Import a specific collection
65$result = $service->importUserCollection(
66 'did:plc:z72i7hdynmk6r22z27h6tvur',
67 'app.bsky.feed.post'
68);
69
70// With progress callback
71$result = $service->importUser('did:plc:z72i7hdynmk6r22z27h6tvur', null, function ($progress) {
72 echo "Synced {$progress->recordsSynced} records from {$progress->collection}\n";
73});
74```
75
76### ImportResult
77
78The `ImportResult` value object provides information about the import operation:
79
80```php
81$result = $service->importUser($did);
82
83$result->recordsSynced; // Number of records successfully synced
84$result->recordsSkipped; // Number of records skipped
85$result->recordsFailed; // Number of records that failed to sync
86$result->completed; // Whether the import completed fully
87$result->cursor; // Cursor for resuming (if incomplete)
88$result->error; // Error message (if failed)
89
90$result->isSuccess(); // True if completed without errors
91$result->isPartial(); // True if some records were synced before failure
92$result->isFailed(); // True if an error occurred
93```
94
95### Checking Status
96
97```php
98// Check if a collection has been imported
99if ($service->isImported($did, 'app.bsky.feed.post')) {
100 echo "Already imported!";
101}
102
103// Get detailed status
104$state = $service->getStatus($did, 'app.bsky.feed.post');
105
106if ($state) {
107 echo "Status: {$state->status}";
108 echo "Records synced: {$state->records_synced}";
109}
110
111// Get all statuses for a user
112$states = $service->getStatusForUser($did);
113```
114
115### Resuming Interrupted Imports
116
117If an import is interrupted (network error, timeout, etc.), you can resume it:
118
119```php
120// Resume a specific import
121$state = $service->getStatus($did, $collection);
122if ($state && $state->canResume()) {
123 $result = $service->resume($state);
124}
125
126// Resume all interrupted imports
127$results = $service->resumeAll();
128```
129
130### Resetting Import State
131
132To re-import a user or collection:
133
134```php
135// Reset a specific collection
136$service->reset($did, 'app.bsky.feed.post');
137
138// Reset all collections for a user
139$service->resetUser($did);
140```
141
142## Queue Integration
143
144For large-scale importing, use the queue system:
145
146### Command Line
147
148```bash
149# Queue an import job instead of running synchronously
150php artisan parity:import did:plc:z72i7hdynmk6r22z27h6tvur --queue
151
152# Queue imports for a list of DIDs
153php artisan parity:import --file=dids.txt --queue
154```
155
156### Programmatic
157
158```php
159use SocialDept\AtpParity\Jobs\ImportUserJob;
160
161// Dispatch a single user import
162ImportUserJob::dispatch('did:plc:z72i7hdynmk6r22z27h6tvur');
163
164// Dispatch for a specific collection
165ImportUserJob::dispatch('did:plc:z72i7hdynmk6r22z27h6tvur', 'app.bsky.feed.post');
166```
167
168## Events
169
170Parity dispatches events during importing that you can listen to:
171
172### ImportStarted
173
174Fired when an import operation begins:
175
176```php
177use SocialDept\AtpParity\Events\ImportStarted;
178
179Event::listen(ImportStarted::class, function (ImportStarted $event) {
180 Log::info("Starting import", [
181 'did' => $event->did,
182 'collection' => $event->collection,
183 ]);
184});
185```
186
187### ImportProgress
188
189Fired after each page of records is processed:
190
191```php
192use SocialDept\AtpParity\Events\ImportProgress;
193
194Event::listen(ImportProgress::class, function (ImportProgress $event) {
195 Log::info("Import progress", [
196 'did' => $event->did,
197 'collection' => $event->collection,
198 'records_synced' => $event->recordsSynced,
199 ]);
200});
201```
202
203### ImportCompleted
204
205Fired when an import operation completes successfully:
206
207```php
208use SocialDept\AtpParity\Events\ImportCompleted;
209
210Event::listen(ImportCompleted::class, function (ImportCompleted $event) {
211 $result = $event->result;
212
213 Log::info("Import completed", [
214 'did' => $result->did,
215 'collection' => $result->collection,
216 'records_synced' => $result->recordsSynced,
217 ]);
218});
219```
220
221### ImportFailed
222
223Fired when an import operation fails:
224
225```php
226use SocialDept\AtpParity\Events\ImportFailed;
227
228Event::listen(ImportFailed::class, function (ImportFailed $event) {
229 Log::error("Import failed", [
230 'did' => $event->did,
231 'collection' => $event->collection,
232 'error' => $event->error,
233 ]);
234});
235```
236
237## Configuration
238
239Configure importing in `config/parity.php`:
240
241```php
242'import' => [
243 // Records per page when listing from PDS (max 100)
244 'page_size' => 100,
245
246 // Delay between pages in milliseconds (rate limiting)
247 'page_delay' => 100,
248
249 // Queue name for import jobs
250 'queue' => 'parity-import',
251
252 // Database table for storing import state
253 'state_table' => 'parity_import_states',
254],
255```
256
257## Batch Importing from File
258
259Create a file with DIDs (one per line):
260
261```text
262did:plc:z72i7hdynmk6r22z27h6tvur
263did:plc:ewvi7nxzyoun6zhxrhs64oiz
264did:plc:ragtjsm2j2vknwkz3zp4oxrd
265```
266
267Then run:
268
269```bash
270# Synchronous (one at a time)
271php artisan parity:import --file=dids.txt --progress
272
273# Queued (parallel via workers)
274php artisan parity:import --file=dids.txt --queue
275```
276
277## Coordinating with ParitySignal
278
279For a complete sync solution, combine importing with real-time firehose sync:
280
2811. **Start the firehose consumer** - Begin receiving live events
2822. **Import historical data** - Fetch existing records
2833. **Continue firehose sync** - New events are handled automatically
284
285This ensures no gaps in your data. Records that arrive via firehose while importing will be properly deduplicated by the mapper's `upsert()` method (which uses the AT Protocol URI as the unique key).
286
287```php
288// Example: Import a user then subscribe to their updates
289$service->importUser($did);
290
291// The firehose consumer (ParitySignal) handles updates automatically
292// as long as it's running with signal:consume
293```
294
295## Best Practices
296
297### Rate Limiting
298
299The `page_delay` config option helps prevent overwhelming PDS servers. For bulk importing, consider:
300
301- Using queued jobs to spread load over time
302- Increasing the delay between pages
303- Running during off-peak hours
304
305### Error Handling
306
307Imports can fail due to:
308- Network errors
309- PDS rate limiting
310- Invalid records
311
312The system automatically tracks progress via cursor, allowing you to resume failed imports:
313
314```bash
315# Check for failed imports
316php artisan parity:import-status --failed
317
318# Resume all failed/interrupted imports
319php artisan parity:import --resume
320```
321
322### Monitoring
323
324Use the events to build monitoring:
325
326```php
327// Track import metrics
328Event::listen(ImportCompleted::class, function (ImportCompleted $event) {
329 Metrics::increment('parity.import.completed');
330 Metrics::gauge('parity.import.records', $event->result->recordsSynced);
331});
332
333Event::listen(ImportFailed::class, function (ImportFailed $event) {
334 Metrics::increment('parity.import.failed');
335 Alert::send("Import failed for {$event->did}: {$event->error}");
336});
337```
338
339## Database Schema
340
341The import state table stores progress:
342
343| Column | Type | Description |
344|--------|------|-------------|
345| id | bigint | Primary key |
346| did | string | The DID being imported |
347| collection | string | The collection NSID |
348| status | string | pending, in_progress, completed, failed |
349| cursor | string | Pagination cursor for resuming |
350| records_synced | int | Count of successfully synced records |
351| records_skipped | int | Count of skipped records |
352| records_failed | int | Count of failed records |
353| started_at | timestamp | When import started |
354| completed_at | timestamp | When import completed |
355| error | text | Error message if failed |
356| created_at | timestamp | |
357| updated_at | timestamp | |
358
359The combination of `did` and `collection` is unique.