summaryrefslogtreecommitdiff
path: root/third_party/BaseClasses/outputq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'third_party/BaseClasses/outputq.cpp')
-rw-r--r--third_party/BaseClasses/outputq.cpp801
1 files changed, 801 insertions, 0 deletions
diff --git a/third_party/BaseClasses/outputq.cpp b/third_party/BaseClasses/outputq.cpp
new file mode 100644
index 00000000..d3ab6175
--- /dev/null
+++ b/third_party/BaseClasses/outputq.cpp
@@ -0,0 +1,801 @@
+//------------------------------------------------------------------------------
+// File: OutputQ.cpp
+//
+// Desc: DirectShow base classes - implements COutputQueue class used by an
+// output pin which may sometimes want to queue output samples on a
+// separate thread and sometimes call Receive() directly on the input
+// pin.
+//
+// Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved.
+//------------------------------------------------------------------------------
+
+
+#include <streams.h>
+
+
+//
+// COutputQueue Constructor :
+//
+// Determines if a thread is to be created and creates resources
+//
+// pInputPin - the downstream input pin we're queueing samples to
+//
+// phr - changed to a failure code if this function fails
+// (otherwise unchanges)
+//
+// bAuto - Ask pInputPin if it can block in Receive by calling
+// its ReceiveCanBlock method and create a thread if
+// it can block, otherwise not.
+//
+// bQueue - if bAuto == FALSE then we create a thread if and only
+// if bQueue == TRUE
+//
+// lBatchSize - work in batches of lBatchSize
+//
+// bBatchEact - Use exact batch sizes so don't send until the
+// batch is full or SendAnyway() is called
+//
+// lListSize - If we create a thread make the list of samples queued
+// to the thread have this size cache
+//
+// dwPriority - If we create a thread set its priority to this
+//
+COutputQueue::COutputQueue(
+ IPin *pInputPin, // Pin to send stuff to
+ __inout HRESULT *phr, // 'Return code'
+ BOOL bAuto, // Ask pin if queue or not
+ BOOL bQueue, // Send through queue
+ LONG lBatchSize, // Batch
+ BOOL bBatchExact, // Batch exactly to BatchSize
+ LONG lListSize,
+ DWORD dwPriority,
+ bool bFlushingOpt // flushing optimization
+ ) : m_lBatchSize(lBatchSize),
+ m_bBatchExact(bBatchExact && (lBatchSize > 1)),
+ m_hThread(NULL),
+ m_hSem(NULL),
+ m_List(NULL),
+ m_pPin(pInputPin),
+ m_ppSamples(NULL),
+ m_lWaiting(0),
+ m_evFlushComplete(FALSE, phr),
+ m_pInputPin(NULL),
+ m_bSendAnyway(FALSE),
+ m_nBatched(0),
+ m_bFlushing(FALSE),
+ m_bFlushed(TRUE),
+ m_bFlushingOpt(bFlushingOpt),
+ m_bTerminate(FALSE),
+ m_hEventPop(NULL),
+ m_hr(S_OK)
+{
+ ASSERT(m_lBatchSize > 0);
+
+
+ if (FAILED(*phr)) {
+ return;
+ }
+
+ // Check the input pin is OK and cache its IMemInputPin interface
+
+ *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
+ if (FAILED(*phr)) {
+ return;
+ }
+
+ // See if we should ask the downstream pin
+
+ if (bAuto) {
+ HRESULT hr = m_pInputPin->ReceiveCanBlock();
+ if (SUCCEEDED(hr)) {
+ bQueue = hr == S_OK;
+ }
+ }
+
+ // Create our sample batch
+
+ m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
+ if (m_ppSamples == NULL) {
+ *phr = E_OUTOFMEMORY;
+ return;
+ }
+
+ // If we're queueing allocate resources
+
+ if (bQueue) {
+ DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
+ m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
+ if (m_hSem == NULL) {
+ DWORD dwError = GetLastError();
+ *phr = AmHresultFromWin32(dwError);
+ return;
+ }
+ m_List = new CSampleList(NAME("Sample Queue List"),
+ lListSize,
+ FALSE // No lock
+ );
+ if (m_List == NULL) {
+ *phr = E_OUTOFMEMORY;
+ return;
+ }
+
+
+ DWORD dwThreadId;
+ m_hThread = CreateThread(NULL,
+ 0,
+ InitialThreadProc,
+ (LPVOID)this,
+ 0,
+ &dwThreadId);
+ if (m_hThread == NULL) {
+ DWORD dwError = GetLastError();
+ *phr = AmHresultFromWin32(dwError);
+ return;
+ }
+ SetThreadPriority(m_hThread, dwPriority);
+ } else {
+ DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
+ }
+}
+
+//
+// COutputQueuee Destructor :
+//
+// Free all resources -
+//
+// Thread,
+// Batched samples
+//
+COutputQueue::~COutputQueue()
+{
+ DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
+ /* Free our pointer */
+ if (m_pInputPin != NULL) {
+ m_pInputPin->Release();
+ }
+ if (m_hThread != NULL) {
+ {
+ CAutoLock lck(this);
+ m_bTerminate = TRUE;
+ m_hr = S_FALSE;
+ NotifyThread();
+ }
+ DbgWaitForSingleObject(m_hThread);
+ EXECUTE_ASSERT(CloseHandle(m_hThread));
+
+ // The thread frees the samples when asked to terminate
+
+ ASSERT(m_List->GetCount() == 0);
+ delete m_List;
+ } else {
+ FreeSamples();
+ }
+ if (m_hSem != NULL) {
+ EXECUTE_ASSERT(CloseHandle(m_hSem));
+ }
+ delete [] m_ppSamples;
+}
+
+//
+// Call the real thread proc as a member function
+//
+DWORD WINAPI COutputQueue::InitialThreadProc(__in LPVOID pv)
+{
+ HRESULT hrCoInit = CAMThread::CoInitializeHelper();
+
+ COutputQueue *pSampleQueue = (COutputQueue *)pv;
+ DWORD dwReturn = pSampleQueue->ThreadProc();
+
+ if(hrCoInit == S_OK) {
+ CoUninitialize();
+ }
+
+ return dwReturn;
+}
+
+//
+// Thread sending the samples downstream :
+//
+// When there is nothing to do the thread sets m_lWaiting (while
+// holding the critical section) and then waits for m_hSem to be
+// set (not holding the critical section)
+//
+DWORD COutputQueue::ThreadProc()
+{
+ while (TRUE) {
+ BOOL bWait = FALSE;
+ IMediaSample *pSample;
+ LONG lNumberToSend; // Local copy
+ NewSegmentPacket* ppacket;
+
+ //
+ // Get a batch of samples and send it if possible
+ // In any case exit the loop if there is a control action
+ // requested
+ //
+ {
+ CAutoLock lck(this);
+ while (TRUE) {
+
+ if (m_bTerminate) {
+ FreeSamples();
+ return 0;
+ }
+ if (m_bFlushing) {
+ FreeSamples();
+ SetEvent(m_evFlushComplete);
+ }
+
+ // Get a sample off the list
+
+ pSample = m_List->RemoveHead();
+ // inform derived class we took something off the queue
+ if (m_hEventPop) {
+ //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
+ SetEvent(m_hEventPop);
+ }
+
+ if (pSample != NULL &&
+ !IsSpecialSample(pSample)) {
+
+ // If its just a regular sample just add it to the batch
+ // and exit the loop if the batch is full
+
+ m_ppSamples[m_nBatched++] = pSample;
+ if (m_nBatched == m_lBatchSize) {
+ break;
+ }
+ } else {
+
+ // If there was nothing in the queue and there's nothing
+ // to send (either because there's nothing or the batch
+ // isn't full) then prepare to wait
+
+ if (pSample == NULL &&
+ (m_bBatchExact || m_nBatched == 0)) {
+
+ // Tell other thread to set the event when there's
+ // something do to
+
+ ASSERT(m_lWaiting == 0);
+ m_lWaiting++;
+ bWait = TRUE;
+ } else {
+
+ // We break out of the loop on SEND_PACKET unless
+ // there's nothing to send
+
+ if (pSample == SEND_PACKET && m_nBatched == 0) {
+ continue;
+ }
+
+ if (pSample == NEW_SEGMENT) {
+ // now we need the parameters - we are
+ // guaranteed that the next packet contains them
+ ppacket = (NewSegmentPacket *) m_List->RemoveHead();
+ // we took something off the queue
+ if (m_hEventPop) {
+ //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
+ SetEvent(m_hEventPop);
+ }
+
+ ASSERT(ppacket);
+ }
+ // EOS_PACKET falls through here and we exit the loop
+ // In this way it acts like SEND_PACKET
+ }
+ break;
+ }
+ }
+ if (!bWait) {
+ // We look at m_nBatched from the client side so keep
+ // it up to date inside the critical section
+ lNumberToSend = m_nBatched; // Local copy
+ m_nBatched = 0;
+ }
+ }
+
+ // Wait for some more data
+
+ if (bWait) {
+ DbgWaitForSingleObject(m_hSem);
+ continue;
+ }
+
+
+
+ // OK - send it if there's anything to send
+ // We DON'T check m_bBatchExact here because either we've got
+ // a full batch or we dropped through because we got
+ // SEND_PACKET or EOS_PACKET - both of which imply we should
+ // flush our batch
+
+ if (lNumberToSend != 0) {
+ long nProcessed;
+ if (m_hr == S_OK) {
+ ASSERT(!m_bFlushed);
+ HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
+ lNumberToSend,
+ &nProcessed);
+ /* Don't overwrite a flushing state HRESULT */
+ CAutoLock lck(this);
+ if (m_hr == S_OK) {
+ m_hr = hr;
+ }
+ ASSERT(!m_bFlushed);
+ }
+ while (lNumberToSend != 0) {
+ m_ppSamples[--lNumberToSend]->Release();
+ }
+ if (m_hr != S_OK) {
+
+ // In any case wait for more data - S_OK just
+ // means there wasn't an error
+
+ DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
+ m_hr));
+ }
+ }
+
+ // Check for end of stream
+
+ if (pSample == EOS_PACKET) {
+
+ // We don't send even end of stream on if we've previously
+ // returned something other than S_OK
+ // This is because in that case the pin which returned
+ // something other than S_OK should have either sent
+ // EndOfStream() or notified the filter graph
+
+ if (m_hr == S_OK) {
+ DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
+ HRESULT hr = m_pPin->EndOfStream();
+ if (FAILED(hr)) {
+ DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
+ }
+ }
+ }
+
+ // Data from a new source
+
+ if (pSample == RESET_PACKET) {
+ m_hr = S_OK;
+ SetEvent(m_evFlushComplete);
+ }
+
+ if (pSample == NEW_SEGMENT) {
+ m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
+ delete ppacket;
+ }
+ }
+}
+
+// Send batched stuff anyway
+void COutputQueue::SendAnyway()
+{
+ if (!IsQueued()) {
+
+ // m_bSendAnyway is a private parameter checked in ReceiveMultiple
+
+ m_bSendAnyway = TRUE;
+ LONG nProcessed;
+ ReceiveMultiple(NULL, 0, &nProcessed);
+ m_bSendAnyway = FALSE;
+
+ } else {
+ CAutoLock lck(this);
+ QueueSample(SEND_PACKET);
+ NotifyThread();
+ }
+}
+
+void
+COutputQueue::NewSegment(
+ REFERENCE_TIME tStart,
+ REFERENCE_TIME tStop,
+ double dRate)
+{
+ if (!IsQueued()) {
+ if (S_OK == m_hr) {
+ if (m_bBatchExact) {
+ SendAnyway();
+ }
+ m_pPin->NewSegment(tStart, tStop, dRate);
+ }
+ } else {
+ if (m_hr == S_OK) {
+ //
+ // we need to queue the new segment to appear in order in the
+ // data, but we need to pass parameters to it. Rather than
+ // take the hit of wrapping every single sample so we can tell
+ // special ones apart, we queue special pointers to indicate
+ // special packets, and we guarantee (by holding the
+ // critical section) that the packet immediately following a
+ // NEW_SEGMENT value is a NewSegmentPacket containing the
+ // parameters.
+ NewSegmentPacket * ppack = new NewSegmentPacket;
+ if (ppack == NULL) {
+ return;
+ }
+ ppack->tStart = tStart;
+ ppack->tStop = tStop;
+ ppack->dRate = dRate;
+
+ CAutoLock lck(this);
+ QueueSample(NEW_SEGMENT);
+ QueueSample( (IMediaSample*) ppack);
+ NotifyThread();
+ }
+ }
+}
+
+
+//
+// End of Stream is queued to output device
+//
+void COutputQueue::EOS()
+{
+ CAutoLock lck(this);
+ if (!IsQueued()) {
+ if (m_bBatchExact) {
+ SendAnyway();
+ }
+ if (m_hr == S_OK) {
+ DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
+ m_bFlushed = FALSE;
+ HRESULT hr = m_pPin->EndOfStream();
+ if (FAILED(hr)) {
+ DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
+ }
+ }
+ } else {
+ if (m_hr == S_OK) {
+ m_bFlushed = FALSE;
+ QueueSample(EOS_PACKET);
+ NotifyThread();
+ }
+ }
+}
+
+//
+// Flush all the samples in the queue
+//
+void COutputQueue::BeginFlush()
+{
+ if (IsQueued()) {
+ {
+ CAutoLock lck(this);
+
+ // block receives -- we assume this is done by the
+ // filter in which we are a component
+
+ // discard all queued data
+
+ m_bFlushing = TRUE;
+
+ // Make sure we discard all samples from now on
+
+ if (m_hr == S_OK) {
+ m_hr = S_FALSE;
+ }
+
+ // Optimize so we don't keep calling downstream all the time
+
+ if (m_bFlushed && m_bFlushingOpt) {
+ return;
+ }
+
+ // Make sure we really wait for the flush to complete
+ m_evFlushComplete.Reset();
+
+ NotifyThread();
+ }
+
+ // pass this downstream
+
+ m_pPin->BeginFlush();
+ } else {
+ // pass downstream first to avoid deadlocks
+ m_pPin->BeginFlush();
+ CAutoLock lck(this);
+ // discard all queued data
+
+ m_bFlushing = TRUE;
+
+ // Make sure we discard all samples from now on
+
+ if (m_hr == S_OK) {
+ m_hr = S_FALSE;
+ }
+ }
+
+}
+
+//
+// leave flush mode - pass this downstream
+void COutputQueue::EndFlush()
+{
+ {
+ CAutoLock lck(this);
+ ASSERT(m_bFlushing);
+ if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
+ m_bFlushing = FALSE;
+ m_hr = S_OK;
+ return;
+ }
+ }
+
+ // sync with pushing thread -- done in BeginFlush
+ // ensure no more data to go downstream -- done in BeginFlush
+ //
+ // Because we are synching here there is no need to hold the critical
+ // section (in fact we'd deadlock if we did!)
+
+ if (IsQueued()) {
+ m_evFlushComplete.Wait();
+ } else {
+ FreeSamples();
+ }
+
+ // Be daring - the caller has guaranteed no samples will arrive
+ // before EndFlush() returns
+
+ m_bFlushing = FALSE;
+ m_bFlushed = TRUE;
+
+ // call EndFlush on downstream pins
+
+ m_pPin->EndFlush();
+
+ m_hr = S_OK;
+}
+
+// COutputQueue::QueueSample
+//
+// private method to Send a sample to the output queue
+// The critical section MUST be held when this is called
+
+void COutputQueue::QueueSample(IMediaSample *pSample)
+{
+ if (NULL == m_List->AddTail(pSample)) {
+ if (!IsSpecialSample(pSample)) {
+ pSample->Release();
+ }
+ }
+}
+
+//
+// COutputQueue::Receive()
+//
+// Send a single sample by the multiple sample route
+// (NOTE - this could be optimized if necessary)
+//
+// On return the sample will have been Release()'d
+//
+
+HRESULT COutputQueue::Receive(IMediaSample *pSample)
+{
+ LONG nProcessed;
+ return ReceiveMultiple(&pSample, 1, &nProcessed);
+}
+
+//
+// COutputQueue::ReceiveMultiple()
+//
+// Send a set of samples to the downstream pin
+//
+// ppSamples - array of samples
+// nSamples - how many
+// nSamplesProcessed - How many were processed
+//
+// On return all samples will have been Release()'d
+//
+
+HRESULT COutputQueue::ReceiveMultiple (
+ __in_ecount(nSamples) IMediaSample **ppSamples,
+ long nSamples,
+ __out long *nSamplesProcessed)
+{
+ if (nSamples < 0) {
+ return E_INVALIDARG;
+ }
+
+ CAutoLock lck(this);
+ // Either call directly or queue up the samples
+
+ if (!IsQueued()) {
+
+ // If we already had a bad return code then just return
+
+ if (S_OK != m_hr) {
+
+ // If we've never received anything since the last Flush()
+ // and the sticky return code is not S_OK we must be
+ // flushing
+ // ((!A || B) is equivalent to A implies B)
+ ASSERT(!m_bFlushed || m_bFlushing);
+
+ // We're supposed to Release() them anyway!
+ *nSamplesProcessed = 0;
+ for (int i = 0; i < nSamples; i++) {
+ DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
+ nSamples, m_hr));
+ ppSamples[i]->Release();
+ }
+
+ return m_hr;
+ }
+ //
+ // If we're flushing the sticky return code should be S_FALSE
+ //
+ ASSERT(!m_bFlushing);
+ m_bFlushed = FALSE;
+
+ ASSERT(m_nBatched < m_lBatchSize);
+ ASSERT(m_nBatched == 0 || m_bBatchExact);
+
+ // Loop processing the samples in batches
+
+ LONG iLost = 0;
+ long iDone = 0;
+ for (iDone = 0;
+ iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
+ ) {
+
+//pragma message (REMIND("Implement threshold scheme"))
+ ASSERT(m_nBatched < m_lBatchSize);
+ if (iDone < nSamples) {
+ m_ppSamples[m_nBatched++] = ppSamples[iDone++];
+ }
+ if (m_nBatched == m_lBatchSize ||
+ nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
+ LONG nDone;
+ DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
+ m_nBatched));
+
+ if (m_hr == S_OK) {
+ m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
+ m_nBatched,
+ &nDone);
+ } else {
+ nDone = 0;
+ }
+ iLost += m_nBatched - nDone;
+ for (LONG i = 0; i < m_nBatched; i++) {
+ m_ppSamples[i]->Release();
+ }
+ m_nBatched = 0;
+ }
+ }
+ *nSamplesProcessed = iDone - iLost;
+ if (*nSamplesProcessed < 0) {
+ *nSamplesProcessed = 0;
+ }
+ return m_hr;
+ } else {
+ /* We're sending to our thread */
+
+ if (m_hr != S_OK) {
+ *nSamplesProcessed = 0;
+ DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
+ nSamples, m_hr));
+ for (int i = 0; i < nSamples; i++) {
+ ppSamples[i]->Release();
+ }
+ return m_hr;
+ }
+ m_bFlushed = FALSE;
+ for (long i = 0; i < nSamples; i++) {
+ QueueSample(ppSamples[i]);
+ }
+ *nSamplesProcessed = nSamples;
+ if (!m_bBatchExact ||
+ m_nBatched + m_List->GetCount() >= m_lBatchSize) {
+ NotifyThread();
+ }
+ return S_OK;
+ }
+}
+
+// Get ready for new data - cancels sticky m_hr
+void COutputQueue::Reset()
+{
+ if (!IsQueued()) {
+ m_hr = S_OK;
+ } else {
+ {
+ CAutoLock lck(this);
+ QueueSample(RESET_PACKET);
+ NotifyThread();
+ }
+ m_evFlushComplete.Wait();
+ }
+}
+
+// Remove and Release() all queued and Batched samples
+void COutputQueue::FreeSamples()
+{
+ CAutoLock lck(this);
+ if (IsQueued()) {
+ while (TRUE) {
+ IMediaSample *pSample = m_List->RemoveHead();
+ // inform derived class we took something off the queue
+ if (m_hEventPop) {
+ //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
+ SetEvent(m_hEventPop);
+ }
+
+ if (pSample == NULL) {
+ break;
+ }
+ if (!IsSpecialSample(pSample)) {
+ pSample->Release();
+ } else {
+ if (pSample == NEW_SEGMENT) {
+ // Free NEW_SEGMENT packet
+ NewSegmentPacket *ppacket =
+ (NewSegmentPacket *) m_List->RemoveHead();
+ // inform derived class we took something off the queue
+ if (m_hEventPop) {
+ //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT")));
+ SetEvent(m_hEventPop);
+ }
+
+ ASSERT(ppacket != NULL);
+ delete ppacket;
+ }
+ }
+ }
+ }
+ for (int i = 0; i < m_nBatched; i++) {
+ m_ppSamples[i]->Release();
+ }
+ m_nBatched = 0;
+}
+
+// Notify the thread if there is something to do
+//
+// The critical section MUST be held when this is called
+void COutputQueue::NotifyThread()
+{
+ // Optimize - no need to signal if it's not waiting
+ ASSERT(IsQueued());
+ if (m_lWaiting) {
+ ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
+ m_lWaiting = 0;
+ }
+}
+
+// See if there's any work to do
+// Returns
+// TRUE if there is nothing on the queue and nothing in the batch
+// and all data has been sent
+// FALSE otherwise
+//
+BOOL COutputQueue::IsIdle()
+{
+ CAutoLock lck(this);
+
+ // We're idle if
+ // there is no thread (!IsQueued()) OR
+ // the thread is waiting for more work (m_lWaiting != 0)
+ // AND
+ // there's nothing in the current batch (m_nBatched == 0)
+
+ if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
+ return FALSE;
+ } else {
+
+ // If we're idle it shouldn't be possible for there
+ // to be anything on the work queue
+
+ ASSERT(!IsQueued() || m_List->GetCount() == 0);
+ return TRUE;
+ }
+}
+
+
+void COutputQueue::SetPopEvent(HANDLE hEvent)
+{
+ m_hEventPop = hEvent;
+}