26struct InterprocessConnection::ConnectionThread final :
public Thread
29 void run()
override { owner.runThread(); }
32 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
38 explicit SafeActionImpl (InterprocessConnection& p)
41 template <
typename Fn>
44 const ScopedLock lock (mutex);
52 const ScopedLock lock (mutex);
58 const ScopedLock lock (mutex);
63 CriticalSection mutex;
64 InterprocessConnection& ref;
68class InterprocessConnection::SafeAction final :
public SafeActionImpl
70 using SafeActionImpl::SafeActionImpl;
75 : useMessageThread (callbacksOnMessageThread),
76 magicMessageHeader (magicMessageHeaderNumber),
77 safeAction (std::make_shared<SafeAction> (*this))
79 thread.reset (
new ConnectionThread (*
this));
89 jassert (! safeAction->isSafe());
91 callbackConnectionState =
false;
98 int portNumber,
int timeOutMillisecs)
102 auto s = std::make_unique<StreamingSocket>();
104 if (s->connect (hostName, portNumber, timeOutMillisecs))
107 initialiseWithSocket (std::move (s));
118 auto newPipe = std::make_unique<NamedPipe>();
120 if (newPipe->openExisting (pipeName))
123 pipeReceiveMessageTimeout = timeoutMs;
124 initialiseWithPipe (std::move (newPipe));
135 auto newPipe = std::make_unique<NamedPipe>();
137 if (newPipe->createNewPipe (pipeName, mustNotExist))
140 pipeReceiveMessageTimeout = timeoutMs;
141 initialiseWithPipe (std::move (newPipe));
150 thread->signalThreadShouldExit();
154 if (socket !=
nullptr) socket->close();
155 if (pipe !=
nullptr) pipe->close();
158 thread->stopThread (timeoutMs);
159 deletePipeAndSocket();
161 if (notify == Notify::yes)
164 callbackConnectionState =
false;
165 safeAction->setSafe (
false);
168void InterprocessConnection::deletePipeAndSocket()
179 return ((socket !=
nullptr && socket->isConnected())
180 || (pipe !=
nullptr && pipe->isOpen()))
189 if (pipe ==
nullptr && socket ==
nullptr)
192 if (socket !=
nullptr && ! socket->isLocal())
193 return socket->getHostName();
196 return IPAddress::local().toString();
206 messageData.
copyFrom (messageHeader, 0,
sizeof (messageHeader));
209 return writeData (messageData.
getData(), (
int) messageData.
getSize()) == (int) messageData.
getSize();
212int InterprocessConnection::writeData (
void* data,
int dataSize)
216 if (socket !=
nullptr)
217 return socket->write (data, dataSize);
220 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
226void InterprocessConnection::initialise()
228 safeAction->setSafe (
true);
229 threadIsRunning =
true;
231 thread->startThread();
234void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
236 jassert (socket ==
nullptr && pipe ==
nullptr);
237 socket = std::move (newSocket);
241void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
243 jassert (socket ==
nullptr && pipe ==
nullptr);
244 pipe = std::move (newPipe);
249struct ConnectionStateMessage final :
public MessageManager::MessageBase
251 ConnectionStateMessage (std::shared_ptr<SafeActionImpl> ipc,
bool connected) noexcept
252 : safeAction (ipc), connectionMade (connected)
255 void messageCallback()
override
257 safeAction->ifSafe ([
this] (InterprocessConnection& owner)
260 owner.connectionMade();
262 owner.connectionLost();
266 std::shared_ptr<SafeActionImpl> safeAction;
269 JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
272void InterprocessConnection::connectionMadeInt()
274 if (! callbackConnectionState)
276 callbackConnectionState =
true;
278 if (useMessageThread)
279 (
new ConnectionStateMessage (safeAction,
true))->post();
285void InterprocessConnection::connectionLostInt()
287 if (callbackConnectionState)
289 callbackConnectionState =
false;
291 if (useMessageThread)
292 (
new ConnectionStateMessage (safeAction,
false))->post();
298struct DataDeliveryMessage final :
public Message
300 DataDeliveryMessage (std::shared_ptr<SafeActionImpl> ipc,
const MemoryBlock& d)
301 : safeAction (ipc), data (d)
304 void messageCallback()
override
306 safeAction->ifSafe ([
this] (InterprocessConnection& owner)
308 owner.messageReceived (data);
312 std::shared_ptr<SafeActionImpl> safeAction;
316void InterprocessConnection::deliverDataInt (
const MemoryBlock& data)
318 jassert (callbackConnectionState);
320 if (useMessageThread)
321 (
new DataDeliveryMessage (safeAction, data))->post();
327int InterprocessConnection::readData (
void* data,
int num)
329 const ScopedReadLock sl (pipeAndSocketLock);
331 if (socket !=
nullptr)
332 return socket->read (data, num,
true);
335 return pipe->read (data, num, pipeReceiveMessageTimeout);
341bool InterprocessConnection::readNextMessage()
343 uint32 messageHeader[2];
344 auto bytes = readData (messageHeader,
sizeof (messageHeader));
346 if (bytes == (
int)
sizeof (messageHeader)
351 if (bytesInMessage > 0)
353 MemoryBlock messageData ((
size_t) bytesInMessage,
true);
356 while (bytesInMessage > 0)
358 if (thread->threadShouldExit())
361 auto numThisTime = jmin (bytesInMessage, 65536);
362 auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
367 bytesRead += bytesIn;
368 bytesInMessage -= bytesIn;
372 deliverDataInt (messageData);
380 if (socket !=
nullptr)
381 deletePipeAndSocket();
389void InterprocessConnection::runThread()
391 while (! thread->threadShouldExit())
393 if (socket !=
nullptr)
395 auto ready = socket->waitUntilReady (
true, 100);
399 deletePipeAndSocket();
410 else if (pipe !=
nullptr)
412 if (! pipe->isOpen())
414 deletePipeAndSocket();
424 if (thread->threadShouldExit() || ! readNextMessage())
428 threadIsRunning =
false;
static Type swapIfBigEndian(Type value) noexcept
virtual void connectionMade()=0
virtual void messageReceived(const MemoryBlock &message)=0
void disconnect(int timeoutMs=-1, Notify notify=Notify::yes)
String getConnectedHostName() const
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
virtual ~InterprocessConnection()
virtual void connectionLost()=0
bool sendMessage(const MemoryBlock &message)
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
void copyFrom(const void *srcData, int destinationOffset, size_t numBytes) noexcept
void * getData() noexcept
size_t getSize() const noexcept
Thread(const String &threadName, size_t threadStackSize=osDefaultStackSize)