···44 last_time_us integer not null
45 );
4647- create table if not exists oplog (
48- tid text primary key,
49- did text not null,
50- repo text not null,
51- old_sha text not null,
52- new_sha text not null,
53- ref text not null
54 );
55 `)
56 if err != nil {
···44 last_time_us integer not null
45 );
4647+ create table if not exists events (
48+ rkey text not null,
49+ nsid text not null,
50+ event text not null, -- json
51+ primary key (rkey, nsid)
0052 );
53 `)
54 if err != nil {
-70
knotserver/db/oplog.go
···1-package db
2-3-import (
4- "fmt"
5-6- "tangled.sh/tangled.sh/core/knotserver/notifier"
7-)
8-9-type Op struct {
10- Tid string // time based ID, easy to enumerate & monotonic
11- Did string // did of pusher
12- Repo string // <did/repo> fully qualified repo
13- OldSha string // old sha of reference being updated
14- NewSha string // new sha of reference being updated
15- Ref string // the reference being updated
16-}
17-18-func (d *DB) InsertOp(op Op, notifier *notifier.Notifier) error {
19- _, err := d.db.Exec(
20- `insert into oplog (tid, did, repo, old_sha, new_sha, ref) values (?, ?, ?, ?, ?, ?)`,
21- op.Tid,
22- op.Did,
23- op.Repo,
24- op.OldSha,
25- op.NewSha,
26- op.Ref,
27- )
28- if err != nil {
29- return err
30- }
31-32- notifier.NotifyAll()
33- return nil
34-}
35-36-func (d *DB) GetOps(cursor string) ([]Op, error) {
37- whereClause := ""
38- args := []any{}
39- if cursor != "" {
40- whereClause = "where tid > ?"
41- args = append(args, cursor)
42- }
43-44- query := fmt.Sprintf(`
45- select tid, did, repo, old_sha, new_sha, ref
46- from oplog
47- %s
48- order by tid asc
49- limit 100
50- `, whereClause)
51-52- rows, err := d.db.Query(query, args...)
53- if err != nil {
54- return nil, err
55- }
56- defer rows.Close()
57-58- var ops []Op
59- for rows.Next() {
60- var op Op
61- rows.Scan(&op.Tid, &op.Did, &op.Repo, &op.OldSha, &op.NewSha, &op.Ref)
62- ops = append(ops, op)
63- }
64-65- if err := rows.Err(); err != nil {
66- return nil, err
67- }
68-69- return ops, nil
70-}
···23import (
4 "context"
5+ "encoding/json"
6 "net/http"
7 "time"
8···14 WriteBufferSize: 1024,
15}
1617+func (h *Handle) Events(w http.ResponseWriter, r *http.Request) {
18 l := h.l.With("handler", "OpLog")
19 l.Info("received new connection")
20···75}
7677func (h *Handle) streamOps(conn *websocket.Conn, cursor *string) error {
78+ events, err := h.db.GetEvents(*cursor)
79 if err != nil {
80+ h.l.Error("failed to fetch events from db", "err", err, "cursor", cursor)
81 return err
82 }
83+ h.l.Debug("ops", "ops", events)
8485+ for _, event := range events {
86+ // first extract the inner json into a map
87+ var eventJson map[string]any
88+ err := json.Unmarshal([]byte(event.EventJson), &eventJson)
89+ if err != nil {
90+ h.l.Error("failed to unmarshal event", "err", err)
91+ return err
92+ }
93+94+ jsonMsg, err := json.Marshal(map[string]any{
95+ "rkey": event.Rkey,
96+ "nsid": event.Nsid,
97+ "event": eventJson,
98+ })
99+ if err != nil {
100+ h.l.Error("failed to marshal record", "err", err)
101+ return err
102+ }
103+104+ if err := conn.WriteMessage(websocket.TextMessage, jsonMsg); err != nil {
105 h.l.Debug("err", "err", err)
106 return err
107 }
108+ *cursor = event.Rkey
109 }
110111 return nil
+1-1
knotserver/handler.go
···149 })
150151 // Socket that streams git oplogs
152- r.Get("/oplog", h.OpLog)
153154 // Initialize the knot with an owner and public key.
155 r.With(h.VerifySignature).Post("/init", h.Init)
···149 })
150151 // Socket that streams git oplogs
152+ r.Get("/events", h.Events)
153154 // Initialize the knot with an owner and public key.
155 r.With(h.VerifySignature).Post("/init", h.Init)