backend for xcvr appview
1package model
2
3import (
4 "context"
5 "errors"
6 "net/http"
7 "os"
8 "rvcx/internal/db"
9 "rvcx/internal/log"
10 "rvcx/internal/oauth"
11 "rvcx/internal/recordmanager"
12 "rvcx/internal/types"
13 "sync"
14 "time"
15
16 "github.com/rachel-mp4/lrcd"
17 lrcpb "github.com/rachel-mp4/lrcproto/gen/go"
18)
19
20type Model struct {
21 store *db.Store
22 uriMap map[string]*channelModel
23 logger *log.Logger
24 cli *oauth.PasswordClient
25 mu sync.Mutex
26 rm *recordmanager.RecordManager
27}
28
29type channelModel struct {
30 uri string
31 logger *log.Logger
32 valid bool
33
34 welcome string
35 server *lrcd.Server
36 lastID uint32
37 initChan <-chan lrcd.InitChanMsg
38 mediainitChan <-chan lrcd.MediaInitChanMsg
39 ctx context.Context
40 cancel func()
41
42 clients map[*client]bool
43 clientsmu sync.Mutex
44}
45
46func (m *Model) GetWSHandlerFrom(uri string) (http.HandlerFunc, error) {
47 server, err := m.getServer(uri)
48 if err != nil {
49 return nil, err
50 }
51 return server.WSHandler(), nil
52}
53
54func (m *Model) GetLexStreamFrom(uri string) (http.HandlerFunc, error) {
55 cm, ok := m.uriMap[uri]
56 if !ok {
57 return nil, errors.New("not a valid server")
58 }
59 return cm.WSHandler(uri, m), nil
60}
61
62func Init(store *db.Store, logger *log.Logger, cli *oauth.PasswordClient, rm *recordmanager.RecordManager) *Model {
63 uris, err := store.GetChannelURIs(context.Background())
64 if err != nil {
65 panic(err)
66 }
67 uriToServerModel := make(map[string]*channelModel, len(uris))
68 myid := os.Getenv("MY_DID")
69 for _, uri := range uris {
70 valid := (uri.Host == myid)
71 beep := channelModel{
72 welcome: uri.Topic,
73 uri: uri.URI,
74 logger: logger,
75 lastID: uri.LastID,
76 valid: valid,
77 clients: make(map[*client]bool),
78 clientsmu: sync.Mutex{},
79 }
80 uriToServerModel[uri.URI] = &beep
81 }
82 return &Model{
83 store,
84 uriToServerModel,
85 logger,
86 cli,
87 sync.Mutex{},
88 rm,
89 }
90}
91
92func (m *Model) AddChannel(c *types.Channel) error {
93 _, ok := m.uriMap[c.URI]
94 if ok {
95 return errors.New("tried to add existing server!")
96 }
97 valid := (c.Host == os.Getenv("MY_DID"))
98 var welcome string
99 if c.Topic == nil {
100 welcome = "and now you're connected"
101 } else {
102 welcome = *c.Topic
103 }
104 beep := channelModel{
105 welcome: welcome,
106 uri: c.URI,
107 logger: m.logger,
108 lastID: 1,
109 valid: valid,
110 clients: make(map[*client]bool),
111 clientsmu: sync.Mutex{},
112 }
113 m.uriMap[c.URI] = &beep
114 return nil
115}
116
117func (m *Model) UpdateChannel(c *types.Channel) error {
118 cm, ok := m.uriMap[c.URI]
119 if !ok {
120 return m.AddChannel(c)
121 }
122 valid := (c.Host == os.Getenv("my_IDENTITY"))
123 if valid != cm.valid {
124 if valid {
125 cm.valid = true
126 } else {
127 cm.valid = false
128 cm.cancel()
129 }
130 }
131 var welcome string
132 if c.Topic == nil {
133 welcome = "and now you're connected"
134 } else {
135 welcome = *c.Topic
136 }
137 cm.welcome = welcome
138 return nil
139}
140
141func (m *Model) DeleteChannel(uri string) error {
142 cm, ok := m.uriMap[uri]
143 if !ok {
144 return nil
145 }
146 delete(m.uriMap, uri)
147 // this case is for if a malformed channel record is ingested which
148 // doesn't create a channel, but it still shows up in uriMap. probs
149 // shouldn't be in uriMap but idk
150 if cm != nil {
151 cm.cancel()
152 }
153 return nil
154}
155
156func (m *Model) getServer(uri string) (*lrcd.Server, error) {
157 m.mu.Lock()
158 defer m.mu.Unlock()
159
160 cm := m.uriMap[uri]
161 if cm == nil {
162 return nil, errors.New("uri doesn't refer to a channel i am aware of")
163 }
164 if !cm.valid {
165 return nil, errors.New("Not hosted on this backend!")
166 }
167
168 if cm.server == nil {
169 m.logger.Deprintln("i think the server should exist, so i'm making it")
170 var err error
171 lastID := cm.lastID
172 initChan := make(chan lrcd.InitChanMsg, 100)
173 mediainitChan := make(chan lrcd.MediaInitChanMsg, 100)
174
175 server, err := lrcd.NewServer(
176 lrcd.WithResolver(func(externalID string, ctx context.Context) *string {
177 did, err := m.store.FullResolveHandle(externalID, ctx)
178 if err != nil {
179 select {
180 case <-ctx.Done():
181 return nil
182 default:
183 return &did
184 }
185 }
186 return &did
187 }),
188 lrcd.WithWelcome(cm.welcome),
189 lrcd.WithLogging(os.Stdout, true),
190 lrcd.WithInitialID(lastID),
191 lrcd.WithInitChannel(initChan),
192 lrcd.WithMediainitChannel(mediainitChan),
193 lrcd.WithServerURIAndSecret(uri, os.Getenv("LRCD_SECRET")),
194 )
195 if err != nil {
196 return nil, errors.New("Error creating server")
197 }
198
199 err = server.Start()
200 if err != nil {
201 return nil, errors.New("Error starting server")
202 }
203
204 if cm.cancel != nil {
205 m.logger.Println("that's weird, old cancel lying around")
206 cm.cancel()
207 }
208
209 ctx, cancel := context.WithCancel(context.Background())
210 cm.server = server
211 cm.initChan = initChan
212 cm.mediainitChan = mediainitChan
213 cm.cancel = cancel
214 cm.ctx = ctx
215
216 go m.handleInitEvents(cm)
217 }
218 return cm.server, nil
219}
220
221func (m *Model) handleInitEvents(cm *channelModel) {
222 ticker := time.NewTicker(5 * time.Minute)
223 defer ticker.Stop()
224
225 for {
226 select {
227 case <-cm.ctx.Done():
228 cm.logger.Deprintln("i'm a handleinitevent goroutine and my context is done")
229 return
230 case <-ticker.C:
231 c := cm.server.Connected()
232 if c == 0 {
233 cm.logger.Deprintln("i think the server is empty! gonna break some things")
234 lastID, err := cm.server.Stop()
235 if err != nil {
236 panic(err)
237 }
238 cm.lastID = lastID
239 cm.server = nil
240 cm.initChan = nil
241 cm.cancel()
242 cm.cancel = nil
243 return
244 }
245 case e, ok := <-cm.initChan:
246 if !ok {
247 cm.logger.Println("this is a weird case!")
248 return
249 }
250 err := m.rm.PostSignet(e.ResolvedId, e.Init, cm.uri, context.Background())
251 if err != nil {
252 m.logger.Println("error posting signet: " + err.Error())
253 }
254 case me, ok := <-cm.mediainitChan:
255 if !ok {
256 cm.logger.Println("this is a weird case!")
257 return
258 }
259 e := lrcpb.Event_Init{
260 Init: &lrcpb.Init{
261 Id: me.Mediainit.Mediainit.Id,
262 ExternalID: me.Mediainit.Mediainit.ExternalID,
263 },
264 }
265 err := m.rm.PostSignet(me.ResolvedId, e, cm.uri, context.Background())
266 if err != nil {
267 m.logger.Println("error posting signet: " + err.Error())
268 }
269 }
270 }
271}