//------------------------------------------------------------------------------ // File: OutputQ.cpp // // Desc: DirectShow base classes - implements COutputQueue class used by an // output pin which may sometimes want to queue output samples on a // separate thread and sometimes call Receive() directly on the input // pin. // // Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved. //------------------------------------------------------------------------------ #include // // COutputQueue Constructor : // // Determines if a thread is to be created and creates resources // // pInputPin - the downstream input pin we're queueing samples to // // phr - changed to a failure code if this function fails // (otherwise unchanges) // // bAuto - Ask pInputPin if it can block in Receive by calling // its ReceiveCanBlock method and create a thread if // it can block, otherwise not. // // bQueue - if bAuto == FALSE then we create a thread if and only // if bQueue == TRUE // // lBatchSize - work in batches of lBatchSize // // bBatchEact - Use exact batch sizes so don't send until the // batch is full or SendAnyway() is called // // lListSize - If we create a thread make the list of samples queued // to the thread have this size cache // // dwPriority - If we create a thread set its priority to this // COutputQueue::COutputQueue( IPin *pInputPin, // Pin to send stuff to __inout HRESULT *phr, // 'Return code' BOOL bAuto, // Ask pin if queue or not BOOL bQueue, // Send through queue LONG lBatchSize, // Batch BOOL bBatchExact, // Batch exactly to BatchSize LONG lListSize, DWORD dwPriority, bool bFlushingOpt // flushing optimization ) : m_lBatchSize(lBatchSize), m_bBatchExact(bBatchExact && (lBatchSize > 1)), m_hThread(NULL), m_hSem(NULL), m_List(NULL), m_pPin(pInputPin), m_ppSamples(NULL), m_lWaiting(0), m_evFlushComplete(FALSE, phr), m_pInputPin(NULL), m_bSendAnyway(FALSE), m_nBatched(0), m_bFlushing(FALSE), m_bFlushed(TRUE), m_bFlushingOpt(bFlushingOpt), m_bTerminate(FALSE), m_hEventPop(NULL), m_hr(S_OK) { ASSERT(m_lBatchSize > 0); if (FAILED(*phr)) { return; } // Check the input pin is OK and cache its IMemInputPin interface *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin); if (FAILED(*phr)) { return; } // See if we should ask the downstream pin if (bAuto) { HRESULT hr = m_pInputPin->ReceiveCanBlock(); if (SUCCEEDED(hr)) { bQueue = hr == S_OK; } } // Create our sample batch m_ppSamples = new PMEDIASAMPLE[m_lBatchSize]; if (m_ppSamples == NULL) { *phr = E_OUTOFMEMORY; return; } // If we're queueing allocate resources if (bQueue) { DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin"))); m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL); if (m_hSem == NULL) { DWORD dwError = GetLastError(); *phr = AmHresultFromWin32(dwError); return; } m_List = new CSampleList(NAME("Sample Queue List"), lListSize, FALSE // No lock ); if (m_List == NULL) { *phr = E_OUTOFMEMORY; return; } DWORD dwThreadId; m_hThread = CreateThread(NULL, 0, InitialThreadProc, (LPVOID)this, 0, &dwThreadId); if (m_hThread == NULL) { DWORD dwError = GetLastError(); *phr = AmHresultFromWin32(dwError); return; } SetThreadPriority(m_hThread, dwPriority); } else { DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread"))); } } // // COutputQueuee Destructor : // // Free all resources - // // Thread, // Batched samples // COutputQueue::~COutputQueue() { DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue"))); /* Free our pointer */ if (m_pInputPin != NULL) { m_pInputPin->Release(); } if (m_hThread != NULL) { { CAutoLock lck(this); m_bTerminate = TRUE; m_hr = S_FALSE; NotifyThread(); } DbgWaitForSingleObject(m_hThread); EXECUTE_ASSERT(CloseHandle(m_hThread)); // The thread frees the samples when asked to terminate ASSERT(m_List->GetCount() == 0); delete m_List; } else { FreeSamples(); } if (m_hSem != NULL) { EXECUTE_ASSERT(CloseHandle(m_hSem)); } delete [] m_ppSamples; } // // Call the real thread proc as a member function // DWORD WINAPI COutputQueue::InitialThreadProc(__in LPVOID pv) { HRESULT hrCoInit = CAMThread::CoInitializeHelper(); COutputQueue *pSampleQueue = (COutputQueue *)pv; DWORD dwReturn = pSampleQueue->ThreadProc(); if(hrCoInit == S_OK) { CoUninitialize(); } return dwReturn; } // // Thread sending the samples downstream : // // When there is nothing to do the thread sets m_lWaiting (while // holding the critical section) and then waits for m_hSem to be // set (not holding the critical section) // DWORD COutputQueue::ThreadProc() { while (TRUE) { BOOL bWait = FALSE; IMediaSample *pSample; LONG lNumberToSend; // Local copy NewSegmentPacket* ppacket; // // Get a batch of samples and send it if possible // In any case exit the loop if there is a control action // requested // { CAutoLock lck(this); while (TRUE) { if (m_bTerminate) { FreeSamples(); return 0; } if (m_bFlushing) { FreeSamples(); SetEvent(m_evFlushComplete); } // Get a sample off the list pSample = m_List->RemoveHead(); // inform derived class we took something off the queue if (m_hEventPop) { //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT"))); SetEvent(m_hEventPop); } if (pSample != NULL && !IsSpecialSample(pSample)) { // If its just a regular sample just add it to the batch // and exit the loop if the batch is full m_ppSamples[m_nBatched++] = pSample; if (m_nBatched == m_lBatchSize) { break; } } else { // If there was nothing in the queue and there's nothing // to send (either because there's nothing or the batch // isn't full) then prepare to wait if (pSample == NULL && (m_bBatchExact || m_nBatched == 0)) { // Tell other thread to set the event when there's // something do to ASSERT(m_lWaiting == 0); m_lWaiting++; bWait = TRUE; } else { // We break out of the loop on SEND_PACKET unless // there's nothing to send if (pSample == SEND_PACKET && m_nBatched == 0) { continue; } if (pSample == NEW_SEGMENT) { // now we need the parameters - we are // guaranteed that the next packet contains them ppacket = (NewSegmentPacket *) m_List->RemoveHead(); // we took something off the queue if (m_hEventPop) { //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT"))); SetEvent(m_hEventPop); } ASSERT(ppacket); } // EOS_PACKET falls through here and we exit the loop // In this way it acts like SEND_PACKET } break; } } if (!bWait) { // We look at m_nBatched from the client side so keep // it up to date inside the critical section lNumberToSend = m_nBatched; // Local copy m_nBatched = 0; } } // Wait for some more data if (bWait) { DbgWaitForSingleObject(m_hSem); continue; } // OK - send it if there's anything to send // We DON'T check m_bBatchExact here because either we've got // a full batch or we dropped through because we got // SEND_PACKET or EOS_PACKET - both of which imply we should // flush our batch if (lNumberToSend != 0) { long nProcessed; if (m_hr == S_OK) { ASSERT(!m_bFlushed); HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples, lNumberToSend, &nProcessed); /* Don't overwrite a flushing state HRESULT */ CAutoLock lck(this); if (m_hr == S_OK) { m_hr = hr; } ASSERT(!m_bFlushed); } while (lNumberToSend != 0) { m_ppSamples[--lNumberToSend]->Release(); } if (m_hr != S_OK) { // In any case wait for more data - S_OK just // means there wasn't an error DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"), m_hr)); } } // Check for end of stream if (pSample == EOS_PACKET) { // We don't send even end of stream on if we've previously // returned something other than S_OK // This is because in that case the pin which returned // something other than S_OK should have either sent // EndOfStream() or notified the filter graph if (m_hr == S_OK) { DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()"))); HRESULT hr = m_pPin->EndOfStream(); if (FAILED(hr)) { DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()"))); } } } // Data from a new source if (pSample == RESET_PACKET) { m_hr = S_OK; SetEvent(m_evFlushComplete); } if (pSample == NEW_SEGMENT) { m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate); delete ppacket; } } } // Send batched stuff anyway void COutputQueue::SendAnyway() { if (!IsQueued()) { // m_bSendAnyway is a private parameter checked in ReceiveMultiple m_bSendAnyway = TRUE; LONG nProcessed; ReceiveMultiple(NULL, 0, &nProcessed); m_bSendAnyway = FALSE; } else { CAutoLock lck(this); QueueSample(SEND_PACKET); NotifyThread(); } } void COutputQueue::NewSegment( REFERENCE_TIME tStart, REFERENCE_TIME tStop, double dRate) { if (!IsQueued()) { if (S_OK == m_hr) { if (m_bBatchExact) { SendAnyway(); } m_pPin->NewSegment(tStart, tStop, dRate); } } else { if (m_hr == S_OK) { // // we need to queue the new segment to appear in order in the // data, but we need to pass parameters to it. Rather than // take the hit of wrapping every single sample so we can tell // special ones apart, we queue special pointers to indicate // special packets, and we guarantee (by holding the // critical section) that the packet immediately following a // NEW_SEGMENT value is a NewSegmentPacket containing the // parameters. NewSegmentPacket * ppack = new NewSegmentPacket; if (ppack == NULL) { return; } ppack->tStart = tStart; ppack->tStop = tStop; ppack->dRate = dRate; CAutoLock lck(this); QueueSample(NEW_SEGMENT); QueueSample( (IMediaSample*) ppack); NotifyThread(); } } } // // End of Stream is queued to output device // void COutputQueue::EOS() { CAutoLock lck(this); if (!IsQueued()) { if (m_bBatchExact) { SendAnyway(); } if (m_hr == S_OK) { DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()"))); m_bFlushed = FALSE; HRESULT hr = m_pPin->EndOfStream(); if (FAILED(hr)) { DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()"))); } } } else { if (m_hr == S_OK) { m_bFlushed = FALSE; QueueSample(EOS_PACKET); NotifyThread(); } } } // // Flush all the samples in the queue // void COutputQueue::BeginFlush() { if (IsQueued()) { { CAutoLock lck(this); // block receives -- we assume this is done by the // filter in which we are a component // discard all queued data m_bFlushing = TRUE; // Make sure we discard all samples from now on if (m_hr == S_OK) { m_hr = S_FALSE; } // Optimize so we don't keep calling downstream all the time if (m_bFlushed && m_bFlushingOpt) { return; } // Make sure we really wait for the flush to complete m_evFlushComplete.Reset(); NotifyThread(); } // pass this downstream m_pPin->BeginFlush(); } else { // pass downstream first to avoid deadlocks m_pPin->BeginFlush(); CAutoLock lck(this); // discard all queued data m_bFlushing = TRUE; // Make sure we discard all samples from now on if (m_hr == S_OK) { m_hr = S_FALSE; } } } // // leave flush mode - pass this downstream void COutputQueue::EndFlush() { { CAutoLock lck(this); ASSERT(m_bFlushing); if (m_bFlushingOpt && m_bFlushed && IsQueued()) { m_bFlushing = FALSE; m_hr = S_OK; return; } } // sync with pushing thread -- done in BeginFlush // ensure no more data to go downstream -- done in BeginFlush // // Because we are synching here there is no need to hold the critical // section (in fact we'd deadlock if we did!) if (IsQueued()) { m_evFlushComplete.Wait(); } else { FreeSamples(); } // Be daring - the caller has guaranteed no samples will arrive // before EndFlush() returns m_bFlushing = FALSE; m_bFlushed = TRUE; // call EndFlush on downstream pins m_pPin->EndFlush(); m_hr = S_OK; } // COutputQueue::QueueSample // // private method to Send a sample to the output queue // The critical section MUST be held when this is called void COutputQueue::QueueSample(IMediaSample *pSample) { if (NULL == m_List->AddTail(pSample)) { if (!IsSpecialSample(pSample)) { pSample->Release(); } } } // // COutputQueue::Receive() // // Send a single sample by the multiple sample route // (NOTE - this could be optimized if necessary) // // On return the sample will have been Release()'d // HRESULT COutputQueue::Receive(IMediaSample *pSample) { LONG nProcessed; return ReceiveMultiple(&pSample, 1, &nProcessed); } // // COutputQueue::ReceiveMultiple() // // Send a set of samples to the downstream pin // // ppSamples - array of samples // nSamples - how many // nSamplesProcessed - How many were processed // // On return all samples will have been Release()'d // HRESULT COutputQueue::ReceiveMultiple ( __in_ecount(nSamples) IMediaSample **ppSamples, long nSamples, __out long *nSamplesProcessed) { if (nSamples < 0) { return E_INVALIDARG; } CAutoLock lck(this); // Either call directly or queue up the samples if (!IsQueued()) { // If we already had a bad return code then just return if (S_OK != m_hr) { // If we've never received anything since the last Flush() // and the sticky return code is not S_OK we must be // flushing // ((!A || B) is equivalent to A implies B) ASSERT(!m_bFlushed || m_bFlushing); // We're supposed to Release() them anyway! *nSamplesProcessed = 0; for (int i = 0; i < nSamples; i++) { DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"), nSamples, m_hr)); ppSamples[i]->Release(); } return m_hr; } // // If we're flushing the sticky return code should be S_FALSE // ASSERT(!m_bFlushing); m_bFlushed = FALSE; ASSERT(m_nBatched < m_lBatchSize); ASSERT(m_nBatched == 0 || m_bBatchExact); // Loop processing the samples in batches LONG iLost = 0; long iDone = 0; for (iDone = 0; iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway); ) { //pragma message (REMIND("Implement threshold scheme")) ASSERT(m_nBatched < m_lBatchSize); if (iDone < nSamples) { m_ppSamples[m_nBatched++] = ppSamples[iDone++]; } if (m_nBatched == m_lBatchSize || nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) { LONG nDone; DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"), m_nBatched)); if (m_hr == S_OK) { m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples, m_nBatched, &nDone); } else { nDone = 0; } iLost += m_nBatched - nDone; for (LONG i = 0; i < m_nBatched; i++) { m_ppSamples[i]->Release(); } m_nBatched = 0; } } *nSamplesProcessed = iDone - iLost; if (*nSamplesProcessed < 0) { *nSamplesProcessed = 0; } return m_hr; } else { /* We're sending to our thread */ if (m_hr != S_OK) { *nSamplesProcessed = 0; DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"), nSamples, m_hr)); for (int i = 0; i < nSamples; i++) { ppSamples[i]->Release(); } return m_hr; } m_bFlushed = FALSE; for (long i = 0; i < nSamples; i++) { QueueSample(ppSamples[i]); } *nSamplesProcessed = nSamples; if (!m_bBatchExact || m_nBatched + m_List->GetCount() >= m_lBatchSize) { NotifyThread(); } return S_OK; } } // Get ready for new data - cancels sticky m_hr void COutputQueue::Reset() { if (!IsQueued()) { m_hr = S_OK; } else { { CAutoLock lck(this); QueueSample(RESET_PACKET); NotifyThread(); } m_evFlushComplete.Wait(); } } // Remove and Release() all queued and Batched samples void COutputQueue::FreeSamples() { CAutoLock lck(this); if (IsQueued()) { while (TRUE) { IMediaSample *pSample = m_List->RemoveHead(); // inform derived class we took something off the queue if (m_hEventPop) { //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT"))); SetEvent(m_hEventPop); } if (pSample == NULL) { break; } if (!IsSpecialSample(pSample)) { pSample->Release(); } else { if (pSample == NEW_SEGMENT) { // Free NEW_SEGMENT packet NewSegmentPacket *ppacket = (NewSegmentPacket *) m_List->RemoveHead(); // inform derived class we took something off the queue if (m_hEventPop) { //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered SET EVENT"))); SetEvent(m_hEventPop); } ASSERT(ppacket != NULL); delete ppacket; } } } } for (int i = 0; i < m_nBatched; i++) { m_ppSamples[i]->Release(); } m_nBatched = 0; } // Notify the thread if there is something to do // // The critical section MUST be held when this is called void COutputQueue::NotifyThread() { // Optimize - no need to signal if it's not waiting ASSERT(IsQueued()); if (m_lWaiting) { ReleaseSemaphore(m_hSem, m_lWaiting, NULL); m_lWaiting = 0; } } // See if there's any work to do // Returns // TRUE if there is nothing on the queue and nothing in the batch // and all data has been sent // FALSE otherwise // BOOL COutputQueue::IsIdle() { CAutoLock lck(this); // We're idle if // there is no thread (!IsQueued()) OR // the thread is waiting for more work (m_lWaiting != 0) // AND // there's nothing in the current batch (m_nBatched == 0) if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) { return FALSE; } else { // If we're idle it shouldn't be possible for there // to be anything on the work queue ASSERT(!IsQueued() || m_List->GetCount() == 0); return TRUE; } } void COutputQueue::SetPopEvent(HANDLE hEvent) { m_hEventPop = hEvent; }