mirror of
				https://github.com/PabloMK7/citra.git
				synced 2025-10-31 13:50:03 +00:00 
			
		
		
		
	Merge pull request #4197 from B3n30/spsc_queue_wait
threadsafe_queue: Add PopWait and use it where possible
This commit is contained in:
		
						commit
						18caa787d2
					
				
					 7 changed files with 37 additions and 30 deletions
				
			
		|  | @ -38,9 +38,7 @@ public: | |||
|     const Impl& operator=(Impl const&) = delete; | ||||
| 
 | ||||
|     void PushEntry(Entry e) { | ||||
|         std::lock_guard<std::mutex> lock(message_mutex); | ||||
|         message_queue.Push(std::move(e)); | ||||
|         message_cv.notify_one(); | ||||
|     } | ||||
| 
 | ||||
|     void AddBackend(std::unique_ptr<Backend> backend) { | ||||
|  | @ -84,13 +82,13 @@ private: | |||
|                 } | ||||
|             }; | ||||
|             while (true) { | ||||
|                 std::unique_lock<std::mutex> lock(message_mutex); | ||||
|                 message_cv.wait(lock, [&] { return !running || message_queue.Pop(entry); }); | ||||
|                 if (!running) { | ||||
|                 entry = message_queue.PopWait(); | ||||
|                 if (entry.final_entry) { | ||||
|                     break; | ||||
|                 } | ||||
|                 write_logs(entry); | ||||
|             } | ||||
| 
 | ||||
|             // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a case
 | ||||
|             // where a system is repeatedly spamming logs even on close.
 | ||||
|             constexpr int MAX_LOGS_TO_WRITE = 100; | ||||
|  | @ -102,14 +100,13 @@ private: | |||
|     } | ||||
| 
 | ||||
|     ~Impl() { | ||||
|         running = false; | ||||
|         message_cv.notify_one(); | ||||
|         Entry entry; | ||||
|         entry.final_entry = true; | ||||
|         message_queue.Push(entry); | ||||
|         backend_thread.join(); | ||||
|     } | ||||
| 
 | ||||
|     std::atomic_bool running{true}; | ||||
|     std::mutex message_mutex, writing_mutex; | ||||
|     std::condition_variable message_cv; | ||||
|     std::mutex writing_mutex; | ||||
|     std::thread backend_thread; | ||||
|     std::vector<std::unique_ptr<Backend>> backends; | ||||
|     Common::MPSCQueue<Log::Entry> message_queue; | ||||
|  |  | |||
|  | @ -28,6 +28,7 @@ struct Entry { | |||
|     unsigned int line_num; | ||||
|     std::string function; | ||||
|     std::string message; | ||||
|     bool final_entry = false; | ||||
| 
 | ||||
|     Entry() = default; | ||||
|     Entry(Entry&& o) = default; | ||||
|  |  | |||
|  | @ -9,6 +9,7 @@ | |||
| 
 | ||||
| #include <algorithm> | ||||
| #include <atomic> | ||||
| #include <condition_variable> | ||||
| #include <cstddef> | ||||
| #include <mutex> | ||||
| #include "common/common_types.h" | ||||
|  | @ -49,6 +50,7 @@ public: | |||
|         write_ptr = new_ptr; | ||||
|         if (NeedSize) | ||||
|             size++; | ||||
|         cv.notify_one(); | ||||
|     } | ||||
| 
 | ||||
|     void Pop() { | ||||
|  | @ -77,6 +79,16 @@ public: | |||
|         return true; | ||||
|     } | ||||
| 
 | ||||
|     T PopWait() { | ||||
|         if (Empty()) { | ||||
|             std::unique_lock<std::mutex> lock(cv_mutex); | ||||
|             cv.wait(lock, [this]() { return !Empty(); }); | ||||
|         } | ||||
|         T t; | ||||
|         Pop(t); | ||||
|         return t; | ||||
|     } | ||||
| 
 | ||||
|     // not thread-safe
 | ||||
|     void Clear() { | ||||
|         size.store(0); | ||||
|  | @ -104,6 +116,8 @@ private: | |||
|     ElementPtr* write_ptr; | ||||
|     ElementPtr* read_ptr; | ||||
|     std::atomic<u32> size; | ||||
|     std::mutex cv_mutex; | ||||
|     std::condition_variable cv; | ||||
| }; | ||||
| 
 | ||||
| // a simple thread-safe,
 | ||||
|  | @ -138,6 +152,10 @@ public: | |||
|         return spsc_queue.Pop(t); | ||||
|     } | ||||
| 
 | ||||
|     T PopWait() { | ||||
|         return spsc_queue.PopWait(); | ||||
|     } | ||||
| 
 | ||||
|     // not thread-safe
 | ||||
|     void Clear() { | ||||
|         spsc_queue.Clear(); | ||||
|  |  | |||
|  | @ -106,34 +106,24 @@ void RPCServer::HandleRequestsLoop() { | |||
| 
 | ||||
|     LOG_INFO(RPC_Server, "Request handler started."); | ||||
| 
 | ||||
|     while (true) { | ||||
|         std::unique_lock<std::mutex> lock(request_queue_mutex); | ||||
|         request_queue_cv.wait(lock, [&] { return !running || request_queue.Pop(request_packet); }); | ||||
|         if (!running) { | ||||
|             break; | ||||
|         } | ||||
|     while ((request_packet = request_queue.PopWait())) { | ||||
|         HandleSingleRequest(std::move(request_packet)); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| void RPCServer::QueueRequest(std::unique_ptr<RPC::Packet> request) { | ||||
|     std::unique_lock<std::mutex> lock(request_queue_mutex); | ||||
|     request_queue.Push(std::move(request)); | ||||
|     request_queue_cv.notify_one(); | ||||
| } | ||||
| 
 | ||||
| void RPCServer::Start() { | ||||
|     running = true; | ||||
|     const auto threadFunction = [this]() { HandleRequestsLoop(); }; | ||||
|     request_handler_thread = std::thread(threadFunction); | ||||
|     server.Start(); | ||||
| } | ||||
| 
 | ||||
| void RPCServer::Stop() { | ||||
|     running = false; | ||||
|     request_queue_cv.notify_one(); | ||||
|     request_handler_thread.join(); | ||||
|     server.Stop(); | ||||
|     request_handler_thread.join(); | ||||
| } | ||||
| 
 | ||||
| }; // namespace RPC
 | ||||
|  |  | |||
|  | @ -31,10 +31,7 @@ private: | |||
| 
 | ||||
|     Server server; | ||||
|     Common::SPSCQueue<std::unique_ptr<Packet>> request_queue; | ||||
|     bool running = false; | ||||
|     std::thread request_handler_thread; | ||||
|     std::mutex request_queue_mutex; | ||||
|     std::condition_variable request_queue_cv; | ||||
| }; | ||||
| 
 | ||||
| } // namespace RPC
 | ||||
|  |  | |||
|  | @ -1,6 +1,5 @@ | |||
| #include <functional> | ||||
| 
 | ||||
| #include "common/threadsafe_queue.h" | ||||
| #include "core/core.h" | ||||
| #include "core/rpc/rpc_server.h" | ||||
| #include "core/rpc/server.h" | ||||
|  | @ -26,9 +25,13 @@ void Server::Stop() { | |||
| } | ||||
| 
 | ||||
| void Server::NewRequestCallback(std::unique_ptr<RPC::Packet> new_request) { | ||||
|     LOG_INFO(RPC_Server, "Received request version={} id={} type={} size={}", | ||||
|              new_request->GetVersion(), new_request->GetId(), | ||||
|              static_cast<u32>(new_request->GetPacketType()), new_request->GetPacketDataSize()); | ||||
|     if (new_request) { | ||||
|         LOG_INFO(RPC_Server, "Received request version={} id={} type={} size={}", | ||||
|                  new_request->GetVersion(), new_request->GetId(), | ||||
|                  static_cast<u32>(new_request->GetPacketType()), new_request->GetPacketDataSize()); | ||||
|     } else { | ||||
|         LOG_INFO(RPC_Server, "Received end packet"); | ||||
|     } | ||||
|     rpc_server.QueueRequest(std::move(new_request)); | ||||
| } | ||||
| 
 | ||||
|  |  | |||
|  | @ -52,7 +52,8 @@ void ZMQServer::WorkerLoop() { | |||
|             LOG_WARNING(RPC_Server, "Failed to receive data on ZeroMQ socket"); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     std::unique_ptr<Packet> end_packet = nullptr; | ||||
|     new_request_callback(std::move(end_packet)); | ||||
|     // Destroying the socket must be done by this thread.
 | ||||
|     zmq_socket.reset(); | ||||
| } | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue