/****************************************************************************** QtAV: Media play library based on Qt and FFmpeg Copyright (C) 2012-2016 Wang Bin * This file is part of QtAV This library is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 2.1 of the License, or (at your option) any later version. This library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this library; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA ******************************************************************************/ #ifndef QTAV_BLOCKINGQUEUE_H #define QTAV_BLOCKINGQUEUE_H #include #include #include //TODO: block full and empty condition separately QT_BEGIN_NAMESPACE template class QQueue; QT_END_NAMESPACE namespace FAV { template class Container = QQueue> class BlockingQueue { friend class VideoThread; public: BlockingQueue(); virtual ~BlockingQueue() {} void setCapacity(int max); //enqueue is allowed if less than capacity /*! * \brief setThreshold * do nothing if min >= capacity() * \param min */ void setThreshold(int min); //wake up and enqueue void put(const T& t); T take(); void setBlocking(bool block); //will wake if false. called when no more data can enqueue void blockEmpty(bool block); void blockFull(bool block); //TODO:setMinBlock,MaxBlock inline void clear(); // TODO: checkEmpty, Enough, Full? inline bool isEmpty() const; inline bool isEnough() const; //size > thres inline bool isFull() const; //size >= cap inline int size() const; inline int threshold() const; inline int capacity() const; class StateChangeCallback { public: virtual ~StateChangeCallback(){} virtual void call() = 0; }; void setEmptyCallback(StateChangeCallback* call); void setThresholdCallback(StateChangeCallback* call); void setFullCallback(StateChangeCallback* call); protected: /*! * \brief checkFull * Check whether the queue is full. Default implemention is compare queue size to capacity. * Full is now a more generic notion. You can implement it as checking queued bytes etc. * \return true if queue is full */ virtual bool checkFull() const; virtual bool checkEmpty() const; virtual bool checkEnough() const; virtual void onPut(const T&) {} virtual void onTake(const T&) {} bool block_empty, block_full; int cap, thres; Container queue; private: mutable QReadWriteLock lock; //locker in const func QReadWriteLock block_change_lock; QWaitCondition cond_full, cond_empty; //upto_threshold_callback, downto_threshold_callback QScopedPointer empty_callback, threshold_callback, full_callback; }; /* cap - thres = 24, about 1s * if fps is large, then larger capacity and threshold is preferred */ template class Container> BlockingQueue::BlockingQueue() :block_empty(true),block_full(true),cap(48),thres(32) , empty_callback(0) , threshold_callback(0) , full_callback(0) { } template class Container> void BlockingQueue::setCapacity(int max) { //qDebug("queue capacity==>>%d", max); QWriteLocker locker(&lock); Q_UNUSED(locker); cap = max; if (thres > cap) thres = cap; } template class Container> void BlockingQueue::setThreshold(int min) { //qDebug("queue threshold==>>%d", min); QWriteLocker locker(&lock); Q_UNUSED(locker); if (min > cap) return; thres = min; } template class Container> void BlockingQueue::put(const T& t) { QWriteLocker locker(&lock); Q_UNUSED(locker); if (checkFull()) { //qDebug("queue full"); //too frequent if (full_callback) { full_callback->call(); } if (block_full) { if(cond_full.wait(&lock,1000) == false) { return; //qInfo() << __FUNCTION__ << __LINE__ << "ERRRRRROOOO!"; } } } queue.enqueue(t); onPut(t); // emit bufferProgressChanged here if buffering if (checkEnough()) { cond_empty.wakeOne(); //emit buffering finished here //qDebug("queue is enough: %d/%d~%d", queue.size(), thres, cap); } else { //qDebug("buffering: %d/%d~%d", queue.size(), thres, cap); } } template class Container> T BlockingQueue::take() { QWriteLocker locker(&lock); Q_UNUSED(locker); //TODO:always block? if (checkEmpty()) { //qDebug("queue empty!!"); if (empty_callback) { empty_callback->call(); } if (block_empty) { // 여기서 S/W 정지함 //block when empty only //Q_ASSERT if(cond_empty.wait(&lock,1000) == false) // 00 { return T(); // qInfo() << __FUNCTION__ << __LINE__ << "ERRRRRROOOO!"; } } } if (checkEmpty()) { //qWarning("Queue is still empty"); if (empty_callback) { empty_callback->call(); } return T(); } T t(queue.dequeue()); cond_full.wakeOne(); onTake(t); // emit start buffering here if empty return t; } template class Container> void BlockingQueue::setBlocking(bool block) { QWriteLocker locker(&lock); Q_UNUSED(locker); block_empty = block_full = block; if (!block) { cond_empty.wakeAll(); //empty still wait. setBlock=>setCapacity(-1) cond_full.wakeAll(); } } template class Container> void BlockingQueue::blockEmpty(bool block) { if (!block) { cond_empty.wakeAll(); } QWriteLocker locker(&block_change_lock); Q_UNUSED(locker); block_empty = block; } template class Container> void BlockingQueue::blockFull(bool block) { if (!block) { cond_full.wakeAll(); } //DO NOT use the same lock that put() get() use. it may be already locked //this function usualy called in demux thread, so no lock is ok QWriteLocker locker(&block_change_lock); Q_UNUSED(locker); block_full = block; } template class Container> void BlockingQueue::clear() { QWriteLocker locker(&lock); Q_UNUSED(locker); //cond_empty.wakeAll(); cond_full.wakeAll(); queue.clear(); //TODO: assert not empty onTake(T()); } template class Container> bool BlockingQueue::isEmpty() const { QReadLocker locker(&lock); Q_UNUSED(locker); return queue.isEmpty(); } template class Container> bool BlockingQueue::isEnough() const { QReadLocker locker(&lock); Q_UNUSED(locker); return queue.size() >= thres; } template class Container> bool BlockingQueue::isFull() const { QReadLocker locker(&lock); Q_UNUSED(locker); return queue.size() >= cap; } template class Container> int BlockingQueue::size() const { QReadLocker locker(&lock); Q_UNUSED(locker); return queue.size(); } template class Container> int BlockingQueue::threshold() const { QReadLocker locker(&lock); Q_UNUSED(locker); return thres; } template class Container> int BlockingQueue::capacity() const { QReadLocker locker(&lock); Q_UNUSED(locker); return cap; } template class Container> void BlockingQueue::setEmptyCallback(StateChangeCallback *call) { QWriteLocker locker(&lock); Q_UNUSED(locker); empty_callback.reset(call); } template class Container> void BlockingQueue::setThresholdCallback(StateChangeCallback *call) { QWriteLocker locker(&lock); Q_UNUSED(locker); threshold_callback.reset(call); } template class Container> void BlockingQueue::setFullCallback(StateChangeCallback *call) { QWriteLocker locker(&lock); Q_UNUSED(locker); full_callback.reset(call); } template class Container> bool BlockingQueue::checkFull() const { return queue.size() >= cap; } template class Container> bool BlockingQueue::checkEmpty() const { return queue.isEmpty(); } template class Container> bool BlockingQueue::checkEnough() const { return queue.size() >= thres && !checkEmpty(); } } //namespace FAV #endif // QTAV_BLOCKINGQUEUE_H