//------------------------------------------------------------------------------ // File: PullPin.cpp // // Desc: DirectShow base classes - implements CPullPin class that pulls data // from IAsyncReader. // // Copyright (c) 1992-2001 Microsoft Corporation. All rights reserved. //------------------------------------------------------------------------------ #include #include "pullpin.h" #ifdef DXMPERF #include "dxmperf.h" #endif // DXMPERF CPullPin::CPullPin() : m_pReader(NULL), m_pAlloc(NULL), m_State(TM_Exit) { #ifdef DXMPERF PERFLOG_CTOR( L"CPullPin", this ); #endif // DXMPERF } CPullPin::~CPullPin() { Disconnect(); #ifdef DXMPERF PERFLOG_DTOR( L"CPullPin", this ); #endif // DXMPERF } // returns S_OK if successfully connected to an IAsyncReader interface // from this object // Optional allocator should be proposed as a preferred allocator if // necessary HRESULT CPullPin::Connect(IUnknown* pUnk, IMemAllocator* pAlloc, BOOL bSync) { CAutoLock lock(&m_AccessLock); if (m_pReader) { return VFW_E_ALREADY_CONNECTED; } HRESULT hr = pUnk->QueryInterface(IID_IAsyncReader, (void**)&m_pReader); if (FAILED(hr)) { #ifdef DXMPERF { AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, hr, pmt ); } #endif // DXMPERF return(hr); } hr = DecideAllocator(pAlloc, NULL); if (FAILED(hr)) { Disconnect(); #ifdef DXMPERF { AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, hr, pmt ); } #endif // DXMPERF return hr; } LONGLONG llTotal, llAvail; hr = m_pReader->Length(&llTotal, &llAvail); if (FAILED(hr)) { Disconnect(); #ifdef DXMPERF { AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, hr, pmt ); } #endif return hr; } // convert from file position to reference time m_tDuration = llTotal * UNITS; m_tStop = m_tDuration; m_tStart = 0; m_bSync = bSync; #ifdef DXMPERF { AM_MEDIA_TYPE * pmt = NULL; PERFLOG_CONNECT( this, pUnk, S_OK, pmt ); } #endif // DXMPERF return S_OK; } // disconnect any connection made in Connect HRESULT CPullPin::Disconnect() { CAutoLock lock(&m_AccessLock); StopThread(); #ifdef DXMPERF PERFLOG_DISCONNECT( this, m_pReader, S_OK ); #endif // DXMPERF if (m_pReader) { m_pReader->Release(); m_pReader = NULL; } if (m_pAlloc) { m_pAlloc->Release(); m_pAlloc = NULL; } return S_OK; } // agree an allocator using RequestAllocator - optional // props param specifies your requirements (non-zero fields). // returns an error code if fail to match requirements. // optional IMemAllocator interface is offered as a preferred allocator // but no error occurs if it can't be met. HRESULT CPullPin::DecideAllocator( IMemAllocator * pAlloc, __inout_opt ALLOCATOR_PROPERTIES * pProps) { ALLOCATOR_PROPERTIES *pRequest; ALLOCATOR_PROPERTIES Request; if (pProps == NULL) { Request.cBuffers = 3; Request.cbBuffer = 64*1024; Request.cbAlign = 0; Request.cbPrefix = 0; pRequest = &Request; } else { pRequest = pProps; } HRESULT hr = m_pReader->RequestAllocator( pAlloc, pRequest, &m_pAlloc); return hr; } // start pulling data HRESULT CPullPin::Active(void) { ASSERT(!ThreadExists()); return StartThread(); } // stop pulling data HRESULT CPullPin::Inactive(void) { StopThread(); return S_OK; } HRESULT CPullPin::Seek(REFERENCE_TIME tStart, REFERENCE_TIME tStop) { CAutoLock lock(&m_AccessLock); ThreadMsg AtStart = m_State; if (AtStart == TM_Start) { BeginFlush(); PauseThread(); EndFlush(); } m_tStart = tStart; m_tStop = tStop; HRESULT hr = S_OK; if (AtStart == TM_Start) { hr = StartThread(); } return hr; } HRESULT CPullPin::Duration(__out REFERENCE_TIME* ptDuration) { *ptDuration = m_tDuration; return S_OK; } HRESULT CPullPin::StartThread() { CAutoLock lock(&m_AccessLock); if (!m_pAlloc || !m_pReader) { return E_UNEXPECTED; } HRESULT hr; if (!ThreadExists()) { // commit allocator hr = m_pAlloc->Commit(); if (FAILED(hr)) { return hr; } // start thread if (!Create()) { return E_FAIL; } } m_State = TM_Start; hr = (HRESULT) CallWorker(m_State); return hr; } HRESULT CPullPin::PauseThread() { CAutoLock lock(&m_AccessLock); if (!ThreadExists()) { return E_UNEXPECTED; } // need to flush to ensure the thread is not blocked // in WaitForNext HRESULT hr = m_pReader->BeginFlush(); if (FAILED(hr)) { return hr; } m_State = TM_Pause; hr = CallWorker(TM_Pause); m_pReader->EndFlush(); return hr; } HRESULT CPullPin::StopThread() { CAutoLock lock(&m_AccessLock); if (!ThreadExists()) { return S_FALSE; } // need to flush to ensure the thread is not blocked // in WaitForNext HRESULT hr = m_pReader->BeginFlush(); if (FAILED(hr)) { return hr; } m_State = TM_Exit; hr = CallWorker(TM_Exit); m_pReader->EndFlush(); // wait for thread to completely exit Close(); // decommit allocator if (m_pAlloc) { m_pAlloc->Decommit(); } return S_OK; } DWORD CPullPin::ThreadProc(void) { while(1) { DWORD cmd = GetRequest(); switch(cmd) { case TM_Exit: Reply(S_OK); return 0; case TM_Pause: // we are paused already Reply(S_OK); break; case TM_Start: Reply(S_OK); Process(); break; } // at this point, there should be no outstanding requests on the // upstream filter. // We should force begin/endflush to ensure that this is true. // !!!Note that we may currently be inside a BeginFlush/EndFlush pair // on another thread, but the premature EndFlush will do no harm now // that we are idle. m_pReader->BeginFlush(); CleanupCancelled(); m_pReader->EndFlush(); } } HRESULT CPullPin::QueueSample( __inout REFERENCE_TIME& tCurrent, REFERENCE_TIME tAlignStop, BOOL bDiscontinuity ) { IMediaSample* pSample; HRESULT hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0); if (FAILED(hr)) { return hr; } LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS); if (tStopThis > tAlignStop) { tStopThis = tAlignStop; } pSample->SetTime(&tCurrent, &tStopThis); tCurrent = tStopThis; pSample->SetDiscontinuity(bDiscontinuity); hr = m_pReader->Request( pSample, 0); if (FAILED(hr)) { pSample->Release(); CleanupCancelled(); OnError(hr); } return hr; } HRESULT CPullPin::CollectAndDeliver( REFERENCE_TIME tStart, REFERENCE_TIME tStop) { IMediaSample* pSample = NULL; // better be sure pSample is set DWORD_PTR dwUnused; HRESULT hr = m_pReader->WaitForNext( INFINITE, &pSample, &dwUnused); if (FAILED(hr)) { if (pSample) { pSample->Release(); } } else { hr = DeliverSample(pSample, tStart, tStop); } if (FAILED(hr)) { CleanupCancelled(); OnError(hr); } return hr; } HRESULT CPullPin::DeliverSample( IMediaSample* pSample, REFERENCE_TIME tStart, REFERENCE_TIME tStop ) { // fix up sample if past actual stop (for sector alignment) REFERENCE_TIME t1, t2; if (S_OK == pSample->GetTime(&t1, &t2)) { if (t2 > tStop) { t2 = tStop; } // adjust times to be relative to (aligned) start time t1 -= tStart; t2 -= tStart; HRESULT hr = pSample->SetTime(&t1, &t2); if (FAILED(hr)) { return hr; } } #ifdef DXMPERF { AM_MEDIA_TYPE * pmt = NULL; pSample->GetMediaType( &pmt ); PERFLOG_RECEIVE( L"CPullPin", m_pReader, this, pSample, pmt ); } #endif HRESULT hr = Receive(pSample); pSample->Release(); return hr; } void CPullPin::Process(void) { // is there anything to do? if (m_tStop <= m_tStart) { EndOfStream(); return; } BOOL bDiscontinuity = TRUE; // if there is more than one sample at the allocator, // then try to queue 2 at once in order to overlap. // -- get buffer count and required alignment ALLOCATOR_PROPERTIES Actual; HRESULT hr = m_pAlloc->GetProperties(&Actual); // align the start position downwards REFERENCE_TIME tStart = AlignDown(m_tStart / UNITS, Actual.cbAlign) * UNITS; REFERENCE_TIME tCurrent = tStart; REFERENCE_TIME tStop = m_tStop; if (tStop > m_tDuration) { tStop = m_tDuration; } // align the stop position - may be past stop, but that // doesn't matter REFERENCE_TIME tAlignStop = AlignUp(tStop / UNITS, Actual.cbAlign) * UNITS; DWORD dwRequest; if (!m_bSync) { // Break out of the loop either if we get to the end or we're asked // to do something else while (tCurrent < tAlignStop) { // Break out without calling EndOfStream if we're asked to // do something different if (CheckRequest(&dwRequest)) { return; } // queue a first sample if (Actual.cBuffers > 1) { hr = QueueSample(tCurrent, tAlignStop, TRUE); bDiscontinuity = FALSE; if (FAILED(hr)) { return; } } // loop queueing second and waiting for first.. while (tCurrent < tAlignStop) { hr = QueueSample(tCurrent, tAlignStop, bDiscontinuity); bDiscontinuity = FALSE; if (FAILED(hr)) { return; } hr = CollectAndDeliver(tStart, tStop); if (S_OK != hr) { // stop if error, or if downstream filter said // to stop. return; } } if (Actual.cBuffers > 1) { hr = CollectAndDeliver(tStart, tStop); if (FAILED(hr)) { return; } } } } else { // sync version of above loop while (tCurrent < tAlignStop) { // Break out without calling EndOfStream if we're asked to // do something different if (CheckRequest(&dwRequest)) { return; } IMediaSample* pSample; hr = m_pAlloc->GetBuffer(&pSample, NULL, NULL, 0); if (FAILED(hr)) { OnError(hr); return; } LONGLONG tStopThis = tCurrent + (pSample->GetSize() * UNITS); if (tStopThis > tAlignStop) { tStopThis = tAlignStop; } pSample->SetTime(&tCurrent, &tStopThis); tCurrent = tStopThis; if (bDiscontinuity) { pSample->SetDiscontinuity(TRUE); bDiscontinuity = FALSE; } hr = m_pReader->SyncReadAligned(pSample); if (FAILED(hr)) { pSample->Release(); OnError(hr); return; } hr = DeliverSample(pSample, tStart, tStop); if (hr != S_OK) { if (FAILED(hr)) { OnError(hr); } return; } } } EndOfStream(); } // after a flush, cancelled i/o will be waiting for collection // and release void CPullPin::CleanupCancelled(void) { while (1) { IMediaSample * pSample; DWORD_PTR dwUnused; HRESULT hr = m_pReader->WaitForNext( 0, // no wait &pSample, &dwUnused); if(pSample) { pSample->Release(); } else { // no more samples return; } } }