/* * Copyright (C) 1996-2023 The Squid Software Foundation and contributors * * Squid software is distributed under GPLv2+ license and includes * contributions from numerous individuals and organizations. * Please see the COPYING and CONTRIBUTORS files for details. */ #ifndef SQUID_SRC_IPC_QUEUE_H #define SQUID_SRC_IPC_QUEUE_H #include "base/InstanceId.h" #include "debug/Stream.h" #include "ipc/mem/FlexibleArray.h" #include "ipc/mem/Pointer.h" #include "util.h" #include #include class String; namespace Ipc { /// State of the reading end of a queue (i.e., of the code calling pop()). /// Multiple queues attached to one reader share this state. class QueueReader { public: QueueReader(); // the initial state is "blocked without a signal" /// whether the reader is waiting for a notification signal bool blocked() const { return popBlocked.load(); } /// \copydoc popSignal bool signaled() const { return popSignal.load(); } /// marks the reader as blocked, waiting for a notification signal void block() { popBlocked.store(true); } /// removes the block() effects void unblock() { popBlocked.store(false); } /// if reader is blocked and not notified, marks the notification signal /// as sent and not received, returning true; otherwise, returns false bool raiseSignal() { return blocked() && !popSignal.exchange(true); } /// marks sent reader notification as received (also removes pop blocking) void clearSignal() { unblock(); popSignal.store(false); } private: std::atomic popBlocked; ///< whether the reader is blocked on pop() std::atomic popSignal; ///< whether writer has sent and reader has not received notification public: typedef std::atomic Rate; ///< pop()s per second Rate rateLimit; ///< pop()s per second limit if positive // we need a signed atomic type because balance may get negative typedef std::atomic AtomicSignedMsec; typedef AtomicSignedMsec Balance; /// how far ahead the reader is compared to a perfect read/sec event rate Balance balance; /// unique ID for debugging which reader is used (works across processes) const InstanceId id; }; /// shared array of QueueReaders class QueueReaders { public: QueueReaders(const int aCapacity); size_t sharedMemorySize() const; static size_t SharedMemorySize(const int capacity); const int theCapacity; /// number of readers Ipc::Mem::FlexibleArray theReaders; /// readers }; /** * Lockless fixed-capacity queue for a single writer and a single reader. * * If the queue is empty, the reader is considered "blocked" and needs * an out-of-band notification message to notice the next pushed item. * * Current implementation assumes that the writer cannot get blocked: if the * queue is full, the writer will just not push and come back later (with a * different value). We can add support for blocked writers if needed. */ class OneToOneUniQueue { public: // pop() and push() exceptions; TODO: use TextException instead class Full {}; class ItemTooLarge {}; OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity); unsigned int maxItemSize() const { return theMaxItemSize; } int size() const { return theSize; } int capacity() const { return theCapacity; } int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); } bool empty() const { return !theSize; } bool full() const { return theSize == theCapacity; } static int Bytes2Items(const unsigned int maxItemSize, int size); static int Items2Bytes(const unsigned int maxItemSize, const int size); /// returns true iff the value was set; [un]blocks the reader as needed template bool pop(Value &value, QueueReader *const reader = nullptr); /// returns true iff the caller must notify the reader of the pushed item template bool push(const Value &value, QueueReader *const reader = nullptr); /// returns true iff the value was set; the value may be stale! template bool peek(Value &value) const; /// prints incoming queue state; suitable for cache manager reports template void statIn(std::ostream &, int localProcessId, int remoteProcessId) const; /// prints outgoing queue state; suitable for cache manager reports template void statOut(std::ostream &, int localProcessId, int remoteProcessId) const; private: void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const; void statClose(std::ostream &) const; template void statSamples(std::ostream &, unsigned int start, uint32_t size) const; template void statRange(std::ostream &, unsigned int start, uint32_t n) const; // optimization: these non-std::atomic data members are in shared memory, // but each is used only by one process (aside from obscured reporting) unsigned int theIn; ///< current push() position; reporting aside, used only in push() unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek() std::atomic theSize; ///< number of items in the queue const unsigned int theMaxItemSize; ///< maximum item size const uint32_t theCapacity; ///< maximum number of items, i.e. theBuffer size char theBuffer[]; }; /// shared array of OneToOneUniQueues class OneToOneUniQueues { public: OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity); size_t sharedMemorySize() const; static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity); const OneToOneUniQueue &operator [](const int index) const; inline OneToOneUniQueue &operator [](const int index); private: inline const OneToOneUniQueue &front() const; public: const int theCapacity; /// number of OneToOneUniQueues }; /** * Base class for lockless fixed-capacity bidirectional queues for a * limited number processes. */ class BaseMultiQueue { public: BaseMultiQueue(const int aLocalProcessId); virtual ~BaseMultiQueue() {} /// clears the reader notification received by the local process from the remote process void clearReaderSignal(const int remoteProcessId); /// clears all reader notifications received by the local process void clearAllReaderSignals(); /// picks a process and calls OneToOneUniQueue::pop() using its queue template bool pop(int &remoteProcessId, Value &value); /// calls OneToOneUniQueue::push() using the given process queue template bool push(const int remoteProcessId, const Value &value); /// peeks at the item likely to be pop()ed next template bool peek(int &remoteProcessId, Value &value) const; /// prints current state; suitable for cache manager reports template void stat(std::ostream &) const; /// returns local reader's balance QueueReader::Balance &localBalance() { return localReader().balance; } /// returns reader's balance for a given remote process const QueueReader::Balance &balance(const int remoteProcessId) const; /// returns local reader's rate limit QueueReader::Rate &localRateLimit() { return localReader().rateLimit; } /// returns reader's rate limit for a given remote process const QueueReader::Rate &rateLimit(const int remoteProcessId) const; /// number of items in incoming queue from a given remote process int inSize(const int remoteProcessId) const { return inQueue(remoteProcessId).size(); } /// number of items in outgoing queue to a given remote process int outSize(const int remoteProcessId) const { return outQueue(remoteProcessId).size(); } protected: /// incoming queue from a given remote process virtual const OneToOneUniQueue &inQueue(const int remoteProcessId) const = 0; OneToOneUniQueue &inQueue(const int remoteProcessId); /// outgoing queue to a given remote process virtual const OneToOneUniQueue &outQueue(const int remoteProcessId) const = 0; OneToOneUniQueue &outQueue(const int remoteProcessId); virtual const QueueReader &localReader() const = 0; QueueReader &localReader(); virtual const QueueReader &remoteReader(const int remoteProcessId) const = 0; QueueReader &remoteReader(const int remoteProcessId); virtual int remotesCount() const = 0; virtual int remotesIdOffset() const = 0; protected: const int theLocalProcessId; ///< process ID of this queue private: int theLastPopProcessId; ///< the ID of the last process we tried to pop() from }; /** * Lockless fixed-capacity bidirectional queue for a limited number * processes. Allows communication between two groups of processes: * any process in one group may send data to and receive from any * process in another group, but processes in the same group can not * communicate. Process in each group has a unique integer ID in * [groupIdOffset, groupIdOffset + groupSize) range. */ class FewToFewBiQueue: public BaseMultiQueue { public: typedef OneToOneUniQueue::Full Full; typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; private: /// Shared metadata for FewToFewBiQueue struct Metadata { Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset); size_t sharedMemorySize() const { return sizeof(*this); } static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); } const int theGroupASize; const int theGroupAIdOffset; const int theGroupBSize; const int theGroupBIdOffset; }; public: class Owner { public: Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); ~Owner(); private: Mem::Owner *const metadataOwner; Mem::Owner *const queuesOwner; Mem::Owner *const readersOwner; }; static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity); enum Group { groupA = 0, groupB = 1 }; FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId); /// maximum number of items in the queue static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity); /// finds the oldest item in incoming and outgoing queues between /// us and the given remote process template bool findOldest(const int remoteProcessId, Value &value) const; protected: const OneToOneUniQueue &inQueue(const int remoteProcessId) const override; const OneToOneUniQueue &outQueue(const int remoteProcessId) const override; const QueueReader &localReader() const override; const QueueReader &remoteReader(const int processId) const override; int remotesCount() const override; int remotesIdOffset() const override; private: bool validProcessId(const Group group, const int processId) const; int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; const OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const; int readerIndex(const Group group, const int processId) const; Group localGroup() const { return theLocalGroup; } Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; } private: const Mem::Pointer metadata; ///< shared metadata const Mem::Pointer queues; ///< unidirection one-to-one queues const Mem::Pointer readers; ///< readers array const Group theLocalGroup; ///< group of this queue }; /** * Lockless fixed-capacity bidirectional queue for a limited number * processes. Any process may send data to and receive from any other * process (including itself). Each process has a unique integer ID in * [processIdOffset, processIdOffset + processCount) range. */ class MultiQueue: public BaseMultiQueue { public: typedef OneToOneUniQueue::Full Full; typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; private: /// Shared metadata for MultiQueue struct Metadata { Metadata(const int aProcessCount, const int aProcessIdOffset); size_t sharedMemorySize() const { return sizeof(*this); } static size_t SharedMemorySize(const int, const int) { return sizeof(Metadata); } const int theProcessCount; const int theProcessIdOffset; }; public: class Owner { public: Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); ~Owner(); private: Mem::Owner *const metadataOwner; Mem::Owner *const queuesOwner; Mem::Owner *const readersOwner; }; static Owner *Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity); MultiQueue(const String &id, const int localProcessId); protected: const OneToOneUniQueue &inQueue(const int remoteProcessId) const override; const OneToOneUniQueue &outQueue(const int remoteProcessId) const override; const QueueReader &localReader() const override; const QueueReader &remoteReader(const int remoteProcessId) const override; int remotesCount() const override; int remotesIdOffset() const override; private: bool validProcessId(const int processId) const; const OneToOneUniQueue &oneToOneQueue(const int fromProcessId, const int toProcessId) const; const QueueReader &reader(const int processId) const; private: const Mem::Pointer metadata; ///< shared metadata const Mem::Pointer queues; ///< unidirection one-to-one queues const Mem::Pointer readers; ///< readers array }; // OneToOneUniQueue template bool OneToOneUniQueue::pop(Value &value, QueueReader *const reader) { if (sizeof(value) > theMaxItemSize) throw ItemTooLarge(); // A writer might push between the empty test and block() below, so we do // not return false right after calling block(), but test again. if (empty()) { if (!reader) return false; reader->block(); // A writer might push between the empty test and block() below, // so we must test again as such a writer will not signal us. if (empty()) return false; } if (reader) reader->unblock(); const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize; memcpy(&value, theBuffer + pos, sizeof(value)); --theSize; return true; } template bool OneToOneUniQueue::peek(Value &value) const { if (sizeof(value) > theMaxItemSize) throw ItemTooLarge(); if (empty()) return false; // the reader may pop() before we copy; making this method imprecise const unsigned int pos = (theOut % theCapacity) * theMaxItemSize; memcpy(&value, theBuffer + pos, sizeof(value)); return true; } template bool OneToOneUniQueue::push(const Value &value, QueueReader *const reader) { if (sizeof(value) > theMaxItemSize) throw ItemTooLarge(); if (full()) throw Full(); const unsigned int pos = theIn++ % theCapacity * theMaxItemSize; memcpy(theBuffer + pos, &value, sizeof(value)); const bool wasEmpty = !theSize++; return wasEmpty && (!reader || reader->raiseSignal()); } template void OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const { os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": "; // Nobody can modify our theOut so, after capturing some valid theSize value // in count, we can reliably report all [theOut, theOut+count) items that // were queued at theSize capturing time. We will miss new items push()ed by // the other side, but it is OK -- we report state at the capturing time. const auto count = theSize.load(); statOpen(os, "other", "popIndex", count); statSamples(os, theOut, count); statClose(os); } template void OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const { os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": "; // Nobody can modify our theIn so, after capturing some valid theSize value // in count, we can reliably report all [theIn-count, theIn) items that were // queued at theSize capturing time. We may report items already pop()ed by // the other side, but that is OK because pop() does not modify items -- it // only increments theOut. const auto count = theSize.load(); statOpen(os, "pushIndex", "other", count); statSamples(os, theIn - count, count); // unsigned offset underflow OK statClose(os); } /// report a sample of [start, start + size) items template void OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const { if (!count) { os << " "; return; } os << ", items: [\n"; // report a few leading and trailing items, without repetitions const auto sampleSize = std::min(3U, count); // leading (and max) sample statRange(os, start, sampleSize); if (sampleSize < count) { // the first sample did not show some items // The `start` offset aside, the first sample reported all items // below the sampleSize offset. The second sample needs to report // the last sampleSize items (i.e. starting at count-sampleSize // offset) except those already reported by the first sample. const auto secondSampleOffset = std::max(sampleSize, count - sampleSize); const auto secondSampleSize = std::min(sampleSize, count - sampleSize); // but first we print a sample separator, unless there are no items // between the samples or the separator hides the only unsampled item const auto bothSamples = sampleSize + secondSampleSize; if (bothSamples + 1U == count) statRange(os, start + sampleSize, 1); else if (count > bothSamples) os << " # ... " << (count - bothSamples) << " items not shown ...\n"; statRange(os, start + secondSampleOffset, secondSampleSize); } os << " ]"; } /// statSamples() helper that reports n items from start template void OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const { assert(sizeof(Value) <= theMaxItemSize); auto offset = start; for (uint32_t i = 0; i < n; ++i) { // XXX: Throughout this C++ header, these overflow wrapping tricks work // only because theCapacity currently happens to be a power of 2 (e.g., // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset). const auto pos = (offset++ % theCapacity) * theMaxItemSize; Value value; memcpy(&value, theBuffer + pos, sizeof(value)); os << " { "; value.stat(os); os << " },\n"; } } // OneToOneUniQueues inline OneToOneUniQueue & OneToOneUniQueues::operator [](const int index) { return const_cast((*const_cast(this))[index]); } inline const OneToOneUniQueue & OneToOneUniQueues::front() const { const char *const queue = reinterpret_cast(this) + sizeof(*this); return *reinterpret_cast(queue); } // BaseMultiQueue template bool BaseMultiQueue::pop(int &remoteProcessId, Value &value) { // iterate all remote processes, starting after the one we visited last for (int i = 0; i < remotesCount(); ++i) { if (++theLastPopProcessId >= remotesIdOffset() + remotesCount()) theLastPopProcessId = remotesIdOffset(); OneToOneUniQueue &queue = inQueue(theLastPopProcessId); if (queue.pop(value, &localReader())) { remoteProcessId = theLastPopProcessId; debugs(54, 7, "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size()); return true; } } return false; // no process had anything to pop } template bool BaseMultiQueue::push(const int remoteProcessId, const Value &value) { OneToOneUniQueue &remoteQueue = outQueue(remoteProcessId); QueueReader &reader = remoteReader(remoteProcessId); debugs(54, 7, "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size()); return remoteQueue.push(value, &reader); } template bool BaseMultiQueue::peek(int &remoteProcessId, Value &value) const { // mimic FewToFewBiQueue::pop() but quit just before popping int popProcessId = theLastPopProcessId; // preserve for future pop() for (int i = 0; i < remotesCount(); ++i) { if (++popProcessId >= remotesIdOffset() + remotesCount()) popProcessId = remotesIdOffset(); const OneToOneUniQueue &queue = inQueue(popProcessId); if (queue.peek(value)) { remoteProcessId = popProcessId; return true; } } return false; // most likely, no process had anything to pop } template void BaseMultiQueue::stat(std::ostream &os) const { for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) { const auto &queue = inQueue(processId); queue.statIn(os, theLocalProcessId, processId); } os << "\n"; for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) { const auto &queue = outQueue(processId); queue.statOut(os, theLocalProcessId, processId); } os << "\n"; const auto &reader = localReader(); os << " kid" << theLocalProcessId << " reader flags: " << "{ blocked: " << reader.blocked() << ", signaled: " << reader.signaled() << " }\n"; } // FewToFewBiQueue template bool FewToFewBiQueue::findOldest(const int remoteProcessId, Value &value) const { // we may be called before remote process configured its queue end if (!validProcessId(remoteGroup(), remoteProcessId)) return false; // we need the oldest value, so start with the incoming, them-to-us queue: const OneToOneUniQueue &in = inQueue(remoteProcessId); debugs(54, 2, "peeking from " << remoteProcessId << " to " << theLocalProcessId << " at " << in.size()); if (in.peek(value)) return true; // if the incoming queue is empty, check the outgoing, us-to-them queue: const OneToOneUniQueue &out = outQueue(remoteProcessId); debugs(54, 2, "peeking from " << theLocalProcessId << " to " << remoteProcessId << " at " << out.size()); return out.peek(value); } } // namespace Ipc #endif /* SQUID_SRC_IPC_QUEUE_H */