1 module gnuplot_crafter.multithreaded.singlevar_crafter; 2 3 import test.gnuplot_crafter.multithreaded.singlevar_crafter_test : test; 4 mixin test; 5 6 import gnuplot_crafter.unmatching_length_exception; 7 8 import std.concurrency; 9 import std.stdio; 10 import std.exception; 11 import std.format; 12 import std.conv; 13 import std.variant; 14 15 public shared struct SingleVarCrafter(T = float) 16 { 17 immutable string workerThreadName; 18 19 static size_t counter = 0; 20 21 this(string sinkFilename, bool append = true) 22 { 23 Tid t = spawn(&threadingLoop, sinkFilename, append); 24 25 workerThreadName = "SingleVarCrafter" ~ to!string(counter++); 26 register(workerThreadName, t); 27 28 auto msg = receiveOnly!StartReport(); 29 30 enforce(msg == StartReport.success); 31 } 32 33 ~this() 34 { 35 Tid workerThread = locate(workerThreadName); 36 if(workerThread != Tid.init) 37 { 38 workerThread.send(Order.finish); 39 receive((FinishReport msg){}); 40 } 41 } 42 43 public void put(bool flush = false)(T x, T y) 44 { 45 Tid t = locate(workerThreadName); 46 47 t.send(DataMessage([to!(immutable(T))(x)], 48 [to!(immutable(T))(y)])); 49 50 static if (flush) 51 { 52 t.send(Order.flush); 53 receive((FlushReport msg){}); 54 } 55 } 56 57 public void put(bool flush = false)(const T[] xs, const T[] ys) 58 { 59 enforceEqualLengths(xs.length, ys.length); 60 61 Tid t = locate(workerThreadName); 62 63 t.send(DataMessage(xs.idup, ys.idup)); 64 65 static if (flush) 66 { 67 t.send(Order.flush); 68 receive((FlushReport msg){}); 69 } 70 } 71 72 public void put(bool flush = false)(immutable(T)[] xs, immutable(T)[] ys) 73 { 74 enforceEqualLengths(xs.length, ys.length); 75 76 Tid t = locate(workerThreadName); 77 78 t.send(DataMessage(xs, ys)); 79 80 static if (flush) 81 { 82 t.send(Order.flush); 83 receive((FlushReport msg){}); 84 } 85 } 86 87 public void flush() 88 { 89 Tid t = locate(workerThreadName); 90 t.send(Order.flush); 91 receive((FlushReport msg){}); 92 } 93 94 private void enforceEqualLengths(size_t xsLength, size_t ysLength) const 95 { 96 enforce!UnmatchingLengthException(xsLength == ysLength, 97 format("Unequal lengths for slices xs(%s) and ys(%s)", 98 xsLength, xsLength)); 99 } 100 101 private void threadingLoop(string sinkFilename, bool append) 102 { 103 File sinkFile; 104 105 try 106 { 107 sinkFile = File(sinkFilename, append ? "a" : "w"); 108 } 109 catch(ErrnoException e) 110 { 111 debug writeln(e); 112 ownerTid.send(StartReport.failure); 113 return; 114 } 115 116 ownerTid.send(StartReport.success); 117 118 bool shouldContinue = true; 119 while(shouldContinue) 120 { 121 receive( 122 (DataMessage msg) 123 { 124 with(msg) 125 for(int i = 0; i < xs.length; i++) 126 sinkFile.writefln("%s %s", xs[i], ys[i]); 127 }, 128 (Order msg) 129 { 130 with (Order) final switch(msg) 131 { 132 case flush: 133 sinkFile.flush(); 134 ownerTid.send(FlushReport.success); 135 break; 136 137 case finish: 138 shouldContinue = false; 139 break; 140 } 141 }, 142 (Variant v) 143 { 144 debug writeln("Received variant " ~ to!string(v)); 145 } 146 ); 147 } 148 149 ownerTid.send(FinishReport.success); 150 } 151 152 private struct DataMessage 153 { 154 immutable(T)[] xs; 155 immutable(T)[] ys; 156 157 this(immutable(T)[] xs, immutable(T)[] ys) 158 in 159 { 160 assert(xs.length == ys.length); 161 } 162 body 163 { 164 this.xs = xs; 165 this.ys = ys; 166 } 167 } 168 } 169 170 private enum StartReport 171 { 172 success, 173 failure 174 } 175 176 private enum FinishReport 177 { 178 success 179 } 180 181 private enum FlushReport 182 { 183 success 184 } 185 186 private enum Order 187 { 188 flush, 189 finish 190 }