package main import ( "context" "encoding/json" "errors" "fmt" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" tea "github.com/charmbracelet/bubbletea" "github.com/charmbracelet/lipgloss" "github.com/gopxl/beep" "github.com/gopxl/beep/speaker" "github.com/gopxl/beep/wav" "hash/fnv" "log/slog" "os" "time" ) type NotificationSystem struct { np *NotificationPlayer tp *tea.Program } type NotificationPlayer struct { audioData beep.Buffer sampleRate beep.SampleRate } var ( hasArgs bool didToHandle map[syntax.DID]*syntax.Handle noNeedForFrom bool ) func main() { fmt.Println("beep") if len(os.Args) != 1 { hasArgs = true } if len(os.Args) == 2 { noNeedForFrom = true } f, err := os.Open("thread_notification.wav") if err != nil { panic(err) } streamer, format, err := wav.Decode(f) if err != nil { panic(err) } defer streamer.Close() buffer := beep.NewBuffer(format) buffer.Append(streamer) np := &NotificationPlayer{ audioData: *buffer, sampleRate: format.SampleRate, } err = speaker.Init(np.sampleRate, np.sampleRate.N(time.Second/10)) if err != nil { panic(err) } tp := tea.NewProgram(initialModel()) ns := &NotificationSystem{ np, tp, } didToHandle = make(map[syntax.DID]*syntax.Handle) go consumeLoop(context.Background(), ns) ns.tp.Run() } func consumeLoop(ctx context.Context, ns *NotificationSystem) { jsServerAddr := os.Getenv("JS_SERVER_ADDR") if jsServerAddr == "" { jsServerAddr = "wss://jetstream.atproto.tools/subscribe" } consumer := NewConsumer(jsServerAddr, ns) for { err := consumer.Consume(ctx) if err != nil { fmt.Printf("error in consumeLoop: %s\n", err.Error()) if errors.Is(err, context.Canceled) { fmt.Println("exiting consume loop") return } } } } type Consumer struct { cfg *client.ClientConfig handler *handler } type handler struct { ns *NotificationSystem } func NewConsumer(jsAddr string, ns *NotificationSystem) *Consumer { cfg := client.DefaultClientConfig() if jsAddr != "" { cfg.WebsocketURL = jsAddr } cfg.WantedCollections = []string{ "place.stream.chat.message", } cfg.WantedDids = []string{} return &Consumer{ cfg: cfg, handler: &handler{ns}, } } func (c *Consumer) Consume(ctx context.Context) error { scheduler := sequential.NewScheduler("jetstream_localdev", slog.Default(), c.handler.HandleEvent) defer scheduler.Shutdown() opts := slog.HandlerOptions{ Level: slog.LevelError, } handler := slog.NewJSONHandler(os.Stdout, &opts) client, err := client.NewClient(c.cfg, slog.New(handler), scheduler) if err != nil { return errors.New("failed to create client: " + err.Error()) } cursor := time.Now().Add(1 * -time.Minute).UnixMicro() err = client.ConnectAndRead(ctx, &cursor) if err != nil { return errors.New("error connecting and reading: " + err.Error()) } return nil } type ChatMessage struct { LexiconTypeID string `json:"$type,const=place.stream.chat.message"` CreatedAt string `json:"createdAt"` Streamer string `json:"streamer"` Text string `json:"text"` } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { if event.Commit != nil && event.Commit.Collection == "place.stream.chat.message" && event.Commit.Operation == "create" { handle, err := getHandle(event.Did, ctx) if err != nil { panic(err) } var v ChatMessage err = json.Unmarshal(event.Commit.Record, &v) if err != nil { return nil } shouldSend := !hasArgs if hasArgs { for _, streamer := range os.Args[1:] { if streamer == v.Streamer { shouldSend = true } } } if shouldSend { var streamer string if !noNeedForFrom { streamer, err = getHandle(v.Streamer, ctx) if err != nil { panic(err) } } h.ns.Notify(v.Text, handle, streamer) } } return nil } func getHandle(did string, ctx context.Context) (string, error) { sdid, err := syntax.ParseDID(did) if err != nil { return "", err } h, ok := didToHandle[sdid] if ok { return h.String(), nil } dd := identity.DefaultDirectory() id, err := dd.LookupDID(ctx, sdid) if err != nil { return "failed.to.lookup", nil } didToHandle[sdid] = &id.Handle return id.Handle.String(), nil } func (ns *NotificationSystem) Notify(text string, handle string, streamer string) { noise := ns.np.audioData.Streamer(0, ns.np.audioData.Len()) speaker.Play(noise) ns.tp.Send(ChatMsg{text: &text, handle: &handle, streamer: &streamer}) } type model struct { records []*record width int height int } type record struct { handle *string text *string streamer *string } func initialModel() model { return model{ records: make([]*record, 0), } } func (m model) Init() tea.Cmd { return nil } //6c67ea //15191e type ChatMsg struct { handle *string text *string streamer *string } func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { switch msg := msg.(type) { case tea.KeyMsg: if msg.String() == "q" || msg.String() == "ctrl+c" { return m, tea.Quit } case tea.WindowSizeMsg: m.width = msg.Width m.height = msg.Height case ChatMsg: record := record{ text: msg.text, handle: msg.handle, streamer: msg.streamer, } m.records = append(m.records, &record) } return m, nil } func (m model) View() string { s := "" for _, record := range m.records { str := "invalid handle" if record.handle != nil { str = fmt.Sprintf("%s", *record.handle) } bold := lipgloss.NewStyle().Bold(true).Foreground(lipgloss.Color(hashStringToColor(str))) bdy := "" if record.text != nil { bdy = fmt.Sprintf("%s", *record.text) } middleText := "\n" if !noNeedForFrom { boldStrmr := lipgloss.NewStyle().Bold(true).Foreground(lipgloss.Color(hashStringToColor(*record.streamer))) middleText = fmt.Sprintf(" in %s's chat\n", boldStrmr.Render(*record.streamer)) } style := lipgloss.NewStyle().Width(m.width) s = s + "\n" + bold.Render(str) + middleText + style.Render(bdy) + "\n" } return s } func hashStringToColor(s string) string { h := fnv.New32a() h.Write([]byte(s)) ui := h.Sum32() guess := fmt.Sprintf("#%06x", ui) return guess[0:7] }