From 179b34d76adc548f6e4517eef03da8dec228a5a5 Mon Sep 17 00:00:00 2001 From: Liong Sauw Ming Date: Mon, 6 Feb 2012 08:27:28 +0000 Subject: Closed #1450: Add support for SDL job queue to grow in size. git-svn-id: http://svn.pjsip.org/repos/pjproject/trunk@3950 74dad513-b988-da41-8d7b-12977e46ad98 --- pjmedia/src/pjmedia-videodev/sdl_dev.c | 93 +++++++++++++++++++++++++--------- 1 file changed, 68 insertions(+), 25 deletions(-) (limited to 'pjmedia/src/pjmedia-videodev/sdl_dev.c') diff --git a/pjmedia/src/pjmedia-videodev/sdl_dev.c b/pjmedia/src/pjmedia-videodev/sdl_dev.c index c6d68746..d2743154 100644 --- a/pjmedia/src/pjmedia-videodev/sdl_dev.c +++ b/pjmedia/src/pjmedia-videodev/sdl_dev.c @@ -98,7 +98,8 @@ struct stream_list struct sdl_stream *stream; }; -#define MAX_JOBS 8 +#define INITIAL_MAX_JOBS 64 +#define JOB_QUEUE_INC_FACTOR 2 typedef pj_status_t (*job_func_ptr)(void *data); @@ -128,13 +129,16 @@ typedef struct job { #endif /* PJ_DARWINOS */ typedef struct job_queue { - job *jobs[MAX_JOBS]; - pj_sem_t *job_sem[MAX_JOBS]; + pj_pool_t *pool; + job **jobs; + pj_sem_t **job_sem; + pj_sem_t **old_sem; pj_mutex_t *mutex; pj_thread_t *thread; pj_sem_t *sem; - int head, tail; + unsigned size; + unsigned head, tail; pj_bool_t is_full; pj_bool_t is_quitting; } job_queue; @@ -1233,11 +1237,39 @@ static int job_thread(void * data) jb = jq->jobs[jq->head]; jb->retval = (*jb->func)(jb->data); - pj_sem_post(jq->job_sem[jq->head]); - pj_mutex_lock(jq->mutex); - jq->head = (jq->head + 1) % MAX_JOBS; - jq->is_full = PJ_FALSE; - pj_mutex_unlock(jq->mutex); + /* If job queue is full and we already finish all the pending + * jobs, increase the size. + */ + if (jq->is_full && ((jq->head + 1) % jq->size == jq->tail)) { + unsigned i, head; + + if (jq->old_sem) { + for (i = 0; i < jq->size / JOB_QUEUE_INC_FACTOR; i++) { + pj_sem_destroy(jq->old_sem[i]); + } + } + jq->old_sem = jq->job_sem; + + /* Double the job queue size. */ + jq->size *= JOB_QUEUE_INC_FACTOR; + pj_sem_destroy(jq->sem); + pj_sem_create(jq->pool, "thread_sem", 0, jq->size + 1, + &jq->sem); + jq->jobs = (job **)pj_pool_calloc(jq->pool, jq->size, + sizeof(job *)); + jq->job_sem = (pj_sem_t **) pj_pool_calloc(jq->pool, jq->size, + sizeof(pj_sem_t *)); + for (i = 0; i < jq->size; i++) { + pj_sem_create(jq->pool, "job_sem", 0, 1, &jq->job_sem[i]); + } + jq->is_full = PJ_FALSE; + head = jq->head; + jq->head = jq->tail = 0; + pj_sem_post(jq->old_sem[head]); + } else { + pj_sem_post(jq->job_sem[jq->head]); + jq->head = (jq->head + 1) % jq->size; + } } return 0; @@ -1249,7 +1281,6 @@ static pj_status_t job_queue_create(pj_pool_t *pool, job_queue **pjq) pj_status_t status; job_queue *jq = PJ_POOL_ZALLOC_T(pool, job_queue); - pj_sem_create(pool, "thread_sem", 0, MAX_JOBS + 1, &jq->sem); #if defined(PJ_DARWINOS) && PJ_DARWINOS!=0 PJ_UNUSED_ARG(status); @@ -1262,7 +1293,13 @@ static pj_status_t job_queue_create(pj_pool_t *pool, job_queue **pjq) } #endif /* PJ_DARWINOS */ - for (i = 0; i < MAX_JOBS; i++) { + jq->pool = pool; + jq->size = INITIAL_MAX_JOBS; + pj_sem_create(pool, "thread_sem", 0, jq->size + 1, &jq->sem); + jq->jobs = (job **)pj_pool_calloc(pool, jq->size, sizeof(job *)); + jq->job_sem = (pj_sem_t **) pj_pool_calloc(pool, jq->size, + sizeof(pj_sem_t *)); + for (i = 0; i < jq->size; i++) { pj_sem_create(pool, "job_sem", 0, 1, &jq->job_sem[i]); } pj_mutex_create_recursive(pool, "job_mutex", &jq->mutex); @@ -1296,22 +1333,23 @@ static pj_status_t job_queue_post_job(job_queue *jq, job_func_ptr func, [apool release]; #else /* PJ_DARWINOS */ pj_mutex_lock(jq->mutex); - if (jq->is_full) { - /* Sorry, the queue is full! */ - pj_mutex_unlock(jq->mutex); - *retval = PJ_ETOOMANY; - return PJ_ETOOMANY; - } jq->jobs[jq->tail] = &jb; tail = jq->tail; - jq->tail = (jq->tail + 1) % MAX_JOBS; - if (jq->tail == jq->head) + jq->tail = (jq->tail + 1) % jq->size; + if (jq->tail == jq->head) { jq->is_full = PJ_TRUE; - pj_mutex_unlock(jq->mutex); - - pj_sem_post(jq->sem); - /* Wait until our posted job is completed. */ - pj_sem_wait(jq->job_sem[tail]); + PJ_LOG(4, (THIS_FILE, "SDL job queue is full, increasing " + "the queue size.")); + pj_sem_post(jq->sem); + /* Wait until our posted job is completed. */ + pj_sem_wait(jq->job_sem[tail]); + pj_mutex_unlock(jq->mutex); + } else { + pj_mutex_unlock(jq->mutex); + pj_sem_post(jq->sem); + /* Wait until our posted job is completed. */ + pj_sem_wait(jq->job_sem[tail]); + } #endif /* PJ_DARWINOS */ *retval = jb.retval; @@ -1334,9 +1372,14 @@ static pj_status_t job_queue_destroy(job_queue *jq) pj_sem_destroy(jq->sem); jq->sem = NULL; } - for (i = 0; i < MAX_JOBS; i++) { + for (i = 0; i < jq->size; i++) { pj_sem_destroy(jq->job_sem[i]); } + if (jq->old_sem) { + for (i = 0; i < jq->size / JOB_QUEUE_INC_FACTOR; i++) { + pj_sem_destroy(jq->old_sem[i]); + } + } pj_mutex_destroy(jq->mutex); return PJ_SUCCESS; -- cgit v1.2.3