mirror of
				https://github.com/PabloMK7/citra.git
				synced 2025-10-31 05:40:04 +00:00 
			
		
		
		
	threadsafe_queue: Add WaitIfEmpty and use it in logging
This commit is contained in:
		
							parent
							
								
									874a95cea7
								
							
						
					
					
						commit
						9b49a79a72
					
				
					 3 changed files with 27 additions and 13 deletions
				
			
		|  | @ -38,9 +38,7 @@ public: | ||||||
|     const Impl& operator=(Impl const&) = delete; |     const Impl& operator=(Impl const&) = delete; | ||||||
| 
 | 
 | ||||||
|     void PushEntry(Entry e) { |     void PushEntry(Entry e) { | ||||||
|         std::lock_guard<std::mutex> lock(message_mutex); |  | ||||||
|         message_queue.Push(std::move(e)); |         message_queue.Push(std::move(e)); | ||||||
|         message_cv.notify_one(); |  | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void AddBackend(std::unique_ptr<Backend> backend) { |     void AddBackend(std::unique_ptr<Backend> backend) { | ||||||
|  | @ -83,14 +81,13 @@ private: | ||||||
|                     backend->Write(e); |                     backend->Write(e); | ||||||
|                 } |                 } | ||||||
|             }; |             }; | ||||||
|             while (true) { |             while (message_queue.PopWait(entry)) { | ||||||
|                 std::unique_lock<std::mutex> lock(message_mutex); |                 if (entry.final_entry) { | ||||||
|                 message_cv.wait(lock, [&] { return !running || message_queue.Pop(entry); }); |  | ||||||
|                 if (!running) { |  | ||||||
|                     break; |                     break; | ||||||
|                 } |                 } | ||||||
|                 write_logs(entry); |                 write_logs(entry); | ||||||
|             } |             } | ||||||
|  | 
 | ||||||
|             // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a case
 |             // Drain the logging queue. Only writes out up to MAX_LOGS_TO_WRITE to prevent a case
 | ||||||
|             // where a system is repeatedly spamming logs even on close.
 |             // where a system is repeatedly spamming logs even on close.
 | ||||||
|             constexpr int MAX_LOGS_TO_WRITE = 100; |             constexpr int MAX_LOGS_TO_WRITE = 100; | ||||||
|  | @ -102,14 +99,13 @@ private: | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     ~Impl() { |     ~Impl() { | ||||||
|         running = false; |         Entry entry; | ||||||
|         message_cv.notify_one(); |         entry.final_entry = true; | ||||||
|  |         message_queue.Push(entry); | ||||||
|         backend_thread.join(); |         backend_thread.join(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     std::atomic_bool running{true}; |     std::mutex writing_mutex; | ||||||
|     std::mutex message_mutex, writing_mutex; |  | ||||||
|     std::condition_variable message_cv; |  | ||||||
|     std::thread backend_thread; |     std::thread backend_thread; | ||||||
|     std::vector<std::unique_ptr<Backend>> backends; |     std::vector<std::unique_ptr<Backend>> backends; | ||||||
|     Common::MPSCQueue<Log::Entry> message_queue; |     Common::MPSCQueue<Log::Entry> message_queue; | ||||||
|  |  | ||||||
|  | @ -28,6 +28,7 @@ struct Entry { | ||||||
|     unsigned int line_num; |     unsigned int line_num; | ||||||
|     std::string function; |     std::string function; | ||||||
|     std::string message; |     std::string message; | ||||||
|  |     bool final_entry = false; | ||||||
| 
 | 
 | ||||||
|     Entry() = default; |     Entry() = default; | ||||||
|     Entry(Entry&& o) = default; |     Entry(Entry&& o) = default; | ||||||
|  |  | ||||||
|  | @ -9,6 +9,7 @@ | ||||||
| 
 | 
 | ||||||
| #include <algorithm> | #include <algorithm> | ||||||
| #include <atomic> | #include <atomic> | ||||||
|  | #include <condition_variable> | ||||||
| #include <cstddef> | #include <cstddef> | ||||||
| #include <mutex> | #include <mutex> | ||||||
| #include "common/common_types.h" | #include "common/common_types.h" | ||||||
|  | @ -41,7 +42,7 @@ public: | ||||||
|     template <typename Arg> |     template <typename Arg> | ||||||
|     void Push(Arg&& t) { |     void Push(Arg&& t) { | ||||||
|         // create the element, add it to the queue
 |         // create the element, add it to the queue
 | ||||||
|         write_ptr->current = std::forward<Arg>(t); |         write_ptr->current = std::move(t); | ||||||
|         // set the next pointer to a new element ptr
 |         // set the next pointer to a new element ptr
 | ||||||
|         // then advance the write pointer
 |         // then advance the write pointer
 | ||||||
|         ElementPtr* new_ptr = new ElementPtr(); |         ElementPtr* new_ptr = new ElementPtr(); | ||||||
|  | @ -49,6 +50,7 @@ public: | ||||||
|         write_ptr = new_ptr; |         write_ptr = new_ptr; | ||||||
|         if (NeedSize) |         if (NeedSize) | ||||||
|             size++; |             size++; | ||||||
|  |         cv.notify_one(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     void Pop() { |     void Pop() { | ||||||
|  | @ -66,10 +68,11 @@ public: | ||||||
|         if (Empty()) |         if (Empty()) | ||||||
|             return false; |             return false; | ||||||
| 
 | 
 | ||||||
|  |         ElementPtr* tmpptr = read_ptr; | ||||||
|  | 
 | ||||||
|         if (NeedSize) |         if (NeedSize) | ||||||
|             size--; |             size--; | ||||||
| 
 | 
 | ||||||
|         ElementPtr* tmpptr = read_ptr; |  | ||||||
|         read_ptr = tmpptr->next.load(std::memory_order_acquire); |         read_ptr = tmpptr->next.load(std::memory_order_acquire); | ||||||
|         t = std::move(tmpptr->current); |         t = std::move(tmpptr->current); | ||||||
|         tmpptr->next.store(nullptr); |         tmpptr->next.store(nullptr); | ||||||
|  | @ -77,6 +80,14 @@ public: | ||||||
|         return true; |         return true; | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     bool PopWait(T& t) { | ||||||
|  |         if (Empty()) { | ||||||
|  |             std::unique_lock<std::mutex> lock(cv_mutex); | ||||||
|  |             cv.wait(lock, [this]() { return !Empty(); }); | ||||||
|  |         } | ||||||
|  |         return Pop(t); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     // not thread-safe
 |     // not thread-safe
 | ||||||
|     void Clear() { |     void Clear() { | ||||||
|         size.store(0); |         size.store(0); | ||||||
|  | @ -104,6 +115,8 @@ private: | ||||||
|     ElementPtr* write_ptr; |     ElementPtr* write_ptr; | ||||||
|     ElementPtr* read_ptr; |     ElementPtr* read_ptr; | ||||||
|     std::atomic<u32> size; |     std::atomic<u32> size; | ||||||
|  |     std::mutex cv_mutex; | ||||||
|  |     std::condition_variable cv; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| // a simple thread-safe,
 | // a simple thread-safe,
 | ||||||
|  | @ -138,6 +151,10 @@ public: | ||||||
|         return spsc_queue.Pop(t); |         return spsc_queue.Pop(t); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     bool PopWait(T& t) { | ||||||
|  |         return spsc_queue.PopWait(t); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     // not thread-safe
 |     // not thread-safe
 | ||||||
|     void Clear() { |     void Clear() { | ||||||
|         spsc_queue.Clear(); |         spsc_queue.Clear(); | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue