tangled
alpha
login
or
join now
atscan.net
/
atscand
1
fork
atom
wip
1
fork
atom
overview
issues
pulls
pipelines
update
tree.fail
4 months ago
c36071cd
b361f1eb
+374
-506
3 changed files
expand all
collapse all
unified
split
internal
api
handlers.go
server.go
plc
bundle.go
+342
-497
internal/api/handlers.go
···
1
1
package api
2
2
3
3
import (
4
4
-
"bufio"
5
5
-
"bytes"
4
4
+
"context"
6
5
"crypto/sha256"
7
6
"encoding/hex"
8
7
"encoding/json"
···
17
16
"github.com/atscan/atscanner/internal/plc"
18
17
"github.com/atscan/atscanner/internal/storage"
19
18
"github.com/gorilla/mux"
20
20
-
"github.com/klauspost/compress/zstd"
21
19
)
22
20
23
23
-
// ====================
24
24
-
// Endpoint Handlers (new)
25
25
-
// ====================
21
21
+
// ===== RESPONSE HELPERS =====
22
22
+
23
23
+
type response struct {
24
24
+
w http.ResponseWriter
25
25
+
}
26
26
+
27
27
+
func newResponse(w http.ResponseWriter) *response {
28
28
+
return &response{w: w}
29
29
+
}
30
30
+
31
31
+
func (r *response) json(data interface{}) {
32
32
+
r.w.Header().Set("Content-Type", "application/json")
33
33
+
json.NewEncoder(r.w).Encode(data)
34
34
+
}
35
35
+
36
36
+
func (r *response) error(msg string, code int) {
37
37
+
http.Error(r.w, msg, code)
38
38
+
}
26
39
27
27
-
func (s *Server) handleGetEndpoints(w http.ResponseWriter, r *http.Request) {
28
28
-
ctx := r.Context()
40
40
+
func (r *response) bundleHeaders(bundle *storage.PLCBundle) {
41
41
+
r.w.Header().Set("X-Bundle-Number", fmt.Sprintf("%d", bundle.BundleNumber))
42
42
+
r.w.Header().Set("X-Bundle-Hash", bundle.Hash)
43
43
+
r.w.Header().Set("X-Bundle-Compressed-Hash", bundle.CompressedHash)
44
44
+
r.w.Header().Set("X-Bundle-Start-Time", bundle.StartTime.Format(time.RFC3339Nano))
45
45
+
r.w.Header().Set("X-Bundle-End-Time", bundle.EndTime.Format(time.RFC3339Nano))
46
46
+
r.w.Header().Set("X-Bundle-Operation-Count", fmt.Sprintf("%d", plc.BUNDLE_SIZE))
47
47
+
r.w.Header().Set("X-Bundle-DID-Count", fmt.Sprintf("%d", len(bundle.DIDs)))
48
48
+
}
29
49
30
30
-
filter := &storage.EndpointFilter{}
50
50
+
// ===== REQUEST HELPERS =====
31
51
32
32
-
if typ := r.URL.Query().Get("type"); typ != "" {
33
33
-
filter.Type = typ
34
34
-
}
52
52
+
func getBundleNumber(r *http.Request) (int, error) {
53
53
+
vars := mux.Vars(r)
54
54
+
return strconv.Atoi(vars["number"])
55
55
+
}
35
56
36
36
-
if status := r.URL.Query().Get("status"); status != "" {
37
37
-
filter.Status = status
57
57
+
func getQueryInt(r *http.Request, key string, defaultVal int) int {
58
58
+
if val := r.URL.Query().Get(key); val != "" {
59
59
+
if parsed, err := strconv.Atoi(val); err == nil {
60
60
+
return parsed
61
61
+
}
38
62
}
63
63
+
return defaultVal
64
64
+
}
39
65
40
40
-
if minUserCount := r.URL.Query().Get("min_user_count"); minUserCount != "" {
41
41
-
if count, err := strconv.ParseInt(minUserCount, 10, 64); err == nil {
42
42
-
filter.MinUserCount = count
66
66
+
func getQueryInt64(r *http.Request, key string, defaultVal int64) int64 {
67
67
+
if val := r.URL.Query().Get(key); val != "" {
68
68
+
if parsed, err := strconv.ParseInt(val, 10, 64); err == nil {
69
69
+
return parsed
43
70
}
44
71
}
72
72
+
return defaultVal
73
73
+
}
45
74
46
46
-
if limit := r.URL.Query().Get("limit"); limit != "" {
47
47
-
if l, err := strconv.Atoi(limit); err == nil {
48
48
-
filter.Limit = l
49
49
-
}
75
75
+
// ===== FORMATTING HELPERS =====
76
76
+
77
77
+
func formatBundleResponse(bundle *storage.PLCBundle) map[string]interface{} {
78
78
+
return map[string]interface{}{
79
79
+
"plc_bundle_number": bundle.BundleNumber,
80
80
+
"start_time": bundle.StartTime,
81
81
+
"end_time": bundle.EndTime,
82
82
+
"operation_count": plc.BUNDLE_SIZE,
83
83
+
"did_count": len(bundle.DIDs),
84
84
+
"hash": bundle.Hash,
85
85
+
"compressed_hash": bundle.CompressedHash,
86
86
+
"compressed_size": bundle.CompressedSize,
87
87
+
"prev_bundle_hash": bundle.PrevBundleHash,
88
88
+
"created_at": bundle.CreatedAt,
50
89
}
90
90
+
}
51
91
52
52
-
if offset := r.URL.Query().Get("offset"); offset != "" {
53
53
-
if o, err := strconv.Atoi(offset); err == nil {
54
54
-
filter.Offset = o
55
55
-
}
92
92
+
func formatEndpointResponse(ep *storage.Endpoint) map[string]interface{} {
93
93
+
return map[string]interface{}{
94
94
+
"id": ep.ID,
95
95
+
"endpoint_type": ep.EndpointType,
96
96
+
"endpoint": ep.Endpoint,
97
97
+
"discovered_at": ep.DiscoveredAt,
98
98
+
"last_checked": ep.LastChecked,
99
99
+
"status": statusToString(ep.Status),
100
100
+
"user_count": ep.UserCount,
56
101
}
102
102
+
}
57
103
58
58
-
endpoints, err := s.db.GetEndpoints(ctx, filter)
104
104
+
func statusToString(status int) string {
105
105
+
switch status {
106
106
+
case storage.EndpointStatusOnline:
107
107
+
return "online"
108
108
+
case storage.EndpointStatusOffline:
109
109
+
return "offline"
110
110
+
default:
111
111
+
return "unknown"
112
112
+
}
113
113
+
}
114
114
+
115
115
+
// ===== ENDPOINT HANDLERS =====
116
116
+
117
117
+
func (s *Server) handleGetEndpoints(w http.ResponseWriter, r *http.Request) {
118
118
+
resp := newResponse(w)
119
119
+
120
120
+
filter := &storage.EndpointFilter{
121
121
+
Type: r.URL.Query().Get("type"),
122
122
+
Status: r.URL.Query().Get("status"),
123
123
+
MinUserCount: getQueryInt64(r, "min_user_count", 0),
124
124
+
Limit: getQueryInt(r, "limit", 0),
125
125
+
Offset: getQueryInt(r, "offset", 0),
126
126
+
}
127
127
+
128
128
+
endpoints, err := s.db.GetEndpoints(r.Context(), filter)
59
129
if err != nil {
60
60
-
http.Error(w, err.Error(), http.StatusInternalServerError)
130
130
+
resp.error(err.Error(), http.StatusInternalServerError)
61
131
return
62
132
}
63
133
64
64
-
// Convert status codes to strings for API
65
134
response := make([]map[string]interface{}, len(endpoints))
66
135
for i, ep := range endpoints {
67
67
-
response[i] = map[string]interface{}{
68
68
-
"id": ep.ID,
69
69
-
"endpoint_type": ep.EndpointType,
70
70
-
"endpoint": ep.Endpoint,
71
71
-
"discovered_at": ep.DiscoveredAt,
72
72
-
"last_checked": ep.LastChecked,
73
73
-
"status": statusToString(ep.Status),
74
74
-
"user_count": ep.UserCount,
75
75
-
}
136
136
+
response[i] = formatEndpointResponse(ep)
76
137
}
77
138
78
78
-
respondJSON(w, response)
139
139
+
resp.json(response)
79
140
}
80
141
81
142
func (s *Server) handleGetEndpoint(w http.ResponseWriter, r *http.Request) {
82
82
-
ctx := r.Context()
143
143
+
resp := newResponse(w)
83
144
vars := mux.Vars(r)
84
145
endpoint := vars["endpoint"]
85
85
-
86
86
-
// Get type from query param, default to "pds" for backward compatibility
87
146
endpointType := r.URL.Query().Get("type")
88
147
if endpointType == "" {
89
148
endpointType = "pds"
90
149
}
91
150
92
92
-
ep, err := s.db.GetEndpoint(ctx, endpoint, endpointType)
151
151
+
ep, err := s.db.GetEndpoint(r.Context(), endpoint, endpointType)
93
152
if err != nil {
94
94
-
http.Error(w, "Endpoint not found", http.StatusNotFound)
153
153
+
resp.error("Endpoint not found", http.StatusNotFound)
95
154
return
96
155
}
97
156
98
98
-
// Get recent scans
99
99
-
scans, _ := s.db.GetEndpointScans(ctx, ep.ID, 10)
157
157
+
scans, _ := s.db.GetEndpointScans(r.Context(), ep.ID, 10)
100
158
101
101
-
response := map[string]interface{}{
102
102
-
"id": ep.ID,
103
103
-
"endpoint_type": ep.EndpointType,
104
104
-
"endpoint": ep.Endpoint,
105
105
-
"discovered_at": ep.DiscoveredAt,
106
106
-
"last_checked": ep.LastChecked,
107
107
-
"status": statusToString(ep.Status),
108
108
-
"user_count": ep.UserCount,
109
109
-
"recent_scans": scans,
110
110
-
}
159
159
+
result := formatEndpointResponse(ep)
160
160
+
result["recent_scans"] = scans
111
161
112
112
-
respondJSON(w, response)
162
162
+
resp.json(result)
113
163
}
114
164
115
165
func (s *Server) handleGetEndpointStats(w http.ResponseWriter, r *http.Request) {
116
116
-
ctx := r.Context()
117
117
-
118
118
-
stats, err := s.db.GetEndpointStats(ctx)
166
166
+
resp := newResponse(w)
167
167
+
stats, err := s.db.GetEndpointStats(r.Context())
119
168
if err != nil {
120
120
-
http.Error(w, err.Error(), http.StatusInternalServerError)
169
169
+
resp.error(err.Error(), http.StatusInternalServerError)
121
170
return
122
171
}
123
123
-
124
124
-
respondJSON(w, stats)
172
172
+
resp.json(stats)
125
173
}
126
174
127
127
-
// ====================
128
128
-
// DID Handlers
129
129
-
// ====================
175
175
+
// ===== DID HANDLERS =====
130
176
131
177
func (s *Server) handleGetDID(w http.ResponseWriter, r *http.Request) {
132
132
-
ctx := r.Context()
178
178
+
resp := newResponse(w)
133
179
vars := mux.Vars(r)
134
180
did := vars["did"]
135
181
136
136
-
bundles, err := s.db.GetBundlesForDID(ctx, did)
182
182
+
bundles, err := s.db.GetBundlesForDID(r.Context(), did)
137
183
if err != nil {
138
138
-
http.Error(w, err.Error(), http.StatusInternalServerError)
184
184
+
resp.error(err.Error(), http.StatusInternalServerError)
139
185
return
140
186
}
141
187
142
188
if len(bundles) == 0 {
143
143
-
http.Error(w, "DID not found in bundles", http.StatusNotFound)
189
189
+
resp.error("DID not found in bundles", http.StatusNotFound)
144
190
return
145
191
}
146
192
147
193
lastBundle := bundles[len(bundles)-1]
148
148
-
149
149
-
// Compute file path
150
150
-
filePath := filepath.Join(s.plcBundleDir, fmt.Sprintf("%06d.jsonl.zst", lastBundle.BundleNumber))
151
151
-
152
152
-
operations, err := s.loadBundleOperations(filePath)
194
194
+
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), lastBundle.BundleNumber)
153
195
if err != nil {
154
154
-
http.Error(w, fmt.Sprintf("failed to load bundle: %v", err), http.StatusInternalServerError)
196
196
+
resp.error(fmt.Sprintf("failed to load bundle: %v", err), http.StatusInternalServerError)
155
197
return
156
198
}
157
199
158
200
// Find latest operation for this DID
159
159
-
var latestOp *plc.PLCOperation
160
160
-
for i := len(operations) - 1; i >= 0; i-- {
161
161
-
if operations[i].DID == did {
162
162
-
latestOp = &operations[i]
163
163
-
break
201
201
+
for i := len(ops) - 1; i >= 0; i-- {
202
202
+
if ops[i].DID == did {
203
203
+
resp.json(ops[i])
204
204
+
return
164
205
}
165
206
}
166
207
167
167
-
if latestOp == nil {
168
168
-
http.Error(w, "DID operation not found", http.StatusNotFound)
169
169
-
return
170
170
-
}
171
171
-
172
172
-
respondJSON(w, latestOp)
208
208
+
resp.error("DID operation not found", http.StatusNotFound)
173
209
}
174
210
175
211
func (s *Server) handleGetDIDHistory(w http.ResponseWriter, r *http.Request) {
176
176
-
ctx := r.Context()
212
212
+
resp := newResponse(w)
177
213
vars := mux.Vars(r)
178
214
did := vars["did"]
179
215
180
180
-
bundles, err := s.db.GetBundlesForDID(ctx, did)
216
216
+
bundles, err := s.db.GetBundlesForDID(r.Context(), did)
181
217
if err != nil {
182
182
-
http.Error(w, err.Error(), http.StatusInternalServerError)
218
218
+
resp.error(err.Error(), http.StatusInternalServerError)
183
219
return
184
220
}
185
221
186
222
if len(bundles) == 0 {
187
187
-
http.Error(w, "DID not found in bundles", http.StatusNotFound)
223
223
+
resp.error("DID not found in bundles", http.StatusNotFound)
188
224
return
189
225
}
190
226
···
192
228
var currentOp *plc.PLCOperation
193
229
194
230
for _, bundle := range bundles {
195
195
-
// Compute file path
196
196
-
filePath := filepath.Join(s.plcBundleDir, fmt.Sprintf("%06d.jsonl.zst", bundle.BundleNumber))
197
197
-
198
198
-
operations, err := s.loadBundleOperations(filePath)
231
231
+
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), bundle.BundleNumber)
199
232
if err != nil {
200
233
log.Error("Warning: failed to load bundle: %v", err)
201
234
continue
202
235
}
203
236
204
204
-
for _, op := range operations {
237
237
+
for _, op := range ops {
205
238
if op.DID == did {
206
239
entry := plc.DIDHistoryEntry{
207
240
Operation: op,
···
213
246
}
214
247
}
215
248
216
216
-
history := plc.DIDHistory{
249
249
+
resp.json(plc.DIDHistory{
217
250
DID: did,
218
251
Current: currentOp,
219
252
Operations: allOperations,
220
220
-
}
221
221
-
222
222
-
respondJSON(w, history)
253
253
+
})
223
254
}
224
255
225
225
-
// ====================
226
226
-
// PLC Bundle Handlers
227
227
-
// ====================
256
256
+
// ===== PLC BUNDLE HANDLERS =====
228
257
229
258
func (s *Server) handleGetPLCBundle(w http.ResponseWriter, r *http.Request) {
230
230
-
ctx := r.Context()
231
231
-
vars := mux.Vars(r)
259
259
+
resp := newResponse(w)
232
260
233
233
-
bundleNumber, err := strconv.Atoi(vars["number"])
261
261
+
bundleNum, err := getBundleNumber(r)
234
262
if err != nil {
235
235
-
http.Error(w, "invalid bundle number", http.StatusBadRequest)
263
263
+
resp.error("invalid bundle number", http.StatusBadRequest)
236
264
return
237
265
}
238
266
239
239
-
bundle, err := s.db.GetBundleByNumber(ctx, bundleNumber)
267
267
+
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
240
268
if err != nil {
241
241
-
http.Error(w, "bundle not found", http.StatusNotFound)
269
269
+
resp.error("bundle not found", http.StatusNotFound)
242
270
return
243
271
}
244
272
245
245
-
response := map[string]interface{}{
246
246
-
"plc_bundle_number": bundle.BundleNumber,
247
247
-
"start_time": bundle.StartTime,
248
248
-
"end_time": bundle.EndTime,
249
249
-
"operation_count": plc.BUNDLE_SIZE,
250
250
-
"did_count": len(bundle.DIDs),
251
251
-
"hash": bundle.Hash,
252
252
-
"compressed_hash": bundle.CompressedHash,
253
253
-
"compressed_size": bundle.CompressedSize,
254
254
-
"prev_bundle_hash": bundle.PrevBundleHash,
255
255
-
"created_at": bundle.CreatedAt,
256
256
-
}
257
257
-
258
258
-
respondJSON(w, response)
273
273
+
resp.json(formatBundleResponse(bundle))
259
274
}
260
275
261
276
func (s *Server) handleGetPLCBundleDIDs(w http.ResponseWriter, r *http.Request) {
262
262
-
ctx := r.Context()
263
263
-
vars := mux.Vars(r)
277
277
+
resp := newResponse(w)
264
278
265
265
-
bundleNumber, err := strconv.Atoi(vars["number"])
279
279
+
bundleNum, err := getBundleNumber(r)
266
280
if err != nil {
267
267
-
http.Error(w, "invalid bundle number", http.StatusBadRequest)
281
281
+
resp.error("invalid bundle number", http.StatusBadRequest)
268
282
return
269
283
}
270
284
271
271
-
bundle, err := s.db.GetBundleByNumber(ctx, bundleNumber)
285
285
+
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
272
286
if err != nil {
273
273
-
http.Error(w, "bundle not found", http.StatusNotFound)
287
287
+
resp.error("bundle not found", http.StatusNotFound)
274
288
return
275
289
}
276
290
277
277
-
respondJSON(w, map[string]interface{}{
291
291
+
resp.json(map[string]interface{}{
278
292
"plc_bundle_number": bundle.BundleNumber,
279
293
"did_count": len(bundle.DIDs),
280
294
"dids": bundle.DIDs,
···
282
296
}
283
297
284
298
func (s *Server) handleDownloadPLCBundle(w http.ResponseWriter, r *http.Request) {
285
285
-
ctx := r.Context()
286
286
-
vars := mux.Vars(r)
299
299
+
resp := newResponse(w)
287
300
288
288
-
bundleNumber, err := strconv.Atoi(vars["number"])
301
301
+
bundleNum, err := getBundleNumber(r)
289
302
if err != nil {
290
290
-
http.Error(w, "invalid bundle number", http.StatusBadRequest)
303
303
+
resp.error("invalid bundle number", http.StatusBadRequest)
291
304
return
292
305
}
293
306
294
294
-
// Check if client wants uncompressed data
295
295
-
compressed := true
296
296
-
if r.URL.Query().Get("compressed") == "false" {
297
297
-
compressed = false
298
298
-
}
307
307
+
compressed := r.URL.Query().Get("compressed") != "false"
299
308
300
300
-
// Verify bundle exists in database
301
301
-
bundle, err := s.db.GetBundleByNumber(ctx, bundleNumber)
309
309
+
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNum)
302
310
if err != nil {
303
303
-
http.Error(w, "bundle not found", http.StatusNotFound)
311
311
+
resp.error("bundle not found", http.StatusNotFound)
304
312
return
305
313
}
306
314
307
307
-
// Build file path
308
308
-
filePath := filepath.Join(s.plcBundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNumber))
315
315
+
resp.bundleHeaders(bundle)
316
316
+
317
317
+
if compressed {
318
318
+
s.serveCompressedBundle(w, r, bundle)
319
319
+
} else {
320
320
+
s.serveUncompressedBundle(w, r, bundle)
321
321
+
}
322
322
+
}
309
323
310
310
-
// Check if file exists
311
311
-
fileInfo, err := os.Stat(filePath)
324
324
+
func (s *Server) serveCompressedBundle(w http.ResponseWriter, r *http.Request, bundle *storage.PLCBundle) {
325
325
+
resp := newResponse(w)
326
326
+
path := bundle.GetFilePath(s.plcBundleDir)
327
327
+
328
328
+
file, err := os.Open(path)
312
329
if err != nil {
313
313
-
if os.IsNotExist(err) {
314
314
-
http.Error(w, "bundle file not found on disk", http.StatusNotFound)
315
315
-
return
316
316
-
}
317
317
-
http.Error(w, fmt.Sprintf("error accessing bundle file: %v", err), http.StatusInternalServerError)
330
330
+
resp.error("bundle file not found on disk", http.StatusNotFound)
318
331
return
319
332
}
333
333
+
defer file.Close()
320
334
321
321
-
// Set common headers
322
322
-
w.Header().Set("X-Bundle-Number", fmt.Sprintf("%d", bundleNumber))
323
323
-
w.Header().Set("X-Bundle-Hash", bundle.Hash)
324
324
-
w.Header().Set("X-Bundle-Compressed-Hash", bundle.CompressedHash)
325
325
-
w.Header().Set("X-Bundle-Start-Time", bundle.StartTime.Format(time.RFC3339Nano))
326
326
-
w.Header().Set("X-Bundle-End-Time", bundle.EndTime.Format(time.RFC3339Nano))
327
327
-
w.Header().Set("X-Bundle-Operation-Count", fmt.Sprintf("%d", plc.BUNDLE_SIZE))
328
328
-
w.Header().Set("X-Bundle-DID-Count", fmt.Sprintf("%d", len(bundle.DIDs)))
335
335
+
fileInfo, _ := file.Stat()
329
336
330
330
-
if compressed {
331
331
-
// Serve compressed file
332
332
-
file, err := os.Open(filePath)
333
333
-
if err != nil {
334
334
-
http.Error(w, fmt.Sprintf("error opening bundle file: %v", err), http.StatusInternalServerError)
335
335
-
return
336
336
-
}
337
337
-
defer file.Close()
337
337
+
w.Header().Set("Content-Type", "application/zstd")
338
338
+
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl.zst", bundle.BundleNumber))
339
339
+
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
340
340
+
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", fileInfo.Size()))
338
341
339
339
-
w.Header().Set("Content-Type", "application/zstd")
340
340
-
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl.zst", bundleNumber))
341
341
-
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
342
342
-
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", fileInfo.Size()))
342
342
+
http.ServeContent(w, r, filepath.Base(path), bundle.CreatedAt, file)
343
343
+
}
343
344
344
344
-
http.ServeContent(w, r, filepath.Base(filePath), bundle.CreatedAt, file)
345
345
-
} else {
346
346
-
// Serve uncompressed data
347
347
-
compressedData, err := os.ReadFile(filePath)
348
348
-
if err != nil {
349
349
-
http.Error(w, fmt.Sprintf("error reading bundle file: %v", err), http.StatusInternalServerError)
350
350
-
return
351
351
-
}
345
345
+
func (s *Server) serveUncompressedBundle(w http.ResponseWriter, r *http.Request, bundle *storage.PLCBundle) {
346
346
+
resp := newResponse(w)
352
347
353
353
-
// Decompress
354
354
-
decoder, err := zstd.NewReader(nil)
355
355
-
if err != nil {
356
356
-
http.Error(w, fmt.Sprintf("error creating decompressor: %v", err), http.StatusInternalServerError)
357
357
-
return
358
358
-
}
359
359
-
defer decoder.Close()
348
348
+
ops, err := s.bundleManager.LoadBundleOperations(r.Context(), bundle.BundleNumber)
349
349
+
if err != nil {
350
350
+
resp.error(fmt.Sprintf("error loading bundle: %v", err), http.StatusInternalServerError)
351
351
+
return
352
352
+
}
360
353
361
361
-
decompressed, err := decoder.DecodeAll(compressedData, nil)
362
362
-
if err != nil {
363
363
-
http.Error(w, fmt.Sprintf("error decompressing bundle: %v", err), http.StatusInternalServerError)
364
364
-
return
365
365
-
}
354
354
+
// Serialize to JSONL
355
355
+
var buf []byte
356
356
+
for _, op := range ops {
357
357
+
buf = append(buf, op.RawJSON...)
358
358
+
buf = append(buf, '\n')
359
359
+
}
366
360
367
367
-
// Set headers for uncompressed data
368
368
-
w.Header().Set("Content-Type", "application/jsonl")
369
369
-
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl", bundleNumber))
370
370
-
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(decompressed)))
371
371
-
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", fileInfo.Size()))
372
372
-
w.Header().Set("X-Uncompressed-Size", fmt.Sprintf("%d", len(decompressed)))
373
373
-
w.Header().Set("X-Compression-Ratio", fmt.Sprintf("%.2f", float64(len(decompressed))/float64(fileInfo.Size())))
361
361
+
fileInfo, _ := os.Stat(bundle.GetFilePath(s.plcBundleDir))
362
362
+
compressedSize := int64(0)
363
363
+
if fileInfo != nil {
364
364
+
compressedSize = fileInfo.Size()
365
365
+
}
374
366
375
375
-
// Write decompressed data
376
376
-
w.WriteHeader(http.StatusOK)
377
377
-
w.Write(decompressed)
367
367
+
w.Header().Set("Content-Type", "application/jsonl")
368
368
+
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%06d.jsonl", bundle.BundleNumber))
369
369
+
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(buf)))
370
370
+
w.Header().Set("X-Compressed-Size", fmt.Sprintf("%d", compressedSize))
371
371
+
w.Header().Set("X-Uncompressed-Size", fmt.Sprintf("%d", len(buf)))
372
372
+
if compressedSize > 0 {
373
373
+
w.Header().Set("X-Compression-Ratio", fmt.Sprintf("%.2f", float64(len(buf))/float64(compressedSize)))
378
374
}
375
375
+
376
376
+
w.WriteHeader(http.StatusOK)
377
377
+
w.Write(buf)
379
378
}
380
379
381
380
func (s *Server) handleGetPLCBundles(w http.ResponseWriter, r *http.Request) {
382
382
-
ctx := r.Context()
383
383
-
384
384
-
limit := 50
385
385
-
if l := r.URL.Query().Get("limit"); l != "" {
386
386
-
if parsed, err := strconv.Atoi(l); err == nil {
387
387
-
limit = parsed
388
388
-
}
389
389
-
}
381
381
+
resp := newResponse(w)
382
382
+
limit := getQueryInt(r, "limit", 50)
390
383
391
391
-
bundles, err := s.db.GetBundles(ctx, limit)
384
384
+
bundles, err := s.db.GetBundles(r.Context(), limit)
392
385
if err != nil {
393
393
-
http.Error(w, err.Error(), http.StatusInternalServerError)
386
386
+
resp.error(err.Error(), http.StatusInternalServerError)
394
387
return
395
388
}
396
389
397
390
response := make([]map[string]interface{}, len(bundles))
398
391
for i, bundle := range bundles {
399
399
-
response[i] = map[string]interface{}{
400
400
-
"plc_bundle_number": bundle.BundleNumber,
401
401
-
"start_time": bundle.StartTime,
402
402
-
"end_time": bundle.EndTime,
403
403
-
"operation_count": plc.BUNDLE_SIZE,
404
404
-
"did_count": len(bundle.DIDs),
405
405
-
"hash": bundle.Hash,
406
406
-
"compressed_hash": bundle.CompressedHash,
407
407
-
"compressed_size": bundle.CompressedSize,
408
408
-
"prev_bundle_hash": bundle.PrevBundleHash,
409
409
-
}
392
392
+
response[i] = formatBundleResponse(bundle)
410
393
}
411
394
412
412
-
respondJSON(w, response)
395
395
+
resp.json(response)
413
396
}
414
397
415
398
func (s *Server) handleGetPLCBundleStats(w http.ResponseWriter, r *http.Request) {
416
416
-
ctx := r.Context()
399
399
+
resp := newResponse(w)
417
400
418
418
-
count, size, err := s.db.GetBundleStats(ctx)
401
401
+
count, size, err := s.db.GetBundleStats(r.Context())
419
402
if err != nil {
420
420
-
http.Error(w, err.Error(), http.StatusInternalServerError)
403
403
+
resp.error(err.Error(), http.StatusInternalServerError)
421
404
return
422
405
}
423
406
424
424
-
respondJSON(w, map[string]interface{}{
407
407
+
resp.json(map[string]interface{}{
425
408
"plc_bundle_count": count,
426
409
"total_size": size,
427
410
"total_size_mb": float64(size) / 1024 / 1024,
428
411
})
429
412
}
430
413
431
431
-
// ====================
432
432
-
// Mempool Handlers
433
433
-
// ====================
414
414
+
// ===== MEMPOOL HANDLERS =====
434
415
435
416
func (s *Server) handleGetMempoolStats(w http.ResponseWriter, r *http.Request) {
417
417
+
resp := newResponse(w)
436
418
ctx := r.Context()
437
419
438
420
count, err := s.db.GetMempoolCount(ctx)
439
421
if err != nil {
440
440
-
http.Error(w, err.Error(), http.StatusInternalServerError)
422
422
+
resp.error(err.Error(), http.StatusInternalServerError)
441
423
return
442
424
}
443
425
444
444
-
response := map[string]interface{}{
426
426
+
result := map[string]interface{}{
445
427
"operation_count": count,
446
428
"can_create_bundle": count >= plc.BUNDLE_SIZE,
447
429
}
448
430
449
449
-
// Get mempool start time (first item)
450
431
if count > 0 {
451
451
-
firstOp, err := s.db.GetFirstMempoolOperation(ctx)
452
452
-
if err == nil && firstOp != nil {
453
453
-
response["mempool_start_time"] = firstOp.CreatedAt
432
432
+
if firstOp, err := s.db.GetFirstMempoolOperation(ctx); err == nil && firstOp != nil {
433
433
+
result["mempool_start_time"] = firstOp.CreatedAt
454
434
455
455
-
// Calculate estimated next bundle time
456
435
if count < plc.BUNDLE_SIZE {
457
457
-
lastOp, err := s.db.GetLastMempoolOperation(ctx)
458
458
-
if err == nil && lastOp != nil {
459
459
-
// Calculate rate of operations per second
436
436
+
if lastOp, err := s.db.GetLastMempoolOperation(ctx); err == nil && lastOp != nil {
460
437
timeSpan := lastOp.CreatedAt.Sub(firstOp.CreatedAt).Seconds()
461
461
-
462
438
if timeSpan > 0 {
463
439
opsPerSecond := float64(count) / timeSpan
464
464
-
465
440
if opsPerSecond > 0 {
466
441
remainingOps := plc.BUNDLE_SIZE - count
467
442
secondsNeeded := float64(remainingOps) / opsPerSecond
468
468
-
estimatedTime := time.Now().Add(time.Duration(secondsNeeded) * time.Second)
469
469
-
470
470
-
response["estimated_next_bundle_time"] = estimatedTime
471
471
-
response["operations_needed"] = remainingOps
472
472
-
response["current_rate_per_second"] = opsPerSecond
443
443
+
result["estimated_next_bundle_time"] = time.Now().Add(time.Duration(secondsNeeded) * time.Second)
444
444
+
result["operations_needed"] = remainingOps
445
445
+
result["current_rate_per_second"] = opsPerSecond
473
446
}
474
447
}
475
448
}
476
449
} else {
477
477
-
// Bundle can be created now
478
478
-
response["estimated_next_bundle_time"] = time.Now()
479
479
-
response["operations_needed"] = 0
450
450
+
result["estimated_next_bundle_time"] = time.Now()
451
451
+
result["operations_needed"] = 0
480
452
}
481
453
}
482
454
} else {
483
483
-
response["mempool_start_time"] = nil
484
484
-
response["estimated_next_bundle_time"] = nil
455
455
+
result["mempool_start_time"] = nil
456
456
+
result["estimated_next_bundle_time"] = nil
485
457
}
486
458
487
487
-
respondJSON(w, response)
459
459
+
resp.json(result)
488
460
}
489
461
490
490
-
// ====================
491
491
-
// PLC Metrics Handlers
492
492
-
// ====================
462
462
+
// ===== PLC METRICS HANDLERS =====
493
463
494
464
func (s *Server) handleGetPLCMetrics(w http.ResponseWriter, r *http.Request) {
495
495
-
ctx := r.Context()
496
496
-
497
497
-
limit := 10
498
498
-
if l := r.URL.Query().Get("limit"); l != "" {
499
499
-
if parsed, err := strconv.Atoi(l); err == nil {
500
500
-
limit = parsed
501
501
-
}
502
502
-
}
465
465
+
resp := newResponse(w)
466
466
+
limit := getQueryInt(r, "limit", 10)
503
467
504
504
-
metrics, err := s.db.GetPLCMetrics(ctx, limit)
468
468
+
metrics, err := s.db.GetPLCMetrics(r.Context(), limit)
505
469
if err != nil {
506
506
-
http.Error(w, err.Error(), http.StatusInternalServerError)
470
470
+
resp.error(err.Error(), http.StatusInternalServerError)
507
471
return
508
472
}
509
473
510
510
-
respondJSON(w, metrics)
474
474
+
resp.json(metrics)
511
475
}
512
476
513
513
-
// ====================
514
514
-
// Verification Handlers
515
515
-
// ====================
477
477
+
// ===== VERIFICATION HANDLERS =====
516
478
517
479
func (s *Server) handleVerifyPLCBundle(w http.ResponseWriter, r *http.Request) {
518
518
-
ctx := r.Context()
480
480
+
resp := newResponse(w)
519
481
vars := mux.Vars(r)
520
520
-
bundleNumberStr := vars["bundleNumber"]
521
482
522
522
-
bundleNumber, err := strconv.Atoi(bundleNumberStr)
483
483
+
bundleNumber, err := strconv.Atoi(vars["bundleNumber"])
523
484
if err != nil {
524
524
-
http.Error(w, "Invalid bundle number", http.StatusBadRequest)
485
485
+
resp.error("Invalid bundle number", http.StatusBadRequest)
525
486
return
526
487
}
527
488
528
528
-
// Get bundle from DB
529
529
-
bundle, err := s.db.GetBundleByNumber(ctx, bundleNumber)
489
489
+
bundle, err := s.db.GetBundleByNumber(r.Context(), bundleNumber)
530
490
if err != nil {
531
531
-
http.Error(w, "Bundle not found", http.StatusNotFound)
491
491
+
resp.error("Bundle not found", http.StatusNotFound)
532
492
return
533
493
}
534
494
535
535
-
// Get previous bundle for boundary state
495
495
+
// Fetch from PLC and verify
496
496
+
remoteOps, prevCIDs, err := s.fetchRemoteBundleOps(r.Context(), bundleNumber)
497
497
+
if err != nil {
498
498
+
resp.error(fmt.Sprintf("Failed to fetch from PLC directory: %v", err), http.StatusInternalServerError)
499
499
+
return
500
500
+
}
501
501
+
502
502
+
remoteHash := computeOperationsHash(remoteOps)
503
503
+
verified := bundle.Hash == remoteHash
504
504
+
505
505
+
resp.json(map[string]interface{}{
506
506
+
"bundle_number": bundleNumber,
507
507
+
"verified": verified,
508
508
+
"local_hash": bundle.Hash,
509
509
+
"remote_hash": remoteHash,
510
510
+
"local_op_count": plc.BUNDLE_SIZE,
511
511
+
"remote_op_count": len(remoteOps),
512
512
+
"boundary_cids_used": len(prevCIDs),
513
513
+
})
514
514
+
}
515
515
+
516
516
+
func (s *Server) fetchRemoteBundleOps(ctx context.Context, bundleNum int) ([]plc.PLCOperation, map[string]bool, error) {
536
517
var after string
537
518
var prevBoundaryCIDs map[string]bool
538
519
539
539
-
if bundleNumber > 1 {
540
540
-
prevBundle, err := s.db.GetBundleByNumber(ctx, bundleNumber-1)
520
520
+
if bundleNum > 1 {
521
521
+
prevBundle, err := s.db.GetBundleByNumber(ctx, bundleNum-1)
541
522
if err != nil {
542
542
-
http.Error(w, "Failed to get previous bundle", http.StatusInternalServerError)
543
543
-
return
523
523
+
return nil, nil, fmt.Errorf("failed to get previous bundle: %w", err)
544
524
}
545
525
546
526
after = prevBundle.EndTime.Format("2006-01-02T15:04:05.000Z")
547
527
548
548
-
// Convert stored boundary CIDs to map
549
528
if len(prevBundle.BoundaryCIDs) > 0 {
550
529
prevBoundaryCIDs = make(map[string]bool)
551
530
for _, cid := range prevBundle.BoundaryCIDs {
···
554
533
}
555
534
}
556
535
557
557
-
// Collect remote operations (may need multiple fetches for large bundles)
558
536
var allRemoteOps []plc.PLCOperation
559
537
seenCIDs := make(map[string]bool)
560
538
561
561
-
// Track boundary CIDs
562
539
for cid := range prevBoundaryCIDs {
563
540
seenCIDs[cid] = true
564
541
}
565
542
566
543
currentAfter := after
567
567
-
maxFetches := 20 // Enough for up to 20k operations
544
544
+
maxFetches := 20
568
545
569
546
for fetchNum := 0; fetchNum < maxFetches && len(allRemoteOps) < plc.BUNDLE_SIZE; fetchNum++ {
570
570
-
// Fetch from PLC directory
571
547
batch, err := s.plcClient.Export(ctx, plc.ExportOptions{
572
548
Count: 1000,
573
549
After: currentAfter,
574
550
})
575
575
-
if err != nil {
576
576
-
http.Error(w, fmt.Sprintf("Failed to fetch from PLC directory: %v", err), http.StatusInternalServerError)
577
577
-
return
578
578
-
}
579
579
-
580
580
-
if len(batch) == 0 {
551
551
+
if err != nil || len(batch) == 0 {
581
552
break
582
553
}
583
554
584
584
-
// Deduplicate and add unique operations
585
555
for _, op := range batch {
586
556
if !seenCIDs[op.CID] {
587
557
seenCIDs[op.CID] = true
···
592
562
}
593
563
}
594
564
595
595
-
// Update cursor for next fetch
596
565
if len(batch) > 0 {
597
566
lastOp := batch[len(batch)-1]
598
567
currentAfter = lastOp.CreatedAt.Format("2006-01-02T15:04:05.000Z")
599
568
}
600
569
601
601
-
// If we got less than 1000, we've reached the end
602
570
if len(batch) < 1000 {
603
571
break
604
572
}
605
573
}
606
574
607
607
-
// Trim to exact bundle size
608
575
if len(allRemoteOps) > plc.BUNDLE_SIZE {
609
576
allRemoteOps = allRemoteOps[:plc.BUNDLE_SIZE]
610
577
}
611
578
612
612
-
// Compute remote hash (uncompressed JSONL)
613
613
-
remoteHash, err := computeRemoteOperationsHash(allRemoteOps)
614
614
-
if err != nil {
615
615
-
http.Error(w, fmt.Sprintf("Failed to compute remote hash: %v", err), http.StatusInternalServerError)
616
616
-
return
617
617
-
}
618
618
-
619
619
-
// Compare hashes (use uncompressed hash)
620
620
-
verified := bundle.Hash == remoteHash
621
621
-
622
622
-
respondJSON(w, map[string]interface{}{
623
623
-
"bundle_number": bundleNumber,
624
624
-
"verified": verified,
625
625
-
"local_hash": bundle.Hash,
626
626
-
"remote_hash": remoteHash,
627
627
-
"local_op_count": plc.BUNDLE_SIZE,
628
628
-
"remote_op_count": len(allRemoteOps),
629
629
-
"boundary_cids_used": len(prevBoundaryCIDs),
630
630
-
})
579
579
+
return allRemoteOps, prevBoundaryCIDs, nil
631
580
}
632
581
633
582
func (s *Server) handleVerifyChain(w http.ResponseWriter, r *http.Request) {
583
583
+
resp := newResponse(w)
634
584
ctx := r.Context()
635
585
636
636
-
// Get last bundle number
637
586
lastBundle, err := s.db.GetLastBundleNumber(ctx)
638
587
if err != nil {
639
639
-
http.Error(w, err.Error(), http.StatusInternalServerError)
588
588
+
resp.error(err.Error(), http.StatusInternalServerError)
640
589
return
641
590
}
642
591
643
592
if lastBundle == 0 {
644
644
-
respondJSON(w, map[string]interface{}{
593
593
+
resp.json(map[string]interface{}{
645
594
"status": "empty",
646
595
"message": "No bundles to verify",
647
596
})
648
597
return
649
598
}
650
599
651
651
-
// Verify chain
652
600
valid := true
653
601
var brokenAt int
654
602
var errorMsg string
···
662
610
break
663
611
}
664
612
665
665
-
// Verify chain link
666
613
if i > 1 {
667
614
prevBundle, err := s.db.GetBundleByNumber(ctx, i-1)
668
615
if err != nil {
···
681
628
}
682
629
}
683
630
684
684
-
response := map[string]interface{}{
631
631
+
result := map[string]interface{}{
685
632
"chain_length": lastBundle,
686
633
"valid": valid,
687
634
}
688
635
689
636
if !valid {
690
690
-
response["broken_at"] = brokenAt
691
691
-
response["error"] = errorMsg
637
637
+
result["broken_at"] = brokenAt
638
638
+
result["error"] = errorMsg
692
639
}
693
640
694
694
-
respondJSON(w, response)
641
641
+
resp.json(result)
695
642
}
696
643
697
644
func (s *Server) handleGetChainInfo(w http.ResponseWriter, r *http.Request) {
645
645
+
resp := newResponse(w)
698
646
ctx := r.Context()
699
647
700
648
lastBundle, err := s.db.GetLastBundleNumber(ctx)
701
649
if err != nil {
702
702
-
http.Error(w, err.Error(), http.StatusInternalServerError)
650
650
+
resp.error(err.Error(), http.StatusInternalServerError)
703
651
return
704
652
}
705
653
706
654
if lastBundle == 0 {
707
707
-
respondJSON(w, map[string]interface{}{
655
655
+
resp.json(map[string]interface{}{
708
656
"chain_length": 0,
709
657
"status": "empty",
710
658
})
···
713
661
714
662
firstBundle, _ := s.db.GetBundleByNumber(ctx, 1)
715
663
lastBundleData, _ := s.db.GetBundleByNumber(ctx, lastBundle)
716
716
-
717
664
count, size, _ := s.db.GetBundleStats(ctx)
718
665
719
719
-
respondJSON(w, map[string]interface{}{
666
666
+
resp.json(map[string]interface{}{
720
667
"chain_length": lastBundle,
721
668
"total_bundles": count,
722
669
"total_size_mb": float64(size) / 1024 / 1024,
···
728
675
})
729
676
}
730
677
731
731
-
// ====================
732
732
-
// PLC Export Handler
733
733
-
// ====================
678
678
+
// ===== PLC EXPORT HANDLER =====
734
679
735
680
func (s *Server) handlePLCExport(w http.ResponseWriter, r *http.Request) {
681
681
+
resp := newResponse(w)
736
682
ctx := r.Context()
737
683
738
738
-
// Parse query parameters
739
739
-
countStr := r.URL.Query().Get("count")
740
740
-
afterStr := r.URL.Query().Get("after")
684
684
+
count := getQueryInt(r, "count", 1000)
685
685
+
if count > 10000 {
686
686
+
count = 10000
687
687
+
}
741
688
742
742
-
count := 1000 // Default
743
743
-
if countStr != "" {
744
744
-
if c, err := strconv.Atoi(countStr); err == nil && c > 0 {
745
745
-
count = c
746
746
-
if count > 10000 {
747
747
-
count = 10000 // Max limit
748
748
-
}
689
689
+
afterTime, err := parseAfterParam(r.URL.Query().Get("after"))
690
690
+
if err != nil {
691
691
+
resp.error(fmt.Sprintf("Invalid after parameter: %v", err), http.StatusBadRequest)
692
692
+
return
693
693
+
}
694
694
+
695
695
+
startBundle := s.findStartBundle(ctx, afterTime)
696
696
+
ops := s.collectOperations(ctx, startBundle, afterTime, count)
697
697
+
698
698
+
w.Header().Set("Content-Type", "application/jsonl")
699
699
+
w.Header().Set("X-Operation-Count", strconv.Itoa(len(ops)))
700
700
+
701
701
+
for _, op := range ops {
702
702
+
if len(op.RawJSON) > 0 {
703
703
+
w.Write(op.RawJSON)
704
704
+
} else {
705
705
+
jsonData, _ := json.Marshal(op)
706
706
+
w.Write(jsonData)
749
707
}
708
708
+
w.Write([]byte("\n"))
750
709
}
710
710
+
}
751
711
752
752
-
var afterTime time.Time
753
753
-
if afterStr != "" {
754
754
-
// Try multiple timestamp formats (from most specific to least)
755
755
-
formats := []string{
756
756
-
time.RFC3339Nano,
757
757
-
time.RFC3339,
758
758
-
"2006-01-02T15:04:05.000Z",
759
759
-
"2006-01-02T15:04:05",
760
760
-
"2006-01-02T15:04",
761
761
-
"2006-01-02",
712
712
+
func parseAfterParam(afterStr string) (time.Time, error) {
713
713
+
if afterStr == "" {
714
714
+
return time.Time{}, nil
715
715
+
}
716
716
+
717
717
+
formats := []string{
718
718
+
time.RFC3339Nano,
719
719
+
time.RFC3339,
720
720
+
"2006-01-02T15:04:05.000Z",
721
721
+
"2006-01-02T15:04:05",
722
722
+
"2006-01-02T15:04",
723
723
+
"2006-01-02",
724
724
+
}
725
725
+
726
726
+
for _, format := range formats {
727
727
+
if parsed, err := time.Parse(format, afterStr); err == nil {
728
728
+
return parsed, nil
762
729
}
730
730
+
}
763
731
764
764
-
var parsed time.Time
765
765
-
var parseErr error
766
766
-
parsed = time.Time{}
732
732
+
return time.Time{}, fmt.Errorf("invalid timestamp format")
733
733
+
}
767
734
768
768
-
for _, format := range formats {
769
769
-
parsed, parseErr = time.Parse(format, afterStr)
770
770
-
if parseErr == nil {
771
771
-
afterTime = parsed
772
772
-
break
773
773
-
}
774
774
-
}
735
735
+
func (s *Server) findStartBundle(ctx context.Context, afterTime time.Time) int {
736
736
+
if afterTime.IsZero() {
737
737
+
return 1
738
738
+
}
775
739
776
776
-
if parseErr != nil {
777
777
-
http.Error(w, fmt.Sprintf("Invalid after parameter: %v", parseErr), http.StatusBadRequest)
778
778
-
return
779
779
-
}
740
740
+
foundBundle, err := s.db.GetBundleForTimestamp(ctx, afterTime)
741
741
+
if err != nil {
742
742
+
return 1
780
743
}
781
744
782
782
-
// Find starting bundle (FAST - single query)
783
783
-
startBundle := 1
784
784
-
if !afterTime.IsZero() {
785
785
-
foundBundle, err := s.db.GetBundleForTimestamp(ctx, afterTime)
786
786
-
if err != nil {
787
787
-
log.Error("Failed to find bundle for timestamp: %v", err)
788
788
-
// Fallback to bundle 1
789
789
-
} else {
790
790
-
startBundle = foundBundle
791
791
-
// Go back one bundle to catch boundary timestamps
792
792
-
if startBundle > 1 {
793
793
-
startBundle--
794
794
-
}
795
795
-
}
745
745
+
if foundBundle > 1 {
746
746
+
return foundBundle - 1
796
747
}
748
748
+
return foundBundle
749
749
+
}
797
750
798
798
-
// Collect operations from bundles
751
751
+
func (s *Server) collectOperations(ctx context.Context, startBundle int, afterTime time.Time, count int) []plc.PLCOperation {
799
752
var allOps []plc.PLCOperation
800
753
seenCIDs := make(map[string]bool)
801
754
802
802
-
// Load bundles sequentially until we have enough operations
803
755
lastBundle, _ := s.db.GetLastBundleNumber(ctx)
804
756
805
757
for bundleNum := startBundle; bundleNum <= lastBundle && len(allOps) < count; bundleNum++ {
806
806
-
bundlePath := filepath.Join(s.plcBundleDir, fmt.Sprintf("%06d.jsonl.zst", bundleNum))
807
807
-
808
808
-
ops, err := s.loadBundleOperations(bundlePath)
758
758
+
ops, err := s.bundleManager.LoadBundleOperations(ctx, bundleNum)
809
759
if err != nil {
810
760
log.Error("Warning: failed to load bundle %d: %v", bundleNum, err)
811
761
continue
812
762
}
813
763
814
814
-
// Filter operations
815
764
for _, op := range ops {
816
816
-
// Skip if STRICTLY BEFORE "after" timestamp
817
817
-
// Include operations AT or AFTER the timestamp
818
765
if !afterTime.IsZero() && op.CreatedAt.Before(afterTime) {
819
766
continue
820
767
}
821
768
822
822
-
// Skip duplicates (by CID)
823
769
if seenCIDs[op.CID] {
824
770
continue
825
771
}
···
833
779
}
834
780
}
835
781
836
836
-
// Set headers for JSONL response
837
837
-
w.Header().Set("Content-Type", "application/jsonl")
838
838
-
w.Header().Set("X-Operation-Count", strconv.Itoa(len(allOps)))
839
839
-
840
840
-
// Write JSONL response (newline-delimited JSON with trailing newline)
841
841
-
for _, op := range allOps {
842
842
-
// Use raw JSON if available
843
843
-
if len(op.RawJSON) > 0 {
844
844
-
w.Write(op.RawJSON)
845
845
-
} else {
846
846
-
// Fallback: marshal the operation
847
847
-
jsonData, err := json.Marshal(op)
848
848
-
if err != nil {
849
849
-
log.Error("Failed to marshal operation: %v", err)
850
850
-
continue
851
851
-
}
852
852
-
w.Write(jsonData)
853
853
-
}
854
854
-
855
855
-
// Always add newline after each operation (including the last)
856
856
-
w.Write([]byte("\n"))
857
857
-
}
782
782
+
return allOps
858
783
}
859
784
860
860
-
// ====================
861
861
-
// Health Handler
862
862
-
// ====================
785
785
+
// ===== HEALTH HANDLER =====
863
786
864
787
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
865
865
-
respondJSON(w, map[string]string{"status": "ok"})
788
788
+
newResponse(w).json(map[string]string{"status": "ok"})
866
789
}
867
790
868
868
-
// ====================
869
869
-
// Helper Functions
870
870
-
// ====================
871
871
-
872
872
-
// loadBundleOperations loads operations from a bundle file
873
873
-
func (s *Server) loadBundleOperations(path string) ([]plc.PLCOperation, error) {
874
874
-
decoder, err := zstd.NewReader(nil)
875
875
-
if err != nil {
876
876
-
return nil, err
877
877
-
}
878
878
-
defer decoder.Close()
879
879
-
880
880
-
compressedData, err := os.ReadFile(path)
881
881
-
if err != nil {
882
882
-
return nil, err
883
883
-
}
884
884
-
885
885
-
decompressed, err := decoder.DecodeAll(compressedData, nil)
886
886
-
if err != nil {
887
887
-
return nil, err
888
888
-
}
889
889
-
890
890
-
// Parse JSONL (newline-delimited JSON)
891
891
-
var operations []plc.PLCOperation
892
892
-
scanner := bufio.NewScanner(bytes.NewReader(decompressed))
893
893
-
894
894
-
lineNum := 0
895
895
-
for scanner.Scan() {
896
896
-
lineNum++
897
897
-
line := scanner.Bytes()
898
898
-
899
899
-
// Skip empty lines
900
900
-
if len(line) == 0 {
901
901
-
continue
902
902
-
}
903
903
-
904
904
-
var op plc.PLCOperation
905
905
-
if err := json.Unmarshal(line, &op); err != nil {
906
906
-
return nil, fmt.Errorf("failed to parse operation on line %d: %w", lineNum, err)
907
907
-
}
908
908
-
909
909
-
// CRITICAL: Store the original raw JSON bytes
910
910
-
op.RawJSON = make([]byte, len(line))
911
911
-
copy(op.RawJSON, line)
912
912
-
913
913
-
operations = append(operations, op)
914
914
-
}
915
915
-
916
916
-
if err := scanner.Err(); err != nil {
917
917
-
return nil, fmt.Errorf("error reading JSONL: %w", err)
918
918
-
}
791
791
+
// ===== UTILITY FUNCTIONS =====
919
792
920
920
-
return operations, nil
921
921
-
}
922
922
-
923
923
-
// computeRemoteOperationsHash computes hash for remote operations
924
924
-
func computeRemoteOperationsHash(ops []plc.PLCOperation) (string, error) {
793
793
+
func computeOperationsHash(ops []plc.PLCOperation) string {
925
794
var jsonlData []byte
926
926
-
for i, op := range ops {
927
927
-
if len(op.RawJSON) > 0 {
928
928
-
jsonlData = append(jsonlData, op.RawJSON...)
929
929
-
} else {
930
930
-
return "", fmt.Errorf("operation %d missing raw JSON data", i)
931
931
-
}
932
932
-
// Add newline ONLY between operations
795
795
+
for _, op := range ops {
796
796
+
jsonlData = append(jsonlData, op.RawJSON...)
933
797
jsonlData = append(jsonlData, '\n')
934
798
}
935
935
-
936
799
hash := sha256.Sum256(jsonlData)
937
937
-
return hex.EncodeToString(hash[:]), nil
938
938
-
}
939
939
-
940
940
-
// statusToString converts status int to string
941
941
-
func statusToString(status int) string {
942
942
-
switch status {
943
943
-
case storage.PDSStatusOnline: // Use PDSStatusOnline (alias)
944
944
-
return "online"
945
945
-
case storage.PDSStatusOffline: // Use PDSStatusOffline (alias)
946
946
-
return "offline"
947
947
-
default:
948
948
-
return "unknown"
949
949
-
}
950
950
-
}
951
951
-
952
952
-
// respondJSON writes JSON response
953
953
-
func respondJSON(w http.ResponseWriter, data interface{}) {
954
954
-
w.Header().Set("Content-Type", "application/json")
955
955
-
json.NewEncoder(w).Encode(data)
800
800
+
return hex.EncodeToString(hash[:])
956
801
}
+13
-9
internal/api/server.go
···
15
15
)
16
16
17
17
type Server struct {
18
18
-
router *mux.Router
19
19
-
server *http.Server
20
20
-
db storage.Database
21
21
-
plcClient *plc.Client
22
22
-
plcBundleDir string // NEW: Store cache dir
18
18
+
router *mux.Router
19
19
+
server *http.Server
20
20
+
db storage.Database
21
21
+
plcClient *plc.Client
22
22
+
plcBundleDir string
23
23
+
bundleManager *plc.BundleManager
23
24
}
24
25
25
26
func NewServer(db storage.Database, apiCfg config.APIConfig, plcCfg config.PLCConfig) *Server {
27
27
+
bundleManager, _ := plc.NewBundleManager(plcCfg.BundleDir, plcCfg.UseCache, db)
28
28
+
26
29
s := &Server{
27
27
-
router: mux.NewRouter(),
28
28
-
db: db,
29
29
-
plcClient: plc.NewClient(plcCfg.DirectoryURL),
30
30
-
plcBundleDir: plcCfg.BundleDir, // NEW
30
30
+
router: mux.NewRouter(),
31
31
+
db: db,
32
32
+
plcClient: plc.NewClient(plcCfg.DirectoryURL),
33
33
+
plcBundleDir: plcCfg.BundleDir,
34
34
+
bundleManager: bundleManager,
31
35
}
32
36
33
37
s.setupRoutes()
+19
internal/plc/bundle.go
···
576
576
577
577
return operations[startIdx:]
578
578
}
579
579
+
580
580
+
// LoadBundleOperations is a public method for external access (e.g., API handlers)
581
581
+
func (bm *BundleManager) LoadBundleOperations(ctx context.Context, bundleNum int) ([]PLCOperation, error) {
582
582
+
if !bm.enabled {
583
583
+
return nil, fmt.Errorf("bundle manager disabled")
584
584
+
}
585
585
+
586
586
+
bf := bm.newBundleFile(bundleNum)
587
587
+
588
588
+
if !bf.exists() {
589
589
+
return nil, fmt.Errorf("bundle %06d not found", bundleNum)
590
590
+
}
591
591
+
592
592
+
if err := bm.load(bf); err != nil {
593
593
+
return nil, err
594
594
+
}
595
595
+
596
596
+
return bf.operations, nil
597
597
+
}