2 * The copyright in this software is being made available under the 2-clauses
3 * BSD License, included below. This software may be subject to other third
4 * party and contributor rights, including patent rights, and no such rights
5 * are granted under this license.
7 * Copyright (c) 2016, Even Rouault
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
32 #include "opj_includes.h"
39 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
40 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
41 /* a rather unlikely race, skip it */
42 #if !(defined(__MINGW32__) && defined(__i386__))
43 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
49 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
54 int OPJ_CALLCONV opj_get_num_cpus(void)
59 dwNum = info.dwNumberOfProcessors;
70 opj_mutex_t* opj_mutex_create(void)
72 opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
75 InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
79 void opj_mutex_lock(opj_mutex_t* mutex)
81 EnterCriticalSection( &(mutex->cs) );
84 void opj_mutex_unlock(opj_mutex_t* mutex)
86 LeaveCriticalSection( &(mutex->cs) );
89 void opj_mutex_destroy(opj_mutex_t* mutex)
92 DeleteCriticalSection( &(mutex->cs) );
96 struct opj_cond_waiter_list_t
99 struct opj_cond_waiter_list_t* next;
101 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
105 opj_mutex_t *internal_mutex;
106 opj_cond_waiter_list_t *waiter_list;
109 static DWORD TLSKey = 0;
110 static volatile LONG inTLSLockedSection = 0;
111 static volatile int TLSKeyInit = OPJ_FALSE;
113 opj_cond_t* opj_cond_create(void)
115 opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
119 /* Make sure that the TLS key is allocated in a thread-safe way */
120 /* We cannot use a global mutex/critical section since its creation itself would not be */
121 /* thread-safe, so use InterlockedCompareExchange trick */
125 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
126 if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
132 TLSKeyInit = OPJ_TRUE;
134 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
135 InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
141 if( TLSKey == TLS_OUT_OF_INDEXES )
146 cond->internal_mutex = opj_mutex_create();
147 if (cond->internal_mutex == NULL)
152 cond->waiter_list = NULL;
156 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
158 opj_cond_waiter_list_t* item;
159 HANDLE hEvent = (HANDLE) TlsGetValue( TLSKey );
162 hEvent = CreateEvent(NULL, /* security attributes */
163 0, /* manual reset = no */
164 0, /* initial state = unsignaled */
168 TlsSetValue( TLSKey, hEvent );
171 /* Insert the waiter into the waiter list of the condition */
172 opj_mutex_lock(cond->internal_mutex);
174 item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
175 assert(item != NULL);
177 item->hEvent = hEvent;
178 item->next = cond->waiter_list;
180 cond->waiter_list = item;
182 opj_mutex_unlock(cond->internal_mutex);
184 /* Release the client mutex before waiting for the event being signaled */
185 opj_mutex_unlock(mutex);
187 /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
188 /* to report a failure. */
189 WaitForSingleObject(hEvent, INFINITE);
191 /* Reacquire the client mutex */
192 opj_mutex_lock(mutex);
195 void opj_cond_signal(opj_cond_t* cond)
197 opj_cond_waiter_list_t* psIter;
199 /* Signal the first registered event, and remove it from the list */
200 opj_mutex_lock(cond->internal_mutex);
202 psIter = cond->waiter_list;
205 SetEvent(psIter->hEvent);
206 cond->waiter_list = psIter->next;
210 opj_mutex_unlock(cond->internal_mutex);
213 void opj_cond_destroy(opj_cond_t* cond)
216 opj_mutex_destroy(cond->internal_mutex);
217 assert(cond->waiter_list == NULL);
223 opj_thread_fn thread_fn;
228 unsigned int __stdcall opj_thread_callback_adapter( void *info )
230 opj_thread_t* thread = (opj_thread_t*) info;
231 HANDLE hEvent = NULL;
233 thread->thread_fn( thread->user_data );
235 /* Free the handle possible allocated by a cond */
238 /* Make sure TLSKey is not being created just at that moment... */
239 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
240 if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
245 hEvent = (HANDLE) TlsGetValue( TLSKey );
247 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
248 InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
259 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
261 opj_thread_t* thread;
265 thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
268 thread->thread_fn = thread_fn;
269 thread->user_data = user_data;
271 thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
272 opj_thread_callback_adapter, thread, 0, NULL);
274 if( thread->hThread == NULL )
282 void opj_thread_join( opj_thread_t* thread )
284 WaitForSingleObject(thread->hThread, INFINITE);
285 CloseHandle( thread->hThread );
296 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
301 int OPJ_CALLCONV opj_get_num_cpus(void)
303 #ifdef _SC_NPROCESSORS_ONLN
304 return (int)sysconf(_SC_NPROCESSORS_ONLN);
312 pthread_mutex_t mutex;
315 opj_mutex_t* opj_mutex_create(void)
317 opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
320 pthread_mutex_t pthr_mutex = PTHREAD_MUTEX_INITIALIZER;
321 mutex->mutex = pthr_mutex;
325 void opj_mutex_lock(opj_mutex_t* mutex)
327 pthread_mutex_lock(&(mutex->mutex));
330 void opj_mutex_unlock(opj_mutex_t* mutex)
332 pthread_mutex_unlock(&(mutex->mutex));
335 void opj_mutex_destroy(opj_mutex_t* mutex)
338 pthread_mutex_destroy(&(mutex->mutex));
347 opj_cond_t* opj_cond_create(void)
349 opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
352 if( pthread_cond_init(&(cond->cond), NULL) != 0 )
360 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
362 pthread_cond_wait(&(cond->cond), &(mutex->mutex));
365 void opj_cond_signal(opj_cond_t* cond)
367 int ret = pthread_cond_signal(&(cond->cond));
372 void opj_cond_destroy(opj_cond_t* cond)
375 pthread_cond_destroy(&(cond->cond));
382 opj_thread_fn thread_fn;
387 static void* opj_thread_callback_adapter( void* info )
389 opj_thread_t* thread = (opj_thread_t*) info;
390 thread->thread_fn( thread->user_data );
394 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
397 opj_thread_t* thread;
401 thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
404 thread->thread_fn = thread_fn;
405 thread->user_data = user_data;
407 pthread_attr_init( &attr );
408 pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
409 if( pthread_create( &(thread->thread), &attr,
410 opj_thread_callback_adapter, (void *) thread ) != 0 )
418 void opj_thread_join( opj_thread_t* thread )
421 pthread_join( thread->thread, &status);
427 /* Stub implementation */
429 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
434 int OPJ_CALLCONV opj_get_num_cpus(void)
439 opj_mutex_t* opj_mutex_create(void)
444 void opj_mutex_lock(opj_mutex_t* mutex)
449 void opj_mutex_unlock(opj_mutex_t* mutex)
454 void opj_mutex_destroy(opj_mutex_t* mutex)
459 opj_cond_t* opj_cond_create(void)
464 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
470 void opj_cond_signal(opj_cond_t* cond)
475 void opj_cond_destroy(opj_cond_t* cond)
480 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
487 void opj_thread_join( opj_thread_t* thread )
498 opj_tls_free_func opj_free_func;
503 opj_tls_key_val_t* key_val;
507 static opj_tls_t* opj_tls_new(void)
509 return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
512 static void opj_tls_destroy(opj_tls_t* tls)
516 for(i=0;i<tls->key_val_count;i++)
518 if( tls->key_val[i].opj_free_func )
519 tls->key_val[i].opj_free_func(tls->key_val[i].value);
521 opj_free(tls->key_val);
525 void* opj_tls_get(opj_tls_t* tls, int key)
528 for(i=0;i<tls->key_val_count;i++)
530 if( tls->key_val[i].key == key )
531 return tls->key_val[i].value;
536 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, opj_tls_free_func opj_free_func)
538 opj_tls_key_val_t* new_key_val;
540 for(i=0;i<tls->key_val_count;i++)
542 if( tls->key_val[i].key == key )
544 if( tls->key_val[i].opj_free_func )
545 tls->key_val[i].opj_free_func(tls->key_val[i].value);
546 tls->key_val[i].value = value;
547 tls->key_val[i].opj_free_func = opj_free_func;
551 new_key_val = (opj_tls_key_val_t*) opj_realloc( tls->key_val,
552 (tls->key_val_count + 1) * sizeof(opj_tls_key_val_t) );
555 tls->key_val = new_key_val;
556 new_key_val[tls->key_val_count].key = key;
557 new_key_val[tls->key_val_count].value = value;
558 new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
559 tls->key_val_count ++;
568 } opj_worker_thread_job_t;
572 opj_thread_pool_t *tp;
573 opj_thread_t *thread;
574 int marked_as_waiting;
578 } opj_worker_thread_t;
585 } opj_worker_thread_state;
587 struct opj_job_list_t
589 opj_worker_thread_job_t* job;
590 struct opj_job_list_t* next;
592 typedef struct opj_job_list_t opj_job_list_t;
594 struct opj_worker_thread_list_t
596 opj_worker_thread_t* worker_thread;
597 struct opj_worker_thread_list_t* next;
599 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
601 struct opj_thread_pool_t
603 opj_worker_thread_t* worker_threads;
604 int worker_threads_count;
607 volatile opj_worker_thread_state state;
608 opj_job_list_t* job_queue;
609 volatile int pending_jobs_count;
610 opj_worker_thread_list_t* waiting_worker_thread_list;
611 int waiting_worker_thread_count;
613 int signaling_threshold;
616 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
617 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
618 opj_worker_thread_t* worker_thread,
619 OPJ_BOOL signal_job_finished);
621 opj_thread_pool_t* opj_thread_pool_create(int num_threads)
623 opj_thread_pool_t* tp;
625 tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
628 tp->state = OPJWTS_OK;
630 if( num_threads <= 0 )
632 tp->tls = opj_tls_new();
641 tp->mutex = opj_mutex_create();
647 if( !opj_thread_pool_setup(tp, num_threads) )
649 opj_thread_pool_destroy(tp);
655 static void opj_worker_thread_function(void* user_data)
657 opj_worker_thread_t* worker_thread;
658 opj_thread_pool_t* tp;
660 OPJ_BOOL job_finished = OPJ_FALSE;
662 worker_thread = (opj_worker_thread_t* ) user_data;
663 tp = worker_thread->tp;
668 opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread, job_finished);
674 job->job_fn(job->user_data, tls);
677 job_finished = OPJ_TRUE;
680 opj_tls_destroy(tls);
683 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
686 OPJ_BOOL bRet = OPJ_TRUE;
688 assert( num_threads > 0 );
690 tp->cond = opj_cond_create();
691 if( tp->cond == NULL )
694 tp->worker_threads = (opj_worker_thread_t*) opj_calloc( num_threads,
695 sizeof(opj_worker_thread_t) );
696 if( tp->worker_threads == NULL )
698 tp->worker_threads_count = num_threads;
700 for(i=0;i<num_threads;i++)
702 tp->worker_threads[i].tp = tp;
704 tp->worker_threads[i].mutex = opj_mutex_create();
705 if( tp->worker_threads[i].mutex == NULL )
707 tp->worker_threads_count = i;
712 tp->worker_threads[i].cond = opj_cond_create();
713 if( tp->worker_threads[i].cond == NULL )
715 opj_mutex_destroy(tp->worker_threads[i].mutex);
716 tp->worker_threads_count = i;
721 tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
723 tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
724 &(tp->worker_threads[i]));
725 if( tp->worker_threads[i].thread == NULL )
727 tp->worker_threads_count = i;
733 /* Wait all threads to be started */
734 /* printf("waiting for all threads to be started\n"); */
735 opj_mutex_lock(tp->mutex);
736 while( tp->waiting_worker_thread_count < num_threads )
738 opj_cond_wait(tp->cond, tp->mutex);
740 opj_mutex_unlock(tp->mutex);
741 /* printf("all threads started\n"); */
743 if( tp->state == OPJWTS_ERROR )
752 printf("waiting!\n");
756 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
757 opj_worker_thread_t* worker_thread,
758 OPJ_BOOL signal_job_finished)
762 opj_job_list_t* top_job_iter;
764 opj_mutex_lock(tp->mutex);
766 if( signal_job_finished )
768 signal_job_finished = OPJ_FALSE;
769 tp->pending_jobs_count --;
770 /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
771 if( tp->pending_jobs_count <= tp->signaling_threshold )
772 opj_cond_signal(tp->cond);
775 if( tp->state == OPJWTS_STOP )
777 opj_mutex_unlock(tp->mutex);
780 top_job_iter = tp->job_queue;
783 opj_worker_thread_job_t* job;
784 tp->job_queue = top_job_iter->next;
786 job = top_job_iter->job;
787 opj_mutex_unlock(tp->mutex);
788 opj_free(top_job_iter);
793 if( !worker_thread->marked_as_waiting )
795 opj_worker_thread_list_t* item;
797 worker_thread->marked_as_waiting = OPJ_TRUE;
798 tp->waiting_worker_thread_count ++;
799 assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
801 item= (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
804 tp->state = OPJWTS_ERROR;
805 opj_cond_signal(tp->cond);
807 opj_mutex_unlock(tp->mutex);
811 item->worker_thread = worker_thread;
812 item->next = tp->waiting_worker_thread_list;
813 tp->waiting_worker_thread_list = item;
816 /* printf("signaling that worker thread is ready\n"); */
817 opj_cond_signal(tp->cond);
819 opj_mutex_lock(worker_thread->mutex);
820 opj_mutex_unlock(tp->mutex);
822 /* printf("waiting for job\n"); */
823 opj_cond_wait( worker_thread->cond, worker_thread->mutex );
825 opj_mutex_unlock(worker_thread->mutex);
826 /* printf("got job\n"); */
830 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
834 opj_worker_thread_job_t* job;
835 opj_job_list_t* item;
837 if( tp->mutex == NULL )
839 job_fn( user_data, tp->tls );
843 job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
846 job->job_fn = job_fn;
847 job->user_data = user_data;
849 item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
857 opj_mutex_lock(tp->mutex);
859 tp->signaling_threshold = 100 * tp->worker_threads_count;
860 while( tp->pending_jobs_count > tp->signaling_threshold )
862 /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
863 opj_cond_wait(tp->cond, tp->mutex);
864 /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
867 item->next = tp->job_queue;
868 tp->job_queue = item;
869 tp->pending_jobs_count ++;
871 if( tp->waiting_worker_thread_list )
873 opj_worker_thread_t* worker_thread;
874 opj_worker_thread_list_t* next;
875 opj_worker_thread_list_t* to_opj_free;
877 worker_thread = tp->waiting_worker_thread_list->worker_thread;
879 assert( worker_thread->marked_as_waiting );
880 worker_thread->marked_as_waiting = OPJ_FALSE;
882 next = tp->waiting_worker_thread_list->next;
883 to_opj_free = tp->waiting_worker_thread_list;
884 tp->waiting_worker_thread_list = next;
885 tp->waiting_worker_thread_count --;
887 opj_mutex_lock(worker_thread->mutex);
888 opj_mutex_unlock(tp->mutex);
889 opj_cond_signal(worker_thread->cond);
890 opj_mutex_unlock(worker_thread->mutex);
892 opj_free(to_opj_free);
895 opj_mutex_unlock(tp->mutex);
900 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp, int max_remaining_jobs)
902 if( tp->mutex == NULL )
907 if( max_remaining_jobs < 0 )
908 max_remaining_jobs = 0;
909 opj_mutex_lock(tp->mutex);
910 tp->signaling_threshold = max_remaining_jobs;
911 while( tp->pending_jobs_count > max_remaining_jobs )
913 /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
914 opj_cond_wait(tp->cond, tp->mutex);
915 /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
917 opj_mutex_unlock(tp->mutex);
920 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
922 return tp->worker_threads_count;
925 void opj_thread_pool_destroy(opj_thread_pool_t* tp)
931 opj_thread_pool_wait_completion(tp, 0);
933 opj_mutex_lock(tp->mutex);
934 tp->state = OPJWTS_STOP;
935 opj_mutex_unlock(tp->mutex);
937 for(i=0;i<tp->worker_threads_count;i++)
939 opj_mutex_lock(tp->worker_threads[i].mutex);
940 opj_cond_signal(tp->worker_threads[i].cond);
941 opj_mutex_unlock(tp->worker_threads[i].mutex);
942 opj_thread_join(tp->worker_threads[i].thread);
943 opj_cond_destroy(tp->worker_threads[i].cond);
944 opj_mutex_destroy(tp->worker_threads[i].mutex);
947 opj_free(tp->worker_threads);
949 while( tp->waiting_worker_thread_list != NULL )
951 opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
952 opj_free( tp->waiting_worker_thread_list );
953 tp->waiting_worker_thread_list = next;
956 opj_cond_destroy(tp->cond);
958 opj_mutex_destroy(tp->mutex);
959 opj_tls_destroy(tp->tls);