Auto-indexing service and GraphQL API for AT Protocol Records quickslice.slices.network/
atproto gleam graphql
at main 731 lines 19 kB view raw
1/// Edge case and error handling tests for where clause functionality 2/// 3/// Tests various edge cases, error conditions, and potential SQL injection attempts 4import database/executor.{Int, Text} 5import database/queries/where_clause 6import gleam/dict 7import gleam/list 8import gleam/option.{None, Some} 9import gleam/string 10import gleeunit 11import gleeunit/should 12import lexicon_graphql/input/where as where_input 13import swell/value 14import test_helpers 15 16pub fn main() { 17 gleeunit.main() 18} 19 20// ===== Empty/Nil Tests ===== 21 22pub fn empty_where_clause_test() { 23 let assert Ok(exec) = test_helpers.create_test_db() 24 let clause = 25 where_clause.WhereClause(conditions: dict.new(), and: None, or: None) 26 27 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 28 29 sql |> should.equal("") 30 list.length(params) |> should.equal(0) 31} 32 33pub fn all_conditions_none_test() { 34 let assert Ok(exec) = test_helpers.create_test_db() 35 let condition = 36 where_clause.WhereCondition( 37 eq: None, 38 in_values: None, 39 contains: None, 40 gt: None, 41 gte: None, 42 lt: None, 43 lte: None, 44 is_null: None, 45 is_numeric: False, 46 ) 47 let clause = 48 where_clause.WhereClause( 49 conditions: dict.from_list([#("field", condition)]), 50 and: None, 51 or: None, 52 ) 53 54 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 55 56 // Should produce no SQL since all conditions are None 57 sql |> should.equal("") 58 list.length(params) |> should.equal(0) 59} 60 61pub fn empty_in_list_test() { 62 let assert Ok(exec) = test_helpers.create_test_db() 63 let condition = 64 where_clause.WhereCondition( 65 eq: None, 66 in_values: Some([]), 67 contains: None, 68 gt: None, 69 gte: None, 70 lt: None, 71 lte: None, 72 is_null: None, 73 is_numeric: False, 74 ) 75 let clause = 76 where_clause.WhereClause( 77 conditions: dict.from_list([#("status", condition)]), 78 and: None, 79 or: None, 80 ) 81 82 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 83 84 // Empty IN list should produce no SQL 85 sql |> should.equal("") 86 list.length(params) |> should.equal(0) 87} 88 89pub fn empty_and_clause_list_test() { 90 let assert Ok(exec) = test_helpers.create_test_db() 91 let clause = 92 where_clause.WhereClause(conditions: dict.new(), and: Some([]), or: None) 93 94 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 95 96 sql |> should.equal("") 97 list.length(params) |> should.equal(0) 98} 99 100pub fn empty_or_clause_list_test() { 101 let assert Ok(exec) = test_helpers.create_test_db() 102 let clause = 103 where_clause.WhereClause(conditions: dict.new(), and: None, or: Some([])) 104 105 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 106 107 sql |> should.equal("") 108 list.length(params) |> should.equal(0) 109} 110 111// ===== SQL Injection Prevention Tests ===== 112 113pub fn sql_injection_in_string_value_test() { 114 let assert Ok(exec) = test_helpers.create_test_db() 115 // Try common SQL injection patterns - should be safely parameterized 116 let malicious_strings = [ 117 "'; DROP TABLE records; --", 118 "' OR '1'='1", 119 "admin'--", 120 "1' UNION SELECT * FROM records--", 121 "'; DELETE FROM records WHERE ''='", 122 ] 123 124 list.each(malicious_strings, fn(malicious) { 125 let condition = 126 where_clause.WhereCondition( 127 eq: Some(Text(malicious)), 128 in_values: None, 129 contains: None, 130 gt: None, 131 gte: None, 132 lt: None, 133 lte: None, 134 is_null: None, 135 is_numeric: False, 136 ) 137 let clause = 138 where_clause.WhereClause( 139 conditions: dict.from_list([#("username", condition)]), 140 and: None, 141 or: None, 142 ) 143 144 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 145 146 // Should use parameterized query with json_extract for non-table columns 147 sql |> should.equal("json_extract(json, '$.username') = ?") 148 list.length(params) |> should.equal(1) 149 150 // The malicious string should be in params, not in SQL 151 // SQL should not contain the injection string 152 should.be_false(string.contains(sql, "DROP TABLE")) 153 should.be_false(string.contains(sql, "DELETE FROM")) 154 }) 155} 156 157pub fn sql_injection_in_contains_test() { 158 let assert Ok(exec) = test_helpers.create_test_db() 159 // Contains should also be parameterized 160 let malicious = "'; DROP TABLE records; --" 161 162 let condition = 163 where_clause.WhereCondition( 164 eq: None, 165 in_values: None, 166 contains: Some(malicious), 167 gt: None, 168 gte: None, 169 lt: None, 170 lte: None, 171 is_null: None, 172 is_numeric: False, 173 ) 174 let clause = 175 where_clause.WhereClause( 176 conditions: dict.from_list([#("description", condition)]), 177 and: None, 178 or: None, 179 ) 180 181 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 182 183 // Should use LIKE with parameterized value and json_extract for non-table fields 184 sql 185 |> should.equal( 186 "json_extract(json, '$.description') LIKE '%' || ? || '%' COLLATE NOCASE", 187 ) 188 list.length(params) |> should.equal(1) 189} 190 191// ===== Type Edge Cases ===== 192 193pub fn integer_boundary_values_test() { 194 let assert Ok(exec) = test_helpers.create_test_db() 195 // Test with very large and very small integers 196 let large_int = 2_147_483_647 197 let small_int = -2_147_483_648 198 199 let condition_large = 200 where_clause.WhereCondition( 201 eq: Some(Int(large_int)), 202 in_values: None, 203 contains: None, 204 gt: None, 205 gte: None, 206 lt: None, 207 lte: None, 208 is_null: None, 209 is_numeric: False, 210 ) 211 212 let condition_small = 213 where_clause.WhereCondition( 214 eq: Some(Int(small_int)), 215 in_values: None, 216 contains: None, 217 gt: None, 218 gte: None, 219 lt: None, 220 lte: None, 221 is_null: None, 222 is_numeric: False, 223 ) 224 225 let clause_large = 226 where_clause.WhereClause( 227 conditions: dict.from_list([#("count", condition_large)]), 228 and: None, 229 or: None, 230 ) 231 232 let clause_small = 233 where_clause.WhereClause( 234 conditions: dict.from_list([#("count", condition_small)]), 235 and: None, 236 or: None, 237 ) 238 239 let #(sql_large, params_large) = 240 where_clause.build_where_sql(exec, clause_large, False, 1) 241 let #(sql_small, params_small) = 242 where_clause.build_where_sql(exec, clause_small, False, 1) 243 244 // count is a JSON field, not a table column 245 sql_large |> should.equal("json_extract(json, '$.count') = ?") 246 list.length(params_large) |> should.equal(1) 247 248 sql_small |> should.equal("json_extract(json, '$.count') = ?") 249 list.length(params_small) |> should.equal(1) 250} 251 252pub fn empty_string_value_test() { 253 let assert Ok(exec) = test_helpers.create_test_db() 254 let condition = 255 where_clause.WhereCondition( 256 eq: Some(Text("")), 257 in_values: None, 258 contains: None, 259 gt: None, 260 gte: None, 261 lt: None, 262 lte: None, 263 is_null: None, 264 is_numeric: False, 265 ) 266 let clause = 267 where_clause.WhereClause( 268 conditions: dict.from_list([#("name", condition)]), 269 and: None, 270 or: None, 271 ) 272 273 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 274 275 // Empty string is still valid - should use json_extract for non-table columns 276 sql |> should.equal("json_extract(json, '$.name') = ?") 277 list.length(params) |> should.equal(1) 278} 279 280pub fn unicode_string_value_test() { 281 let assert Ok(exec) = test_helpers.create_test_db() 282 // Test with various Unicode characters 283 let unicode_strings = [ 284 "Hello 世界", 285 "🚀 Rocket", 286 "Café", 287 "Москва", 288 "مرحبا", 289 ] 290 291 list.each(unicode_strings, fn(unicode_str) { 292 let condition = 293 where_clause.WhereCondition( 294 eq: Some(Text(unicode_str)), 295 in_values: None, 296 contains: None, 297 gt: None, 298 gte: None, 299 lt: None, 300 lte: None, 301 is_null: None, 302 is_numeric: False, 303 ) 304 let clause = 305 where_clause.WhereClause( 306 conditions: dict.from_list([#("text", condition)]), 307 and: None, 308 or: None, 309 ) 310 311 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 312 313 sql |> should.equal("json_extract(json, '$.text') = ?") 314 list.length(params) |> should.equal(1) 315 }) 316} 317 318// ===== Complex Nesting Edge Cases ===== 319 320pub fn deeply_nested_and_clauses_test() { 321 let assert Ok(exec) = test_helpers.create_test_db() 322 // Create deeply nested AND clauses (5 levels) 323 let inner_condition = 324 where_clause.WhereCondition( 325 eq: Some(Text("value")), 326 in_values: None, 327 contains: None, 328 gt: None, 329 gte: None, 330 lt: None, 331 lte: None, 332 is_null: None, 333 is_numeric: False, 334 ) 335 336 let level1 = 337 where_clause.WhereClause( 338 conditions: dict.from_list([#("field1", inner_condition)]), 339 and: None, 340 or: None, 341 ) 342 let level2 = 343 where_clause.WhereClause( 344 conditions: dict.new(), 345 and: Some([level1]), 346 or: None, 347 ) 348 let level3 = 349 where_clause.WhereClause( 350 conditions: dict.new(), 351 and: Some([level2]), 352 or: None, 353 ) 354 let level4 = 355 where_clause.WhereClause( 356 conditions: dict.new(), 357 and: Some([level3]), 358 or: None, 359 ) 360 let level5 = 361 where_clause.WhereClause( 362 conditions: dict.new(), 363 and: Some([level4]), 364 or: None, 365 ) 366 367 let #(sql, params) = where_clause.build_where_sql(exec, level5, False, 1) 368 369 // With single condition at each level, no extra parentheses needed 370 // The implementation correctly doesn't add unnecessary parentheses for single conditions 371 sql |> should.equal("json_extract(json, '$.field1') = ?") 372 list.length(params) |> should.equal(1) 373} 374 375pub fn mixed_empty_and_non_empty_conditions_test() { 376 let assert Ok(exec) = test_helpers.create_test_db() 377 // Mix conditions with Some and None values 378 let condition1 = 379 where_clause.WhereCondition( 380 eq: Some(Text("value1")), 381 in_values: None, 382 contains: None, 383 gt: None, 384 gte: None, 385 lt: None, 386 lte: None, 387 is_null: None, 388 is_numeric: False, 389 ) 390 391 let condition2 = 392 where_clause.WhereCondition( 393 eq: None, 394 in_values: None, 395 contains: None, 396 gt: None, 397 gte: None, 398 lt: None, 399 lte: None, 400 is_null: None, 401 is_numeric: False, 402 ) 403 404 let condition3 = 405 where_clause.WhereCondition( 406 eq: Some(Text("value3")), 407 in_values: None, 408 contains: None, 409 gt: None, 410 gte: None, 411 lt: None, 412 lte: None, 413 is_null: None, 414 is_numeric: False, 415 ) 416 417 let clause = 418 where_clause.WhereClause( 419 conditions: dict.from_list([ 420 #("field1", condition1), 421 #("field2", condition2), 422 #("field3", condition3), 423 ]), 424 and: None, 425 or: None, 426 ) 427 428 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 429 430 // Should only include non-empty conditions 431 list.length(params) |> should.equal(2) 432 // SQL should contain field1 and field3 (with json_extract), but not field2 433 // Dict iteration order is not guaranteed, so check for both possible orders 434 let expected1 = 435 "json_extract(json, '$.field1') = ? AND json_extract(json, '$.field3') = ?" 436 let expected2 = 437 "json_extract(json, '$.field3') = ? AND json_extract(json, '$.field1') = ?" 438 should.be_true(sql == expected1 || sql == expected2) 439} 440 441// ===== GraphQL Parser Edge Cases ===== 442 443pub fn parse_invalid_graphql_value_test() { 444 // Parse a non-object value 445 let invalid = value.String("not an object") 446 447 let result = where_input.parse_where_clause(invalid) 448 449 // Should return empty clause 450 where_input.is_clause_empty(result) |> should.be_true 451} 452 453pub fn parse_empty_object_test() { 454 let empty_object = value.Object([]) 455 456 let result = where_input.parse_where_clause(empty_object) 457 458 where_input.is_clause_empty(result) |> should.be_true 459} 460 461pub fn parse_unknown_operator_test() { 462 // Include an unknown operator that should be ignored 463 let condition_value = 464 value.Object([ 465 #("eq", value.String("test")), 466 #("unknown_op", value.String("should_be_ignored")), 467 ]) 468 469 let where_object = value.Object([#("field1", condition_value)]) 470 471 let result = where_input.parse_where_clause(where_object) 472 473 // Should parse eq but ignore unknown_op 474 let conditions = result.conditions 475 case dict.get(conditions, "field1") { 476 Ok(cond) -> { 477 // eq should be parsed 478 case cond.eq { 479 Some(where_input.StringValue("test")) -> should.be_true(True) 480 _ -> should.fail() 481 } 482 } 483 Error(_) -> should.fail() 484 } 485} 486 487pub fn parse_type_mismatch_in_operator_test() { 488 // Try to parse eq with a list instead of a scalar 489 let condition_value = 490 value.Object([ 491 #("eq", value.List([value.String("test")])), 492 ]) 493 494 let where_object = value.Object([#("field1", condition_value)]) 495 496 let result = where_input.parse_where_clause(where_object) 497 498 // Should handle gracefully - eq should be None 499 let conditions = result.conditions 500 case dict.get(conditions, "field1") { 501 Ok(cond) -> { 502 case cond.eq { 503 None -> should.be_true(True) 504 _ -> should.fail() 505 } 506 } 507 Error(_) -> should.fail() 508 } 509} 510 511pub fn parse_mixed_types_in_in_list_test() { 512 // IN list with mixed types - should filter out invalid ones 513 let condition_value = 514 value.Object([ 515 #( 516 "in", 517 value.List([ 518 value.String("valid1"), 519 value.Int(42), 520 value.List([]), 521 // Invalid - nested list 522 value.String("valid2"), 523 value.Object([]), 524 // Invalid - object 525 ]), 526 ), 527 ]) 528 529 let where_object = value.Object([#("field1", condition_value)]) 530 531 let result = where_input.parse_where_clause(where_object) 532 533 // Should only include valid scalar values 534 let conditions = result.conditions 535 case dict.get(conditions, "field1") { 536 Ok(cond) -> { 537 case cond.in_values { 538 Some(values) -> { 539 // Should have 3 valid values (2 strings + 1 int) 540 list.length(values) |> should.equal(3) 541 } 542 None -> should.fail() 543 } 544 } 545 Error(_) -> should.fail() 546 } 547} 548 549// ===== Multiple Operators on Same Field ===== 550 551pub fn multiple_operators_same_field_test() { 552 let assert Ok(exec) = test_helpers.create_test_db() 553 // Test range query: gt AND lt on same field 554 let condition = 555 where_clause.WhereCondition( 556 eq: None, 557 in_values: None, 558 contains: None, 559 gt: Some(Int(10)), 560 gte: None, 561 lt: Some(Int(100)), 562 lte: None, 563 is_null: None, 564 is_numeric: True, 565 ) 566 let clause = 567 where_clause.WhereClause( 568 conditions: dict.from_list([#("age", condition)]), 569 and: None, 570 or: None, 571 ) 572 573 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 574 575 // Should combine both operators with json_extract and CAST for numeric comparison 576 sql 577 |> should.equal( 578 "CAST(json_extract(json, '$.age') AS INTEGER) > ? AND CAST(json_extract(json, '$.age') AS INTEGER) < ?", 579 ) 580 list.length(params) |> should.equal(2) 581} 582 583pub fn conflicting_operators_test() { 584 let assert Ok(exec) = test_helpers.create_test_db() 585 // eq and in on same field - both should be applied with AND 586 let condition = 587 where_clause.WhereCondition( 588 eq: Some(Text("exact")), 589 in_values: Some([Text("val1"), Text("val2")]), 590 contains: None, 591 gt: None, 592 gte: None, 593 lt: None, 594 lte: None, 595 is_null: None, 596 is_numeric: False, 597 ) 598 let clause = 599 where_clause.WhereClause( 600 conditions: dict.from_list([#("status", condition)]), 601 and: None, 602 or: None, 603 ) 604 605 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 606 607 // Should apply both (though logically this might not make sense) 608 list.length(params) |> should.equal(3) 609 // Check for both possible orders with json_extract 610 let expected1 = 611 "json_extract(json, '$.status') = ? AND json_extract(json, '$.status') IN (?, ?)" 612 let expected2 = 613 "json_extract(json, '$.status') IN (?, ?) AND json_extract(json, '$.status') = ?" 614 should.be_true(sql == expected1 || sql == expected2) 615} 616 617// ===== Large IN Lists ===== 618 619pub fn large_in_list_test() { 620 let assert Ok(exec) = test_helpers.create_test_db() 621 // Test with 100 items in IN list 622 let large_list = 623 list.range(1, 100) 624 |> list.map(fn(i) { Int(i) }) 625 626 let condition = 627 where_clause.WhereCondition( 628 eq: None, 629 in_values: Some(large_list), 630 contains: None, 631 gt: None, 632 gte: None, 633 lt: None, 634 lte: None, 635 is_null: None, 636 is_numeric: False, 637 ) 638 let clause = 639 where_clause.WhereClause( 640 conditions: dict.from_list([#("id", condition)]), 641 and: None, 642 or: None, 643 ) 644 645 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 646 647 // Should generate correct number of placeholders 648 list.length(params) |> should.equal(100) 649 // Check SQL has correct IN clause structure 650 should.be_true( 651 sql 652 |> fn(s) { 653 s 654 |> fn(str) { 655 str 656 |> fn(_) { True } 657 } 658 }, 659 ) 660} 661 662// ===== Special Characters in Field Names ===== 663 664pub fn field_name_with_json_path_test() { 665 let assert Ok(exec) = test_helpers.create_test_db() 666 // Test JSON path field names 667 let condition = 668 where_clause.WhereCondition( 669 eq: Some(Text("value")), 670 in_values: None, 671 contains: None, 672 gt: None, 673 gte: None, 674 lt: None, 675 lte: None, 676 is_null: None, 677 is_numeric: False, 678 ) 679 let clause = 680 where_clause.WhereClause( 681 conditions: dict.from_list([#("value.nested.field", condition)]), 682 and: None, 683 or: None, 684 ) 685 686 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 687 688 // Should use json_extract for dotted field names 689 sql 690 |> should.equal("json_extract(json, '$.value.nested.field') = ?") 691 list.length(params) |> should.equal(1) 692} 693 694pub fn field_name_with_special_chars_test() { 695 let assert Ok(exec) = test_helpers.create_test_db() 696 // Field names with underscores, numbers, etc. 697 let special_field_names = [ 698 "field_with_underscore", 699 "field123", 700 "field_123_test", 701 "UPPERCASE_FIELD", 702 ] 703 704 list.each(special_field_names, fn(field_name) { 705 let condition = 706 where_clause.WhereCondition( 707 eq: Some(Text("test")), 708 in_values: None, 709 contains: None, 710 gt: None, 711 gte: None, 712 lt: None, 713 lte: None, 714 is_null: None, 715 is_numeric: False, 716 ) 717 let clause = 718 where_clause.WhereClause( 719 conditions: dict.from_list([#(field_name, condition)]), 720 and: None, 721 or: None, 722 ) 723 724 let #(sql, params) = where_clause.build_where_sql(exec, clause, False, 1) 725 726 // Should use json_extract even for non-dotted field names 727 let expected = "json_extract(json, '$." <> field_name <> "') = ?" 728 sql |> should.equal(expected) 729 list.length(params) |> should.equal(1) 730 }) 731}