79a9d5fb496ac20b12dd21eb94fb610c3b0cbae8
[openjpeg.git] / src / lib / openjp2 / thread.c
1 /*
2  * The copyright in this software is being made available under the 2-clauses 
3  * BSD License, included below. This software may be subject to other third 
4  * party and contributor rights, including patent rights, and no such rights
5  * are granted under this license.
6  *
7  * Copyright (c) 2016, Even Rouault
8  * All rights reserved.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31
32 #include "opj_includes.h"
33
34 #include "thread.h"
35 #include <assert.h>
36
37 #ifdef MUTEX_win32
38
39 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
40 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
41 /* a rather unlikely race, skip it */
42 #if !(defined(__MINGW32__) && defined(__i386__))
43 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
44 #endif
45
46 #include <windows.h>
47 #include <process.h>
48
49 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
50 {
51     return OPJ_TRUE;
52 }
53
54 int OPJ_CALLCONV opj_get_num_cpus(void)
55 {
56     SYSTEM_INFO info;
57     DWORD dwNum;
58     GetSystemInfo(&info);
59     dwNum = info.dwNumberOfProcessors;
60     if( dwNum < 1 )
61         return 1;
62     return (int)dwNum;
63 }
64
65 struct opj_mutex_t
66 {
67     CRITICAL_SECTION cs;
68 };
69
70 opj_mutex_t* opj_mutex_create(void)
71 {
72     opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
73     if( !mutex )
74         return NULL;
75     InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
76     return mutex;
77 }
78
79 void opj_mutex_lock(opj_mutex_t* mutex)
80 {
81     EnterCriticalSection( &(mutex->cs) );
82 }
83
84 void opj_mutex_unlock(opj_mutex_t* mutex)
85 {
86     LeaveCriticalSection( &(mutex->cs) );
87 }
88
89 void opj_mutex_destroy(opj_mutex_t* mutex)
90 {
91     if( !mutex ) return;
92     DeleteCriticalSection( &(mutex->cs) );
93     opj_free( mutex );
94 }
95
96 struct opj_cond_waiter_list_t
97 {
98     HANDLE hEvent;
99     struct opj_cond_waiter_list_t* next;
100 };
101 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
102
103 struct opj_cond_t
104 {
105     opj_mutex_t             *internal_mutex;
106     opj_cond_waiter_list_t  *waiter_list;
107 };
108
109 static DWORD TLSKey = 0;
110 static volatile LONG inTLSLockedSection = 0;
111 static volatile int TLSKeyInit = OPJ_FALSE;
112
113 opj_cond_t* opj_cond_create(void)
114 {
115     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
116     if( !cond )
117         return NULL;
118
119     /* Make sure that the TLS key is allocated in a thread-safe way */
120     /* We cannot use a global mutex/critical section since its creation itself would not be */
121     /* thread-safe, so use InterlockedCompareExchange trick */
122     while( OPJ_TRUE )
123     {
124
125 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
126         if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
127 #endif
128         {
129             if( !TLSKeyInit )
130             {
131                 TLSKey = TlsAlloc();
132                 TLSKeyInit = OPJ_TRUE;
133             }
134 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
135             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
136 #endif
137             break;
138         }
139     }
140
141     if( TLSKey == TLS_OUT_OF_INDEXES )
142     {
143         opj_free(cond);
144         return NULL;
145     }
146     cond->internal_mutex = opj_mutex_create();
147     if (cond->internal_mutex == NULL)
148     {
149         opj_free(cond);
150         return NULL;
151     }
152     cond->waiter_list = NULL;
153     return cond;
154 }
155
156 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
157 {
158     opj_cond_waiter_list_t* item;
159     HANDLE hEvent = (HANDLE) TlsGetValue( TLSKey );
160     if (hEvent == NULL)
161     {
162         hEvent = CreateEvent(NULL, /* security attributes */
163                              0,    /* manual reset = no */
164                              0,    /* initial state = unsignaled */
165                              NULL  /* no name */);
166         assert(hEvent);
167
168         TlsSetValue( TLSKey, hEvent );
169     }
170
171     /* Insert the waiter into the waiter list of the condition */
172     opj_mutex_lock(cond->internal_mutex);
173
174     item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
175     assert(item != NULL);
176
177     item->hEvent = hEvent;
178     item->next = cond->waiter_list;
179
180     cond->waiter_list = item;
181
182     opj_mutex_unlock(cond->internal_mutex);
183
184     /* Release the client mutex before waiting for the event being signaled */
185     opj_mutex_unlock(mutex);
186
187     /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
188     /* to report a failure. */
189     WaitForSingleObject(hEvent, INFINITE);
190
191     /* Reacquire the client mutex */
192     opj_mutex_lock(mutex);
193 }
194
195 void opj_cond_signal(opj_cond_t* cond)
196 {
197     opj_cond_waiter_list_t* psIter;
198
199     /* Signal the first registered event, and remove it from the list */
200     opj_mutex_lock(cond->internal_mutex);
201
202     psIter = cond->waiter_list;
203     if (psIter != NULL)
204     {
205         SetEvent(psIter->hEvent);
206         cond->waiter_list = psIter->next;
207         opj_free(psIter);
208     }
209
210     opj_mutex_unlock(cond->internal_mutex);
211 }
212
213 void opj_cond_destroy(opj_cond_t* cond)
214 {
215     if( !cond ) return;
216     opj_mutex_destroy(cond->internal_mutex);
217     assert(cond->waiter_list == NULL);
218     opj_free(cond);
219 }
220
221 struct opj_thread_t
222 {
223     opj_thread_fn thread_fn;
224     void* user_data;
225     HANDLE hThread;
226 };
227
228 unsigned int __stdcall opj_thread_callback_adapter( void *info )
229 {
230     opj_thread_t* thread = (opj_thread_t*) info;
231     HANDLE hEvent = NULL;
232
233     thread->thread_fn( thread->user_data );
234
235     /* Free the handle possible allocated by a cond */
236     while( OPJ_TRUE )
237     {
238         /* Make sure TLSKey is not being created just at that moment... */
239 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
240         if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
241 #endif
242         {
243             if( TLSKeyInit )
244             {
245                 hEvent = (HANDLE) TlsGetValue( TLSKey );
246             }
247 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
248             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
249 #endif
250             break;
251         }
252     }
253     if( hEvent )
254         CloseHandle(hEvent);
255
256     return 0;
257 }
258
259 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
260 {
261     opj_thread_t* thread;
262
263     assert( thread_fn );
264
265     thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
266     if( !thread )
267         return NULL;
268     thread->thread_fn = thread_fn;
269     thread->user_data = user_data;
270
271     thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
272                                     opj_thread_callback_adapter, thread, 0, NULL);
273
274     if( thread->hThread == NULL )
275     {
276         opj_free( thread );
277         return NULL;
278     }
279     return thread;
280 }
281
282 void opj_thread_join( opj_thread_t* thread )
283 {
284     WaitForSingleObject(thread->hThread, INFINITE);
285     CloseHandle( thread->hThread );
286
287     opj_free(thread);
288 }
289
290 #elif MUTEX_pthread
291
292 #include <pthread.h>
293 #include <stdlib.h>
294 #include <unistd.h>
295
296 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
297 {
298     return OPJ_TRUE;
299 }
300
301 int OPJ_CALLCONV opj_get_num_cpus(void)
302 {
303 #ifdef _SC_NPROCESSORS_ONLN
304     return (int)sysconf(_SC_NPROCESSORS_ONLN);
305 #else
306     return 1;
307 #endif
308 }
309
310 struct opj_mutex_t
311 {
312     pthread_mutex_t mutex;
313 };
314
315 opj_mutex_t* opj_mutex_create(void)
316 {
317     opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t));
318     if( mutex != NULL ) {
319         if ( pthread_mutex_init(&mutex->mutex, NULL) != 0) {
320             opj_free(mutex);
321             mutex = NULL;
322         }
323     }
324     return mutex;
325 }
326
327 void opj_mutex_lock(opj_mutex_t* mutex)
328 {
329     pthread_mutex_lock(&(mutex->mutex));
330 }
331
332 void opj_mutex_unlock(opj_mutex_t* mutex)
333 {
334     pthread_mutex_unlock(&(mutex->mutex));
335 }
336
337 void opj_mutex_destroy(opj_mutex_t* mutex)
338 {
339     if( !mutex ) return;
340     pthread_mutex_destroy(&(mutex->mutex));
341     opj_free(mutex);
342 }
343
344 struct opj_cond_t
345 {
346     pthread_cond_t cond;
347 };
348
349 opj_cond_t* opj_cond_create(void)
350 {
351     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
352     if( !cond )
353         return NULL;
354     if( pthread_cond_init(&(cond->cond), NULL) != 0 )
355     {
356         opj_free(cond);
357         return NULL;
358     }
359     return cond;
360 }
361
362 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
363 {
364     pthread_cond_wait(&(cond->cond), &(mutex->mutex));
365 }
366
367 void opj_cond_signal(opj_cond_t* cond)
368 {
369     int ret = pthread_cond_signal(&(cond->cond));
370     (void)ret;
371     assert(ret == 0);
372 }
373
374 void opj_cond_destroy(opj_cond_t* cond)
375 {
376     if( !cond ) return;
377     pthread_cond_destroy(&(cond->cond));
378     opj_free(cond);
379 }
380
381
382 struct opj_thread_t
383 {
384     opj_thread_fn thread_fn;
385     void* user_data;
386     pthread_t thread;
387 };
388
389 static void* opj_thread_callback_adapter( void* info )
390 {
391     opj_thread_t* thread = (opj_thread_t*) info;
392     thread->thread_fn( thread->user_data );
393     return NULL;
394 }
395
396 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
397 {
398     pthread_attr_t attr;
399     opj_thread_t* thread;
400
401     assert( thread_fn );
402
403     thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
404     if( !thread )
405         return NULL;
406     thread->thread_fn = thread_fn;
407     thread->user_data = user_data;
408
409     pthread_attr_init( &attr );
410     pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
411     if( pthread_create( &(thread->thread), &attr,
412                         opj_thread_callback_adapter, (void *) thread ) != 0 )
413     {
414         opj_free( thread );
415         return NULL;
416     }
417     return thread;
418 }
419
420 void opj_thread_join( opj_thread_t* thread )
421 {
422     void* status;
423     pthread_join( thread->thread, &status);
424
425     opj_free(thread);
426 }
427
428 #else
429 /* Stub implementation */
430
431 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
432 {
433     return OPJ_FALSE;
434 }
435
436 int OPJ_CALLCONV opj_get_num_cpus(void)
437 {
438     return 1;
439 }
440
441 opj_mutex_t* opj_mutex_create(void)
442 {
443     return NULL;
444 }
445
446 void opj_mutex_lock(opj_mutex_t* mutex)
447 {
448     (void) mutex;
449 }
450
451 void opj_mutex_unlock(opj_mutex_t* mutex)
452 {
453     (void) mutex;
454 }
455
456 void opj_mutex_destroy(opj_mutex_t* mutex)
457 {
458     (void) mutex;
459 }
460
461 opj_cond_t* opj_cond_create(void)
462 {
463     return NULL;
464 }
465
466 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
467 {
468     (void) cond;
469     (void) mutex;
470 }
471
472 void opj_cond_signal(opj_cond_t* cond)
473 {
474     (void) cond;
475 }
476
477 void opj_cond_destroy(opj_cond_t* cond)
478 {
479     (void) cond;
480 }
481
482 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
483 {
484     (void) thread_fn; 
485     (void) user_data;
486     return NULL;
487 }
488
489 void opj_thread_join( opj_thread_t* thread )
490 {
491     (void) thread;
492 }
493
494 #endif
495
496 typedef struct
497 {
498     int key;
499     void* value;
500     opj_tls_free_func opj_free_func;
501 } opj_tls_key_val_t;
502
503 struct opj_tls_t
504 {
505     opj_tls_key_val_t* key_val;
506     int                key_val_count;
507 };
508
509 static opj_tls_t* opj_tls_new(void)
510 {
511     return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
512 }
513
514 static void opj_tls_destroy(opj_tls_t* tls)
515 {
516     int i;
517     if( !tls ) return;
518     for(i=0;i<tls->key_val_count;i++)
519     {
520         if( tls->key_val[i].opj_free_func )
521             tls->key_val[i].opj_free_func(tls->key_val[i].value);
522     }
523     opj_free(tls->key_val);
524     opj_free(tls);
525 }
526
527 void* opj_tls_get(opj_tls_t* tls, int key)
528 {
529     int i;
530     for(i=0;i<tls->key_val_count;i++)
531     {
532         if( tls->key_val[i].key == key )
533             return tls->key_val[i].value;
534     }
535     return NULL;
536 }
537
538 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, opj_tls_free_func opj_free_func)
539 {
540     opj_tls_key_val_t* new_key_val;
541     int i;
542         
543     if (tls->key_val_count == INT_MAX) {
544         return OPJ_FALSE;
545     }
546     for(i=0;i<tls->key_val_count;i++)
547     {
548         if( tls->key_val[i].key == key )
549         {
550             if( tls->key_val[i].opj_free_func ) {
551                 tls->key_val[i].opj_free_func(tls->key_val[i].value);
552             }
553             tls->key_val[i].value = value;
554             tls->key_val[i].opj_free_func = opj_free_func;
555             return OPJ_TRUE;
556         }
557     }
558     new_key_val = (opj_tls_key_val_t*) opj_realloc( tls->key_val,
559                         ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t) );
560     if( !new_key_val )
561         return OPJ_FALSE;
562     tls->key_val = new_key_val;
563     new_key_val[tls->key_val_count].key = key;
564     new_key_val[tls->key_val_count].value = value;
565     new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
566     tls->key_val_count ++;
567     return OPJ_TRUE;
568 }
569
570
571 typedef struct
572 {
573     opj_job_fn          job_fn;
574     void               *user_data;
575 } opj_worker_thread_job_t;
576
577 typedef struct
578 {
579     opj_thread_pool_t   *tp;
580     opj_thread_t        *thread;
581     int                  marked_as_waiting;
582
583     opj_mutex_t         *mutex;
584     opj_cond_t          *cond;
585 } opj_worker_thread_t;
586
587 typedef enum
588 {
589     OPJWTS_OK,
590     OPJWTS_STOP,
591     OPJWTS_ERROR
592 } opj_worker_thread_state;
593
594 struct opj_job_list_t
595 {
596     opj_worker_thread_job_t* job;
597     struct opj_job_list_t* next;
598 };
599 typedef struct opj_job_list_t opj_job_list_t;
600
601 struct opj_worker_thread_list_t
602 {
603     opj_worker_thread_t* worker_thread;
604     struct opj_worker_thread_list_t* next;
605 };
606 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
607
608 struct opj_thread_pool_t
609 {
610     opj_worker_thread_t*             worker_threads;
611     int                              worker_threads_count;
612     opj_cond_t*                      cond;
613     opj_mutex_t*                     mutex;
614     volatile opj_worker_thread_state state;
615     opj_job_list_t*                  job_queue;
616     volatile int                     pending_jobs_count;
617     opj_worker_thread_list_t*        waiting_worker_thread_list;
618     int                              waiting_worker_thread_count;
619     opj_tls_t*                       tls;
620     int                              signaling_threshold;
621 };
622
623 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
624 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
625                                                              opj_worker_thread_t* worker_thread,
626                                                              OPJ_BOOL signal_job_finished);
627
628 opj_thread_pool_t* opj_thread_pool_create(int num_threads)
629 {
630     opj_thread_pool_t* tp;
631
632     tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
633     if( !tp )
634         return NULL;
635     tp->state = OPJWTS_OK;
636
637     if( num_threads <= 0 )
638     {
639         tp->tls = opj_tls_new();
640         if( !tp->tls )
641         {
642             opj_free(tp);
643             tp = NULL;
644         }
645         return tp;
646     }
647
648     tp->mutex = opj_mutex_create();
649     if( !tp->mutex )
650     {
651         opj_free(tp);
652         return NULL;
653     }
654     if( !opj_thread_pool_setup(tp, num_threads) )
655     {
656         opj_thread_pool_destroy(tp);
657         return NULL;
658     }
659     return tp;
660 }
661
662 static void opj_worker_thread_function(void* user_data)
663 {
664     opj_worker_thread_t* worker_thread;
665     opj_thread_pool_t* tp;
666     opj_tls_t* tls;
667     OPJ_BOOL job_finished = OPJ_FALSE;
668
669     worker_thread = (opj_worker_thread_t* ) user_data;
670     tp = worker_thread->tp;
671     tls = opj_tls_new();
672
673     while( OPJ_TRUE )
674     {
675         opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread, job_finished);
676         if( job == NULL )
677             break;
678
679         if( job->job_fn )
680         {
681             job->job_fn(job->user_data, tls);
682         }
683         opj_free(job);
684         job_finished = OPJ_TRUE;
685     }
686
687     opj_tls_destroy(tls);
688 }
689
690 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
691 {
692     int i;
693     OPJ_BOOL bRet = OPJ_TRUE;
694
695     assert( num_threads > 0 );
696
697     tp->cond = opj_cond_create();
698     if( tp->cond == NULL )
699         return OPJ_FALSE;
700
701     tp->worker_threads = (opj_worker_thread_t*) opj_calloc( (size_t)num_threads,
702                                                         sizeof(opj_worker_thread_t) );
703     if( tp->worker_threads == NULL )
704         return OPJ_FALSE;
705     tp->worker_threads_count = num_threads;
706
707     for(i=0;i<num_threads;i++)
708     {
709         tp->worker_threads[i].tp = tp;
710
711         tp->worker_threads[i].mutex = opj_mutex_create();
712         if( tp->worker_threads[i].mutex == NULL )
713         {
714             tp->worker_threads_count = i;
715             bRet = OPJ_FALSE;
716             break;
717         }
718
719         tp->worker_threads[i].cond = opj_cond_create();
720         if( tp->worker_threads[i].cond == NULL )
721         {
722             opj_mutex_destroy(tp->worker_threads[i].mutex);
723             tp->worker_threads_count = i;
724             bRet = OPJ_FALSE;
725             break;
726         }
727
728         tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
729
730         tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
731                                                          &(tp->worker_threads[i]));
732         if( tp->worker_threads[i].thread == NULL )
733         {
734             tp->worker_threads_count = i;
735             bRet = OPJ_FALSE;
736             break;
737         }
738     }
739
740     /* Wait all threads to be started */
741     /* printf("waiting for all threads to be started\n"); */
742     opj_mutex_lock(tp->mutex);
743     while( tp->waiting_worker_thread_count < num_threads )
744     {
745         opj_cond_wait(tp->cond, tp->mutex);
746     }
747     opj_mutex_unlock(tp->mutex);
748     /* printf("all threads started\n"); */
749
750     if( tp->state == OPJWTS_ERROR )
751         bRet = OPJ_FALSE;
752
753     return bRet;
754 }
755
756 /*
757 void opj_waiting()
758 {
759     printf("waiting!\n");
760 }
761 */
762
763 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
764                                                              opj_worker_thread_t* worker_thread,
765                                                              OPJ_BOOL signal_job_finished)
766 {
767     while( OPJ_TRUE )
768     {
769         opj_job_list_t* top_job_iter;
770
771         opj_mutex_lock(tp->mutex);
772
773         if( signal_job_finished )
774         {
775             signal_job_finished = OPJ_FALSE;
776             tp->pending_jobs_count --;
777             /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
778             if( tp->pending_jobs_count <= tp->signaling_threshold )
779                 opj_cond_signal(tp->cond);
780         }
781
782         if( tp->state == OPJWTS_STOP )
783         {
784             opj_mutex_unlock(tp->mutex);
785             return NULL;
786         }
787         top_job_iter = tp->job_queue;
788         if( top_job_iter )
789         {
790             opj_worker_thread_job_t* job;
791             tp->job_queue = top_job_iter->next;
792
793             job = top_job_iter->job;
794             opj_mutex_unlock(tp->mutex);
795             opj_free(top_job_iter);
796             return job;
797         }
798
799         /* opj_waiting(); */
800         if( !worker_thread->marked_as_waiting )
801         {
802             opj_worker_thread_list_t* item;
803
804             worker_thread->marked_as_waiting = OPJ_TRUE;
805             tp->waiting_worker_thread_count ++;
806             assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
807
808             item= (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
809             if( item == NULL )
810             {
811                 tp->state = OPJWTS_ERROR;
812                 opj_cond_signal(tp->cond);
813
814                 opj_mutex_unlock(tp->mutex);
815                 return NULL;
816             }
817
818             item->worker_thread = worker_thread;
819             item->next = tp->waiting_worker_thread_list;
820             tp->waiting_worker_thread_list = item;
821         }
822
823         /* printf("signaling that worker thread is ready\n"); */
824         opj_cond_signal(tp->cond);
825
826         opj_mutex_lock(worker_thread->mutex);
827         opj_mutex_unlock(tp->mutex);
828
829         /* printf("waiting for job\n"); */
830         opj_cond_wait( worker_thread->cond, worker_thread->mutex );
831
832         opj_mutex_unlock(worker_thread->mutex);
833         /* printf("got job\n"); */
834     }
835 }
836
837 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
838                                     opj_job_fn job_fn,
839                                     void* user_data)
840 {
841     opj_worker_thread_job_t* job;
842     opj_job_list_t* item;
843
844     if( tp->mutex == NULL )
845     {
846         job_fn( user_data, tp->tls );
847         return OPJ_TRUE;
848     }
849
850     job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
851     if( job == NULL )
852         return OPJ_FALSE;
853     job->job_fn = job_fn;
854     job->user_data = user_data;
855
856     item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
857     if( item == NULL )
858     {
859         opj_free(job);
860         return OPJ_FALSE;
861     }
862     item->job = job;
863
864     opj_mutex_lock(tp->mutex);
865
866     tp->signaling_threshold = 100 * tp->worker_threads_count;
867     while( tp->pending_jobs_count > tp->signaling_threshold )
868     {
869         /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
870         opj_cond_wait(tp->cond, tp->mutex);
871         /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
872     }
873
874     item->next = tp->job_queue;
875     tp->job_queue = item;
876     tp->pending_jobs_count ++;
877
878     if( tp->waiting_worker_thread_list )
879     {
880         opj_worker_thread_t* worker_thread;
881         opj_worker_thread_list_t* next;
882         opj_worker_thread_list_t* to_opj_free;
883
884         worker_thread = tp->waiting_worker_thread_list->worker_thread;
885
886         assert( worker_thread->marked_as_waiting );
887         worker_thread->marked_as_waiting = OPJ_FALSE;
888
889         next = tp->waiting_worker_thread_list->next;
890         to_opj_free = tp->waiting_worker_thread_list;
891         tp->waiting_worker_thread_list = next;
892         tp->waiting_worker_thread_count --;
893
894         opj_mutex_lock(worker_thread->mutex);
895         opj_mutex_unlock(tp->mutex);
896         opj_cond_signal(worker_thread->cond);
897         opj_mutex_unlock(worker_thread->mutex);
898
899         opj_free(to_opj_free);
900     }
901     else
902         opj_mutex_unlock(tp->mutex);
903
904     return OPJ_TRUE;
905 }
906
907 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp, int max_remaining_jobs)
908 {
909     if( tp->mutex == NULL )
910     {
911         return;
912     }
913
914     if( max_remaining_jobs < 0 )
915         max_remaining_jobs = 0;
916     opj_mutex_lock(tp->mutex);
917     tp->signaling_threshold = max_remaining_jobs;
918     while( tp->pending_jobs_count > max_remaining_jobs )
919     {
920         /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
921         opj_cond_wait(tp->cond, tp->mutex);
922         /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
923     }
924     opj_mutex_unlock(tp->mutex);
925 }
926
927 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
928 {
929     return tp->worker_threads_count;
930 }
931
932 void opj_thread_pool_destroy(opj_thread_pool_t* tp)
933 {
934     if( !tp ) return;
935     if( tp->cond )
936     {
937         int i;
938         opj_thread_pool_wait_completion(tp, 0);
939
940         opj_mutex_lock(tp->mutex);
941         tp->state = OPJWTS_STOP;
942         opj_mutex_unlock(tp->mutex);
943
944         for(i=0;i<tp->worker_threads_count;i++)
945         {
946             opj_mutex_lock(tp->worker_threads[i].mutex);
947             opj_cond_signal(tp->worker_threads[i].cond);
948             opj_mutex_unlock(tp->worker_threads[i].mutex);
949             opj_thread_join(tp->worker_threads[i].thread);
950             opj_cond_destroy(tp->worker_threads[i].cond);
951             opj_mutex_destroy(tp->worker_threads[i].mutex);
952         }
953
954         opj_free(tp->worker_threads);
955
956         while( tp->waiting_worker_thread_list != NULL )
957         {
958             opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
959             opj_free( tp->waiting_worker_thread_list );
960             tp->waiting_worker_thread_list = next;
961         }
962
963         opj_cond_destroy(tp->cond);
964     }
965     opj_mutex_destroy(tp->mutex);
966     opj_tls_destroy(tp->tls);
967     opj_free(tp);
968 }