X-Git-Url: https://main.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fopenjp2%2Fthread.c;h=f2fca2ee4af8e395cab361f053b90c21b30952e5;hb=cd900d96618ab77e79812db654731dd6b5fc7bd8;hp=59b5d87ee9a9e059349f86c40991a9f96bf3183e;hpb=4f9abb9a45ffd711f9717db15d062fa020ed6cf5;p=openjpeg.git diff --git a/src/lib/openjp2/thread.c b/src/lib/openjp2/thread.c index 59b5d87e..f2fca2ee 100644 --- a/src/lib/openjp2/thread.c +++ b/src/lib/openjp2/thread.c @@ -1,6 +1,6 @@ /* - * The copyright in this software is being made available under the 2-clauses - * BSD License, included below. This software may be subject to other third + * The copyright in this software is being made available under the 2-clauses + * BSD License, included below. This software may be subject to other third * party and contributor rights, including patent rights, and no such rights * are granted under this license. * @@ -29,9 +29,6 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include "opj_includes.h" - -#include "thread.h" #include #ifdef MUTEX_win32 @@ -46,6 +43,8 @@ #include #include +#include "opj_includes.h" + OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void) { return OPJ_TRUE; @@ -57,51 +56,52 @@ int OPJ_CALLCONV opj_get_num_cpus(void) DWORD dwNum; GetSystemInfo(&info); dwNum = info.dwNumberOfProcessors; - if( dwNum < 1 ) + if (dwNum < 1) { return 1; + } return (int)dwNum; } -struct opj_mutex_t -{ +struct opj_mutex_t { CRITICAL_SECTION cs; }; opj_mutex_t* opj_mutex_create(void) { opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t)); - if( !mutex ) + if (!mutex) { return NULL; + } InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000); return mutex; } void opj_mutex_lock(opj_mutex_t* mutex) { - EnterCriticalSection( &(mutex->cs) ); + EnterCriticalSection(&(mutex->cs)); } void opj_mutex_unlock(opj_mutex_t* mutex) { - LeaveCriticalSection( &(mutex->cs) ); + LeaveCriticalSection(&(mutex->cs)); } void opj_mutex_destroy(opj_mutex_t* mutex) { - if( !mutex ) return; - DeleteCriticalSection( &(mutex->cs) ); - opj_free( mutex ); + if (!mutex) { + return; + } + DeleteCriticalSection(&(mutex->cs)); + opj_free(mutex); } -struct opj_cond_waiter_list_t -{ +struct opj_cond_waiter_list_t { HANDLE hEvent; struct opj_cond_waiter_list_t* next; }; typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t; -struct opj_cond_t -{ +struct opj_cond_t { opj_mutex_t *internal_mutex; opj_cond_waiter_list_t *waiter_list; }; @@ -113,21 +113,20 @@ static volatile int TLSKeyInit = OPJ_FALSE; opj_cond_t* opj_cond_create(void) { opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t)); - if( !cond ) + if (!cond) { return NULL; + } /* Make sure that the TLS key is allocated in a thread-safe way */ /* We cannot use a global mutex/critical section since its creation itself would not be */ /* thread-safe, so use InterlockedCompareExchange trick */ - while( OPJ_TRUE ) - { + while (OPJ_TRUE) { #if HAVE_INTERLOCKED_COMPARE_EXCHANGE - if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 ) + if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0) #endif { - if( !TLSKeyInit ) - { + if (!TLSKeyInit) { TLSKey = TlsAlloc(); TLSKeyInit = OPJ_TRUE; } @@ -138,14 +137,12 @@ opj_cond_t* opj_cond_create(void) } } - if( TLSKey == TLS_OUT_OF_INDEXES ) - { + if (TLSKey == TLS_OUT_OF_INDEXES) { opj_free(cond); return NULL; } cond->internal_mutex = opj_mutex_create(); - if (cond->internal_mutex == NULL) - { + if (cond->internal_mutex == NULL) { opj_free(cond); return NULL; } @@ -156,16 +153,15 @@ opj_cond_t* opj_cond_create(void) void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex) { opj_cond_waiter_list_t* item; - HANDLE hEvent = (HANDLE) TlsGetValue( TLSKey ); - if (hEvent == NULL) - { + HANDLE hEvent = (HANDLE) TlsGetValue(TLSKey); + if (hEvent == NULL) { hEvent = CreateEvent(NULL, /* security attributes */ 0, /* manual reset = no */ 0, /* initial state = unsignaled */ NULL /* no name */); assert(hEvent); - TlsSetValue( TLSKey, hEvent ); + TlsSetValue(TLSKey, hEvent); } /* Insert the waiter into the waiter list of the condition */ @@ -200,8 +196,7 @@ void opj_cond_signal(opj_cond_t* cond) opj_mutex_lock(cond->internal_mutex); psIter = cond->waiter_list; - if (psIter != NULL) - { + if (psIter != NULL) { SetEvent(psIter->hEvent); cond->waiter_list = psIter->next; opj_free(psIter); @@ -212,37 +207,36 @@ void opj_cond_signal(opj_cond_t* cond) void opj_cond_destroy(opj_cond_t* cond) { - if( !cond ) return; + if (!cond) { + return; + } opj_mutex_destroy(cond->internal_mutex); assert(cond->waiter_list == NULL); opj_free(cond); } -struct opj_thread_t -{ +struct opj_thread_t { opj_thread_fn thread_fn; void* user_data; HANDLE hThread; }; -unsigned int __stdcall opj_thread_callback_adapter( void *info ) +unsigned int __stdcall opj_thread_callback_adapter(void *info) { opj_thread_t* thread = (opj_thread_t*) info; HANDLE hEvent = NULL; - thread->thread_fn( thread->user_data ); + thread->thread_fn(thread->user_data); /* Free the handle possible allocated by a cond */ - while( OPJ_TRUE ) - { + while (OPJ_TRUE) { /* Make sure TLSKey is not being created just at that moment... */ #if HAVE_INTERLOCKED_COMPARE_EXCHANGE - if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 ) + if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0) #endif { - if( TLSKeyInit ) - { - hEvent = (HANDLE) TlsGetValue( TLSKey ); + if (TLSKeyInit) { + hEvent = (HANDLE) TlsGetValue(TLSKey); } #if HAVE_INTERLOCKED_COMPARE_EXCHANGE InterlockedCompareExchange(&inTLSLockedSection, 0, 1); @@ -250,39 +244,40 @@ unsigned int __stdcall opj_thread_callback_adapter( void *info ) break; } } - if( hEvent ) + if (hEvent) { CloseHandle(hEvent); + } return 0; } -opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data ) +opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data) { opj_thread_t* thread; - assert( thread_fn ); + assert(thread_fn); - thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) ); - if( !thread ) + thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t)); + if (!thread) { return NULL; + } thread->thread_fn = thread_fn; thread->user_data = user_data; thread->hThread = (HANDLE)_beginthreadex(NULL, 0, - opj_thread_callback_adapter, thread, 0, NULL); + opj_thread_callback_adapter, thread, 0, NULL); - if( thread->hThread == NULL ) - { - opj_free( thread ); + if (thread->hThread == NULL) { + opj_free(thread); return NULL; } return thread; } -void opj_thread_join( opj_thread_t* thread ) +void opj_thread_join(opj_thread_t* thread) { WaitForSingleObject(thread->hThread, INFINITE); - CloseHandle( thread->hThread ); + CloseHandle(thread->hThread); opj_free(thread); } @@ -293,6 +288,10 @@ void opj_thread_join( opj_thread_t* thread ) #include #include +/* Moved after all system includes, and in particular pthread.h, so as to */ +/* avoid poisoning issuing with malloc() use in pthread.h with ulibc (#1013) */ +#include "opj_includes.h" + OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void) { return OPJ_TRUE; @@ -307,18 +306,19 @@ int OPJ_CALLCONV opj_get_num_cpus(void) #endif } -struct opj_mutex_t -{ +struct opj_mutex_t { pthread_mutex_t mutex; }; opj_mutex_t* opj_mutex_create(void) { - opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t)); - if( !mutex ) - return NULL; - pthread_mutex_t pthr_mutex = PTHREAD_MUTEX_INITIALIZER; - mutex->mutex = pthr_mutex; + opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t)); + if (mutex != NULL) { + if (pthread_mutex_init(&mutex->mutex, NULL) != 0) { + opj_free(mutex); + mutex = NULL; + } + } return mutex; } @@ -334,23 +334,24 @@ void opj_mutex_unlock(opj_mutex_t* mutex) void opj_mutex_destroy(opj_mutex_t* mutex) { - if( !mutex ) return; + if (!mutex) { + return; + } pthread_mutex_destroy(&(mutex->mutex)); opj_free(mutex); } -struct opj_cond_t -{ +struct opj_cond_t { pthread_cond_t cond; }; opj_cond_t* opj_cond_create(void) { opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t)); - if( !cond ) + if (!cond) { return NULL; - if( pthread_cond_init(&(cond->cond), NULL) != 0 ) - { + } + if (pthread_cond_init(&(cond->cond), NULL) != 0) { opj_free(cond); return NULL; } @@ -371,54 +372,55 @@ void opj_cond_signal(opj_cond_t* cond) void opj_cond_destroy(opj_cond_t* cond) { - if( !cond ) return; + if (!cond) { + return; + } pthread_cond_destroy(&(cond->cond)); opj_free(cond); } -struct opj_thread_t -{ +struct opj_thread_t { opj_thread_fn thread_fn; void* user_data; pthread_t thread; }; -static void* opj_thread_callback_adapter( void* info ) +static void* opj_thread_callback_adapter(void* info) { opj_thread_t* thread = (opj_thread_t*) info; - thread->thread_fn( thread->user_data ); + thread->thread_fn(thread->user_data); return NULL; } -opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data ) +opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data) { pthread_attr_t attr; opj_thread_t* thread; - assert( thread_fn ); + assert(thread_fn); - thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) ); - if( !thread ) + thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t)); + if (!thread) { return NULL; + } thread->thread_fn = thread_fn; thread->user_data = user_data; - pthread_attr_init( &attr ); - pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE ); - if( pthread_create( &(thread->thread), &attr, - opj_thread_callback_adapter, (void *) thread ) != 0 ) - { - opj_free( thread ); + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + if (pthread_create(&(thread->thread), &attr, + opj_thread_callback_adapter, (void *) thread) != 0) { + opj_free(thread); return NULL; } return thread; } -void opj_thread_join( opj_thread_t* thread ) +void opj_thread_join(opj_thread_t* thread) { void* status; - pthread_join( thread->thread, &status); + pthread_join(thread->thread, &status); opj_free(thread); } @@ -426,6 +428,8 @@ void opj_thread_join( opj_thread_t* thread ) #else /* Stub implementation */ +#include "opj_includes.h" + OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void) { return OPJ_FALSE; @@ -477,29 +481,27 @@ void opj_cond_destroy(opj_cond_t* cond) (void) cond; } -opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data ) +opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data) { - (void) thread_fn; + (void) thread_fn; (void) user_data; return NULL; } -void opj_thread_join( opj_thread_t* thread ) +void opj_thread_join(opj_thread_t* thread) { (void) thread; } #endif -typedef struct -{ +typedef struct { int key; void* value; opj_tls_free_func opj_free_func; } opj_tls_key_val_t; -struct opj_tls_t -{ +struct opj_tls_t { opj_tls_key_val_t* key_val; int key_val_count; }; @@ -512,11 +514,13 @@ static opj_tls_t* opj_tls_new(void) static void opj_tls_destroy(opj_tls_t* tls) { int i; - if( !tls ) return; - for(i=0;ikey_val_count;i++) - { - if( tls->key_val[i].opj_free_func ) + if (!tls) { + return; + } + for (i = 0; i < tls->key_val_count; i++) { + if (tls->key_val[i].opj_free_func) { tls->key_val[i].opj_free_func(tls->key_val[i].value); + } } opj_free(tls->key_val); opj_free(tls); @@ -525,33 +529,38 @@ static void opj_tls_destroy(opj_tls_t* tls) void* opj_tls_get(opj_tls_t* tls, int key) { int i; - for(i=0;ikey_val_count;i++) - { - if( tls->key_val[i].key == key ) + for (i = 0; i < tls->key_val_count; i++) { + if (tls->key_val[i].key == key) { return tls->key_val[i].value; + } } return NULL; } -OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, opj_tls_free_func opj_free_func) +OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, + opj_tls_free_func opj_free_func) { opj_tls_key_val_t* new_key_val; int i; - for(i=0;ikey_val_count;i++) - { - if( tls->key_val[i].key == key ) - { - if( tls->key_val[i].opj_free_func ) + + if (tls->key_val_count == INT_MAX) { + return OPJ_FALSE; + } + for (i = 0; i < tls->key_val_count; i++) { + if (tls->key_val[i].key == key) { + if (tls->key_val[i].opj_free_func) { tls->key_val[i].opj_free_func(tls->key_val[i].value); + } tls->key_val[i].value = value; tls->key_val[i].opj_free_func = opj_free_func; return OPJ_TRUE; } } - new_key_val = (opj_tls_key_val_t*) opj_realloc( tls->key_val, - (tls->key_val_count + 1) * sizeof(opj_tls_key_val_t) ); - if( !new_key_val ) + new_key_val = (opj_tls_key_val_t*) opj_realloc(tls->key_val, + ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t)); + if (!new_key_val) { return OPJ_FALSE; + } tls->key_val = new_key_val; new_key_val[tls->key_val_count].key = key; new_key_val[tls->key_val_count].value = value; @@ -561,14 +570,12 @@ OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, opj_tls_free_func opj } -typedef struct -{ +typedef struct { opj_job_fn job_fn; void *user_data; } opj_worker_thread_job_t; -typedef struct -{ +typedef struct { opj_thread_pool_t *tp; opj_thread_t *thread; int marked_as_waiting; @@ -577,29 +584,25 @@ typedef struct opj_cond_t *cond; } opj_worker_thread_t; -typedef enum -{ +typedef enum { OPJWTS_OK, OPJWTS_STOP, OPJWTS_ERROR } opj_worker_thread_state; -struct opj_job_list_t -{ +struct opj_job_list_t { opj_worker_thread_job_t* job; struct opj_job_list_t* next; }; typedef struct opj_job_list_t opj_job_list_t; -struct opj_worker_thread_list_t -{ +struct opj_worker_thread_list_t { opj_worker_thread_t* worker_thread; struct opj_worker_thread_list_t* next; }; typedef struct opj_worker_thread_list_t opj_worker_thread_list_t; -struct opj_thread_pool_t -{ +struct opj_thread_pool_t { opj_worker_thread_t* worker_threads; int worker_threads_count; opj_cond_t* cond; @@ -614,24 +617,24 @@ struct opj_thread_pool_t }; static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads); -static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp, - opj_worker_thread_t* worker_thread, - OPJ_BOOL signal_job_finished); +static opj_worker_thread_job_t* opj_thread_pool_get_next_job( + opj_thread_pool_t* tp, + opj_worker_thread_t* worker_thread, + OPJ_BOOL signal_job_finished); opj_thread_pool_t* opj_thread_pool_create(int num_threads) { opj_thread_pool_t* tp; tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t)); - if( !tp ) + if (!tp) { return NULL; + } tp->state = OPJWTS_OK; - if( num_threads <= 0 ) - { + if (num_threads <= 0) { tp->tls = opj_tls_new(); - if( !tp->tls ) - { + if (!tp->tls) { opj_free(tp); tp = NULL; } @@ -639,13 +642,11 @@ opj_thread_pool_t* opj_thread_pool_create(int num_threads) } tp->mutex = opj_mutex_create(); - if( !tp->mutex ) - { + if (!tp->mutex) { opj_free(tp); return NULL; } - if( !opj_thread_pool_setup(tp, num_threads) ) - { + if (!opj_thread_pool_setup(tp, num_threads)) { opj_thread_pool_destroy(tp); return NULL; } @@ -659,18 +660,18 @@ static void opj_worker_thread_function(void* user_data) opj_tls_t* tls; OPJ_BOOL job_finished = OPJ_FALSE; - worker_thread = (opj_worker_thread_t* ) user_data; + worker_thread = (opj_worker_thread_t*) user_data; tp = worker_thread->tp; tls = opj_tls_new(); - while( OPJ_TRUE ) - { - opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread, job_finished); - if( job == NULL ) + while (OPJ_TRUE) { + opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread, + job_finished); + if (job == NULL) { break; + } - if( job->job_fn ) - { + if (job->job_fn) { job->job_fn(job->user_data, tls); } opj_free(job); @@ -685,33 +686,32 @@ static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads) int i; OPJ_BOOL bRet = OPJ_TRUE; - assert( num_threads > 0 ); + assert(num_threads > 0); tp->cond = opj_cond_create(); - if( tp->cond == NULL ) + if (tp->cond == NULL) { return OPJ_FALSE; + } - tp->worker_threads = (opj_worker_thread_t*) opj_calloc( num_threads, - sizeof(opj_worker_thread_t) ); - if( tp->worker_threads == NULL ) + tp->worker_threads = (opj_worker_thread_t*) opj_calloc((size_t)num_threads, + sizeof(opj_worker_thread_t)); + if (tp->worker_threads == NULL) { return OPJ_FALSE; + } tp->worker_threads_count = num_threads; - for(i=0;iworker_threads[i].tp = tp; tp->worker_threads[i].mutex = opj_mutex_create(); - if( tp->worker_threads[i].mutex == NULL ) - { + if (tp->worker_threads[i].mutex == NULL) { tp->worker_threads_count = i; bRet = OPJ_FALSE; break; } tp->worker_threads[i].cond = opj_cond_create(); - if( tp->worker_threads[i].cond == NULL ) - { + if (tp->worker_threads[i].cond == NULL) { opj_mutex_destroy(tp->worker_threads[i].mutex); tp->worker_threads_count = i; bRet = OPJ_FALSE; @@ -721,9 +721,10 @@ static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads) tp->worker_threads[i].marked_as_waiting = OPJ_FALSE; tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function, - &(tp->worker_threads[i])); - if( tp->worker_threads[i].thread == NULL ) - { + &(tp->worker_threads[i])); + if (tp->worker_threads[i].thread == NULL) { + opj_mutex_destroy(tp->worker_threads[i].mutex); + opj_cond_destroy(tp->worker_threads[i].cond); tp->worker_threads_count = i; bRet = OPJ_FALSE; break; @@ -733,15 +734,15 @@ static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads) /* Wait all threads to be started */ /* printf("waiting for all threads to be started\n"); */ opj_mutex_lock(tp->mutex); - while( tp->waiting_worker_thread_count < num_threads ) - { + while (tp->waiting_worker_thread_count < tp->worker_threads_count) { opj_cond_wait(tp->cond, tp->mutex); } opj_mutex_unlock(tp->mutex); /* printf("all threads started\n"); */ - if( tp->state == OPJWTS_ERROR ) + if (tp->state == OPJWTS_ERROR) { bRet = OPJ_FALSE; + } return bRet; } @@ -753,33 +754,31 @@ void opj_waiting() } */ -static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp, - opj_worker_thread_t* worker_thread, - OPJ_BOOL signal_job_finished) +static opj_worker_thread_job_t* opj_thread_pool_get_next_job( + opj_thread_pool_t* tp, + opj_worker_thread_t* worker_thread, + OPJ_BOOL signal_job_finished) { - while( OPJ_TRUE ) - { + while (OPJ_TRUE) { opj_job_list_t* top_job_iter; opj_mutex_lock(tp->mutex); - if( signal_job_finished ) - { + if (signal_job_finished) { signal_job_finished = OPJ_FALSE; tp->pending_jobs_count --; /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/ - if( tp->pending_jobs_count <= tp->signaling_threshold ) + if (tp->pending_jobs_count <= tp->signaling_threshold) { opj_cond_signal(tp->cond); + } } - if( tp->state == OPJWTS_STOP ) - { + if (tp->state == OPJWTS_STOP) { opj_mutex_unlock(tp->mutex); return NULL; } top_job_iter = tp->job_queue; - if( top_job_iter ) - { + if (top_job_iter) { opj_worker_thread_job_t* job; tp->job_queue = top_job_iter->next; @@ -790,17 +789,15 @@ static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* } /* opj_waiting(); */ - if( !worker_thread->marked_as_waiting ) - { + if (!worker_thread->marked_as_waiting) { opj_worker_thread_list_t* item; worker_thread->marked_as_waiting = OPJ_TRUE; tp->waiting_worker_thread_count ++; assert(tp->waiting_worker_thread_count <= tp->worker_threads_count); - item= (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t)); - if( item == NULL ) - { + item = (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t)); + if (item == NULL) { tp->state = OPJWTS_ERROR; opj_cond_signal(tp->cond); @@ -820,7 +817,7 @@ static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* opj_mutex_unlock(tp->mutex); /* printf("waiting for job\n"); */ - opj_cond_wait( worker_thread->cond, worker_thread->mutex ); + opj_cond_wait(worker_thread->cond, worker_thread->mutex); opj_mutex_unlock(worker_thread->mutex); /* printf("got job\n"); */ @@ -834,21 +831,20 @@ OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp, opj_worker_thread_job_t* job; opj_job_list_t* item; - if( tp->mutex == NULL ) - { - job_fn( user_data, tp->tls ); + if (tp->mutex == NULL) { + job_fn(user_data, tp->tls); return OPJ_TRUE; } job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t)); - if( job == NULL ) + if (job == NULL) { return OPJ_FALSE; + } job->job_fn = job_fn; job->user_data = user_data; item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t)); - if( item == NULL ) - { + if (item == NULL) { opj_free(job); return OPJ_FALSE; } @@ -857,8 +853,7 @@ OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp, opj_mutex_lock(tp->mutex); tp->signaling_threshold = 100 * tp->worker_threads_count; - while( tp->pending_jobs_count > tp->signaling_threshold ) - { + while (tp->pending_jobs_count > tp->signaling_threshold) { /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */ opj_cond_wait(tp->cond, tp->mutex); /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */ @@ -868,15 +863,14 @@ OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp, tp->job_queue = item; tp->pending_jobs_count ++; - if( tp->waiting_worker_thread_list ) - { + if (tp->waiting_worker_thread_list) { opj_worker_thread_t* worker_thread; opj_worker_thread_list_t* next; opj_worker_thread_list_t* to_opj_free; worker_thread = tp->waiting_worker_thread_list->worker_thread; - assert( worker_thread->marked_as_waiting ); + assert(worker_thread->marked_as_waiting); worker_thread->marked_as_waiting = OPJ_FALSE; next = tp->waiting_worker_thread_list->next; @@ -890,26 +884,26 @@ OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp, opj_mutex_unlock(worker_thread->mutex); opj_free(to_opj_free); - } - else + } else { opj_mutex_unlock(tp->mutex); + } return OPJ_TRUE; } -void opj_thread_pool_wait_completion(opj_thread_pool_t* tp, int max_remaining_jobs) +void opj_thread_pool_wait_completion(opj_thread_pool_t* tp, + int max_remaining_jobs) { - if( tp->mutex == NULL ) - { + if (tp->mutex == NULL) { return; } - if( max_remaining_jobs < 0 ) + if (max_remaining_jobs < 0) { max_remaining_jobs = 0; + } opj_mutex_lock(tp->mutex); tp->signaling_threshold = max_remaining_jobs; - while( tp->pending_jobs_count > max_remaining_jobs ) - { + while (tp->pending_jobs_count > max_remaining_jobs) { /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/ opj_cond_wait(tp->cond, tp->mutex); /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/ @@ -924,16 +918,18 @@ int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp) void opj_thread_pool_destroy(opj_thread_pool_t* tp) { - if( !tp ) return; - if( tp->cond ) - { + if (!tp) { + return; + } + if (tp->cond) { int i; opj_thread_pool_wait_completion(tp, 0); + opj_mutex_lock(tp->mutex); tp->state = OPJWTS_STOP; + opj_mutex_unlock(tp->mutex); - for(i=0;iworker_threads_count;i++) - { + for (i = 0; i < tp->worker_threads_count; i++) { opj_mutex_lock(tp->worker_threads[i].mutex); opj_cond_signal(tp->worker_threads[i].cond); opj_mutex_unlock(tp->worker_threads[i].mutex); @@ -944,10 +940,9 @@ void opj_thread_pool_destroy(opj_thread_pool_t* tp) opj_free(tp->worker_threads); - while( tp->waiting_worker_thread_list != NULL ) - { + while (tp->waiting_worker_thread_list != NULL) { opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next; - opj_free( tp->waiting_worker_thread_list ); + opj_free(tp->waiting_worker_thread_list); tp->waiting_worker_thread_list = next; }