tangled
alpha
login
or
join now
angrydutchman.peedee.es
/
plcbundle
forked from
atscan.net/plcbundle
0
fork
atom
A Transparent and Verifiable Way to Sync the AT Protocol's PLC Directory
0
fork
atom
overview
issues
pulls
pipelines
update ws
tree.fail
4 months ago
fb37be17
a4d84733
+89
-26
2 changed files
expand all
collapse all
unified
split
cmd
plcbundle
main.go
server.go
+28
-1
cmd/plcbundle/main.go
···
7
"net/http"
8
"os"
9
"path/filepath"
0
10
"sort"
11
"strings"
12
"time"
···
15
"github.com/atscan/plcbundle/plc"
16
)
17
18
-
// Version information (injected at build time via ldflags)
19
var (
20
version = "dev"
21
gitCommit = "unknown"
22
buildDate = "unknown"
23
)
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
24
25
func main() {
26
if len(os.Args) < 2 {
···
7
"net/http"
8
"os"
9
"path/filepath"
10
+
"runtime/debug"
11
"sort"
12
"strings"
13
"time"
···
16
"github.com/atscan/plcbundle/plc"
17
)
18
19
+
// Version information (injected at build time via ldflags or read from build info)
20
var (
21
version = "dev"
22
gitCommit = "unknown"
23
buildDate = "unknown"
24
)
25
+
26
+
func init() {
27
+
// Try to get version from build info (works with go install)
28
+
if info, ok := debug.ReadBuildInfo(); ok {
29
+
if info.Main.Version != "" && info.Main.Version != "(devel)" {
30
+
version = info.Main.Version
31
+
}
32
+
33
+
// Extract git commit and build time from build settings
34
+
for _, setting := range info.Settings {
35
+
switch setting.Key {
36
+
case "vcs.revision":
37
+
if setting.Value != "" {
38
+
gitCommit = setting.Value
39
+
if len(gitCommit) > 7 {
40
+
gitCommit = gitCommit[:7] // Short hash
41
+
}
42
+
}
43
+
case "vcs.time":
44
+
if setting.Value != "" {
45
+
buildDate = setting.Value
46
+
}
47
+
}
48
+
}
49
+
}
50
+
}
51
52
func main() {
53
if len(os.Args) < 2 {
+61
-25
cmd/plcbundle/server.go
···
12
"time"
13
14
"github.com/atscan/plcbundle/bundle"
0
15
"github.com/gorilla/websocket"
16
)
17
···
143
index := mgr.GetIndex()
144
bundles := index.GetBundles()
145
146
-
if len(bundles) == 0 {
147
-
return
148
-
}
149
-
150
currentRecord := 0
151
152
// Stream all operations from all bundles
···
166
continue
167
}
168
169
-
// Send raw JSON if available, otherwise marshal the operation
170
-
var data []byte
171
-
if len(op.RawJSON) > 0 {
172
-
data = op.RawJSON
173
-
} else {
174
-
data, err = json.Marshal(op)
175
-
if err != nil {
176
-
fmt.Fprintf(os.Stderr, "Failed to marshal operation: %v\n", err)
177
-
currentRecord++
178
-
continue
179
-
}
180
-
}
181
-
182
-
// Send raw JSON as text message
183
-
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
184
-
// Connection closed or error
185
-
fmt.Fprintf(os.Stderr, "WebSocket write error: %v\n", err)
186
return
187
}
188
189
currentRecord++
190
191
-
// Optional: Send ping periodically to keep connection alive
192
if currentRecord%1000 == 0 {
193
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
194
return
···
196
}
197
}
198
}
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
199
}
200
201
func handleRoot(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, syncMode bool, wsEnabled bool) {
···
315
fmt.Fprintf(w, "━━━━━━━━\n")
316
fmt.Fprintf(w, " # Get bundle metadata\n")
317
fmt.Fprintf(w, " curl %s/bundle/1\n\n", baseURL)
318
-
fmt.Fprintf(w, " # Download compressed bundle\n")
319
-
fmt.Fprintf(w, " curl %s/data/1 -o 000001.jsonl.zst\n\n", baseURL)
320
-
fmt.Fprintf(w, " # Stream decompressed operations\n")
321
fmt.Fprintf(w, " curl %s/jsonl/1\n\n", baseURL)
322
323
if wsEnabled {
···
12
"time"
13
14
"github.com/atscan/plcbundle/bundle"
15
+
"github.com/atscan/plcbundle/plc"
16
"github.com/gorilla/websocket"
17
)
18
···
144
index := mgr.GetIndex()
145
bundles := index.GetBundles()
146
0
0
0
0
147
currentRecord := 0
148
149
// Stream all operations from all bundles
···
163
continue
164
}
165
166
+
// Send raw JSON
167
+
if err := sendOperation(conn, op); err != nil {
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
168
return
169
}
170
171
currentRecord++
172
173
+
// Send ping periodically to keep connection alive
174
if currentRecord%1000 == 0 {
175
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
176
return
···
178
}
179
}
180
}
181
+
182
+
// Stream mempool operations (seamlessly after bundles)
183
+
mempoolOps, err := mgr.GetMempoolOperations()
184
+
if err != nil {
185
+
fmt.Fprintf(os.Stderr, "Failed to get mempool operations: %v\n", err)
186
+
return
187
+
}
188
+
189
+
for _, op := range mempoolOps {
190
+
// Skip records before cursor
191
+
if currentRecord < cursor {
192
+
currentRecord++
193
+
continue
194
+
}
195
+
196
+
// Send raw JSON
197
+
if err := sendOperation(conn, op); err != nil {
198
+
return
199
+
}
200
+
201
+
currentRecord++
202
+
203
+
// Send ping periodically
204
+
if currentRecord%1000 == 0 {
205
+
if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
206
+
return
207
+
}
208
+
}
209
+
}
210
+
}
211
+
212
+
// sendOperation sends a single operation over WebSocket as raw JSON
213
+
func sendOperation(conn *websocket.Conn, op plc.PLCOperation) error {
214
+
var data []byte
215
+
var err error
216
+
217
+
// Use raw JSON if available, otherwise marshal
218
+
if len(op.RawJSON) > 0 {
219
+
data = op.RawJSON
220
+
} else {
221
+
data, err = json.Marshal(op)
222
+
if err != nil {
223
+
fmt.Fprintf(os.Stderr, "Failed to marshal operation: %v\n", err)
224
+
return nil // Skip this operation but continue
225
+
}
226
+
}
227
+
228
+
// Send as text message
229
+
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
230
+
fmt.Fprintf(os.Stderr, "WebSocket write error: %v\n", err)
231
+
return err
232
+
}
233
+
234
+
return nil
235
}
236
237
func handleRoot(w http.ResponseWriter, r *http.Request, mgr *bundle.Manager, syncMode bool, wsEnabled bool) {
···
351
fmt.Fprintf(w, "━━━━━━━━\n")
352
fmt.Fprintf(w, " # Get bundle metadata\n")
353
fmt.Fprintf(w, " curl %s/bundle/1\n\n", baseURL)
354
+
fmt.Fprintf(w, " # Download compressed bundle 42\n")
355
+
fmt.Fprintf(w, " curl %s/data/42 -o 000042.jsonl.zst\n\n", baseURL)
356
+
fmt.Fprintf(w, " # Stream decompressed operations from bundle 42\n")
357
fmt.Fprintf(w, " curl %s/jsonl/1\n\n", baseURL)
358
359
if wsEnabled {