Git fork

Merge branch 'jk/fsmonitor-event-listener-race-fix'

On macOS, fsmonitor can fall into a race condition that results in
a client waiting forever to be notified for an event that have
already happened. This problem has been corrected.

* jk/fsmonitor-event-listener-race-fix:
fsmonitor: initialize fs event listener before accepting clients
simple-ipc: split async server initialization and running

+98 -18
+3 -3
builtin/fsmonitor--daemon.c
··· 1208 * system event listener thread so that we have the IPC handle 1209 * before we need it. 1210 */ 1211 - if (ipc_server_run_async(&state->ipc_server_data, 1212 - state->path_ipc.buf, &ipc_opts, 1213 - handle_client, state)) 1214 return error_errno( 1215 _("could not start IPC thread pool on '%s'"), 1216 state->path_ipc.buf);
··· 1208 * system event listener thread so that we have the IPC handle 1209 * before we need it. 1210 */ 1211 + if (ipc_server_init_async(&state->ipc_server_data, 1212 + state->path_ipc.buf, &ipc_opts, 1213 + handle_client, state)) 1214 return error_errno( 1215 _("could not start IPC thread pool on '%s'"), 1216 state->path_ipc.buf);
+6
compat/fsmonitor/fsm-listen-darwin.c
··· 516 } 517 data->stream_started = 1; 518 519 pthread_mutex_lock(&data->dq_lock); 520 pthread_cond_wait(&data->dq_finished, &data->dq_lock); 521 pthread_mutex_unlock(&data->dq_lock);
··· 516 } 517 data->stream_started = 1; 518 519 + /* 520 + * Our fs event listener is now running, so it's safe to start 521 + * serving client requests. 522 + */ 523 + ipc_server_start_async(state->ipc_server_data); 524 + 525 pthread_mutex_lock(&data->dq_lock); 526 pthread_cond_wait(&data->dq_finished, &data->dq_lock); 527 pthread_mutex_unlock(&data->dq_lock);
+6
compat/fsmonitor/fsm-listen-win32.c
··· 741 start_rdcw_watch(data->watch_gitdir) == -1) 742 goto force_error_stop; 743 744 for (;;) { 745 dwWait = WaitForMultipleObjects(data->nr_listener_handles, 746 data->hListener,
··· 741 start_rdcw_watch(data->watch_gitdir) == -1) 742 goto force_error_stop; 743 744 + /* 745 + * Now that we've established the rdcw watches, we can start 746 + * serving clients. 747 + */ 748 + ipc_server_start_async(state->ipc_server_data); 749 + 750 for (;;) { 751 dwWait = WaitForMultipleObjects(data->nr_listener_handles, 752 data->hListener,
+3 -2
compat/simple-ipc/ipc-shared.c
··· 16 struct ipc_server_data *server_data = NULL; 17 int ret; 18 19 - ret = ipc_server_run_async(&server_data, path, opts, 20 - application_cb, application_data); 21 if (ret) 22 return ret; 23 24 ret = ipc_server_await(server_data); 25 26 ipc_server_free(server_data);
··· 16 struct ipc_server_data *server_data = NULL; 17 int ret; 18 19 + ret = ipc_server_init_async(&server_data, path, opts, 20 + application_cb, application_data); 21 if (ret) 22 return ret; 23 24 + ipc_server_start_async(server_data); 25 ret = ipc_server_await(server_data); 26 27 ipc_server_free(server_data);
+23 -5
compat/simple-ipc/ipc-unix-socket.c
··· 328 int back_pos; 329 int front_pos; 330 331 int shutdown_requested; 332 int is_stopped; 333 }; ··· 824 /* 825 * Start IPC server in a pool of background threads. 826 */ 827 - int ipc_server_run_async(struct ipc_server_data **returned_server_data, 828 - const char *path, const struct ipc_server_opts *opts, 829 - ipc_server_application_cb *application_cb, 830 - void *application_data) 831 { 832 struct unix_ss_socket *server_socket = NULL; 833 struct ipc_server_data *server_data; ··· 887 server_data->accept_thread->server_socket = server_socket; 888 server_data->accept_thread->fd_send_shutdown = sv[0]; 889 server_data->accept_thread->fd_wait_shutdown = sv[1]; 890 891 if (pthread_create(&server_data->accept_thread->pthread_id, NULL, 892 accept_thread_proc, server_data->accept_thread)) ··· 918 return 0; 919 } 920 921 /* 922 * Gently tell the IPC server treads to shutdown. 923 * Can be run on any thread. ··· 933 934 trace2_region_enter("ipc-server", "server-stop-async", NULL); 935 936 - pthread_mutex_lock(&server_data->work_available_mutex); 937 938 server_data->shutdown_requested = 1; 939
··· 328 int back_pos; 329 int front_pos; 330 331 + int started; 332 int shutdown_requested; 333 int is_stopped; 334 }; ··· 825 /* 826 * Start IPC server in a pool of background threads. 827 */ 828 + int ipc_server_init_async(struct ipc_server_data **returned_server_data, 829 + const char *path, const struct ipc_server_opts *opts, 830 + ipc_server_application_cb *application_cb, 831 + void *application_data) 832 { 833 struct unix_ss_socket *server_socket = NULL; 834 struct ipc_server_data *server_data; ··· 888 server_data->accept_thread->server_socket = server_socket; 889 server_data->accept_thread->fd_send_shutdown = sv[0]; 890 server_data->accept_thread->fd_wait_shutdown = sv[1]; 891 + 892 + /* 893 + * Hold work-available mutex so that no work can start until 894 + * we unlock it. 895 + */ 896 + pthread_mutex_lock(&server_data->work_available_mutex); 897 898 if (pthread_create(&server_data->accept_thread->pthread_id, NULL, 899 accept_thread_proc, server_data->accept_thread)) ··· 925 return 0; 926 } 927 928 + void ipc_server_start_async(struct ipc_server_data *server_data) 929 + { 930 + if (!server_data || server_data->started) 931 + return; 932 + 933 + server_data->started = 1; 934 + pthread_mutex_unlock(&server_data->work_available_mutex); 935 + } 936 + 937 /* 938 * Gently tell the IPC server treads to shutdown. 939 * Can be run on any thread. ··· 949 950 trace2_region_enter("ipc-server", "server-stop-async", NULL); 951 952 + /* If we haven't started yet, we are already holding lock. */ 953 + if (server_data->started) 954 + pthread_mutex_lock(&server_data->work_available_mutex); 955 956 server_data->shutdown_requested = 1; 957
+44 -4
compat/simple-ipc/ipc-win32.c
··· 371 HANDLE hEventStopRequested; 372 struct ipc_server_thread_data *thread_list; 373 int is_stopped; 374 }; 375 376 enum connect_result { ··· 526 return ret; 527 } 528 529 /* 530 * Thread proc for an IPC server worker thread. It handles a series of 531 * connections from clients. It cleans and reuses the hPipe between each ··· 549 550 memset(&oConnect, 0, sizeof(oConnect)); 551 oConnect.hEvent = hEventConnected; 552 553 for (;;) { 554 cr = wait_for_connection(server_thread_data, &oConnect); ··· 752 return hPipe; 753 } 754 755 - int ipc_server_run_async(struct ipc_server_data **returned_server_data, 756 - const char *path, const struct ipc_server_opts *opts, 757 - ipc_server_application_cb *application_cb, 758 - void *application_data) 759 { 760 struct ipc_server_data *server_data; 761 wchar_t wpath[MAX_PATH]; ··· 787 strbuf_addstr(&server_data->buf_path, path); 788 wcscpy(server_data->wpath, wpath); 789 790 if (nr_threads < 1) 791 nr_threads = 1; 792 ··· 837 return 0; 838 } 839 840 int ipc_server_stop_async(struct ipc_server_data *server_data) 841 { 842 if (!server_data) ··· 850 * We DO NOT attempt to force them to drop an active connection. 851 */ 852 SetEvent(server_data->hEventStopRequested); 853 return 0; 854 } 855 ··· 899 server_data->thread_list = std->next_thread; 900 free(std); 901 } 902 903 free(server_data); 904 }
··· 371 HANDLE hEventStopRequested; 372 struct ipc_server_thread_data *thread_list; 373 int is_stopped; 374 + 375 + pthread_mutex_t startup_barrier; 376 + int started; 377 }; 378 379 enum connect_result { ··· 529 return ret; 530 } 531 532 + static void wait_for_startup_barrier(struct ipc_server_data *server_data) 533 + { 534 + /* 535 + * Temporarily hold the startup_barrier mutex before starting, 536 + * which lets us know that it's OK to start serving requests. 537 + */ 538 + pthread_mutex_lock(&server_data->startup_barrier); 539 + pthread_mutex_unlock(&server_data->startup_barrier); 540 + } 541 + 542 /* 543 * Thread proc for an IPC server worker thread. It handles a series of 544 * connections from clients. It cleans and reuses the hPipe between each ··· 562 563 memset(&oConnect, 0, sizeof(oConnect)); 564 oConnect.hEvent = hEventConnected; 565 + 566 + wait_for_startup_barrier(server_thread_data->server_data); 567 568 for (;;) { 569 cr = wait_for_connection(server_thread_data, &oConnect); ··· 767 return hPipe; 768 } 769 770 + int ipc_server_init_async(struct ipc_server_data **returned_server_data, 771 + const char *path, const struct ipc_server_opts *opts, 772 + ipc_server_application_cb *application_cb, 773 + void *application_data) 774 { 775 struct ipc_server_data *server_data; 776 wchar_t wpath[MAX_PATH]; ··· 802 strbuf_addstr(&server_data->buf_path, path); 803 wcscpy(server_data->wpath, wpath); 804 805 + /* 806 + * Hold the startup_barrier lock so that no threads will progress 807 + * until ipc_server_start_async() is called. 808 + */ 809 + pthread_mutex_init(&server_data->startup_barrier, NULL); 810 + pthread_mutex_lock(&server_data->startup_barrier); 811 + 812 if (nr_threads < 1) 813 nr_threads = 1; 814 ··· 859 return 0; 860 } 861 862 + void ipc_server_start_async(struct ipc_server_data *server_data) 863 + { 864 + if (!server_data || server_data->started) 865 + return; 866 + 867 + server_data->started = 1; 868 + pthread_mutex_unlock(&server_data->startup_barrier); 869 + } 870 + 871 int ipc_server_stop_async(struct ipc_server_data *server_data) 872 { 873 if (!server_data) ··· 881 * We DO NOT attempt to force them to drop an active connection. 882 */ 883 SetEvent(server_data->hEventStopRequested); 884 + 885 + /* 886 + * If we haven't yet told the threads they are allowed to run, 887 + * do so now, so they can receive the shutdown event. 888 + */ 889 + ipc_server_start_async(server_data); 890 + 891 return 0; 892 } 893 ··· 937 server_data->thread_list = std->next_thread; 938 free(std); 939 } 940 + 941 + pthread_mutex_destroy(&server_data->startup_barrier); 942 943 free(server_data); 944 }
+13 -4
simple-ipc.h
··· 179 * When a client IPC message is received, the `application_cb` will be 180 * called (possibly on a random thread) to handle the message and 181 * optionally compose a reply message. 182 */ 183 - int ipc_server_run_async(struct ipc_server_data **returned_server_data, 184 - const char *path, const struct ipc_server_opts *opts, 185 - ipc_server_application_cb *application_cb, 186 - void *application_data); 187 188 /* 189 * Gently signal the IPC server pool to shutdown. No new client
··· 179 * When a client IPC message is received, the `application_cb` will be 180 * called (possibly on a random thread) to handle the message and 181 * optionally compose a reply message. 182 + * 183 + * This initializes all threads but no actual work will be done until 184 + * ipc_server_start_async() is called. 185 */ 186 + int ipc_server_init_async(struct ipc_server_data **returned_server_data, 187 + const char *path, const struct ipc_server_opts *opts, 188 + ipc_server_application_cb *application_cb, 189 + void *application_data); 190 + 191 + /* 192 + * Let an async server start running. This needs to be called only once 193 + * after initialization. 194 + */ 195 + void ipc_server_start_async(struct ipc_server_data *server_data); 196 197 /* 198 * Gently signal the IPC server pool to shutdown. No new client