wip: currently rewriting the project as a full stack application tangled.org/kacaii.dev/sigo
gleam

:art: move `fetch_user_data` to the on_onit group

+66 -53
+66 -53
src/app/web/socket.gleam
··· 16 16 import gleam/int 17 17 import gleam/json 18 18 import gleam/list 19 - import gleam/option 19 + import gleam/option.{type Option, None, Some} 20 20 import gleam/result 21 21 import gleam/string 22 22 import gleam/time/timestamp ··· 65 65 ///  User connected to this socket 66 66 user_uuid: uuid.Uuid, 67 67 ///  Selector being used for the process 68 - selector: option.Option(process.Selector(msg.Msg)), 69 - /// 󱥁 Notifications that the user wants to receive 68 + selector: Option(process.Selector(msg.Msg)), 69 + /// 󱥁 Occurrence notifications than an user is subscribed to 70 70 subscribed: List(category.Category), 71 71 ///  Brigades that an user has been assigned to 72 72 brigade_list: List(uuid.Uuid), 73 73 ) 74 74 } 75 75 76 + ///  Broadcast a message to all active users 77 + pub fn broadcast( 78 + registry registry: group_registry.GroupRegistry(msg.Msg), 79 + message message: msg.Msg, 80 + ) -> Nil { 81 + let members = group_registry.members(registry, ws_topic) 82 + 83 + use member <- list.each(members) 84 + process.spawn(fn() { process.send(member, message) }) 85 + } 86 + 76 87 fn handle_connection( 77 88 req: request.Request(mist.Connection), 78 89 ctx: Context, ··· 81 92 ) -> response.Response(mist.ResponseData) { 82 93 case fetch_user_data(ctx, user_uuid) { 83 94 Error(err) -> handle_error(err) 84 - Ok(state) -> 85 - case request.path_segments(req) { 86 - ["ws"] -> 87 - mist.websocket( 88 - request: req, 89 - on_init: ws_on_init(_, req:, ctx:, registry:, state:), 90 - on_close: ws_on_close(_, ctx:, registry:), 91 - handler: fn(state, msg, conn) { 92 - ws_handler(state:, msg:, conn:, ctx:, registry:) 93 - }, 94 - ) 95 - 96 - _ -> send_response("Not found", 404) 97 - } 95 + Ok(state) -> route_request(req, ctx, registry, state) 98 96 } 99 97 } 100 98 101 - /// Queries the Database and builds the initial state of the user 102 - fn fetch_user_data( 99 + fn route_request( 100 + req: request.Request(mist.Connection), 103 101 ctx: Context, 104 - user_uuid: uuid.Uuid, 105 - ) -> Result(State, WebSocketError) { 106 - use brigade_list <- result.try(fetch_brigades(ctx, user_uuid)) 107 - use subscribed <- result.map(fetch_categories(ctx, user_uuid)) 108 - State(user_uuid:, subscribed:, brigade_list:, selector: option.None) 102 + registry: group_registry.GroupRegistry(msg.Msg), 103 + state: State, 104 + ) -> response.Response(mist.ResponseData) { 105 + case request.path_segments(req) { 106 + ["ws"] -> 107 + mist.websocket( 108 + request: req, 109 + on_init: ws_on_init(_, req:, ctx:, registry:, state:), 110 + on_close: ws_on_close(_, ctx:, registry:), 111 + handler: fn(state, msg, conn) { 112 + ws_handler(state:, msg:, conn:, ctx:, registry:) 113 + }, 114 + ) 115 + 116 + _ -> send_response("Not found", 404) 117 + } 109 118 } 110 119 111 120 fn ws_handler( ··· 310 319 ctx _ctx: Context, 311 320 registry registry: group_registry.GroupRegistry(msg.Msg), 312 321 state state: State, 313 - ) -> #(State, option.Option(process.Selector(msg.Msg))) { 322 + ) -> #(State, Option(process.Selector(msg.Msg))) { 314 323 let self = process.self() 315 324 let group_subject = group_registry.join(registry, ws_topic, self) 316 325 ··· 323 332 |> process.select(user_subject) 324 333 325 334 let selector = { 326 - use acc, value <- list.fold(over: state.subscribed, from: selector) 327 - let topic = "occurrence:new_" <> category.to_string(value) 335 + use selector, category <- list.fold(over: state.subscribed, from: selector) 336 + let topic = "occurrence:new_" <> category.to_string(category) 328 337 329 - let occ_subj = group_registry.join(registry, topic, self) 330 - process.select(acc, occ_subj) 338 + let subject = group_registry.join(registry, topic, self) 339 + process.select(selector, subject) 331 340 } 332 341 333 - #(state, option.Some(selector)) 342 + #(state, Some(selector)) 343 + } 344 + 345 + /// Queries the Database and builds the initial state of the user 346 + fn fetch_user_data( 347 + ctx: Context, 348 + user_uuid: uuid.Uuid, 349 + ) -> Result(State, WebSocketError) { 350 + use brigade_list <- result.try(fetch_brigades(ctx, user_uuid)) 351 + use subscribed <- result.map(fetch_subscribed_categories(ctx, user_uuid)) 352 + State(user_uuid:, subscribed:, brigade_list:, selector: None) 334 353 } 335 354 336 - /// 󰀖 Find all brigades that an user is assigned to 355 + /// 󰀖 Find all brigades that a given user is assigned to 337 356 fn fetch_brigades( 338 357 ctx: Context, 339 358 for: uuid.Uuid, ··· 347 366 } 348 367 349 368 /// 󰩉 Find all occurrence categories an user wants to be notified of 350 - fn fetch_categories( 369 + fn fetch_subscribed_categories( 351 370 ctx: Context, 352 371 for: uuid.Uuid, 353 372 ) -> Result(List(category.Category), WebSocketError) { ··· 386 405 387 406 // HELPERS --------------------------------------------------------------------- 388 407 389 - ///  Broadcast a message to all active users 390 - pub fn broadcast( 391 - registry registry: group_registry.GroupRegistry(msg.Msg), 392 - message message: msg.Msg, 393 - ) -> Nil { 394 - let members = group_registry.members(registry, ws_topic) 395 - 396 - use member <- list.each(members) 397 - process.spawn(fn() { process.send(member, message) }) 398 - } 399 - 400 408 fn send_response( 401 409 body: String, 402 410 status: Int, ··· 456 464 |> send_response(500) 457 465 458 466 pog.PostgresqlError(code:, name:, message:) -> 459 - json.object([ 467 + [ 460 468 #("code", json.string(code)), 461 469 #("name", json.string(name)), 462 470 #("message", json.string(message)), 463 - ]) 471 + ] 472 + |> json.object 464 473 |> json.to_string 465 474 |> send_response(500) 466 475 ··· 469 478 |> send_response(500) 470 479 471 480 pog.ConstraintViolated(message:, constraint:, detail:) -> 472 - json.object([ 481 + [ 473 482 #("message", json.string(message)), 474 483 #("constraint", json.string(constraint)), 475 484 #("detail", json.string(detail)), 476 - ]) 485 + ] 486 + |> json.object 477 487 |> json.to_string 478 488 |> send_response(409) 479 489 480 490 pog.UnexpectedArgumentCount(expected:, got:) -> { 481 - json.object([ 491 + [ 482 492 #("expected", json.int(expected)), 483 493 #("got", json.int(got)), 484 - ]) 494 + ] 495 + |> json.object 485 496 |> json.to_string 486 497 |> send_response(400) 487 498 } 488 499 489 500 pog.UnexpectedArgumentType(expected:, got:) -> { 490 - json.object([ 501 + [ 491 502 #("expected", json.string(expected)), 492 503 #("got", json.string(got)), 493 - ]) 504 + ] 505 + |> json.object 494 506 |> json.to_string 495 507 |> send_response(400) 496 508 } ··· 505 517 case list.first(decode_errors) { 506 518 Error(_) -> send_response("Ok", 200) 507 519 Ok(err) -> 508 - json.object([ 520 + [ 509 521 #("expected", json.string(err.expected)), 510 522 #("found", json.string(err.found)), 511 523 #("path", json.string(string.join(err.path, "/"))), 512 - ]) 524 + ] 525 + |> json.object 513 526 |> json.to_string 514 527 |> send_response(400) 515 528 }