1 module gnuplot_crafter.multithreaded.singlevar_crafter;
2 
3 import test.gnuplot_crafter.multithreaded.singlevar_crafter_test : test;
4 mixin test;
5 
6 import gnuplot_crafter.unmatching_length_exception;
7 
8 import std.concurrency;
9 import std.stdio;
10 import std.exception;
11 import std.format;
12 import std.conv;
13 import std.variant;
14 
15 public shared struct SingleVarCrafter(T = float)
16 {
17     immutable string workerThreadName;
18 
19     static size_t counter = 0;
20 
21     this(string sinkFilename, bool append = true)
22     {
23         Tid t = spawn(&threadingLoop, sinkFilename, append);
24 
25         workerThreadName = "SingleVarCrafter" ~ to!string(counter++);
26         register(workerThreadName, t);
27 
28         auto msg = receiveOnly!StartReport();
29 
30         enforce(msg == StartReport.success);
31     }
32 
33     ~this()
34     {
35         Tid workerThread = locate(workerThreadName);
36         if(workerThread != Tid.init)
37         {
38             workerThread.send(Order.finish);
39             receive((FinishReport msg){});
40         }
41     }
42 
43     public void put(bool flush = false)(T x, T y)
44     {
45         Tid t = locate(workerThreadName);
46 
47         t.send(DataMessage([to!(immutable(T))(x)],
48                             [to!(immutable(T))(y)]));
49 
50         static if (flush)
51         {
52             t.send(Order.flush);
53             receive((FlushReport msg){});
54         }
55     }
56 
57     public void put(bool flush = false)(const T[] xs, const T[] ys)
58     {
59         enforceEqualLengths(xs.length, ys.length);
60 
61         Tid t = locate(workerThreadName);
62 
63         t.send(DataMessage(xs.idup, ys.idup));
64 
65         static if (flush)
66         {
67             t.send(Order.flush);
68             receive((FlushReport msg){});
69         }
70     }
71 
72     public void put(bool flush = false)(immutable(T)[] xs, immutable(T)[] ys)
73     {
74         enforceEqualLengths(xs.length, ys.length);
75 
76         Tid t = locate(workerThreadName);
77 
78         t.send(DataMessage(xs, ys));
79 
80         static if (flush)
81         {
82             t.send(Order.flush);
83             receive((FlushReport msg){});
84         }
85     }
86 
87     public void flush()
88     {
89         Tid t = locate(workerThreadName);
90         t.send(Order.flush);
91         receive((FlushReport msg){});
92     }
93 
94     private void enforceEqualLengths(size_t xsLength, size_t ysLength) const
95     {
96         enforce!UnmatchingLengthException(xsLength == ysLength,
97             format("Unequal lengths for slices xs(%s) and ys(%s)",
98                     xsLength, xsLength));
99     }
100 
101     private void threadingLoop(string sinkFilename, bool append)
102     {
103         File sinkFile;
104 
105         try
106         {
107             sinkFile = File(sinkFilename, append ? "a" : "w");
108         }
109         catch(ErrnoException e)
110         {
111             debug writeln(e);
112             ownerTid.send(StartReport.failure);
113             return;
114         }
115 
116         ownerTid.send(StartReport.success);
117 
118         bool shouldContinue = true;
119         while(shouldContinue)
120         {
121             receive(
122                 (DataMessage msg)
123                 {
124                     with(msg)
125                         for(int i = 0; i < xs.length; i++)
126                             sinkFile.writefln("%s %s", xs[i], ys[i]);
127                 },
128                 (Order msg)
129                 {
130                     with (Order) final switch(msg)
131                     {
132                         case flush:
133                             sinkFile.flush();
134                             ownerTid.send(FlushReport.success);
135                             break;
136 
137                         case finish:
138                             shouldContinue = false;
139                             break;
140                     }
141                 },
142                 (Variant v)
143                 {
144                     debug writeln("Received variant " ~ to!string(v));
145                 }
146             );
147         }
148 
149         ownerTid.send(FinishReport.success);
150     }
151 
152     private struct DataMessage
153     {
154         immutable(T)[] xs;
155         immutable(T)[] ys;
156 
157         this(immutable(T)[] xs, immutable(T)[] ys)
158         in
159         {
160             assert(xs.length == ys.length);
161         }
162         body
163         {
164             this.xs = xs;
165             this.ys = ys;
166         }
167     }
168 }
169 
170 private enum StartReport
171 {
172     success,
173     failure
174 }
175 
176 private enum FinishReport
177 {
178     success
179 }
180 
181 private enum FlushReport
182 {
183     success
184 }
185 
186 private enum Order
187 {
188     flush,
189     finish
190 }