tangled
alpha
login
or
join now
slices.network
/
slices
137
fork
atom
Highly ambitious ATProtocol AppView service and sdks
137
fork
atom
overview
issues
10
pulls
3
pipelines
add jetstream logs page, update fly configs
chadtmiller.com
6 months ago
5f7fafc5
23084c40
+708
-62
21 changed files
expand all
collapse all
unified
split
api
.sqlx
query-101a30ddcb0ebab2b4ee2777ebe1e5520bea1950c1e3523db8faae3550aa60c5.json
query-503cd19e89b58ac44820ba3e827ee8dcf0a28d7e1ab5b8e3499abef898b0b7ac.json
fly.toml
scripts
generate_typescript.ts
prod_sync.sh
src
database.rs
handler_logs.rs
jetstream.rs
logging.rs
main.rs
frontend
fly.toml
src
client.ts
components
JetstreamLogs.tsx
JetstreamStatus.tsx
JetstreamStatusCompact.tsx
SyncJobLogs.tsx
pages
JetstreamLogsPage.tsx
SlicePage.tsx
routes
pages.tsx
slices.tsx
utils
time.ts
+71
api/.sqlx/query-101a30ddcb0ebab2b4ee2777ebe1e5520bea1950c1e3523db8faae3550aa60c5.json
···
1
1
+
{
2
2
+
"db_name": "PostgreSQL",
3
3
+
"query": "\n SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata\n FROM logs\n WHERE log_type = 'jetstream'\n AND (slice_uri = $1 OR slice_uri IS NULL)\n ORDER BY created_at DESC\n LIMIT $2\n ",
4
4
+
"describe": {
5
5
+
"columns": [
6
6
+
{
7
7
+
"ordinal": 0,
8
8
+
"name": "id",
9
9
+
"type_info": "Int8"
10
10
+
},
11
11
+
{
12
12
+
"ordinal": 1,
13
13
+
"name": "created_at",
14
14
+
"type_info": "Timestamptz"
15
15
+
},
16
16
+
{
17
17
+
"ordinal": 2,
18
18
+
"name": "log_type",
19
19
+
"type_info": "Varchar"
20
20
+
},
21
21
+
{
22
22
+
"ordinal": 3,
23
23
+
"name": "job_id",
24
24
+
"type_info": "Uuid"
25
25
+
},
26
26
+
{
27
27
+
"ordinal": 4,
28
28
+
"name": "user_did",
29
29
+
"type_info": "Text"
30
30
+
},
31
31
+
{
32
32
+
"ordinal": 5,
33
33
+
"name": "slice_uri",
34
34
+
"type_info": "Text"
35
35
+
},
36
36
+
{
37
37
+
"ordinal": 6,
38
38
+
"name": "level",
39
39
+
"type_info": "Varchar"
40
40
+
},
41
41
+
{
42
42
+
"ordinal": 7,
43
43
+
"name": "message",
44
44
+
"type_info": "Text"
45
45
+
},
46
46
+
{
47
47
+
"ordinal": 8,
48
48
+
"name": "metadata",
49
49
+
"type_info": "Jsonb"
50
50
+
}
51
51
+
],
52
52
+
"parameters": {
53
53
+
"Left": [
54
54
+
"Text",
55
55
+
"Int8"
56
56
+
]
57
57
+
},
58
58
+
"nullable": [
59
59
+
false,
60
60
+
false,
61
61
+
false,
62
62
+
true,
63
63
+
true,
64
64
+
true,
65
65
+
false,
66
66
+
false,
67
67
+
true
68
68
+
]
69
69
+
},
70
70
+
"hash": "101a30ddcb0ebab2b4ee2777ebe1e5520bea1950c1e3523db8faae3550aa60c5"
71
71
+
}
+2
-2
api/.sqlx/query-eaf55dfbc38c83a834edcd368adbba7593a2e91f7a65765027c3139c42cdaec2.json
api/.sqlx/query-503cd19e89b58ac44820ba3e827ee8dcf0a28d7e1ab5b8e3499abef898b0b7ac.json
···
1
1
{
2
2
"db_name": "PostgreSQL",
3
3
-
"query": "\n SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata\n FROM logs\n WHERE log_type = 'jetstream'\n ORDER BY created_at DESC\n LIMIT $1\n ",
3
3
+
"query": "\n SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata\n FROM logs\n WHERE log_type = 'jetstream'\n ORDER BY created_at DESC\n LIMIT $1\n ",
4
4
"describe": {
5
5
"columns": [
6
6
{
···
66
66
true
67
67
]
68
68
},
69
69
-
"hash": "eaf55dfbc38c83a834edcd368adbba7593a2e91f7a65765027c3139c42cdaec2"
69
69
+
"hash": "503cd19e89b58ac44820ba3e827ee8dcf0a28d7e1ab5b8e3499abef898b0b7ac"
70
70
}
+1
-1
api/fly.toml
···
23
23
processes = ['app']
24
24
25
25
[[vm]]
26
26
-
memory = '512mb'
26
26
+
memory = '1gb'
27
27
cpu_kind = 'shared'
28
28
cpus = 1
+26
api/scripts/generate_typescript.ts
···
411
411
});
412
412
413
413
sourceFile.addInterface({
414
414
+
name: "GetJetstreamLogsParams",
415
415
+
isExported: true,
416
416
+
properties: [
417
417
+
{ name: "limit", type: "number", hasQuestionToken: true },
418
418
+
],
419
419
+
});
420
420
+
421
421
+
sourceFile.addInterface({
422
422
+
name: "GetJetstreamLogsResponse",
423
423
+
isExported: true,
424
424
+
properties: [
425
425
+
{ name: "logs", type: "LogEntry[]" },
426
426
+
],
427
427
+
});
428
428
+
429
429
+
sourceFile.addInterface({
414
430
name: "LogEntry",
415
431
isExported: true,
416
432
properties: [
···
1659
1675
isAsync: true,
1660
1676
statements: [
1661
1677
`return await this.makeRequest<JetstreamStatusResponse>('social.slices.slice.getJetstreamStatus', 'GET');`,
1678
1678
+
],
1679
1679
+
});
1680
1680
+
1681
1681
+
classDeclaration.addMethod({
1682
1682
+
name: "getJetstreamLogs",
1683
1683
+
parameters: [{ name: "params", type: "GetJetstreamLogsParams" }],
1684
1684
+
returnType: "Promise<GetJetstreamLogsResponse>",
1685
1685
+
isAsync: true,
1686
1686
+
statements: [
1687
1687
+
`return await this.makeRequest<GetJetstreamLogsResponse>('social.slices.slice.getJetstreamLogs', 'GET', params);`,
1662
1688
],
1663
1689
});
1664
1690
+5
-1
api/scripts/prod_sync.sh
···
20
20
"social.slices.lexicon",
21
21
"social.slices.actor.profile"
22
22
],
23
23
+
"externalCollections": [
24
24
+
"app.bsky.actor.profile"
25
25
+
],
23
26
"repos": [
24
27
"did:plc:bcgltzqazw5tb6k2g3ttenbj"
25
25
-
]
28
28
+
],
29
29
+
"skipValidation": true
26
30
}' | jq '.'
27
31
28
32
echo ""
+6
-4
api/src/database.rs
···
1008
1008
}
1009
1009
1010
1010
1011
1011
-
pub async fn upsert_record(&self, record: &Record) -> Result<(), DatabaseError> {
1012
1012
-
sqlx::query(r#"
1011
1011
+
pub async fn upsert_record(&self, record: &Record) -> Result<bool, DatabaseError> {
1012
1012
+
// Returns true if inserted, false if updated
1013
1013
+
let result = sqlx::query_scalar::<_, bool>(r#"
1013
1014
INSERT INTO record (uri, cid, did, collection, json, indexed_at, slice_uri)
1014
1015
VALUES ($1, $2, $3, $4, $5, $6, $7)
1015
1016
ON CONFLICT ON CONSTRAINT record_pkey DO UPDATE
1016
1017
SET cid = EXCLUDED.cid,
1017
1018
json = EXCLUDED.json,
1018
1019
indexed_at = EXCLUDED.indexed_at
1020
1020
+
RETURNING (xmax = 0)
1019
1021
"#)
1020
1022
.bind(&record.uri)
1021
1023
.bind(&record.cid)
···
1024
1026
.bind(&record.json)
1025
1027
.bind(&record.indexed_at)
1026
1028
.bind(&record.slice_uri)
1027
1027
-
.execute(&self.pool)
1029
1029
+
.fetch_one(&self.pool)
1028
1030
.await?;
1029
1029
-
Ok(())
1031
1031
+
Ok(result)
1030
1032
}
1031
1033
1032
1034
pub async fn get_all_slices(&self) -> Result<Vec<String>, DatabaseError> {
+2
-1
api/src/handler_logs.rs
···
11
11
#[derive(Debug, Deserialize)]
12
12
pub struct LogsQuery {
13
13
pub limit: Option<i64>,
14
14
+
pub slice: Option<String>,
14
15
}
15
16
16
17
#[derive(Debug, Serialize)]
···
44
45
State(state): State<AppState>,
45
46
Query(params): Query<LogsQuery>,
46
47
) -> Result<Json<LogsResponse>, StatusCode> {
47
47
-
match get_jetstream_logs(&state.database_pool, params.limit).await {
48
48
+
match get_jetstream_logs(&state.database_pool, params.slice.as_deref(), params.limit).await {
48
49
Ok(logs) => Ok(Json(LogsResponse { logs })),
49
50
Err(e) => {
50
51
tracing::error!("Failed to get jetstream logs: {}", e);
+90
-11
api/src/jetstream.rs
···
11
11
use crate::models::Record;
12
12
use crate::errors::SliceError;
13
13
use crate::lexicon::validator::LexiconValidator;
14
14
+
use crate::logging::{Logger, LogLevel};
14
15
15
16
pub struct JetstreamConsumer {
16
17
consumer: Consumer,
···
45
46
// Increment event counter
46
47
let count = self.event_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
47
48
48
48
-
// Log every 10000 events to show activity
49
49
+
// Log every 10000 events to show activity (console only, not in DB)
49
50
if count % 10000 == 0 {
50
51
info!("Jetstream consumer has processed {} events", count);
51
52
}
···
53
54
match event {
54
55
JetstreamEvent::Commit { did, commit, .. } => {
55
56
if let Err(e) = self.handle_commit_event(&did, commit).await {
56
56
-
error!("Error handling commit event: {}", e);
57
57
+
let message = format!("Error handling commit event: {}", e);
58
58
+
error!("{}", message);
59
59
+
Logger::global().log_jetstream(LogLevel::Error, &message, Some(serde_json::json!({
60
60
+
"error": e.to_string(),
61
61
+
"did": did,
62
62
+
"event_type": "commit"
63
63
+
})));
57
64
}
58
65
}
59
66
JetstreamEvent::Delete { did, commit, .. } => {
60
67
if let Err(e) = self.handle_delete_event(&did, commit).await {
61
61
-
error!("Error handling delete event: {}", e);
68
68
+
let message = format!("Error handling delete event: {}", e);
69
69
+
error!("{}", message);
70
70
+
Logger::global().log_jetstream(LogLevel::Error, &message, Some(serde_json::json!({
71
71
+
"error": e.to_string(),
72
72
+
"did": did,
73
73
+
"event_type": "delete"
74
74
+
})));
62
75
}
63
76
}
64
77
_ => {
···
176
189
true
177
190
}
178
191
Err(fresh_e) => {
179
179
-
error!("✗ Validation failed even with fresh validator for collection {} in slice {}: {}",
180
180
-
commit.collection, slice_uri, fresh_e);
192
192
+
let message = format!("Validation failed for collection {} in slice {}", commit.collection, slice_uri);
193
193
+
error!("✗ {}: {}", message, fresh_e);
194
194
+
Logger::global().log_jetstream(LogLevel::Warn, &message, Some(serde_json::json!({
195
195
+
"collection": commit.collection,
196
196
+
"slice_uri": slice_uri,
197
197
+
"did": did
198
198
+
})));
181
199
false
182
200
}
183
201
}
184
202
}
185
203
None => {
186
186
-
error!("✗ No lexicons found for slice {} during fallback", slice_uri);
204
204
+
// Skip logging for missing lexicons - this is expected for many slices
187
205
false
188
206
}
189
207
}
···
211
229
slice_uri: Some(slice_uri.clone()),
212
230
};
213
231
214
214
-
self.database.upsert_record(&record).await
215
215
-
.map_err(|e| anyhow::anyhow!("Database error: {}", e))?;
232
232
+
match self.database.upsert_record(&record).await {
233
233
+
Ok(is_insert) => {
234
234
+
let message = if is_insert { "Record inserted" } else { "Record updated" };
235
235
+
let operation = if is_insert { "insert" } else { "update" };
236
236
+
Logger::global().log_jetstream(LogLevel::Info, message, Some(serde_json::json!({
237
237
+
"operation": operation,
238
238
+
"collection": commit.collection,
239
239
+
"slice_uri": slice_uri,
240
240
+
"did": did,
241
241
+
"record_type": "primary"
242
242
+
})));
243
243
+
}
244
244
+
Err(e) => {
245
245
+
let message = "Failed to insert/update record";
246
246
+
Logger::global().log_jetstream(LogLevel::Error, message, Some(serde_json::json!({
247
247
+
"operation": "upsert",
248
248
+
"collection": commit.collection,
249
249
+
"slice_uri": slice_uri,
250
250
+
"did": did,
251
251
+
"error": e.to_string(),
252
252
+
"record_type": "primary"
253
253
+
})));
254
254
+
return Err(anyhow::anyhow!("Database error: {}", e));
255
255
+
}
256
256
+
}
216
257
217
258
info!("✓ Successfully indexed {} record from primary collection: {}",
218
259
commit.operation, uri);
···
234
275
slice_uri: Some(slice_uri.clone()),
235
276
};
236
277
237
237
-
self.database.upsert_record(&record).await
238
238
-
.map_err(|e| anyhow::anyhow!("Database error: {}", e))?;
278
278
+
match self.database.upsert_record(&record).await {
279
279
+
Ok(is_insert) => {
280
280
+
let message = if is_insert { "Record inserted" } else { "Record updated" };
281
281
+
let operation = if is_insert { "insert" } else { "update" };
282
282
+
Logger::global().log_jetstream(LogLevel::Info, message, Some(serde_json::json!({
283
283
+
"operation": operation,
284
284
+
"collection": commit.collection,
285
285
+
"slice_uri": slice_uri,
286
286
+
"did": did,
287
287
+
"record_type": "external"
288
288
+
})));
289
289
+
}
290
290
+
Err(e) => {
291
291
+
let message = "Failed to insert/update record";
292
292
+
Logger::global().log_jetstream(LogLevel::Error, message, Some(serde_json::json!({
293
293
+
"operation": "upsert",
294
294
+
"collection": commit.collection,
295
295
+
"slice_uri": slice_uri,
296
296
+
"did": did,
297
297
+
"error": e.to_string(),
298
298
+
"record_type": "external"
299
299
+
})));
300
300
+
return Err(anyhow::anyhow!("Database error: {}", e));
301
301
+
}
302
302
+
}
239
303
240
304
info!("✓ Successfully indexed {} record from external collection: {}",
241
305
commit.operation, uri);
···
268
332
Ok(rows_affected) => {
269
333
if rows_affected > 0 {
270
334
info!("✓ Deleted record globally: {} ({} rows)", uri, rows_affected);
335
335
+
Logger::global().log_jetstream(LogLevel::Info, "Record deleted", Some(serde_json::json!({
336
336
+
"operation": "delete",
337
337
+
"collection": commit.collection,
338
338
+
"did": did,
339
339
+
"uri": uri,
340
340
+
"rows_affected": rows_affected
341
341
+
})));
271
342
}
272
343
}
273
344
Err(e) => {
274
274
-
error!("Failed to delete record {}: {}", uri, e);
345
345
+
let message = "Failed to delete record";
346
346
+
error!("{}: {}", message, e);
347
347
+
Logger::global().log_jetstream(LogLevel::Error, message, Some(serde_json::json!({
348
348
+
"operation": "delete",
349
349
+
"collection": commit.collection,
350
350
+
"did": did,
351
351
+
"uri": uri,
352
352
+
"error": e.to_string()
353
353
+
})));
275
354
}
276
355
}
277
356
+64
-15
api/src/logging.rs
···
120
120
let _ = self.sender.send(entry);
121
121
}
122
122
123
123
+
/// Log a jetstream message (queued for batch insertion)
124
124
+
pub fn log_jetstream(
125
125
+
&self,
126
126
+
level: LogLevel,
127
127
+
message: &str,
128
128
+
metadata: Option<Value>,
129
129
+
) {
130
130
+
let entry = QueuedLogEntry {
131
131
+
log_type: LogType::Jetstream.as_str().to_string(),
132
132
+
job_id: None,
133
133
+
user_did: None,
134
134
+
slice_uri: None,
135
135
+
level: level.as_str().to_string(),
136
136
+
message: message.to_string(),
137
137
+
metadata,
138
138
+
created_at: Utc::now(),
139
139
+
};
140
140
+
141
141
+
// Also log to tracing for immediate console output
142
142
+
match level {
143
143
+
LogLevel::Info => info!("[jetstream] {}", message),
144
144
+
LogLevel::Warn => warn!("[jetstream] {}", message),
145
145
+
LogLevel::Error => error!("[jetstream] {}", message),
146
146
+
}
147
147
+
148
148
+
// Queue for database insertion (ignore send errors if channel closed)
149
149
+
let _ = self.sender.send(entry);
150
150
+
}
151
151
+
123
152
/// Background worker that processes the log queue
124
153
async fn background_worker(
125
154
mut receiver: mpsc::UnboundedReceiver<QueuedLogEntry>,
···
263
292
Ok(rows)
264
293
}
265
294
266
266
-
/// Get jetstream logs
267
267
-
#[allow(dead_code)]
295
295
+
/// Get jetstream logs, optionally filtered by slice (still includes global connection logs)
268
296
pub async fn get_jetstream_logs(
269
297
pool: &PgPool,
298
298
+
slice_filter: Option<&str>,
270
299
limit: Option<i64>,
271
300
) -> Result<Vec<LogEntry>, sqlx::Error> {
272
301
let limit = limit.unwrap_or(100);
273
302
274
274
-
let rows = sqlx::query_as!(
275
275
-
LogEntry,
276
276
-
r#"
277
277
-
SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
278
278
-
FROM logs
279
279
-
WHERE log_type = 'jetstream'
280
280
-
ORDER BY created_at DESC
281
281
-
LIMIT $1
282
282
-
"#,
283
283
-
limit
284
284
-
)
285
285
-
.fetch_all(pool)
286
286
-
.await?;
303
303
+
let rows = if let Some(slice_uri) = slice_filter {
304
304
+
// When filtering by slice, include both slice-specific logs and global connection logs (where slice_uri is NULL)
305
305
+
sqlx::query_as!(
306
306
+
LogEntry,
307
307
+
r#"
308
308
+
SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
309
309
+
FROM logs
310
310
+
WHERE log_type = 'jetstream'
311
311
+
AND (slice_uri = $1 OR slice_uri IS NULL)
312
312
+
ORDER BY created_at DESC
313
313
+
LIMIT $2
314
314
+
"#,
315
315
+
slice_uri,
316
316
+
limit
317
317
+
)
318
318
+
.fetch_all(pool)
319
319
+
.await?
320
320
+
} else {
321
321
+
// No filter, return all jetstream logs
322
322
+
sqlx::query_as!(
323
323
+
LogEntry,
324
324
+
r#"
325
325
+
SELECT id, created_at, log_type, job_id, user_did, slice_uri, level, message, metadata
326
326
+
FROM logs
327
327
+
WHERE log_type = 'jetstream'
328
328
+
ORDER BY created_at DESC
329
329
+
LIMIT $1
330
330
+
"#,
331
331
+
limit
332
332
+
)
333
333
+
.fetch_all(pool)
334
334
+
.await?
335
335
+
};
287
336
288
337
Ok(rows)
289
338
}
+35
-10
api/src/main.rs
···
37
37
use crate::database::Database;
38
38
use crate::errors::AppError;
39
39
use crate::jetstream::JetstreamConsumer;
40
40
+
use crate::logging::{Logger, LogLevel};
40
41
41
42
#[derive(Clone)]
42
43
pub struct Config {
···
93
94
};
94
95
95
96
// Initialize global logger
96
96
-
logging::Logger::init_global(pool.clone());
97
97
+
Logger::init_global(pool.clone());
97
98
98
99
// Start job queue runner
99
100
let pool_for_runner = pool.clone();
···
153
154
154
155
loop {
155
156
tracing::info!("Starting Jetstream consumer...");
157
157
+
Logger::global().log_jetstream(
158
158
+
LogLevel::Info,
159
159
+
"Starting Jetstream consumer",
160
160
+
Some(serde_json::json!({"action": "starting_consumer"}))
161
161
+
);
156
162
157
163
// Use existing consumer or create new one
158
164
let consumer_arc = match current_consumer.take() {
···
166
172
{
167
173
Ok(consumer) => std::sync::Arc::new(consumer),
168
174
Err(e) => {
169
169
-
tracing::error!(
170
170
-
"Failed to create Jetstream consumer: {} - will retry in {:?}",
171
171
-
e,
172
172
-
retry_delay
175
175
+
let message = format!("Failed to create Jetstream consumer: {} - will retry in {:?}", e, retry_delay);
176
176
+
tracing::error!("{}", message);
177
177
+
Logger::global().log_jetstream(
178
178
+
LogLevel::Error,
179
179
+
&message,
180
180
+
Some(serde_json::json!({
181
181
+
"error": e.to_string(),
182
182
+
"retry_delay_secs": retry_delay.as_secs(),
183
183
+
"action": "consumer_creation_failed"
184
184
+
}))
173
185
);
174
186
jetstream_connected_clone
175
187
.store(false, std::sync::atomic::Ordering::Relaxed);
···
208
220
// No events for 60+ seconds - mark as disconnected
209
221
health_check_connected
210
222
.store(false, std::sync::atomic::Ordering::Relaxed);
211
211
-
tracing::warn!(
212
212
-
"Jetstream marked as disconnected: no events processed in {} seconds",
213
213
-
no_events_duration
223
223
+
let message = format!("Jetstream marked as disconnected: no events processed in {} seconds", no_events_duration);
224
224
+
tracing::warn!("{}", message);
225
225
+
Logger::global().log_jetstream(
226
226
+
LogLevel::Warn,
227
227
+
&message,
228
228
+
Some(serde_json::json!({
229
229
+
"no_events_duration_secs": no_events_duration,
230
230
+
"action": "health_check_disconnected"
231
231
+
}))
214
232
);
215
233
}
216
234
} else {
···
219
237
health_check_connected
220
238
.store(true, std::sync::atomic::Ordering::Relaxed);
221
239
if last_count == 0 && current_count > 0 {
222
222
-
tracing::info!(
223
223
-
"Jetstream health check: events flowing, marked as connected"
240
240
+
let message = "Jetstream health check: events flowing, marked as connected";
241
241
+
tracing::info!("{}", message);
242
242
+
Logger::global().log_jetstream(
243
243
+
LogLevel::Info,
244
244
+
message,
245
245
+
Some(serde_json::json!({
246
246
+
"event_count": current_count,
247
247
+
"action": "health_check_connected"
248
248
+
}))
224
249
);
225
250
}
226
251
}
+1
-1
frontend/fly.toml
···
25
25
processes = ['app']
26
26
27
27
[[vm]]
28
28
-
memory = '1gb'
28
28
+
memory = '512mb'
29
29
cpu_kind = 'shared'
30
30
cpus = 1
31
31
+19
-1
frontend/src/client.ts
···
1
1
// Generated TypeScript client for AT Protocol records
2
2
-
// Generated at: 2025-09-03 22:34:38 UTC
2
2
+
// Generated at: 2025-09-04 05:32:23 UTC
3
3
// Lexicons: 6
4
4
5
5
/**
···
175
175
}
176
176
177
177
export interface GetJobLogsResponse {
178
178
+
logs: LogEntry[];
179
179
+
}
180
180
+
181
181
+
export interface GetJetstreamLogsParams {
182
182
+
limit?: number;
183
183
+
}
184
184
+
185
185
+
export interface GetJetstreamLogsResponse {
178
186
logs: LogEntry[];
179
187
}
180
188
···
953
961
return await this.makeRequest<JetstreamStatusResponse>(
954
962
"social.slices.slice.getJetstreamStatus",
955
963
"GET"
964
964
+
);
965
965
+
}
966
966
+
967
967
+
async getJetstreamLogs(
968
968
+
params: GetJetstreamLogsParams
969
969
+
): Promise<GetJetstreamLogsResponse> {
970
970
+
return await this.makeRequest<GetJetstreamLogsResponse>(
971
971
+
"social.slices.slice.getJetstreamLogs",
972
972
+
"GET",
973
973
+
params
956
974
);
957
975
}
958
976
+104
frontend/src/components/JetstreamLogs.tsx
···
1
1
+
import type { LogEntry } from "../client.ts";
2
2
+
import { formatTimestamp } from "../utils/time.ts";
3
3
+
4
4
+
interface JetstreamLogsProps {
5
5
+
logs: LogEntry[];
6
6
+
}
7
7
+
8
8
+
function LogLevelBadge({ level }: { level: string }) {
9
9
+
const colors: Record<string, string> = {
10
10
+
error: "bg-red-100 text-red-800",
11
11
+
warn: "bg-yellow-100 text-yellow-800",
12
12
+
info: "bg-blue-100 text-blue-800",
13
13
+
debug: "bg-gray-100 text-gray-800",
14
14
+
};
15
15
+
16
16
+
return (
17
17
+
<span
18
18
+
className={`px-2 py-1 rounded text-xs font-medium ${
19
19
+
colors[level] || colors.debug
20
20
+
}`}
21
21
+
>
22
22
+
{level.toUpperCase()}
23
23
+
</span>
24
24
+
);
25
25
+
}
26
26
+
27
27
+
export function JetstreamLogs({ logs }: JetstreamLogsProps) {
28
28
+
if (logs.length === 0) {
29
29
+
return (
30
30
+
<div className="p-8 text-center text-gray-500">
31
31
+
No Jetstream logs available for this slice.
32
32
+
</div>
33
33
+
);
34
34
+
}
35
35
+
36
36
+
const errorCount = logs.filter((l) => l.level === "error").length;
37
37
+
const warnCount = logs.filter((l) => l.level === "warn").length;
38
38
+
const infoCount = logs.filter((l) => l.level === "info").length;
39
39
+
40
40
+
return (
41
41
+
<div className="divide-y divide-gray-200">
42
42
+
{/* Log Stats Header */}
43
43
+
<div className="p-4 bg-gray-50">
44
44
+
<div className="flex gap-4 text-sm">
45
45
+
<span>
46
46
+
Total logs: <strong>{logs.length}</strong>
47
47
+
</span>
48
48
+
{errorCount > 0 && (
49
49
+
<span className="text-red-600">
50
50
+
Errors: <strong>{errorCount}</strong>
51
51
+
</span>
52
52
+
)}
53
53
+
{warnCount > 0 && (
54
54
+
<span className="text-yellow-600">
55
55
+
Warnings: <strong>{warnCount}</strong>
56
56
+
</span>
57
57
+
)}
58
58
+
<span className="text-blue-600">
59
59
+
Info: <strong>{infoCount}</strong>
60
60
+
</span>
61
61
+
</div>
62
62
+
</div>
63
63
+
64
64
+
{/* Log Entries */}
65
65
+
<div className="max-h-[600px] overflow-y-auto">
66
66
+
{logs.map((log) => (
67
67
+
<div
68
68
+
key={log.id}
69
69
+
className={`p-3 hover:bg-gray-50 font-mono text-sm ${
70
70
+
log.level === "error"
71
71
+
? "bg-red-50"
72
72
+
: log.level === "warn"
73
73
+
? "bg-yellow-50"
74
74
+
: ""
75
75
+
}`}
76
76
+
>
77
77
+
<div className="flex items-start gap-3">
78
78
+
<span className="text-gray-400 text-xs">
79
79
+
{formatTimestamp(log.createdAt)}
80
80
+
</span>
81
81
+
<LogLevelBadge level={log.level} />
82
82
+
<div className="flex-1">
83
83
+
<div className="text-gray-800">{log.message}</div>
84
84
+
{log.metadata && Object.keys(log.metadata).length > 0 && (
85
85
+
<details className="mt-2">
86
86
+
<summary
87
87
+
className="text-xs text-gray-500 cursor-pointer hover:text-gray-700"
88
88
+
_="on click toggle .hidden on next <pre/>"
89
89
+
>
90
90
+
View metadata
91
91
+
</summary>
92
92
+
<pre className="mt-2 p-2 bg-gray-100 rounded text-xs overflow-x-auto hidden">
93
93
+
{JSON.stringify(log.metadata, null, 2)}
94
94
+
</pre>
95
95
+
</details>
96
96
+
)}
97
97
+
</div>
98
98
+
</div>
99
99
+
</div>
100
100
+
))}
101
101
+
</div>
102
102
+
</div>
103
103
+
);
104
104
+
}
+22
-2
frontend/src/components/JetstreamStatus.tsx
···
2
2
connected: boolean;
3
3
status: string;
4
4
error?: string;
5
5
+
sliceId?: string;
5
6
}
6
7
7
8
export function JetstreamStatus({
8
9
connected,
9
10
status,
10
11
error,
12
12
+
sliceId,
11
13
}: JetstreamStatusProps) {
12
14
if (connected) {
13
15
return (
···
25
27
</p>
26
28
</div>
27
29
</div>
28
28
-
<div className="text-xs text-green-600">Live Updates</div>
30
30
+
<div className="flex items-center gap-3">
31
31
+
{sliceId && (
32
32
+
<a
33
33
+
href={`/slices/${sliceId}/jetstream/logs`}
34
34
+
className="bg-green-600 hover:bg-green-700 text-white text-xs px-3 py-1.5 rounded-md transition-colors whitespace-nowrap"
35
35
+
>
36
36
+
View Logs
37
37
+
</a>
38
38
+
)}
39
39
+
</div>
29
40
</div>
30
41
</div>
31
42
);
···
47
58
)}
48
59
</div>
49
60
</div>
50
50
-
<div className="text-xs text-red-600">Offline</div>
61
61
+
<div className="flex items-center gap-3">
62
62
+
{sliceId && (
63
63
+
<a
64
64
+
href={`/slices/${sliceId}/jetstream/logs`}
65
65
+
className="bg-red-600 hover:bg-red-700 text-white text-xs px-3 py-1.5 rounded-md transition-colors whitespace-nowrap"
66
66
+
>
67
67
+
View Logs
68
68
+
</a>
69
69
+
)}
70
70
+
</div>
51
71
</div>
52
72
</div>
53
73
);
+13
frontend/src/components/JetstreamStatusCompact.tsx
···
1
1
+
export function JetstreamStatusCompact({ sliceId }: { sliceId: string }) {
2
2
+
return (
3
3
+
<div
4
4
+
hx-get={`/api/jetstream/status?sliceId=${sliceId}&compact=true`}
5
5
+
hx-trigger="load, every 2m"
6
6
+
hx-swap="outerHTML"
7
7
+
className="inline-flex items-center gap-2 text-xs"
8
8
+
>
9
9
+
<div className="w-2 h-2 bg-gray-400 rounded-full"></div>
10
10
+
<span className="text-gray-500">Checking status...</span>
11
11
+
</div>
12
12
+
);
13
13
+
}
+2
-10
frontend/src/components/SyncJobLogs.tsx
···
1
1
+
import { formatTimestamp } from "../utils/time.ts";
2
2
+
1
3
interface LogEntry {
2
4
id: number;
3
5
createdAt: string;
···
13
15
interface SyncJobLogsProps {
14
16
logs: LogEntry[];
15
17
jobId?: string;
16
16
-
}
17
17
-
18
18
-
function formatTimestamp(dateString: string): string {
19
19
-
const date = new Date(dateString);
20
20
-
return date.toLocaleTimeString([], {
21
21
-
hour: "2-digit",
22
22
-
minute: "2-digit",
23
23
-
second: "2-digit",
24
24
-
fractionalSecondDigits: 3,
25
25
-
});
26
18
}
27
19
28
20
function LogLevelBadge({ level }: { level: string }) {
+44
frontend/src/pages/JetstreamLogsPage.tsx
···
1
1
+
import type { LogEntry } from "../client.ts";
2
2
+
import { Layout } from "../components/Layout.tsx";
3
3
+
import { JetstreamLogs } from "../components/JetstreamLogs.tsx";
4
4
+
import { JetstreamStatusCompact } from "../components/JetstreamStatusCompact.tsx";
5
5
+
6
6
+
interface JetstreamLogsPageProps {
7
7
+
logs: LogEntry[];
8
8
+
sliceId: string;
9
9
+
currentUser?: { handle?: string; isAuthenticated: boolean };
10
10
+
}
11
11
+
12
12
+
export function JetstreamLogsPage({
13
13
+
logs,
14
14
+
sliceId,
15
15
+
currentUser,
16
16
+
}: JetstreamLogsPageProps) {
17
17
+
return (
18
18
+
<Layout title={`Jetstream Logs`} currentUser={currentUser}>
19
19
+
<div>
20
20
+
<div className="flex items-center justify-between mb-8">
21
21
+
<div className="flex items-center">
22
22
+
<a
23
23
+
href={`/slices/${sliceId}`}
24
24
+
className="text-blue-600 hover:text-blue-800 mr-4"
25
25
+
>
26
26
+
← Back to Slice
27
27
+
</a>
28
28
+
<h1 className="text-3xl font-bold text-gray-800">Jetstream Logs</h1>
29
29
+
</div>
30
30
+
<JetstreamStatusCompact sliceId={sliceId} />
31
31
+
</div>
32
32
+
33
33
+
<div
34
34
+
className="bg-white rounded-lg shadow-md"
35
35
+
hx-get={`/api/slices/${sliceId}/jetstream/logs`}
36
36
+
hx-trigger="load, every 20s"
37
37
+
hx-swap="innerHTML"
38
38
+
>
39
39
+
<JetstreamLogs logs={logs} />
40
40
+
</div>
41
41
+
</div>
42
42
+
</Layout>
43
43
+
);
44
44
+
}
+1
-1
frontend/src/pages/SlicePage.tsx
···
45
45
46
46
{/* Jetstream Status */}
47
47
<div
48
48
-
hx-get="/api/jetstream/status"
48
48
+
hx-get={`/api/jetstream/status?sliceId=${sliceId}`}
49
49
hx-trigger="load, every 2m"
50
50
hx-swap="outerHTML"
51
51
>
+52
frontend/src/routes/pages.tsx
···
14
14
import { SliceApiDocsPage } from "../pages/SliceApiDocsPage.tsx";
15
15
import { SliceSettingsPage } from "../pages/SliceSettingsPage.tsx";
16
16
import { SyncJobLogsPage } from "../pages/SyncJobLogsPage.tsx";
17
17
+
import { JetstreamLogsPage } from "../pages/JetstreamLogsPage.tsx";
17
18
import { SettingsPage } from "../pages/SettingsPage.tsx";
19
19
+
import type { LogEntry } from "../client.ts";
18
20
19
21
async function handleIndexPage(req: Request): Promise<Response> {
20
22
const context = await withAuth(req);
···
559
561
});
560
562
}
561
563
564
564
+
async function handleJetstreamLogsPage(
565
565
+
req: Request,
566
566
+
params?: URLPatternResult
567
567
+
): Promise<Response> {
568
568
+
const context = await withAuth(req);
569
569
+
570
570
+
if (!context.currentUser.isAuthenticated) {
571
571
+
return Response.redirect(new URL("/login", req.url), 302);
572
572
+
}
573
573
+
574
574
+
const sliceId = params?.pathname.groups.id;
575
575
+
576
576
+
if (!sliceId) {
577
577
+
return new Response("Invalid slice ID", { status: 400 });
578
578
+
}
579
579
+
580
580
+
// Fetch Jetstream logs
581
581
+
let logs: LogEntry[] = [];
582
582
+
583
583
+
try {
584
584
+
const sliceClient = getSliceClient(context, sliceId);
585
585
+
586
586
+
const logsResult = await sliceClient.social.slices.slice.getJetstreamLogs({ limit: 100 });
587
587
+
logs = logsResult.logs.sort((a, b) => new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime());
588
588
+
} catch (error) {
589
589
+
console.error("Failed to fetch Jetstream logs:", error);
590
590
+
}
591
591
+
592
592
+
const html = render(
593
593
+
<JetstreamLogsPage
594
594
+
logs={logs}
595
595
+
sliceId={sliceId}
596
596
+
currentUser={context.currentUser}
597
597
+
/>
598
598
+
);
599
599
+
600
600
+
const responseHeaders: Record<string, string> = {
601
601
+
"content-type": "text/html",
602
602
+
};
603
603
+
604
604
+
return new Response(`<!DOCTYPE html>${html}`, {
605
605
+
status: 200,
606
606
+
headers: responseHeaders,
607
607
+
});
608
608
+
}
609
609
+
562
610
export const pageRoutes: Route[] = [
563
611
{
564
612
pattern: new URLPattern({ pathname: "/" }),
···
583
631
{
584
632
pattern: new URLPattern({ pathname: "/slices/:id/sync/logs/:jobId" }),
585
633
handler: handleSyncJobLogsPage,
634
634
+
},
635
635
+
{
636
636
+
pattern: new URLPattern({ pathname: "/slices/:id/jetstream/logs" }),
637
637
+
handler: handleJetstreamLogsPage,
586
638
},
587
639
{
588
640
pattern: new URLPattern({ pathname: "/slices/:id/:tab" }),
+139
-2
frontend/src/routes/slices.tsx
···
18
18
import { JobHistory } from "../components/JobHistory.tsx";
19
19
import { JetstreamStatus } from "../components/JetstreamStatus.tsx";
20
20
import { SyncJobLogs } from "../components/SyncJobLogs.tsx";
21
21
+
import { JetstreamLogs } from "../components/JetstreamLogs.tsx";
21
22
import { buildAtUri } from "../utils/at-uri.ts";
23
23
+
import { Layout } from "../components/Layout.tsx";
22
24
23
25
async function handleCreateSlice(req: Request): Promise<Response> {
24
26
const context = await withAuth(req);
···
984
986
pattern: new URLPattern({ pathname: "/api/jetstream/status" }),
985
987
handler: handleJetstreamStatus,
986
988
},
989
989
+
{
990
990
+
method: "GET",
991
991
+
pattern: new URLPattern({ pathname: "/api/slices/:id/jetstream/logs" }),
992
992
+
handler: handleJetstreamLogs,
993
993
+
},
987
994
];
988
995
989
989
-
async function handleJetstreamStatus(_req: Request): Promise<Response> {
996
996
+
async function handleJetstreamLogs(
997
997
+
req: Request,
998
998
+
params?: URLPatternResult
999
999
+
): Promise<Response> {
1000
1000
+
const context = await withAuth(req);
1001
1001
+
const authResponse = requireAuth(context);
1002
1002
+
if (authResponse) return authResponse;
1003
1003
+
1004
1004
+
const sliceId = params?.pathname.groups.id;
1005
1005
+
if (!sliceId) {
1006
1006
+
const html = render(
1007
1007
+
<div className="p-8 text-center text-red-600">
1008
1008
+
❌ Invalid slice ID
1009
1009
+
</div>
1010
1010
+
);
1011
1011
+
return new Response(html, {
1012
1012
+
status: 400,
1013
1013
+
headers: { "content-type": "text/html" },
1014
1014
+
});
1015
1015
+
}
1016
1016
+
1017
1017
+
try {
1018
1018
+
// Use the slice-specific client
1019
1019
+
const sliceClient = getSliceClient(context, sliceId);
1020
1020
+
1021
1021
+
// Get Jetstream logs
1022
1022
+
const result = await sliceClient.social.slices.slice.getJetstreamLogs({
1023
1023
+
limit: 100,
1024
1024
+
});
1025
1025
+
1026
1026
+
const logs = result?.logs || [];
1027
1027
+
1028
1028
+
// Sort logs in descending order (newest first)
1029
1029
+
const sortedLogs = logs.sort((a, b) =>
1030
1030
+
new Date(b.createdAt).getTime() - new Date(a.createdAt).getTime()
1031
1031
+
);
1032
1032
+
1033
1033
+
// Render the log content
1034
1034
+
const html = render(<JetstreamLogs logs={sortedLogs} />);
1035
1035
+
1036
1036
+
return new Response(html, {
1037
1037
+
status: 200,
1038
1038
+
headers: { "content-type": "text/html" },
1039
1039
+
});
1040
1040
+
} catch (error) {
1041
1041
+
console.error("Failed to get Jetstream logs:", error);
1042
1042
+
const errorMessage = error instanceof Error ? error.message : String(error);
1043
1043
+
const html = render(
1044
1044
+
<Layout title="Error">
1045
1045
+
<div className="max-w-6xl mx-auto">
1046
1046
+
<div className="flex items-center gap-4 mb-6">
1047
1047
+
<a
1048
1048
+
href={`/slices/${sliceId}`}
1049
1049
+
className="text-blue-600 hover:text-blue-800"
1050
1050
+
>
1051
1051
+
← Back to Slice
1052
1052
+
</a>
1053
1053
+
<h1 className="text-2xl font-semibold text-gray-900">
1054
1054
+
✈️ Jetstream Logs
1055
1055
+
</h1>
1056
1056
+
</div>
1057
1057
+
<div className="p-8 text-center text-red-600">
1058
1058
+
❌ Error loading Jetstream logs: {errorMessage}
1059
1059
+
</div>
1060
1060
+
</div>
1061
1061
+
</Layout>
1062
1062
+
);
1063
1063
+
return new Response(html, {
1064
1064
+
status: 500,
1065
1065
+
headers: { "content-type": "text/html" },
1066
1066
+
});
1067
1067
+
}
1068
1068
+
}
1069
1069
+
1070
1070
+
1071
1071
+
async function handleJetstreamStatus(
1072
1072
+
req: Request,
1073
1073
+
_params?: URLPatternResult
1074
1074
+
): Promise<Response> {
990
1075
try {
1076
1076
+
// Extract parameters from query
1077
1077
+
const url = new URL(req.url);
1078
1078
+
const sliceId = url.searchParams.get("sliceId");
1079
1079
+
const isCompact = url.searchParams.get("compact") === "true";
1080
1080
+
991
1081
// Fetch jetstream status using the atproto client
992
1082
const data = await atprotoClient.social.slices.slice.getJetstreamStatus();
993
1083
1084
1084
+
// Render compact version for logs page
1085
1085
+
if (isCompact) {
1086
1086
+
const html = render(
1087
1087
+
<div className="inline-flex items-center gap-2 text-xs">
1088
1088
+
{data.connected ? (
1089
1089
+
<>
1090
1090
+
<div className="w-2 h-2 bg-green-500 rounded-full animate-pulse"></div>
1091
1091
+
<span className="text-green-700">Jetstream Connected</span>
1092
1092
+
</>
1093
1093
+
) : (
1094
1094
+
<>
1095
1095
+
<div className="w-2 h-2 bg-red-500 rounded-full"></div>
1096
1096
+
<span className="text-red-700">Jetstream Offline</span>
1097
1097
+
</>
1098
1098
+
)}
1099
1099
+
</div>
1100
1100
+
);
1101
1101
+
1102
1102
+
return new Response(html, {
1103
1103
+
status: 200,
1104
1104
+
headers: { "content-type": "text/html" },
1105
1105
+
});
1106
1106
+
}
1107
1107
+
1108
1108
+
// Render full version for main page
994
1109
const html = render(
995
1110
<JetstreamStatus
996
1111
connected={data.connected}
997
1112
status={data.status}
998
1113
error={data.error}
1114
1114
+
sliceId={sliceId || undefined}
999
1115
/>
1000
1116
);
1001
1117
···
1004
1120
headers: { "content-type": "text/html" },
1005
1121
});
1006
1122
} catch (error) {
1007
1007
-
// Fallback to disconnected state on error
1123
1123
+
// Extract parameters for error case too
1124
1124
+
const url = new URL(req.url);
1125
1125
+
const sliceId = url.searchParams.get("sliceId");
1126
1126
+
const isCompact = url.searchParams.get("compact") === "true";
1127
1127
+
1128
1128
+
// Render compact error version
1129
1129
+
if (isCompact) {
1130
1130
+
const html = render(
1131
1131
+
<div className="inline-flex items-center gap-2 text-xs">
1132
1132
+
<div className="w-2 h-2 bg-red-500 rounded-full"></div>
1133
1133
+
<span className="text-red-700">Jetstream Offline</span>
1134
1134
+
</div>
1135
1135
+
);
1136
1136
+
1137
1137
+
return new Response(html, {
1138
1138
+
status: 200,
1139
1139
+
headers: { "content-type": "text/html" },
1140
1140
+
});
1141
1141
+
}
1142
1142
+
1143
1143
+
// Fallback to disconnected state on error for full version
1008
1144
const html = render(
1009
1145
<JetstreamStatus
1010
1146
connected={false}
1011
1147
status="Connection error"
1012
1148
error={error instanceof Error ? error.message : "Unknown error"}
1149
1149
+
sliceId={sliceId || undefined}
1013
1150
/>
1014
1151
);
1015
1152
+9
frontend/src/utils/time.ts
···
1
1
+
export function formatTimestamp(dateString: string): string {
2
2
+
const date = new Date(dateString);
3
3
+
return date.toLocaleTimeString([], {
4
4
+
hour: "2-digit",
5
5
+
minute: "2-digit",
6
6
+
second: "2-digit",
7
7
+
fractionalSecondDigits: 3,
8
8
+
});
9
9
+
}