AT Protocol Terminal Interface Explorer
1package ui
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/jetstream/pkg/models"
11 "github.com/charmbracelet/bubbles/list"
12 tea "github.com/charmbracelet/bubbletea"
13 "github.com/charmbracelet/lipgloss"
14 "github.com/treethought/attie/at"
15)
16
17var (
18 opStyle = lipgloss.NewStyle().Foreground(lipgloss.Color("205"))
19 didStyle = lipgloss.NewStyle().Foreground(lipgloss.Color("42"))
20)
21
22type jetEventItem struct {
23 evt *models.Event
24}
25
26func (j jetEventItem) FilterValue() string {
27 return ""
28}
29func (j jetEventItem) Title() string {
30 return fmt.Sprintf("%s %s %s",
31 opStyle.Render(j.evt.Commit.Operation), j.evt.Commit.Collection, dimStyle.Render(j.evt.Commit.RKey),
32 )
33}
34
35func (j jetEventItem) Description() string {
36 t := time.Unix(0, j.evt.TimeUS*int64(time.Microsecond))
37 return fmt.Sprintf("%s - %s", didStyle.Render(j.evt.Did), t.Format("2006-01-02 15:04:05"))
38}
39
40type eventMsg struct {
41 evt *models.Event
42}
43
44type jetStreamErrorMsg struct {
45 err error
46}
47
48type session struct {
49 lastCursor *int64
50 collections []string
51 dids []string
52}
53
54type JetStreamView struct {
55 list list.Model
56 preview *JetStreamEventView
57 jc *at.JetStreamClient
58 ctx context.Context
59 cancel context.CancelFunc
60 session session
61 w, h int
62}
63
64func NewJetStreamView(jc *at.JetStreamClient) *JetStreamView {
65 del := list.DefaultDelegate{
66 ShowDescription: true,
67 Styles: list.NewDefaultItemStyles(),
68 }
69 del.SetHeight(2)
70
71 l := list.New(nil, del, 80, 20)
72 l.SetShowTitle(false)
73 l.SetShowStatusBar(false)
74 l.SetFilteringEnabled(false)
75 return &JetStreamView{
76 list: l,
77 preview: NewJetEventView(true),
78 jc: jc,
79 }
80}
81
82func (m *JetStreamView) Listen() tea.Cmd {
83 return func() tea.Msg {
84 select {
85 case err := <-m.jc.Err():
86 slog.Error("JetStream client error", "error", err)
87 return jetStreamErrorMsg{err: err}
88 case evt := <-m.jc.Out():
89 slog.Info("Received JetStream event", "did", evt.Did, "kind", evt.Kind)
90 return eventMsg{evt: evt}
91 }
92 }
93}
94
95func (m *JetStreamView) AddEvent(evt *models.Event) tea.Cmd {
96 m.session.lastCursor = &evt.TimeUS
97 item := jetEventItem{evt: evt}
98 return m.list.InsertItem(0, item)
99}
100
101func (m *JetStreamView) Running() bool {
102 return m.ctx != nil
103}
104func (m *JetStreamView) Clear() tea.Cmd {
105 return func() tea.Msg {
106 m.session = session{}
107 m.preview.SetEvent(nil)
108 return m.list.SetItems(nil)
109 }
110}
111
112func (m *JetStreamView) Start(cxs, dids []string, cursor *int64) tea.Cmd {
113 if m.ctx != nil {
114 slog.Warn("JetStream client already running")
115 return nil
116 }
117 m.session = session{
118 lastCursor: cursor,
119 collections: cxs,
120 dids: dids,
121 }
122 m.ctx, m.cancel = context.WithCancel(context.Background())
123 slog.Info("Starting JetStream client", "collections", cxs, "dids", dids, "cursor", cursor)
124 go m.jc.Start(m.ctx, cxs, dids, cursor)
125 return m.Listen()
126}
127func (m *JetStreamView) Stop() tea.Cmd {
128 if m.cancel != nil {
129 slog.Info("Stopping JetStream client")
130 m.cancel()
131 m.ctx = nil
132 }
133 return nil
134}
135
136func (m *JetStreamView) Init() tea.Cmd {
137 return nil
138}
139func (m *JetStreamView) SetSize(w, h int) {
140 m.w = w
141 m.h = h
142 hh := lipgloss.Height(m.header())
143 if m.ctx == nil {
144 hh += 1
145 }
146 if w > 100 {
147 m.list.SetSize(w/2, h-hh)
148 m.preview.SetSize(w/2, h-hh)
149 return
150 }
151 m.list.SetSize(w, h-hh)
152 m.preview.SetSize(0, 0)
153}
154
155func (m *JetStreamView) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
156 switch msg := msg.(type) {
157
158 case jetStreamErrorMsg:
159 slog.Error("JetStream client error", "error", msg.err)
160 return m, nil
161
162 case eventMsg:
163 return m, tea.Batch(
164 m.AddEvent(msg.evt),
165 m.Listen(),
166 )
167 }
168 switch msg := msg.(type) {
169 case tea.KeyMsg:
170 if msg.String() == "enter" {
171 if item, ok := m.list.SelectedItem().(jetEventItem); ok {
172 return m, func() tea.Msg {
173 return jetEventSelectedMsg{evt: item.evt}
174 }
175 }
176 }
177 }
178
179 l, cmd := m.list.Update(msg)
180 m.list = l
181 if item, ok := m.list.SelectedItem().(jetEventItem); ok {
182 m.preview.SetEvent(item.evt)
183 }
184 return m, cmd
185}
186
187var jetstreamTitleStyle = lipgloss.NewStyle().
188 Bold(true).
189 Foreground(lipgloss.Color("205")).
190 BorderStyle(lipgloss.NormalBorder()).
191 BorderBottom(true).
192 BorderForeground(lipgloss.Color("62")).
193 PaddingLeft(1)
194
195func (m *JetStreamView) header() string {
196 cxs := dimStyle.Render("all")
197 if len(m.session.collections) > 0 {
198 cxs = strings.Join(m.session.collections, ", ")
199 }
200 dids := dimStyle.Render("all")
201 if len(m.session.dids) > 0 {
202 dids = strings.Join(m.session.dids, ", ")
203 }
204 lastCursor := dimStyle.Render("live")
205 if m.session.lastCursor != nil {
206 lastCursor = fmt.Sprintf("%d", *m.session.lastCursor)
207 }
208
209 title := jetstreamTitleStyle.Render("📡 JetStream Events")
210
211 dot := dimStyle.Render(" · ")
212 filters := lipgloss.JoinHorizontal(lipgloss.Left,
213 dimStyle.Render(" collections: "), cxs,
214 dot, dimStyle.Render("dids: "), dids,
215 dot, dimStyle.Render("cursor: "), lastCursor,
216 )
217
218 return lipgloss.JoinVertical(lipgloss.Left, title, filters)
219}
220
221func (m *JetStreamView) View() string {
222 hdr := m.header()
223 status := ""
224 if m.ctx == nil {
225 status = dimStyle.Render(" not connected · press ctrl+j to start")
226 }
227
228 if m.w > 100 {
229 left := lipgloss.JoinVertical(lipgloss.Left, m.list.View())
230 right := m.preview.View()
231 body := lipgloss.JoinHorizontal(lipgloss.Top, left, right)
232 if status != "" {
233 return lipgloss.JoinVertical(lipgloss.Left, hdr, status, body)
234 }
235 return lipgloss.JoinVertical(lipgloss.Left, hdr, body)
236 }
237
238 if status != "" {
239 return lipgloss.JoinVertical(lipgloss.Left, hdr, status, m.list.View())
240 }
241 return lipgloss.JoinVertical(lipgloss.Left, hdr, m.list.View())
242}