Merge pull request #786 from rouault/tier1_optimizations_multithreading
[openjpeg.git] / src / lib / openjp2 / thread.c
1 /*
2  * The copyright in this software is being made available under the 2-clauses 
3  * BSD License, included below. This software may be subject to other third 
4  * party and contributor rights, including patent rights, and no such rights
5  * are granted under this license.
6  *
7  * Copyright (c) 2016, Even Rouault
8  * All rights reserved.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31
32 #include "opj_includes.h"
33
34 #include "thread.h"
35 #include <assert.h>
36
37 #ifdef MUTEX_win32
38
39 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
40 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
41 /* a rather unlikely race, skip it */
42 #if !(defined(__MINGW32__) && defined(__i386__))
43 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
44 #endif
45
46 #include <windows.h>
47 #include <process.h>
48
49 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
50 {
51     return OPJ_TRUE;
52 }
53
54 int OPJ_CALLCONV opj_get_num_cpus(void)
55 {
56     SYSTEM_INFO info;
57     DWORD dwNum;
58     GetSystemInfo(&info);
59     dwNum = info.dwNumberOfProcessors;
60     if( dwNum < 1 )
61         return 1;
62     return (int)dwNum;
63 }
64
65 struct opj_mutex_t
66 {
67     CRITICAL_SECTION cs;
68 };
69
70 opj_mutex_t* opj_mutex_create(void)
71 {
72     opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
73     if( !mutex )
74         return NULL;
75     InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
76     return mutex;
77 }
78
79 void opj_mutex_lock(opj_mutex_t* mutex)
80 {
81     EnterCriticalSection( &(mutex->cs) );
82 }
83
84 void opj_mutex_unlock(opj_mutex_t* mutex)
85 {
86     LeaveCriticalSection( &(mutex->cs) );
87 }
88
89 void opj_mutex_destroy(opj_mutex_t* mutex)
90 {
91     if( !mutex ) return;
92     DeleteCriticalSection( &(mutex->cs) );
93     opj_free( mutex );
94 }
95
96 struct opj_cond_waiter_list_t
97 {
98     HANDLE hEvent;
99     struct opj_cond_waiter_list_t* next;
100 };
101 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
102
103 struct opj_cond_t
104 {
105     opj_mutex_t             *internal_mutex;
106     opj_cond_waiter_list_t  *waiter_list;
107 };
108
109 static DWORD TLSKey = 0;
110 static volatile LONG inTLSLockedSection = 0;
111 static volatile int TLSKeyInit = OPJ_FALSE;
112
113 opj_cond_t* opj_cond_create(void)
114 {
115     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
116     if( !cond )
117         return NULL;
118
119     /* Make sure that the TLS key is allocated in a thread-safe way */
120     /* We cannot use a global mutex/critical section since its creation itself would not be */
121     /* thread-safe, so use InterlockedCompareExchange trick */
122     while( OPJ_TRUE )
123     {
124
125 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
126         if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
127 #endif
128         {
129             if( !TLSKeyInit )
130             {
131                 TLSKey = TlsAlloc();
132                 TLSKeyInit = OPJ_TRUE;
133             }
134 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
135             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
136 #endif
137             break;
138         }
139     }
140
141     if( TLSKey == TLS_OUT_OF_INDEXES )
142     {
143         opj_free(cond);
144         return NULL;
145     }
146     cond->internal_mutex = opj_mutex_create();
147     if (cond->internal_mutex == NULL)
148     {
149         opj_free(cond);
150         return NULL;
151     }
152     cond->waiter_list = NULL;
153     return cond;
154 }
155
156 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
157 {
158     opj_cond_waiter_list_t* item;
159     HANDLE hEvent = (HANDLE) TlsGetValue( TLSKey );
160     if (hEvent == NULL)
161     {
162         hEvent = CreateEvent(NULL, /* security attributes */
163                              0,    /* manual reset = no */
164                              0,    /* initial state = unsignaled */
165                              NULL  /* no name */);
166         assert(hEvent);
167
168         TlsSetValue( TLSKey, hEvent );
169     }
170
171     /* Insert the waiter into the waiter list of the condition */
172     opj_mutex_lock(cond->internal_mutex);
173
174     item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
175     assert(item != NULL);
176
177     item->hEvent = hEvent;
178     item->next = cond->waiter_list;
179
180     cond->waiter_list = item;
181
182     opj_mutex_unlock(cond->internal_mutex);
183
184     /* Release the client mutex before waiting for the event being signaled */
185     opj_mutex_unlock(mutex);
186
187     /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
188     /* to report a failure. */
189     WaitForSingleObject(hEvent, INFINITE);
190
191     /* Reacquire the client mutex */
192     opj_mutex_lock(mutex);
193 }
194
195 void opj_cond_signal(opj_cond_t* cond)
196 {
197     opj_cond_waiter_list_t* psIter;
198
199     /* Signal the first registered event, and remove it from the list */
200     opj_mutex_lock(cond->internal_mutex);
201
202     psIter = cond->waiter_list;
203     if (psIter != NULL)
204     {
205         SetEvent(psIter->hEvent);
206         cond->waiter_list = psIter->next;
207         opj_free(psIter);
208     }
209
210     opj_mutex_unlock(cond->internal_mutex);
211 }
212
213 void opj_cond_destroy(opj_cond_t* cond)
214 {
215     if( !cond ) return;
216     opj_mutex_destroy(cond->internal_mutex);
217     assert(cond->waiter_list == NULL);
218     opj_free(cond);
219 }
220
221 struct opj_thread_t
222 {
223     opj_thread_fn thread_fn;
224     void* user_data;
225     HANDLE hThread;
226 };
227
228 unsigned int __stdcall opj_thread_callback_adapter( void *info )
229 {
230     opj_thread_t* thread = (opj_thread_t*) info;
231     HANDLE hEvent = NULL;
232
233     thread->thread_fn( thread->user_data );
234
235     /* Free the handle possible allocated by a cond */
236     while( OPJ_TRUE )
237     {
238         /* Make sure TLSKey is not being created just at that moment... */
239 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
240         if( InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0 )
241 #endif
242         {
243             if( TLSKeyInit )
244             {
245                 hEvent = (HANDLE) TlsGetValue( TLSKey );
246             }
247 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
248             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
249 #endif
250             break;
251         }
252     }
253     if( hEvent )
254         CloseHandle(hEvent);
255
256     return 0;
257 }
258
259 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
260 {
261     opj_thread_t* thread;
262
263     assert( thread_fn );
264
265     thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
266     if( !thread )
267         return NULL;
268     thread->thread_fn = thread_fn;
269     thread->user_data = user_data;
270
271     thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
272                                     opj_thread_callback_adapter, thread, 0, NULL);
273
274     if( thread->hThread == NULL )
275     {
276         opj_free( thread );
277         return NULL;
278     }
279     return thread;
280 }
281
282 void opj_thread_join( opj_thread_t* thread )
283 {
284     WaitForSingleObject(thread->hThread, INFINITE);
285     CloseHandle( thread->hThread );
286
287     opj_free(thread);
288 }
289
290 #elif MUTEX_pthread
291
292 #include <pthread.h>
293 #include <stdlib.h>
294 #include <unistd.h>
295
296 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
297 {
298     return OPJ_TRUE;
299 }
300
301 int OPJ_CALLCONV opj_get_num_cpus(void)
302 {
303 #ifdef _SC_NPROCESSORS_ONLN
304     return (int)sysconf(_SC_NPROCESSORS_ONLN);
305 #else
306     return 1;
307 #endif
308 }
309
310 struct opj_mutex_t
311 {
312     pthread_mutex_t mutex;
313 };
314
315 opj_mutex_t* opj_mutex_create(void)
316 {
317     opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
318     if( !mutex )
319         return NULL;
320     pthread_mutex_t pthr_mutex = PTHREAD_MUTEX_INITIALIZER;
321     mutex->mutex = pthr_mutex;
322     return mutex;
323 }
324
325 void opj_mutex_lock(opj_mutex_t* mutex)
326 {
327     pthread_mutex_lock(&(mutex->mutex));
328 }
329
330 void opj_mutex_unlock(opj_mutex_t* mutex)
331 {
332     pthread_mutex_unlock(&(mutex->mutex));
333 }
334
335 void opj_mutex_destroy(opj_mutex_t* mutex)
336 {
337     if( !mutex ) return;
338     pthread_mutex_destroy(&(mutex->mutex));
339     opj_free(mutex);
340 }
341
342 struct opj_cond_t
343 {
344     pthread_cond_t cond;
345 };
346
347 opj_cond_t* opj_cond_create(void)
348 {
349     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
350     if( !cond )
351         return NULL;
352     if( pthread_cond_init(&(cond->cond), NULL) != 0 )
353     {
354         opj_free(cond);
355         return NULL;
356     }
357     return cond;
358 }
359
360 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
361 {
362     pthread_cond_wait(&(cond->cond), &(mutex->mutex));
363 }
364
365 void opj_cond_signal(opj_cond_t* cond)
366 {
367     int ret = pthread_cond_signal(&(cond->cond));
368     (void)ret;
369     assert(ret == 0);
370 }
371
372 void opj_cond_destroy(opj_cond_t* cond)
373 {
374     if( !cond ) return;
375     pthread_cond_destroy(&(cond->cond));
376     opj_free(cond);
377 }
378
379
380 struct opj_thread_t
381 {
382     opj_thread_fn thread_fn;
383     void* user_data;
384     pthread_t thread;
385 };
386
387 static void* opj_thread_callback_adapter( void* info )
388 {
389     opj_thread_t* thread = (opj_thread_t*) info;
390     thread->thread_fn( thread->user_data );
391     return NULL;
392 }
393
394 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
395 {
396     pthread_attr_t attr;
397     opj_thread_t* thread;
398
399     assert( thread_fn );
400
401     thread = (opj_thread_t*) opj_malloc( sizeof(opj_thread_t) );
402     if( !thread )
403         return NULL;
404     thread->thread_fn = thread_fn;
405     thread->user_data = user_data;
406
407     pthread_attr_init( &attr );
408     pthread_attr_setdetachstate( &attr, PTHREAD_CREATE_JOINABLE );
409     if( pthread_create( &(thread->thread), &attr,
410                         opj_thread_callback_adapter, (void *) thread ) != 0 )
411     {
412         opj_free( thread );
413         return NULL;
414     }
415     return thread;
416 }
417
418 void opj_thread_join( opj_thread_t* thread )
419 {
420     void* status;
421     pthread_join( thread->thread, &status);
422
423     opj_free(thread);
424 }
425
426 #else
427 /* Stub implementation */
428
429 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
430 {
431     return OPJ_FALSE;
432 }
433
434 int OPJ_CALLCONV opj_get_num_cpus(void)
435 {
436     return 1;
437 }
438
439 opj_mutex_t* opj_mutex_create(void)
440 {
441     return NULL;
442 }
443
444 void opj_mutex_lock(opj_mutex_t* mutex)
445 {
446     (void) mutex;
447 }
448
449 void opj_mutex_unlock(opj_mutex_t* mutex)
450 {
451     (void) mutex;
452 }
453
454 void opj_mutex_destroy(opj_mutex_t* mutex)
455 {
456     (void) mutex;
457 }
458
459 opj_cond_t* opj_cond_create(void)
460 {
461     return NULL;
462 }
463
464 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
465 {
466     (void) cond;
467     (void) mutex;
468 }
469
470 void opj_cond_signal(opj_cond_t* cond)
471 {
472     (void) cond;
473 }
474
475 void opj_cond_destroy(opj_cond_t* cond)
476 {
477     (void) cond;
478 }
479
480 opj_thread_t* opj_thread_create( opj_thread_fn thread_fn, void* user_data )
481 {
482     (void) thread_fn; 
483     (void) user_data;
484     return NULL;
485 }
486
487 void opj_thread_join( opj_thread_t* thread )
488 {
489     (void) thread;
490 }
491
492 #endif
493
494 typedef struct
495 {
496     int key;
497     void* value;
498     opj_tls_free_func opj_free_func;
499 } opj_tls_key_val_t;
500
501 struct opj_tls_t
502 {
503     opj_tls_key_val_t* key_val;
504     int                key_val_count;
505 };
506
507 static opj_tls_t* opj_tls_new(void)
508 {
509     return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
510 }
511
512 static void opj_tls_destroy(opj_tls_t* tls)
513 {
514     int i;
515     if( !tls ) return;
516     for(i=0;i<tls->key_val_count;i++)
517     {
518         if( tls->key_val[i].opj_free_func )
519             tls->key_val[i].opj_free_func(tls->key_val[i].value);
520     }
521     opj_free(tls->key_val);
522     opj_free(tls);
523 }
524
525 void* opj_tls_get(opj_tls_t* tls, int key)
526 {
527     int i;
528     for(i=0;i<tls->key_val_count;i++)
529     {
530         if( tls->key_val[i].key == key )
531             return tls->key_val[i].value;
532     }
533     return NULL;
534 }
535
536 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value, opj_tls_free_func opj_free_func)
537 {
538     opj_tls_key_val_t* new_key_val;
539     int i;
540     for(i=0;i<tls->key_val_count;i++)
541     {
542         if( tls->key_val[i].key == key )
543         {
544             if( tls->key_val[i].opj_free_func )
545                 tls->key_val[i].opj_free_func(tls->key_val[i].value);
546             tls->key_val[i].value = value;
547             tls->key_val[i].opj_free_func = opj_free_func;
548             return OPJ_TRUE;
549         }
550     }
551     new_key_val = (opj_tls_key_val_t*) opj_realloc( tls->key_val,
552                         (tls->key_val_count + 1) * sizeof(opj_tls_key_val_t) );
553     if( !new_key_val )
554         return OPJ_FALSE;
555     tls->key_val = new_key_val;
556     new_key_val[tls->key_val_count].key = key;
557     new_key_val[tls->key_val_count].value = value;
558     new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
559     tls->key_val_count ++;
560     return OPJ_TRUE;
561 }
562
563
564 typedef struct
565 {
566     opj_job_fn          job_fn;
567     void               *user_data;
568 } opj_worker_thread_job_t;
569
570 typedef struct
571 {
572     opj_thread_pool_t   *tp;
573     opj_thread_t        *thread;
574     int                  marked_as_waiting;
575
576     opj_mutex_t         *mutex;
577     opj_cond_t          *cond;
578 } opj_worker_thread_t;
579
580 typedef enum
581 {
582     OPJWTS_OK,
583     OPJWTS_STOP,
584     OPJWTS_ERROR
585 } opj_worker_thread_state;
586
587 struct opj_job_list_t
588 {
589     opj_worker_thread_job_t* job;
590     struct opj_job_list_t* next;
591 };
592 typedef struct opj_job_list_t opj_job_list_t;
593
594 struct opj_worker_thread_list_t
595 {
596     opj_worker_thread_t* worker_thread;
597     struct opj_worker_thread_list_t* next;
598 };
599 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
600
601 struct opj_thread_pool_t
602 {
603     opj_worker_thread_t*             worker_threads;
604     int                              worker_threads_count;
605     opj_cond_t*                      cond;
606     opj_mutex_t*                     mutex;
607     volatile opj_worker_thread_state state;
608     opj_job_list_t*                  job_queue;
609     volatile int                     pending_jobs_count;
610     opj_worker_thread_list_t*        waiting_worker_thread_list;
611     int                              waiting_worker_thread_count;
612     opj_tls_t*                       tls;
613     int                              signaling_threshold;
614 };
615
616 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
617 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
618                                                              opj_worker_thread_t* worker_thread,
619                                                              OPJ_BOOL signal_job_finished);
620
621 opj_thread_pool_t* opj_thread_pool_create(int num_threads)
622 {
623     opj_thread_pool_t* tp;
624
625     tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
626     if( !tp )
627         return NULL;
628     tp->state = OPJWTS_OK;
629
630     if( num_threads <= 0 )
631     {
632         tp->tls = opj_tls_new();
633         if( !tp->tls )
634         {
635             opj_free(tp);
636             tp = NULL;
637         }
638         return tp;
639     }
640
641     tp->mutex = opj_mutex_create();
642     if( !tp->mutex )
643     {
644         opj_free(tp);
645         return NULL;
646     }
647     if( !opj_thread_pool_setup(tp, num_threads) )
648     {
649         opj_thread_pool_destroy(tp);
650         return NULL;
651     }
652     return tp;
653 }
654
655 static void opj_worker_thread_function(void* user_data)
656 {
657     opj_worker_thread_t* worker_thread;
658     opj_thread_pool_t* tp;
659     opj_tls_t* tls;
660     OPJ_BOOL job_finished = OPJ_FALSE;
661
662     worker_thread = (opj_worker_thread_t* ) user_data;
663     tp = worker_thread->tp;
664     tls = opj_tls_new();
665
666     while( OPJ_TRUE )
667     {
668         opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread, job_finished);
669         if( job == NULL )
670             break;
671
672         if( job->job_fn )
673         {
674             job->job_fn(job->user_data, tls);
675         }
676         opj_free(job);
677         job_finished = OPJ_TRUE;
678     }
679
680     opj_tls_destroy(tls);
681 }
682
683 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
684 {
685     int i;
686     OPJ_BOOL bRet = OPJ_TRUE;
687
688     assert( num_threads > 0 );
689
690     tp->cond = opj_cond_create();
691     if( tp->cond == NULL )
692         return OPJ_FALSE;
693
694     tp->worker_threads = (opj_worker_thread_t*) opj_calloc( num_threads,
695                                                         sizeof(opj_worker_thread_t) );
696     if( tp->worker_threads == NULL )
697         return OPJ_FALSE;
698     tp->worker_threads_count = num_threads;
699
700     for(i=0;i<num_threads;i++)
701     {
702         tp->worker_threads[i].tp = tp;
703
704         tp->worker_threads[i].mutex = opj_mutex_create();
705         if( tp->worker_threads[i].mutex == NULL )
706         {
707             tp->worker_threads_count = i;
708             bRet = OPJ_FALSE;
709             break;
710         }
711
712         tp->worker_threads[i].cond = opj_cond_create();
713         if( tp->worker_threads[i].cond == NULL )
714         {
715             opj_mutex_destroy(tp->worker_threads[i].mutex);
716             tp->worker_threads_count = i;
717             bRet = OPJ_FALSE;
718             break;
719         }
720
721         tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
722
723         tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
724                                                          &(tp->worker_threads[i]));
725         if( tp->worker_threads[i].thread == NULL )
726         {
727             tp->worker_threads_count = i;
728             bRet = OPJ_FALSE;
729             break;
730         }
731     }
732
733     /* Wait all threads to be started */
734     /* printf("waiting for all threads to be started\n"); */
735     opj_mutex_lock(tp->mutex);
736     while( tp->waiting_worker_thread_count < num_threads )
737     {
738         opj_cond_wait(tp->cond, tp->mutex);
739     }
740     opj_mutex_unlock(tp->mutex);
741     /* printf("all threads started\n"); */
742
743     if( tp->state == OPJWTS_ERROR )
744         bRet = OPJ_FALSE;
745
746     return bRet;
747 }
748
749 /*
750 void opj_waiting()
751 {
752     printf("waiting!\n");
753 }
754 */
755
756 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(opj_thread_pool_t* tp,
757                                                              opj_worker_thread_t* worker_thread,
758                                                              OPJ_BOOL signal_job_finished)
759 {
760     while( OPJ_TRUE )
761     {
762         opj_job_list_t* top_job_iter;
763
764         opj_mutex_lock(tp->mutex);
765
766         if( signal_job_finished )
767         {
768             signal_job_finished = OPJ_FALSE;
769             tp->pending_jobs_count --;
770             /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
771             if( tp->pending_jobs_count <= tp->signaling_threshold )
772                 opj_cond_signal(tp->cond);
773         }
774
775         if( tp->state == OPJWTS_STOP )
776         {
777             opj_mutex_unlock(tp->mutex);
778             return NULL;
779         }
780         top_job_iter = tp->job_queue;
781         if( top_job_iter )
782         {
783             opj_worker_thread_job_t* job;
784             tp->job_queue = top_job_iter->next;
785
786             job = top_job_iter->job;
787             opj_mutex_unlock(tp->mutex);
788             opj_free(top_job_iter);
789             return job;
790         }
791
792         /* opj_waiting(); */
793         if( !worker_thread->marked_as_waiting )
794         {
795             opj_worker_thread_list_t* item;
796
797             worker_thread->marked_as_waiting = OPJ_TRUE;
798             tp->waiting_worker_thread_count ++;
799             assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
800
801             item= (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
802             if( item == NULL )
803             {
804                 tp->state = OPJWTS_ERROR;
805                 opj_cond_signal(tp->cond);
806
807                 opj_mutex_unlock(tp->mutex);
808                 return NULL;
809             }
810
811             item->worker_thread = worker_thread;
812             item->next = tp->waiting_worker_thread_list;
813             tp->waiting_worker_thread_list = item;
814         }
815
816         /* printf("signaling that worker thread is ready\n"); */
817         opj_cond_signal(tp->cond);
818
819         opj_mutex_lock(worker_thread->mutex);
820         opj_mutex_unlock(tp->mutex);
821
822         /* printf("waiting for job\n"); */
823         opj_cond_wait( worker_thread->cond, worker_thread->mutex );
824
825         opj_mutex_unlock(worker_thread->mutex);
826         /* printf("got job\n"); */
827     }
828 }
829
830 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
831                                     opj_job_fn job_fn,
832                                     void* user_data)
833 {
834     opj_worker_thread_job_t* job;
835     opj_job_list_t* item;
836
837     if( tp->mutex == NULL )
838     {
839         job_fn( user_data, tp->tls );
840         return OPJ_TRUE;
841     }
842
843     job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
844     if( job == NULL )
845         return OPJ_FALSE;
846     job->job_fn = job_fn;
847     job->user_data = user_data;
848
849     item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
850     if( item == NULL )
851     {
852         opj_free(job);
853         return OPJ_FALSE;
854     }
855     item->job = job;
856
857     opj_mutex_lock(tp->mutex);
858
859     tp->signaling_threshold = 100 * tp->worker_threads_count;
860     while( tp->pending_jobs_count > tp->signaling_threshold )
861     {
862         /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
863         opj_cond_wait(tp->cond, tp->mutex);
864         /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
865     }
866
867     item->next = tp->job_queue;
868     tp->job_queue = item;
869     tp->pending_jobs_count ++;
870
871     if( tp->waiting_worker_thread_list )
872     {
873         opj_worker_thread_t* worker_thread;
874         opj_worker_thread_list_t* next;
875         opj_worker_thread_list_t* to_opj_free;
876
877         worker_thread = tp->waiting_worker_thread_list->worker_thread;
878
879         assert( worker_thread->marked_as_waiting );
880         worker_thread->marked_as_waiting = OPJ_FALSE;
881
882         next = tp->waiting_worker_thread_list->next;
883         to_opj_free = tp->waiting_worker_thread_list;
884         tp->waiting_worker_thread_list = next;
885         tp->waiting_worker_thread_count --;
886
887         opj_mutex_lock(worker_thread->mutex);
888         opj_mutex_unlock(tp->mutex);
889         opj_cond_signal(worker_thread->cond);
890         opj_mutex_unlock(worker_thread->mutex);
891
892         opj_free(to_opj_free);
893     }
894     else
895         opj_mutex_unlock(tp->mutex);
896
897     return OPJ_TRUE;
898 }
899
900 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp, int max_remaining_jobs)
901 {
902     if( tp->mutex == NULL )
903     {
904         return;
905     }
906
907     if( max_remaining_jobs < 0 )
908         max_remaining_jobs = 0;
909     opj_mutex_lock(tp->mutex);
910     tp->signaling_threshold = max_remaining_jobs;
911     while( tp->pending_jobs_count > max_remaining_jobs )
912     {
913         /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
914         opj_cond_wait(tp->cond, tp->mutex);
915         /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
916     }
917     opj_mutex_unlock(tp->mutex);
918 }
919
920 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
921 {
922     return tp->worker_threads_count;
923 }
924
925 void opj_thread_pool_destroy(opj_thread_pool_t* tp)
926 {
927     if( !tp ) return;
928     if( tp->cond )
929     {
930         int i;
931         opj_thread_pool_wait_completion(tp, 0);
932
933         opj_mutex_lock(tp->mutex);
934         tp->state = OPJWTS_STOP;
935         opj_mutex_unlock(tp->mutex);
936
937         for(i=0;i<tp->worker_threads_count;i++)
938         {
939             opj_mutex_lock(tp->worker_threads[i].mutex);
940             opj_cond_signal(tp->worker_threads[i].cond);
941             opj_mutex_unlock(tp->worker_threads[i].mutex);
942             opj_thread_join(tp->worker_threads[i].thread);
943             opj_cond_destroy(tp->worker_threads[i].cond);
944             opj_mutex_destroy(tp->worker_threads[i].mutex);
945         }
946
947         opj_free(tp->worker_threads);
948
949         while( tp->waiting_worker_thread_list != NULL )
950         {
951             opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
952             opj_free( tp->waiting_worker_thread_list );
953             tp->waiting_worker_thread_list = next;
954         }
955
956         opj_cond_destroy(tp->cond);
957     }
958     opj_mutex_destroy(tp->mutex);
959     opj_tls_destroy(tp->tls);
960     opj_free(tp);
961 }