1 using System;
2 using System.Collections;
3 using System.Threading;
4 using ThreadSynchronizationExample;
5
6 namespace ThreadSynchronizationExample
7 {
8 public class OutputCollector
9 {
10 static OutputCollector() : instance(new OutputCollector())
11 {
12 }
13 public static OutputCollector& Instance()
14 {
15 return *instance;
16 }
17 private OutputCollector() : exiting(false)
18 {
19 }
20 public void EnqueueLine(const string& line)
21 {
22 LockGuard<RecursiveMutex> lock(outputMutex);
23 outputLines.Add(line);
24 outputAvailableOrExiting.NotifyOne();
25 }
26 public void Run()
27 {
28 try
29 {
30 int lineNumber = 1;
31 while (!exiting)
32 {
33 LockGuard<RecursiveMutex> lock(outputMutex);
34 outputAvailableOrExiting.Wait(outputMutex, OutputLinesAvailableOrExiting, null);
35 while (!outputLines.IsEmpty())
36 {
37 string outputLine = outputLines.RemoveFirst();
38 Console.WriteLine(ToString(lineNumber) + " : " + outputLine);
39 ++lineNumber;
40 }
41 }
42 }
43 catch (const Exception& ex)
44 {
45 Console.Error() << ex.ToString() << endl();
46 }
47 }
48 public void Exit()
49 {
50 LockGuard<RecursiveMutex> lock(outputMutex);
51 exiting = true;
52 outputAvailableOrExiting.NotifyOne();
53 }
54 private bool OutputLinesAvailableOrExiting(void* arg)
55 {
56 return !outputLines.IsEmpty() || exiting;
57 }
58 private static UniquePtr<OutputCollector> instance;
59 private List<string> outputLines;
60 private RecursiveMutex outputMutex;
61 private bool exiting;
62 private ConditionVariable outputAvailableOrExiting;
63 }
64
65 public void OutputThreadMain(void* arg)
66 {
67 try
68 {
69 int* threadNumberPtr = cast<int*>(arg);
70 int threadNumber = *threadNumberPtr;
71 for (int i = 0; i < 10; ++i;)
72 {
73 uint randomMs = RandomNumber(20u);
74 OutputCollector.Instance().EnqueueLine("Hello " + ToString(i) + " from thread " + ToString(threadNumber) + ". Sleeping " + ToString(randomMs) + " ms.");
75 Duration duration = Duration.FromMilliseconds(randomMs);
76 Sleep(duration);
77 }
78 }
79 catch (const Exception& ex)
80 {
81 Console.Error() << ex.ToString() << endl();
82 }
83 }
84 }
85
86 public void main()
87 {
88 try
89 {
90 List<Thread> outputThreads;
91 ThreadStartMethod threadMethod = OutputCollector.Instance().Run;
92 Thread outputCollectorThread = Thread.StartMethod(threadMethod);
93 int n = 2 * HardwareConcurrency();
94 List<int> threadNumbers;
95 for (int i = 0; i < n; ++i;)
96 {
97 threadNumbers.Add(i);
98 }
99 for (int i = 0; i < n; ++i;)
100 {
101 int* threadNumber = &threadNumbers[i];
102 Thread outputThread = Thread.StartFunction(OutputThreadMain, threadNumber);
103 outputThreads.Add(Rvalue(outputThread));
104 }
105 for (Thread& outputThread : outputThreads)
106 {
107 outputThread.Join();
108 }
109 OutputCollector.Instance().Exit();
110 outputCollectorThread.Join();
111 }
112 catch (const Exception& ex)
113 {
114 Console.Error() << ex.ToString() << endl();
115 }
116 }