2 * The copyright in this software is being made available under the 2-clauses
3 * BSD License, included below. This software may be subject to other third
4 * party and contributor rights, including patent rights, and no such rights
5 * are granted under this license.
7 * Copyright (c) 2016, Even Rouault
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 #include "opj_includes.h"
39 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
40 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
41 /* a rather unlikely race, skip it */
42 #if !(defined(__MINGW32__) && defined(__i386__))
43 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
49 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
54 int OPJ_CALLCONV opj_get_num_cpus(void)
59 dwNum = info.dwNumberOfProcessors;
70 opj_mutex_t* opj_mutex_create(void)
72 opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
75 InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
79 void opj_mutex_lock(opj_mutex_t* mutex)
81 EnterCriticalSection( &(mutex->cs) );
84 void opj_mutex_unlock(opj_mutex_t* mutex)
86 LeaveCriticalSection( &(mutex->cs) );
89 void opj_mutex_destroy(opj_mutex_t* mutex)
92 DeleteCriticalSection( &(mutex->cs) );
96 struct opj_cond_waiter_list_t
99 struct opj_cond_waiter_list_t* next;
101 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
105 opj_mutex_t *internal_mutex;
106 opj_cond_waiter_list_t *waiter_list;
109 static DWORD TLSKey = 0;
110 static volatile LONG inTLSLockedSection = 0;
111 static volatile int TLSKeyInit = OPJ_FALSE;
113 opj_cond_t* opj_cond_create(void)
115 opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
119 /* Make sure that the TLS key is allocated in a thread-safe way */
120 /* We cannot use a global mutex/critical section since its creation itself would not be */
121 /* thread-safe, so use InterlockedCompareExchange trick */
125 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
126 if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
132 TLSKeyInit = OPJ_TRUE;
134 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
135 InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
141 if( TLSKey == TLS_OUT_OF_INDEXES )
146 cond->internal_mutex = opj_mutex_create();
147 if (cond->internal_mutex == NULL)
152 cond->waiter_list = NULL;
156 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
158 opj_cond_waiter_list_t* item;
159 HANDLE hEvent = (HANDLE) TlsGetValue( TLSKey );
162 hEvent = CreateEvent(NULL, /* security attributes */
163 0, /* manual reset = no */
164 0, /* initial state = unsignaled */
168 TlsSetValue( TLSKey, hEvent );
171 /* Insert the waiter into the waiter list of the condition */
172 opj_mutex_lock(cond->internal_mutex);
174 item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
175 assert(item != NULL);
177 item->hEvent = hEvent;
178 item->next = cond->waiter_list;
180 cond->waiter_list = item;
182 opj_mutex_unlock(cond->internal_mutex);
184 /* Release the client mutex before waiting for the event being signaled */
185 opj_mutex_unlock(mutex);
187 /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
188 /* to report a failure. */
189 WaitForSingleObject(hEvent, INFINITE);
191 /* Reacquire the client mutex */
192 opj_mutex_lock(mutex);
195 void opj_cond_signal(opj_cond_t* cond)
197 opj_cond_waiter_list_t* psIter;
199 /* Signal the first registered event, and remove it from the list */
200 opj_mutex_lock(cond->internal_mutex);
202 psIter = cond->waiter_list;
205 SetEvent(psIter->hEvent);
206 cond->waiter_list = psIter->next;
210 opj_mutex_unlock(cond->internal_mutex);
213 void opj_cond_destroy(opj_cond_t* cond)
216 opj_mutex_destroy(cond->internal_mutex);
217 assert(cond->waiter_list == NULL);
223 opj_thread_fn thread_fn;
228 unsigned int __stdcall opj_thread_callback_adapter( void *info )
230 opj_thread_t* thread = (opj_thread_t*) info;
231 HANDLE hEvent = NULL;
233 thread->thread_fn( thread->user_data );
235 /* Free the handle possible allocated by a cond */
238 /* Make sure TLSKey is not being created just at that moment... */
239 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
240 if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
245 hEvent = (HANDLE) TlsGetValue( TLSKey );
247 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
248 InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
259 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
261 opj_thread_t* thread;
265 thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
268 thread->thread_fn = thread_fn;
269 thread->user_data = user_data;
271 thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
272 opj_thread_callback_adapter, thread, 0, NULL);
274 if( thread->hThread == NULL )
282 void opj_thread_join( opj_thread_t* thread )
284 WaitForSingleObject(thread->hThread, INFINITE);
285 CloseHandle( thread->hThread );
296 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
301 int OPJ_CALLCONV opj_get_num_cpus(void)
303 #ifdef _SC_NPROCESSORS_ONLN
304 return (int)sysconf(_SC_NPROCESSORS_ONLN);
312 pthread_mutex_t mutex;
315 opj_mutex_t* opj_mutex_create(void)
317 opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t));
318 if( mutex != NULL ) {
319 if ( pthread_mutex_init(&mutex->mutex, NULL) != 0) {
327 void opj_mutex_lock(opj_mutex_t* mutex)
329 pthread_mutex_lock(&(mutex->mutex));
332 void opj_mutex_unlock(opj_mutex_t* mutex)
334 pthread_mutex_unlock(&(mutex->mutex));
337 void opj_mutex_destroy(opj_mutex_t* mutex)
340 pthread_mutex_destroy(&(mutex->mutex));
349 opj_cond_t* opj_cond_create(void)
351 opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
354 if( pthread_cond_init(&(cond->cond), NULL) != 0 )
362 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
364 pthread_cond_wait(&(cond->cond), &(mutex->mutex));
367 void opj_cond_signal(opj_cond_t* cond)
369 int ret = pthread_cond_signal(&(cond->cond));
374 void opj_cond_destroy(opj_cond_t* cond)
377 pthread_cond_destroy(&(cond->cond));
384 opj_thread_fn thread_fn;
389 static void* opj_thread_callback_adapter( void* info )
391 opj_thread_t* thread = (opj_thread_t*) info;
392 thread->thread_fn( thread->user_data );
396 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
399 opj_thread_t* thread;
403 thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
406 thread->thread_fn = thread_fn;
407 thread->user_data = user_data;
409 pthread_attr_init( &attr );
410 pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
411 if( pthread_create( &(thread->thread), &attr,
412 opj_thread_callback_adapter, (void *) thread ) != 0 )
420 void opj_thread_join( opj_thread_t* thread )
423 pthread_join( thread->thread, &status);
429 /* Stub implementation */
431 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
436 int OPJ_CALLCONV opj_get_num_cpus(void)
441 opj_mutex_t* opj_mutex_create(void)
446 void opj_mutex_lock(opj_mutex_t* mutex)
451 void opj_mutex_unlock(opj_mutex_t* mutex)
456 void opj_mutex_destroy(opj_mutex_t* mutex)
461 opj_cond_t* opj_cond_create(void)
466 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
472 void opj_cond_signal(opj_cond_t* cond)
477 void opj_cond_destroy(opj_cond_t* cond)
482 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
489 void opj_thread_join( opj_thread_t* thread )
500 opj_tls_free_func opj_free_func;
505 opj_tls_key_val_t* key_val;
509 static opj_tls_t* opj_tls_new(void)
511 return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
514 static void opj_tls_destroy(opj_tls_t* tls)
518 for(i=0;i<tls->key_val_count;i++)
520 if( tls->key_val[i].opj_free_func )
521 tls->key_val[i].opj_free_func(tls->key_val[i].value);
523 opj_free(tls->key_val);
527 void* opj_tls_get(opj_tls_t* tls, int key)
530 for(i=0;i<tls->key_val_count;i++)
532 if( tls->key_val[i].key == key )
533 return tls->key_val[i].value;
538 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, opj_tls_free_func opj_free_func)
540 opj_tls_key_val_t* new_key_val;
543 if (tls->key_val_count == INT_MAX) {
546 for(i=0;i<tls->key_val_count;i++)
548 if( tls->key_val[i].key == key )
550 if( tls->key_val[i].opj_free_func ) {
551 tls->key_val[i].opj_free_func(tls->key_val[i].value);
553 tls->key_val[i].value = value;
554 tls->key_val[i].opj_free_func = opj_free_func;
558 new_key_val = (opj_tls_key_val_t*) opj_realloc( tls->key_val,
559 ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t) );
562 tls->key_val = new_key_val;
563 new_key_val[tls->key_val_count].key = key;
564 new_key_val[tls->key_val_count].value = value;
565 new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
566 tls->key_val_count ++;
575 } opj_worker_thread_job_t;
579 opj_thread_pool_t *tp;
580 opj_thread_t *thread;
581 int marked_as_waiting;
585 } opj_worker_thread_t;
592 } opj_worker_thread_state;
594 struct opj_job_list_t
596 opj_worker_thread_job_t* job;
597 struct opj_job_list_t* next;
599 typedef struct opj_job_list_t opj_job_list_t;
601 struct opj_worker_thread_list_t
603 opj_worker_thread_t* worker_thread;
604 struct opj_worker_thread_list_t* next;
606 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
608 struct opj_thread_pool_t
610 opj_worker_thread_t* worker_threads;
611 int worker_threads_count;
614 volatile opj_worker_thread_state state;
615 opj_job_list_t* job_queue;
616 volatile int pending_jobs_count;
617 opj_worker_thread_list_t* waiting_worker_thread_list;
618 int waiting_worker_thread_count;
620 int signaling_threshold;
623 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
624 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
625 opj_worker_thread_t* worker_thread,
626 OPJ_BOOL signal_job_finished);
628 opj_thread_pool_t* opj_thread_pool_create(int num_threads)
630 opj_thread_pool_t* tp;
632 tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
635 tp->state = OPJWTS_OK;
637 if( num_threads <= 0 )
639 tp->tls = opj_tls_new();
648 tp->mutex = opj_mutex_create();
654 if( !opj_thread_pool_setup(tp, num_threads) )
656 opj_thread_pool_destroy(tp);
662 static void opj_worker_thread_function(void* user_data)
664 opj_worker_thread_t* worker_thread;
665 opj_thread_pool_t* tp;
667 OPJ_BOOL job_finished = OPJ_FALSE;
669 worker_thread = (opj_worker_thread_t* ) user_data;
670 tp = worker_thread->tp;
675 opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread, job_finished);
681 job->job_fn(job->user_data, tls);
684 job_finished = OPJ_TRUE;
687 opj_tls_destroy(tls);
690 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
693 OPJ_BOOL bRet = OPJ_TRUE;
695 assert( num_threads > 0 );
697 tp->cond = opj_cond_create();
698 if( tp->cond == NULL )
701 tp->worker_threads = (opj_worker_thread_t*) opj_calloc( (size_t)num_threads,
702 sizeof(opj_worker_thread_t) );
703 if( tp->worker_threads == NULL )
705 tp->worker_threads_count = num_threads;
707 for(i=0;i<num_threads;i++)
709 tp->worker_threads[i].tp = tp;
711 tp->worker_threads[i].mutex = opj_mutex_create();
712 if( tp->worker_threads[i].mutex == NULL )
714 tp->worker_threads_count = i;
719 tp->worker_threads[i].cond = opj_cond_create();
720 if( tp->worker_threads[i].cond == NULL )
722 opj_mutex_destroy(tp->worker_threads[i].mutex);
723 tp->worker_threads_count = i;
728 tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
730 tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
731 &(tp->worker_threads[i]));
732 if( tp->worker_threads[i].thread == NULL )
734 tp->worker_threads_count = i;
740 /* Wait all threads to be started */
741 /* printf("waiting for all threads to be started\n"); */
742 opj_mutex_lock(tp->mutex);
743 while( tp->waiting_worker_thread_count < num_threads )
745 opj_cond_wait(tp->cond, tp->mutex);
747 opj_mutex_unlock(tp->mutex);
748 /* printf("all threads started\n"); */
750 if( tp->state == OPJWTS_ERROR )
759 printf("waiting!\n");
763 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
764 opj_worker_thread_t* worker_thread,
765 OPJ_BOOL signal_job_finished)
769 opj_job_list_t* top_job_iter;
771 opj_mutex_lock(tp->mutex);
773 if( signal_job_finished )
775 signal_job_finished = OPJ_FALSE;
776 tp->pending_jobs_count --;
777 /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
778 if( tp->pending_jobs_count <= tp->signaling_threshold )
779 opj_cond_signal(tp->cond);
782 if( tp->state == OPJWTS_STOP )
784 opj_mutex_unlock(tp->mutex);
787 top_job_iter = tp->job_queue;
790 opj_worker_thread_job_t* job;
791 tp->job_queue = top_job_iter->next;
793 job = top_job_iter->job;
794 opj_mutex_unlock(tp->mutex);
795 opj_free(top_job_iter);
800 if( !worker_thread->marked_as_waiting )
802 opj_worker_thread_list_t* item;
804 worker_thread->marked_as_waiting = OPJ_TRUE;
805 tp->waiting_worker_thread_count ++;
806 assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
808 item= (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
811 tp->state = OPJWTS_ERROR;
812 opj_cond_signal(tp->cond);
814 opj_mutex_unlock(tp->mutex);
818 item->worker_thread = worker_thread;
819 item->next = tp->waiting_worker_thread_list;
820 tp->waiting_worker_thread_list = item;
823 /* printf("signaling that worker thread is ready\n"); */
824 opj_cond_signal(tp->cond);
826 opj_mutex_lock(worker_thread->mutex);
827 opj_mutex_unlock(tp->mutex);
829 /* printf("waiting for job\n"); */
830 opj_cond_wait( worker_thread->cond, worker_thread->mutex );
832 opj_mutex_unlock(worker_thread->mutex);
833 /* printf("got job\n"); */
837 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
841 opj_worker_thread_job_t* job;
842 opj_job_list_t* item;
844 if( tp->mutex == NULL )
846 job_fn( user_data, tp->tls );
850 job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
853 job->job_fn = job_fn;
854 job->user_data = user_data;
856 item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
864 opj_mutex_lock(tp->mutex);
866 tp->signaling_threshold = 100 * tp->worker_threads_count;
867 while( tp->pending_jobs_count > tp->signaling_threshold )
869 /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
870 opj_cond_wait(tp->cond, tp->mutex);
871 /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
874 item->next = tp->job_queue;
875 tp->job_queue = item;
876 tp->pending_jobs_count ++;
878 if( tp->waiting_worker_thread_list )
880 opj_worker_thread_t* worker_thread;
881 opj_worker_thread_list_t* next;
882 opj_worker_thread_list_t* to_opj_free;
884 worker_thread = tp->waiting_worker_thread_list->worker_thread;
886 assert( worker_thread->marked_as_waiting );
887 worker_thread->marked_as_waiting = OPJ_FALSE;
889 next = tp->waiting_worker_thread_list->next;
890 to_opj_free = tp->waiting_worker_thread_list;
891 tp->waiting_worker_thread_list = next;
892 tp->waiting_worker_thread_count --;
894 opj_mutex_lock(worker_thread->mutex);
895 opj_mutex_unlock(tp->mutex);
896 opj_cond_signal(worker_thread->cond);
897 opj_mutex_unlock(worker_thread->mutex);
899 opj_free(to_opj_free);
902 opj_mutex_unlock(tp->mutex);
907 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp, int max_remaining_jobs)
909 if( tp->mutex == NULL )
914 if( max_remaining_jobs < 0 )
915 max_remaining_jobs = 0;
916 opj_mutex_lock(tp->mutex);
917 tp->signaling_threshold = max_remaining_jobs;
918 while( tp->pending_jobs_count > max_remaining_jobs )
920 /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
921 opj_cond_wait(tp->cond, tp->mutex);
922 /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
924 opj_mutex_unlock(tp->mutex);
927 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
929 return tp->worker_threads_count;
932 void opj_thread_pool_destroy(opj_thread_pool_t* tp)
938 opj_thread_pool_wait_completion(tp, 0);
940 opj_mutex_lock(tp->mutex);
941 tp->state = OPJWTS_STOP;
942 opj_mutex_unlock(tp->mutex);
944 for(i=0;i<tp->worker_threads_count;i++)
946 opj_mutex_lock(tp->worker_threads[i].mutex);
947 opj_cond_signal(tp->worker_threads[i].cond);
948 opj_mutex_unlock(tp->worker_threads[i].mutex);
949 opj_thread_join(tp->worker_threads[i].thread);
950 opj_cond_destroy(tp->worker_threads[i].cond);
951 opj_mutex_destroy(tp->worker_threads[i].mutex);
954 opj_free(tp->worker_threads);
956 while( tp->waiting_worker_thread_list != NULL )
958 opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
959 opj_free( tp->waiting_worker_thread_list );
960 tp->waiting_worker_thread_list = next;
963 opj_cond_destroy(tp->cond);
965 opj_mutex_destroy(tp->mutex);
966 opj_tls_destroy(tp->tls);