Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 116 lines 3.4 kB view raw
1/// OAuth request validation utilities 2import gleam/list 3import gleam/option.{type Option} 4import gleam/string 5import gleam/uri 6import lib/oauth/types/error.{type OAuthError, InvalidRequest, InvalidScope} 7 8/// Validate redirect URI format 9pub fn validate_redirect_uri(uri_string: String) -> Result(Nil, OAuthError) { 10 case uri.parse(uri_string) { 11 Ok(parsed) -> { 12 case parsed.scheme { 13 option.Some(scheme) -> { 14 case scheme { 15 "https" -> Ok(Nil) 16 "http" -> { 17 case parsed.host { 18 option.Some("localhost") -> Ok(Nil) 19 option.Some("127.0.0.1") -> Ok(Nil) 20 option.Some("[::1]") -> Ok(Nil) 21 _ -> 22 Error(InvalidRequest( 23 "HTTP redirect URIs only allowed for localhost", 24 )) 25 } 26 } 27 _ -> { 28 case parsed.fragment { 29 option.None -> Ok(Nil) 30 option.Some(_) -> 31 Error(InvalidRequest("Redirect URI must not contain fragment")) 32 } 33 } 34 } 35 } 36 option.None -> Error(InvalidRequest("Redirect URI must have a scheme")) 37 } 38 } 39 Error(_) -> Error(InvalidRequest("Invalid redirect URI format")) 40 } 41} 42 43/// Check if redirect URI matches registered URI 44pub fn matches_redirect_uri( 45 requested: String, 46 registered: String, 47 require_exact: Bool, 48) -> Bool { 49 case require_exact { 50 True -> requested == registered 51 False -> string.starts_with(requested, registered) 52 } 53} 54 55/// Validate that redirect URI matches one of the registered URIs 56pub fn validate_redirect_uri_match( 57 requested: String, 58 registered_uris: List(String), 59 require_exact: Bool, 60) -> Result(Nil, OAuthError) { 61 let matches = 62 list.any(registered_uris, fn(registered) { 63 matches_redirect_uri(requested, registered, require_exact) 64 }) 65 66 case matches { 67 True -> Ok(Nil) 68 False -> 69 Error(InvalidRequest("Redirect URI does not match any registered URIs")) 70 } 71} 72 73/// Validate PKCE code challenge method 74pub fn validate_code_challenge_method(method: String) -> Result(Nil, OAuthError) { 75 case method { 76 "S256" -> Ok(Nil) 77 "plain" -> 78 Error(InvalidRequest("PKCE method 'plain' is not allowed, use S256")) 79 _ -> Error(InvalidRequest("Invalid code_challenge_method: " <> method)) 80 } 81} 82 83/// Validate scope format and allowed scopes 84pub fn validate_scope( 85 requested: Option(String), 86 allowed: Option(String), 87) -> Result(Nil, OAuthError) { 88 case requested { 89 option.None -> Ok(Nil) 90 option.Some(req) -> { 91 case string.is_empty(req) { 92 True -> Error(InvalidScope("Scope cannot be empty string")) 93 False -> { 94 case allowed { 95 option.None -> 96 Error(InvalidScope("No scopes allowed for this client")) 97 option.Some(allow) -> { 98 let req_scopes = string.split(req, " ") 99 let allow_scopes = string.split(allow, " ") 100 101 let all_allowed = 102 list.all(req_scopes, fn(scope) { 103 list.contains(allow_scopes, scope) 104 }) 105 106 case all_allowed { 107 True -> Ok(Nil) 108 False -> Error(InvalidScope("Requested scope not allowed")) 109 } 110 } 111 } 112 } 113 } 114 } 115 } 116}