Monorepo for Tangled
at push-rukyyyptkmtm 239 lines 6.5 kB view raw
1package notify 2 3import ( 4 "bytes" 5 "context" 6 "crypto/hmac" 7 "crypto/sha256" 8 "encoding/hex" 9 "encoding/json" 10 "fmt" 11 "io" 12 "log/slog" 13 "net/http" 14 "time" 15 16 "github.com/avast/retry-go/v4" 17 "github.com/google/uuid" 18 "tangled.org/core/appview/db" 19 "tangled.org/core/appview/models" 20 "tangled.org/core/log" 21) 22 23type WebhookNotifier struct { 24 BaseNotifier 25 db *db.DB 26 logger *slog.Logger 27 client *http.Client 28} 29 30func NewWebhookNotifier(database *db.DB) *WebhookNotifier { 31 return &WebhookNotifier{ 32 db: database, 33 logger: log.New("webhook-notifier"), 34 client: &http.Client{ 35 Timeout: 30 * time.Second, 36 }, 37 } 38} 39 40// Push implements the Notifier interface for git push events 41func (w *WebhookNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 42 webhooks, err := db.GetActiveWebhooksForRepo(w.db, repo.RepoAt()) 43 if err != nil { 44 w.logger.Error("failed to get webhooks for repo", "repo", repo.RepoAt(), "err", err) 45 return 46 } 47 48 // check if any webhooks are subscribed to push events 49 var pushWebhooks []models.Webhook 50 for _, webhook := range webhooks { 51 if webhook.HasEvent(models.WebhookEventPush) { 52 pushWebhooks = append(pushWebhooks, webhook) 53 } 54 } 55 56 if len(pushWebhooks) == 0 { 57 return 58 } 59 60 payload, err := w.buildPushPayload(repo, ref, oldSha, newSha, committerDid) 61 if err != nil { 62 w.logger.Error("failed to build push payload", "repo", repo.RepoAt(), "err", err) 63 return 64 } 65 66 // Send webhooks 67 for _, webhook := range pushWebhooks { 68 go w.sendWebhook(ctx, webhook, string(models.WebhookEventPush), payload) 69 } 70} 71 72// buildPushPayload creates the webhook payload 73func (w *WebhookNotifier) buildPushPayload(repo *models.Repo, ref, oldSha, newSha, committerDid string) (*models.WebhookPayload, error) { 74 owner := repo.Did 75 76 pusher := committerDid 77 if committerDid == "" { 78 pusher = owner 79 } 80 81 // Build repository object 82 repository := models.WebhookRepository{ 83 Name: repo.Name, 84 FullName: fmt.Sprintf("%s/%s", repo.Did, repo.Name), 85 Description: repo.Description, 86 Fork: repo.Source != "", 87 HtmlUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Name), 88 CloneUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Name), 89 SshUrl: fmt.Sprintf("ssh://git@%s/%s/%s", repo.Knot, repo.Did, repo.Name), 90 CreatedAt: repo.Created.Format(time.RFC3339), 91 UpdatedAt: repo.Created.Format(time.RFC3339), 92 Owner: models.WebhookUser{ 93 Did: owner, 94 }, 95 } 96 97 // Add optional fields 98 if repo.Website != "" { 99 repository.Website = repo.Website 100 } 101 if repo.RepoStats != nil { 102 repository.StarsCount = repo.RepoStats.StarCount 103 repository.OpenIssues = repo.RepoStats.IssueCount.Open 104 } 105 106 // Build payload 107 payload := &models.WebhookPayload{ 108 Ref: ref, 109 Before: oldSha, 110 After: newSha, 111 Repository: repository, 112 Pusher: models.WebhookUser{ 113 Did: pusher, 114 }, 115 } 116 117 return payload, nil 118} 119 120// sendWebhook sends the webhook http request 121func (w *WebhookNotifier) sendWebhook(ctx context.Context, webhook models.Webhook, event string, payload *models.WebhookPayload) { 122 deliveryId := uuid.New().String() 123 124 payloadBytes, err := json.Marshal(payload) 125 if err != nil { 126 w.logger.Error("failed to marshal webhook payload", "webhook_id", webhook.Id, "err", err) 127 return 128 } 129 130 req, err := http.NewRequestWithContext(ctx, "POST", webhook.Url, bytes.NewReader(payloadBytes)) 131 if err != nil { 132 w.logger.Error("failed to create webhook request", "webhook_id", webhook.Id, "err", err) 133 return 134 } 135 136 shortSha := payload.After[:7] 137 138 req.Header.Set("Content-Type", "application/json") 139 req.Header.Set("User-Agent", "Tangled-Hook/"+shortSha) 140 req.Header.Set("X-Tangled-Event", event) 141 req.Header.Set("X-Tangled-Hook-ID", fmt.Sprintf("%d", webhook.Id)) 142 req.Header.Set("X-Tangled-Delivery", deliveryId) 143 req.Header.Set("X-Tangled-Repo", payload.Repository.FullName) 144 145 if webhook.Secret != "" { 146 signature := w.computeSignature(payloadBytes, webhook.Secret) 147 req.Header.Set("X-Tangled-Signature-256", "sha256="+signature) 148 } 149 150 delivery := &models.WebhookDelivery{ 151 WebhookId: webhook.Id, 152 Event: event, 153 DeliveryId: deliveryId, 154 Url: webhook.Url, 155 RequestBody: string(payloadBytes), 156 } 157 158 // retry webhook delivery with exponential backoff 159 retryOpts := []retry.Option{ 160 retry.Attempts(3), 161 retry.Delay(1 * time.Second), 162 retry.MaxDelay(10 * time.Second), 163 retry.DelayType(retry.BackOffDelay), 164 retry.LastErrorOnly(true), 165 retry.OnRetry(func(n uint, err error) { 166 w.logger.Info("retrying webhook delivery", 167 "webhook_id", webhook.Id, 168 "attempt", n+1, 169 "err", err) 170 }), 171 retry.Context(ctx), 172 retry.RetryIf(func(err error) bool { 173 // only retry on network errors or 5xx responses 174 if err != nil { 175 return true 176 } 177 return false 178 }), 179 } 180 181 var resp *http.Response 182 err = retry.Do(func() error { 183 var err error 184 resp, err = w.client.Do(req) 185 if err != nil { 186 return err 187 } 188 189 // retry on 5xx server errors 190 if resp.StatusCode >= 500 { 191 defer resp.Body.Close() 192 return fmt.Errorf("server error: %d", resp.StatusCode) 193 } 194 195 return nil 196 }, retryOpts...) 197 198 if err != nil { 199 w.logger.Error("webhook request failed after retries", "webhook_id", webhook.Id, "err", err) 200 delivery.Success = false 201 delivery.ResponseBody = err.Error() 202 } else { 203 defer resp.Body.Close() 204 205 delivery.ResponseCode = resp.StatusCode 206 delivery.Success = resp.StatusCode >= 200 && resp.StatusCode < 300 207 208 // Read response body (limit to 10KB) 209 bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024)) 210 if err != nil { 211 w.logger.Warn("failed to read webhook response body", "webhook_id", webhook.Id, "err", err) 212 } else { 213 delivery.ResponseBody = string(bodyBytes) 214 } 215 216 if !delivery.Success { 217 w.logger.Warn("webhook delivery failed", 218 "webhook_id", webhook.Id, 219 "status", resp.StatusCode, 220 "url", webhook.Url) 221 } else { 222 w.logger.Info("webhook delivered successfully", 223 "webhook_id", webhook.Id, 224 "url", webhook.Url, 225 "delivery_id", deliveryId) 226 } 227 } 228 229 if err := db.AddWebhookDelivery(w.db, delivery); err != nil { 230 w.logger.Error("failed to record webhook delivery", "webhook_id", webhook.Id, "err", err) 231 } 232} 233 234// computeSignature computes HMAC-SHA256 signature for the payload 235func (w *WebhookNotifier) computeSignature(payload []byte, secret string) string { 236 mac := hmac.New(sha256.New, []byte(secret)) 237 mac.Write(payload) 238 return hex.EncodeToString(mac.Sum(nil)) 239}