Monorepo for Tangled tangled.org

knotmirror: add manual resync/cancel

Signed-off-by: Seongmin Lee <git@boltless.me>

boltless.me 02be1066 67975aeb

verified
+157 -8
+74 -3
knotmirror/adminpage.go
··· 3 3 import ( 4 4 "database/sql" 5 5 "embed" 6 + "fmt" 7 + "html" 6 8 "html/template" 7 9 "log/slog" 8 10 "net/http" 9 11 "strconv" 10 12 "time" 11 13 14 + "github.com/bluesky-social/indigo/atproto/syntax" 12 15 "github.com/go-chi/chi/v5" 13 16 "tangled.org/core/appview/pagination" 14 17 "tangled.org/core/knotmirror/db" ··· 21 24 const repoPageSize = 20 22 25 23 26 type AdminServer struct { 24 - db *sql.DB 27 + db *sql.DB 28 + resyncer *Resyncer 29 + logger *slog.Logger 25 30 } 26 31 27 - func NewAdminServer(database *sql.DB) *AdminServer { 28 - return &AdminServer{db: database} 32 + func NewAdminServer(l *slog.Logger, database *sql.DB, resyncer *Resyncer) *AdminServer { 33 + return &AdminServer{ 34 + db: database, 35 + resyncer: resyncer, 36 + logger: l, 37 + } 29 38 } 30 39 31 40 func (s *AdminServer) Router() http.Handler { 32 41 r := chi.NewRouter() 33 42 r.Get("/repos", s.handleRepos()) 34 43 r.Get("/hosts", s.handleHosts()) 44 + 45 + // not sure how to use these. should we vibe-code the admin page with React? 46 + r.Post("/api/triggerRepoResync", s.handleRepoResyncTrigger()) 47 + r.Post("/api/cancelRepoResync", s.handleRepoResyncCancel()) 48 + r.Post("/api/testNotif", s.handleTestNotif) 35 49 return r 36 50 } 37 51 ··· 106 120 } 107 121 } 108 122 } 123 + 124 + func (s *AdminServer) handleRepoResyncTrigger() http.HandlerFunc { 125 + return func(w http.ResponseWriter, r *http.Request) { 126 + var repoQuery = r.FormValue("repo") 127 + 128 + repo, err := syntax.ParseATURI(repoQuery) 129 + if err != nil || repo.RecordKey() == "" { 130 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 131 + return 132 + } 133 + 134 + if err := s.resyncer.TriggerResyncJob(r.Context(), repo); err != nil { 135 + s.logger.Error("failed to trigger resync job", "err", err) 136 + writeNotif(w, http.StatusInternalServerError, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 137 + return 138 + } 139 + writeNotif(w, http.StatusOK, "success") 140 + } 141 + } 142 + 143 + func (s *AdminServer) handleRepoResyncCancel() http.HandlerFunc { 144 + return func(w http.ResponseWriter, r *http.Request) { 145 + var repoQuery = r.FormValue("repo") 146 + 147 + repo, err := syntax.ParseATURI(repoQuery) 148 + if err != nil || repo.RecordKey() == "" { 149 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 150 + return 151 + } 152 + 153 + s.resyncer.CancelResyncJob(repo) 154 + writeNotif(w, http.StatusOK, "success") 155 + } 156 + } 157 + 158 + func (s *AdminServer) handleTestNotif(w http.ResponseWriter, r *http.Request) { 159 + writeNotif(w, http.StatusOK, "new notifi") 160 + } 161 + 162 + func writeNotif(w http.ResponseWriter, status int, msg string) { 163 + w.Header().Set("Content-Type", "text/html") 164 + w.WriteHeader(status) 165 + 166 + class := "info" 167 + switch { 168 + case status >= 500: 169 + class = "error" 170 + case status >= 400: 171 + class = "warn" 172 + } 173 + 174 + fmt.Fprintf(w, 175 + `<div class="notif %s" hx-swap-oob="beforeend:#notifications">%s</div>`, 176 + class, 177 + html.EscapeString(msg), 178 + ) 179 + }
+1 -1
knotmirror/knotmirror.go
··· 59 59 knotstream := knotstream.NewKnotStream(logger, db, cfg) 60 60 crawler := NewCrawler(logger, db) 61 61 resyncer := NewResyncer(logger, db, gitc, cfg) 62 - adminpage := NewAdminServer(db) 62 + adminpage := NewAdminServer(logger, db, resyncer) 63 63 64 64 // maintain repository list with tap 65 65 // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events.
+70 -4
knotmirror/resyncer.go
··· 25 25 26 26 claimJobMu sync.Mutex 27 27 28 - repoFetchTimeout time.Duration 28 + runningJobs map[syntax.ATURI]context.CancelFunc 29 + runningJobsMu sync.Mutex 30 + 31 + repoFetchTimeout time.Duration 32 + manualResyncTimeout time.Duration 29 33 30 34 parallelism int 31 35 } ··· 38 42 39 43 repoFetchTimeout: cfg.GitRepoFetchTimeout, 40 44 parallelism: cfg.ResyncParallelism, 45 + 46 + manualResyncTimeout: 30 * time.Minute, 41 47 } 42 48 } 43 49 ··· 73 79 } 74 80 } 75 81 82 + func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 83 + r.runningJobsMu.Lock() 84 + defer r.runningJobsMu.Unlock() 85 + 86 + if _, exists := r.runningJobs[repo]; exists { 87 + return 88 + } 89 + r.runningJobs[repo] = cancel 90 + } 91 + 92 + func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 93 + r.runningJobsMu.Lock() 94 + defer r.runningJobsMu.Unlock() 95 + 96 + delete(r.runningJobs, repo) 97 + } 98 + 99 + func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 100 + r.runningJobsMu.Lock() 101 + defer r.runningJobsMu.Unlock() 102 + 103 + cancel, ok := r.runningJobs[repo] 104 + if !ok { 105 + return 106 + } 107 + delete(r.runningJobs, repo) 108 + cancel() 109 + } 110 + 111 + // TriggerResyncJob manually triggers the resync job 112 + func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 113 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 114 + if err != nil { 115 + return fmt.Errorf("failed to get repo: %w", err) 116 + } 117 + if repo == nil { 118 + return fmt.Errorf("repo not found: %s", repoAt) 119 + } 120 + 121 + if repo.State == models.RepoStateResyncing { 122 + return fmt.Errorf("repo already resyncing") 123 + } 124 + 125 + repo.State = models.RepoStatePending 126 + repo.RetryAfter = -1 // resyncer will prioritize this 127 + 128 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 129 + return fmt.Errorf("updating repo state to pending %w", err) 130 + } 131 + return nil 132 + } 133 + 76 134 func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 77 135 // use mutex to prevent duplicated jobs 78 136 r.claimJobMu.Lock() ··· 86 144 where at_uri = ( 87 145 select at_uri from repos 88 146 where state in ($2, $3, $4) 89 - and (retry_after = 0 or retry_after < $5) 147 + and (retry_after = -1 or retry_after = 0 or retry_after < $5) 90 148 limit 1 91 149 ) 92 150 returning at_uri ··· 112 170 resyncsStarted.Inc() 113 171 startTime := time.Now() 114 172 115 - success, err := r.doResync(ctx, repoAt) 173 + jobCtx, cancel := context.WithCancel(ctx) 174 + r.registerRunning(repoAt, cancel) 175 + defer r.unregisterRunning(repoAt) 176 + 177 + success, err := r.doResync(jobCtx, repoAt) 116 178 if !success { 117 179 resyncsFailed.Inc() 118 180 resyncDuration.Observe(time.Since(startTime).Seconds()) ··· 140 202 // TODO: check if Knot is on backoff list. If so, return (false, nil) 141 203 // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 142 204 143 - fetchCtx, cancel := context.WithTimeout(ctx, r.repoFetchTimeout) 205 + timeout := r.repoFetchTimeout 206 + if repo.RetryAfter == -1 { 207 + timeout = r.manualResyncTimeout 208 + } 209 + fetchCtx, cancel := context.WithTimeout(ctx, timeout) 144 210 defer cancel() 145 211 146 212 if err := r.gitc.Sync(fetchCtx, repo); err != nil {
+12
knotmirror/templates/base.html
··· 21 21 <main id="main"> 22 22 {{template "content" .}} 23 23 </main> 24 + <div id="notifications"></div> 25 + <button hx-trigger="click" hx-post="/api/testNotif">click me</button> 26 + <script> 27 + document.body.addEventListener("htmx:afterProcessNode", (evt) => { 28 + const el = evt.detail.elt; 29 + if (!el.classList?.contains("notif")) return; 30 + 31 + setTimeout(() => { 32 + el.remove(); 33 + }, 30 * 1000); 34 + }); 35 + </script> 24 36 </body> 25 37 </html> 26 38 {{end}}