Monorepo for Tangled
at master 308 lines 6.5 kB view raw
1package db 2 3import ( 4 "database/sql" 5 "fmt" 6 "strings" 7 "time" 8 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "tangled.org/core/appview/models" 11 "tangled.org/core/orm" 12) 13 14// GetWebhooks returns all webhooks for a repository 15func GetWebhooks(e Execer, filters ...orm.Filter) ([]models.Webhook, error) { 16 var conditions []string 17 var args []any 18 for _, filter := range filters { 19 conditions = append(conditions, filter.Condition()) 20 args = append(args, filter.Arg()...) 21 } 22 23 whereClause := "" 24 if conditions != nil { 25 whereClause = " where " + strings.Join(conditions, " and ") 26 } 27 28 query := fmt.Sprintf(` 29 select 30 id, 31 repo_at, 32 url, 33 secret, 34 active, 35 events, 36 created_at, 37 updated_at, 38 repo_did 39 from webhooks 40 %s 41 order by created_at desc 42 `, whereClause) 43 44 rows, err := e.Query(query, args...) 45 if err != nil { 46 return nil, fmt.Errorf("failed to query webhooks: %w", err) 47 } 48 defer rows.Close() 49 50 var webhooks []models.Webhook 51 for rows.Next() { 52 var wh models.Webhook 53 var createdAt, updatedAt, eventsStr string 54 var secret, whRepoDid sql.NullString 55 var active int 56 57 err := rows.Scan( 58 &wh.Id, 59 &wh.RepoAt, 60 &wh.Url, 61 &secret, 62 &active, 63 &eventsStr, 64 &createdAt, 65 &updatedAt, 66 &whRepoDid, 67 ) 68 if err != nil { 69 return nil, fmt.Errorf("failed to scan webhook: %w", err) 70 } 71 72 if secret.Valid { 73 wh.Secret = secret.String 74 } 75 wh.Active = active == 1 76 if eventsStr != "" { 77 wh.Events = strings.Split(eventsStr, ",") 78 } 79 80 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 81 wh.CreatedAt = t 82 } 83 if t, err := time.Parse(time.RFC3339, updatedAt); err == nil { 84 wh.UpdatedAt = t 85 } 86 if whRepoDid.Valid { 87 wh.RepoDid = whRepoDid.String 88 } 89 90 webhooks = append(webhooks, wh) 91 } 92 93 if err = rows.Err(); err != nil { 94 return nil, fmt.Errorf("failed to iterate webhooks: %w", err) 95 } 96 97 return webhooks, nil 98} 99 100// GetWebhook returns a single webhook by ID 101func GetWebhook(e Execer, id int64) (*models.Webhook, error) { 102 webhooks, err := GetWebhooks(e, orm.FilterEq("id", id)) 103 if err != nil { 104 return nil, err 105 } 106 107 if len(webhooks) == 0 { 108 return nil, sql.ErrNoRows 109 } 110 111 if len(webhooks) != 1 { 112 return nil, fmt.Errorf("expected 1 webhook, got %d", len(webhooks)) 113 } 114 115 return &webhooks[0], nil 116} 117 118// AddWebhook creates a new webhook 119func AddWebhook(e Execer, webhook *models.Webhook) error { 120 eventsStr := strings.Join(webhook.Events, ",") 121 active := 0 122 if webhook.Active { 123 active = 1 124 } 125 126 var repoDid *string 127 if webhook.RepoDid != "" { 128 repoDid = &webhook.RepoDid 129 } 130 131 result, err := e.Exec(` 132 insert into webhooks (repo_at, url, secret, active, events, repo_did) 133 values (?, ?, ?, ?, ?, ?) 134 `, webhook.RepoAt.String(), webhook.Url, webhook.Secret, active, eventsStr, repoDid) 135 136 if err != nil { 137 return fmt.Errorf("failed to insert webhook: %w", err) 138 } 139 140 id, err := result.LastInsertId() 141 if err != nil { 142 return fmt.Errorf("failed to get webhook id: %w", err) 143 } 144 145 webhook.Id = id 146 return nil 147} 148 149// UpdateWebhook updates an existing webhook 150func UpdateWebhook(e Execer, webhook *models.Webhook) error { 151 eventsStr := strings.Join(webhook.Events, ",") 152 active := 0 153 if webhook.Active { 154 active = 1 155 } 156 157 _, err := e.Exec(` 158 update webhooks 159 set url = ?, secret = ?, active = ?, events = ?, updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') 160 where id = ? 161 `, webhook.Url, webhook.Secret, active, eventsStr, webhook.Id) 162 163 if err != nil { 164 return fmt.Errorf("failed to update webhook: %w", err) 165 } 166 167 return nil 168} 169 170// DeleteWebhook deletes a webhook 171func DeleteWebhook(e Execer, id int64) error { 172 _, err := e.Exec(`delete from webhooks where id = ?`, id) 173 if err != nil { 174 return fmt.Errorf("failed to delete webhook: %w", err) 175 } 176 return nil 177} 178 179// AddWebhookDelivery records a webhook delivery attempt 180func AddWebhookDelivery(e Execer, delivery *models.WebhookDelivery) error { 181 success := 0 182 if delivery.Success { 183 success = 1 184 } 185 186 result, err := e.Exec(` 187 insert into webhook_deliveries ( 188 webhook_id, 189 event, 190 delivery_id, 191 url, 192 request_body, 193 response_code, 194 response_body, 195 success 196 ) values (?, ?, ?, ?, ?, ?, ?, ?) 197 `, 198 delivery.WebhookId, 199 delivery.Event, 200 delivery.DeliveryId, 201 delivery.Url, 202 delivery.RequestBody, 203 delivery.ResponseCode, 204 delivery.ResponseBody, 205 success, 206 ) 207 208 if err != nil { 209 return fmt.Errorf("failed to insert webhook delivery: %w", err) 210 } 211 212 id, err := result.LastInsertId() 213 if err != nil { 214 return fmt.Errorf("failed to get delivery id: %w", err) 215 } 216 217 delivery.Id = id 218 return nil 219} 220 221// GetWebhookDeliveries returns recent deliveries for a webhook 222func GetWebhookDeliveries(e Execer, webhookId int64, limit int) ([]models.WebhookDelivery, error) { 223 if limit <= 0 { 224 limit = 20 225 } 226 227 query := ` 228 select 229 id, 230 webhook_id, 231 event, 232 delivery_id, 233 url, 234 request_body, 235 response_code, 236 response_body, 237 success, 238 created_at 239 from webhook_deliveries 240 where webhook_id = ? 241 order by created_at desc 242 limit ? 243 ` 244 245 rows, err := e.Query(query, webhookId, limit) 246 if err != nil { 247 return nil, fmt.Errorf("failed to query webhook deliveries: %w", err) 248 } 249 defer rows.Close() 250 251 var deliveries []models.WebhookDelivery 252 for rows.Next() { 253 var d models.WebhookDelivery 254 var createdAt string 255 var success int 256 var responseCode sql.NullInt64 257 var responseBody sql.NullString 258 259 err := rows.Scan( 260 &d.Id, 261 &d.WebhookId, 262 &d.Event, 263 &d.DeliveryId, 264 &d.Url, 265 &d.RequestBody, 266 &responseCode, 267 &responseBody, 268 &success, 269 &createdAt, 270 ) 271 if err != nil { 272 return nil, fmt.Errorf("failed to scan delivery: %w", err) 273 } 274 275 d.Success = success == 1 276 if responseCode.Valid { 277 d.ResponseCode = int(responseCode.Int64) 278 } 279 if responseBody.Valid { 280 d.ResponseBody = responseBody.String 281 } 282 283 if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 284 d.CreatedAt = t 285 } 286 287 deliveries = append(deliveries, d) 288 } 289 290 if err = rows.Err(); err != nil { 291 return nil, fmt.Errorf("failed to iterate deliveries: %w", err) 292 } 293 294 return deliveries, nil 295} 296 297// GetWebhooksForRepo is a convenience function to get all webhooks for a repository 298func GetWebhooksForRepo(e Execer, repoAt syntax.ATURI) ([]models.Webhook, error) { 299 return GetWebhooks(e, orm.FilterEq("repo_at", repoAt.String())) 300} 301 302// GetActiveWebhooksForRepo returns only active webhooks for a repository 303func GetActiveWebhooksForRepo(e Execer, repoAt syntax.ATURI) ([]models.Webhook, error) { 304 return GetWebhooks(e, 305 orm.FilterEq("repo_at", repoAt.String()), 306 orm.FilterEq("active", 1), 307 ) 308}