···2020 create table if not exists known_dids (
2121 did text primary key
2222 );
2323+2324 create table if not exists public_keys (
2425 id integer primary key autoincrement,
2526 did text not null,
···3738 created timestamp default current_timestamp,
3839 unique(did, name)
3940 );
4040- create table if not exists access_levels (
4141+4242+ create table if not exists _jetstream (
4143 id integer primary key autoincrement,
4242- repo_id integer not null,
4343- did text not null,
4444- access text not null check (access in ('OWNER', 'WRITER')),
4545- created timestamp default current_timestamp,
4646- unique(repo_id, did),
4747- foreign key (repo_id) references repos(id) on delete cascade
4444+ last_time_us integer not null
4845 );
4946 `)
5047 if err != nil {
···66 "fmt"
77 "log"
88 "net/http"
99+ "time"
9101011 "github.com/go-chi/chi/v5"
1112 tangled "github.com/sotangled/tangled/api/tangled"
···109110 collections := []string{tangled.PublicKeyNSID, tangled.KnotMemberNSID}
110111 dids := []string{}
111112113113+ var lastTimeUs int64
114114+ var err error
115115+ lastTimeUs, err = h.db.GetLastTimeUs()
116116+ if err != nil {
117117+ log.Println("couldn't get last time us, starting from now")
118118+ lastTimeUs = time.Now().UnixMicro()
119119+ }
120120+ // If last time is older than a week, start from now
121121+ if time.Now().UnixMicro()-lastTimeUs > 7*24*60*60*1000*1000 {
122122+ lastTimeUs = time.Now().UnixMicro()
123123+ log.Printf("last time us is older than a week. discarding that and starting from now.")
124124+ err = h.db.SaveLastTimeUs(lastTimeUs)
125125+ if err != nil {
126126+ log.Println("failed to save last time us")
127127+ }
128128+ }
129129+130130+ log.Printf("found last time_us %d", lastTimeUs)
131131+112132 h.js = jsclient.NewJetstreamClient(collections, dids)
113113- messages, err := h.js.ReadJetstream(ctx)
133133+ messages, err := h.js.ReadJetstream(ctx, lastTimeUs)
114134 if err != nil {
115135 return fmt.Errorf("failed to read from jetstream: %w", err)
116136 }
···150170 h.e.AddMember(ThisServer, record["member"].(string))
151171 }
152172 default:
173173+ }
174174+175175+ lastTimeUs := int64(data["time_us"].(float64))
176176+ if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil {
177177+ log.Printf("failed to save last time us: %v", err)
153178 }
154179 }
155180