Merge pull request #919 from rouault/reformat
[openjpeg.git] / src / lib / openjp2 / thread.c
1 /*
2  * The copyright in this software is being made available under the 2-clauses
3  * BSD License, included below. This software may be subject to other third
4  * party and contributor rights, including patent rights, and no such rights
5  * are granted under this license.
6  *
7  * Copyright (c) 2016, Even Rouault
8  * All rights reserved.
9  *
10  * Redistribution and use in source and binary forms, with or without
11  * modification, are permitted provided that the following conditions
12  * are met:
13  * 1. Redistributions of source code must retain the above copyright
14  *    notice, this list of conditions and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  *
19  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
20  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22  * ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29  * POSSIBILITY OF SUCH DAMAGE.
30  */
31
32 #include "opj_includes.h"
33
34 #include "thread.h"
35 #include <assert.h>
36
37 #ifdef MUTEX_win32
38
39 /* Some versions of x86_64-w64-mingw32-gc -m32 resolve InterlockedCompareExchange() */
40 /* as __sync_val_compare_and_swap_4 but fails to link it. As this protects against */
41 /* a rather unlikely race, skip it */
42 #if !(defined(__MINGW32__) && defined(__i386__))
43 #define HAVE_INTERLOCKED_COMPARE_EXCHANGE 1
44 #endif
45
46 #include <windows.h>
47 #include <process.h>
48
49 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
50 {
51     return OPJ_TRUE;
52 }
53
54 int OPJ_CALLCONV opj_get_num_cpus(void)
55 {
56     SYSTEM_INFO info;
57     DWORD dwNum;
58     GetSystemInfo(&info);
59     dwNum = info.dwNumberOfProcessors;
60     if (dwNum < 1) {
61         return 1;
62     }
63     return (int)dwNum;
64 }
65
66 struct opj_mutex_t {
67     CRITICAL_SECTION cs;
68 };
69
70 opj_mutex_t* opj_mutex_create(void)
71 {
72     opj_mutex_t* mutex = (opj_mutex_t*) opj_malloc(sizeof(opj_mutex_t));
73     if (!mutex) {
74         return NULL;
75     }
76     InitializeCriticalSectionAndSpinCount(&(mutex->cs), 4000);
77     return mutex;
78 }
79
80 void opj_mutex_lock(opj_mutex_t* mutex)
81 {
82     EnterCriticalSection(&(mutex->cs));
83 }
84
85 void opj_mutex_unlock(opj_mutex_t* mutex)
86 {
87     LeaveCriticalSection(&(mutex->cs));
88 }
89
90 void opj_mutex_destroy(opj_mutex_t* mutex)
91 {
92     if (!mutex) {
93         return;
94     }
95     DeleteCriticalSection(&(mutex->cs));
96     opj_free(mutex);
97 }
98
99 struct opj_cond_waiter_list_t {
100     HANDLE hEvent;
101     struct opj_cond_waiter_list_t* next;
102 };
103 typedef struct opj_cond_waiter_list_t opj_cond_waiter_list_t;
104
105 struct opj_cond_t {
106     opj_mutex_t             *internal_mutex;
107     opj_cond_waiter_list_t  *waiter_list;
108 };
109
110 static DWORD TLSKey = 0;
111 static volatile LONG inTLSLockedSection = 0;
112 static volatile int TLSKeyInit = OPJ_FALSE;
113
114 opj_cond_t* opj_cond_create(void)
115 {
116     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
117     if (!cond) {
118         return NULL;
119     }
120
121     /* Make sure that the TLS key is allocated in a thread-safe way */
122     /* We cannot use a global mutex/critical section since its creation itself would not be */
123     /* thread-safe, so use InterlockedCompareExchange trick */
124     while (OPJ_TRUE) {
125
126 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
127         if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
128 #endif
129         {
130             if (!TLSKeyInit) {
131                 TLSKey = TlsAlloc();
132                 TLSKeyInit = OPJ_TRUE;
133             }
134 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
135             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
136 #endif
137             break;
138         }
139     }
140
141     if (TLSKey == TLS_OUT_OF_INDEXES) {
142         opj_free(cond);
143         return NULL;
144     }
145     cond->internal_mutex = opj_mutex_create();
146     if (cond->internal_mutex == NULL) {
147         opj_free(cond);
148         return NULL;
149     }
150     cond->waiter_list = NULL;
151     return cond;
152 }
153
154 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
155 {
156     opj_cond_waiter_list_t* item;
157     HANDLE hEvent = (HANDLE) TlsGetValue(TLSKey);
158     if (hEvent == NULL) {
159         hEvent = CreateEvent(NULL, /* security attributes */
160                              0,    /* manual reset = no */
161                              0,    /* initial state = unsignaled */
162                              NULL  /* no name */);
163         assert(hEvent);
164
165         TlsSetValue(TLSKey, hEvent);
166     }
167
168     /* Insert the waiter into the waiter list of the condition */
169     opj_mutex_lock(cond->internal_mutex);
170
171     item = (opj_cond_waiter_list_t*)opj_malloc(sizeof(opj_cond_waiter_list_t));
172     assert(item != NULL);
173
174     item->hEvent = hEvent;
175     item->next = cond->waiter_list;
176
177     cond->waiter_list = item;
178
179     opj_mutex_unlock(cond->internal_mutex);
180
181     /* Release the client mutex before waiting for the event being signaled */
182     opj_mutex_unlock(mutex);
183
184     /* Ideally we would check that we do not get WAIT_FAILED but it is hard */
185     /* to report a failure. */
186     WaitForSingleObject(hEvent, INFINITE);
187
188     /* Reacquire the client mutex */
189     opj_mutex_lock(mutex);
190 }
191
192 void opj_cond_signal(opj_cond_t* cond)
193 {
194     opj_cond_waiter_list_t* psIter;
195
196     /* Signal the first registered event, and remove it from the list */
197     opj_mutex_lock(cond->internal_mutex);
198
199     psIter = cond->waiter_list;
200     if (psIter != NULL) {
201         SetEvent(psIter->hEvent);
202         cond->waiter_list = psIter->next;
203         opj_free(psIter);
204     }
205
206     opj_mutex_unlock(cond->internal_mutex);
207 }
208
209 void opj_cond_destroy(opj_cond_t* cond)
210 {
211     if (!cond) {
212         return;
213     }
214     opj_mutex_destroy(cond->internal_mutex);
215     assert(cond->waiter_list == NULL);
216     opj_free(cond);
217 }
218
219 struct opj_thread_t {
220     opj_thread_fn thread_fn;
221     void* user_data;
222     HANDLE hThread;
223 };
224
225 unsigned int __stdcall opj_thread_callback_adapter(void *info)
226 {
227     opj_thread_t* thread = (opj_thread_t*) info;
228     HANDLE hEvent = NULL;
229
230     thread->thread_fn(thread->user_data);
231
232     /* Free the handle possible allocated by a cond */
233     while (OPJ_TRUE) {
234         /* Make sure TLSKey is not being created just at that moment... */
235 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
236         if (InterlockedCompareExchange(&inTLSLockedSection, 1, 0) == 0)
237 #endif
238         {
239             if (TLSKeyInit) {
240                 hEvent = (HANDLE) TlsGetValue(TLSKey);
241             }
242 #if HAVE_INTERLOCKED_COMPARE_EXCHANGE
243             InterlockedCompareExchange(&inTLSLockedSection, 0, 1);
244 #endif
245             break;
246         }
247     }
248     if (hEvent) {
249         CloseHandle(hEvent);
250     }
251
252     return 0;
253 }
254
255 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
256 {
257     opj_thread_t* thread;
258
259     assert(thread_fn);
260
261     thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
262     if (!thread) {
263         return NULL;
264     }
265     thread->thread_fn = thread_fn;
266     thread->user_data = user_data;
267
268     thread->hThread = (HANDLE)_beginthreadex(NULL, 0,
269                       opj_thread_callback_adapter, thread, 0, NULL);
270
271     if (thread->hThread == NULL) {
272         opj_free(thread);
273         return NULL;
274     }
275     return thread;
276 }
277
278 void opj_thread_join(opj_thread_t* thread)
279 {
280     WaitForSingleObject(thread->hThread, INFINITE);
281     CloseHandle(thread->hThread);
282
283     opj_free(thread);
284 }
285
286 #elif MUTEX_pthread
287
288 #include <pthread.h>
289 #include <stdlib.h>
290 #include <unistd.h>
291
292 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
293 {
294     return OPJ_TRUE;
295 }
296
297 int OPJ_CALLCONV opj_get_num_cpus(void)
298 {
299 #ifdef _SC_NPROCESSORS_ONLN
300     return (int)sysconf(_SC_NPROCESSORS_ONLN);
301 #else
302     return 1;
303 #endif
304 }
305
306 struct opj_mutex_t {
307     pthread_mutex_t mutex;
308 };
309
310 opj_mutex_t* opj_mutex_create(void)
311 {
312     opj_mutex_t* mutex = (opj_mutex_t*) opj_calloc(1U, sizeof(opj_mutex_t));
313     if (mutex != NULL) {
314         if (pthread_mutex_init(&mutex->mutex, NULL) != 0) {
315             opj_free(mutex);
316             mutex = NULL;
317         }
318     }
319     return mutex;
320 }
321
322 void opj_mutex_lock(opj_mutex_t* mutex)
323 {
324     pthread_mutex_lock(&(mutex->mutex));
325 }
326
327 void opj_mutex_unlock(opj_mutex_t* mutex)
328 {
329     pthread_mutex_unlock(&(mutex->mutex));
330 }
331
332 void opj_mutex_destroy(opj_mutex_t* mutex)
333 {
334     if (!mutex) {
335         return;
336     }
337     pthread_mutex_destroy(&(mutex->mutex));
338     opj_free(mutex);
339 }
340
341 struct opj_cond_t {
342     pthread_cond_t cond;
343 };
344
345 opj_cond_t* opj_cond_create(void)
346 {
347     opj_cond_t* cond = (opj_cond_t*) opj_malloc(sizeof(opj_cond_t));
348     if (!cond) {
349         return NULL;
350     }
351     if (pthread_cond_init(&(cond->cond), NULL) != 0) {
352         opj_free(cond);
353         return NULL;
354     }
355     return cond;
356 }
357
358 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
359 {
360     pthread_cond_wait(&(cond->cond), &(mutex->mutex));
361 }
362
363 void opj_cond_signal(opj_cond_t* cond)
364 {
365     int ret = pthread_cond_signal(&(cond->cond));
366     (void)ret;
367     assert(ret == 0);
368 }
369
370 void opj_cond_destroy(opj_cond_t* cond)
371 {
372     if (!cond) {
373         return;
374     }
375     pthread_cond_destroy(&(cond->cond));
376     opj_free(cond);
377 }
378
379
380 struct opj_thread_t {
381     opj_thread_fn thread_fn;
382     void* user_data;
383     pthread_t thread;
384 };
385
386 static void* opj_thread_callback_adapter(void* info)
387 {
388     opj_thread_t* thread = (opj_thread_t*) info;
389     thread->thread_fn(thread->user_data);
390     return NULL;
391 }
392
393 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
394 {
395     pthread_attr_t attr;
396     opj_thread_t* thread;
397
398     assert(thread_fn);
399
400     thread = (opj_thread_t*) opj_malloc(sizeof(opj_thread_t));
401     if (!thread) {
402         return NULL;
403     }
404     thread->thread_fn = thread_fn;
405     thread->user_data = user_data;
406
407     pthread_attr_init(&attr);
408     pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
409     if (pthread_create(&(thread->thread), &attr,
410                        opj_thread_callback_adapter, (void *) thread) != 0) {
411         opj_free(thread);
412         return NULL;
413     }
414     return thread;
415 }
416
417 void opj_thread_join(opj_thread_t* thread)
418 {
419     void* status;
420     pthread_join(thread->thread, &status);
421
422     opj_free(thread);
423 }
424
425 #else
426 /* Stub implementation */
427
428 OPJ_BOOL OPJ_CALLCONV opj_has_thread_support(void)
429 {
430     return OPJ_FALSE;
431 }
432
433 int OPJ_CALLCONV opj_get_num_cpus(void)
434 {
435     return 1;
436 }
437
438 opj_mutex_t* opj_mutex_create(void)
439 {
440     return NULL;
441 }
442
443 void opj_mutex_lock(opj_mutex_t* mutex)
444 {
445     (void) mutex;
446 }
447
448 void opj_mutex_unlock(opj_mutex_t* mutex)
449 {
450     (void) mutex;
451 }
452
453 void opj_mutex_destroy(opj_mutex_t* mutex)
454 {
455     (void) mutex;
456 }
457
458 opj_cond_t* opj_cond_create(void)
459 {
460     return NULL;
461 }
462
463 void opj_cond_wait(opj_cond_t* cond, opj_mutex_t* mutex)
464 {
465     (void) cond;
466     (void) mutex;
467 }
468
469 void opj_cond_signal(opj_cond_t* cond)
470 {
471     (void) cond;
472 }
473
474 void opj_cond_destroy(opj_cond_t* cond)
475 {
476     (void) cond;
477 }
478
479 opj_thread_t* opj_thread_create(opj_thread_fn thread_fn, void* user_data)
480 {
481     (void) thread_fn;
482     (void) user_data;
483     return NULL;
484 }
485
486 void opj_thread_join(opj_thread_t* thread)
487 {
488     (void) thread;
489 }
490
491 #endif
492
493 typedef struct {
494     int key;
495     void* value;
496     opj_tls_free_func opj_free_func;
497 } opj_tls_key_val_t;
498
499 struct opj_tls_t {
500     opj_tls_key_val_t* key_val;
501     int                key_val_count;
502 };
503
504 static opj_tls_t* opj_tls_new(void)
505 {
506     return (opj_tls_t*) opj_calloc(1, sizeof(opj_tls_t));
507 }
508
509 static void opj_tls_destroy(opj_tls_t* tls)
510 {
511     int i;
512     if (!tls) {
513         return;
514     }
515     for (i = 0; i < tls->key_val_count; i++) {
516         if (tls->key_val[i].opj_free_func) {
517             tls->key_val[i].opj_free_func(tls->key_val[i].value);
518         }
519     }
520     opj_free(tls->key_val);
521     opj_free(tls);
522 }
523
524 void* opj_tls_get(opj_tls_t* tls, int key)
525 {
526     int i;
527     for (i = 0; i < tls->key_val_count; i++) {
528         if (tls->key_val[i].key == key) {
529             return tls->key_val[i].value;
530         }
531     }
532     return NULL;
533 }
534
535 OPJ_BOOL opj_tls_set(opj_tls_t* tls, int key, void* value,
536                      opj_tls_free_func opj_free_func)
537 {
538     opj_tls_key_val_t* new_key_val;
539     int i;
540
541     if (tls->key_val_count == INT_MAX) {
542         return OPJ_FALSE;
543     }
544     for (i = 0; i < tls->key_val_count; i++) {
545         if (tls->key_val[i].key == key) {
546             if (tls->key_val[i].opj_free_func) {
547                 tls->key_val[i].opj_free_func(tls->key_val[i].value);
548             }
549             tls->key_val[i].value = value;
550             tls->key_val[i].opj_free_func = opj_free_func;
551             return OPJ_TRUE;
552         }
553     }
554     new_key_val = (opj_tls_key_val_t*) opj_realloc(tls->key_val,
555                   ((size_t)tls->key_val_count + 1U) * sizeof(opj_tls_key_val_t));
556     if (!new_key_val) {
557         return OPJ_FALSE;
558     }
559     tls->key_val = new_key_val;
560     new_key_val[tls->key_val_count].key = key;
561     new_key_val[tls->key_val_count].value = value;
562     new_key_val[tls->key_val_count].opj_free_func = opj_free_func;
563     tls->key_val_count ++;
564     return OPJ_TRUE;
565 }
566
567
568 typedef struct {
569     opj_job_fn          job_fn;
570     void               *user_data;
571 } opj_worker_thread_job_t;
572
573 typedef struct {
574     opj_thread_pool_t   *tp;
575     opj_thread_t        *thread;
576     int                  marked_as_waiting;
577
578     opj_mutex_t         *mutex;
579     opj_cond_t          *cond;
580 } opj_worker_thread_t;
581
582 typedef enum {
583     OPJWTS_OK,
584     OPJWTS_STOP,
585     OPJWTS_ERROR
586 } opj_worker_thread_state;
587
588 struct opj_job_list_t {
589     opj_worker_thread_job_t* job;
590     struct opj_job_list_t* next;
591 };
592 typedef struct opj_job_list_t opj_job_list_t;
593
594 struct opj_worker_thread_list_t {
595     opj_worker_thread_t* worker_thread;
596     struct opj_worker_thread_list_t* next;
597 };
598 typedef struct opj_worker_thread_list_t opj_worker_thread_list_t;
599
600 struct opj_thread_pool_t {
601     opj_worker_thread_t*             worker_threads;
602     int                              worker_threads_count;
603     opj_cond_t*                      cond;
604     opj_mutex_t*                     mutex;
605     volatile opj_worker_thread_state state;
606     opj_job_list_t*                  job_queue;
607     volatile int                     pending_jobs_count;
608     opj_worker_thread_list_t*        waiting_worker_thread_list;
609     int                              waiting_worker_thread_count;
610     opj_tls_t*                       tls;
611     int                              signaling_threshold;
612 };
613
614 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads);
615 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
616     opj_thread_pool_t* tp,
617     opj_worker_thread_t* worker_thread,
618     OPJ_BOOL signal_job_finished);
619
620 opj_thread_pool_t* opj_thread_pool_create(int num_threads)
621 {
622     opj_thread_pool_t* tp;
623
624     tp = (opj_thread_pool_t*) opj_calloc(1, sizeof(opj_thread_pool_t));
625     if (!tp) {
626         return NULL;
627     }
628     tp->state = OPJWTS_OK;
629
630     if (num_threads <= 0) {
631         tp->tls = opj_tls_new();
632         if (!tp->tls) {
633             opj_free(tp);
634             tp = NULL;
635         }
636         return tp;
637     }
638
639     tp->mutex = opj_mutex_create();
640     if (!tp->mutex) {
641         opj_free(tp);
642         return NULL;
643     }
644     if (!opj_thread_pool_setup(tp, num_threads)) {
645         opj_thread_pool_destroy(tp);
646         return NULL;
647     }
648     return tp;
649 }
650
651 static void opj_worker_thread_function(void* user_data)
652 {
653     opj_worker_thread_t* worker_thread;
654     opj_thread_pool_t* tp;
655     opj_tls_t* tls;
656     OPJ_BOOL job_finished = OPJ_FALSE;
657
658     worker_thread = (opj_worker_thread_t*) user_data;
659     tp = worker_thread->tp;
660     tls = opj_tls_new();
661
662     while (OPJ_TRUE) {
663         opj_worker_thread_job_t* job = opj_thread_pool_get_next_job(tp, worker_thread,
664                                        job_finished);
665         if (job == NULL) {
666             break;
667         }
668
669         if (job->job_fn) {
670             job->job_fn(job->user_data, tls);
671         }
672         opj_free(job);
673         job_finished = OPJ_TRUE;
674     }
675
676     opj_tls_destroy(tls);
677 }
678
679 static OPJ_BOOL opj_thread_pool_setup(opj_thread_pool_t* tp, int num_threads)
680 {
681     int i;
682     OPJ_BOOL bRet = OPJ_TRUE;
683
684     assert(num_threads > 0);
685
686     tp->cond = opj_cond_create();
687     if (tp->cond == NULL) {
688         return OPJ_FALSE;
689     }
690
691     tp->worker_threads = (opj_worker_thread_t*) opj_calloc((size_t)num_threads,
692                          sizeof(opj_worker_thread_t));
693     if (tp->worker_threads == NULL) {
694         return OPJ_FALSE;
695     }
696     tp->worker_threads_count = num_threads;
697
698     for (i = 0; i < num_threads; i++) {
699         tp->worker_threads[i].tp = tp;
700
701         tp->worker_threads[i].mutex = opj_mutex_create();
702         if (tp->worker_threads[i].mutex == NULL) {
703             tp->worker_threads_count = i;
704             bRet = OPJ_FALSE;
705             break;
706         }
707
708         tp->worker_threads[i].cond = opj_cond_create();
709         if (tp->worker_threads[i].cond == NULL) {
710             opj_mutex_destroy(tp->worker_threads[i].mutex);
711             tp->worker_threads_count = i;
712             bRet = OPJ_FALSE;
713             break;
714         }
715
716         tp->worker_threads[i].marked_as_waiting = OPJ_FALSE;
717
718         tp->worker_threads[i].thread = opj_thread_create(opj_worker_thread_function,
719                                        &(tp->worker_threads[i]));
720         if (tp->worker_threads[i].thread == NULL) {
721             tp->worker_threads_count = i;
722             bRet = OPJ_FALSE;
723             break;
724         }
725     }
726
727     /* Wait all threads to be started */
728     /* printf("waiting for all threads to be started\n"); */
729     opj_mutex_lock(tp->mutex);
730     while (tp->waiting_worker_thread_count < num_threads) {
731         opj_cond_wait(tp->cond, tp->mutex);
732     }
733     opj_mutex_unlock(tp->mutex);
734     /* printf("all threads started\n"); */
735
736     if (tp->state == OPJWTS_ERROR) {
737         bRet = OPJ_FALSE;
738     }
739
740     return bRet;
741 }
742
743 /*
744 void opj_waiting()
745 {
746     printf("waiting!\n");
747 }
748 */
749
750 static opj_worker_thread_job_t* opj_thread_pool_get_next_job(
751     opj_thread_pool_t* tp,
752     opj_worker_thread_t* worker_thread,
753     OPJ_BOOL signal_job_finished)
754 {
755     while (OPJ_TRUE) {
756         opj_job_list_t* top_job_iter;
757
758         opj_mutex_lock(tp->mutex);
759
760         if (signal_job_finished) {
761             signal_job_finished = OPJ_FALSE;
762             tp->pending_jobs_count --;
763             /*printf("tp=%p, remaining jobs: %d\n", tp, tp->pending_jobs_count);*/
764             if (tp->pending_jobs_count <= tp->signaling_threshold) {
765                 opj_cond_signal(tp->cond);
766             }
767         }
768
769         if (tp->state == OPJWTS_STOP) {
770             opj_mutex_unlock(tp->mutex);
771             return NULL;
772         }
773         top_job_iter = tp->job_queue;
774         if (top_job_iter) {
775             opj_worker_thread_job_t* job;
776             tp->job_queue = top_job_iter->next;
777
778             job = top_job_iter->job;
779             opj_mutex_unlock(tp->mutex);
780             opj_free(top_job_iter);
781             return job;
782         }
783
784         /* opj_waiting(); */
785         if (!worker_thread->marked_as_waiting) {
786             opj_worker_thread_list_t* item;
787
788             worker_thread->marked_as_waiting = OPJ_TRUE;
789             tp->waiting_worker_thread_count ++;
790             assert(tp->waiting_worker_thread_count <= tp->worker_threads_count);
791
792             item = (opj_worker_thread_list_t*) opj_malloc(sizeof(opj_worker_thread_list_t));
793             if (item == NULL) {
794                 tp->state = OPJWTS_ERROR;
795                 opj_cond_signal(tp->cond);
796
797                 opj_mutex_unlock(tp->mutex);
798                 return NULL;
799             }
800
801             item->worker_thread = worker_thread;
802             item->next = tp->waiting_worker_thread_list;
803             tp->waiting_worker_thread_list = item;
804         }
805
806         /* printf("signaling that worker thread is ready\n"); */
807         opj_cond_signal(tp->cond);
808
809         opj_mutex_lock(worker_thread->mutex);
810         opj_mutex_unlock(tp->mutex);
811
812         /* printf("waiting for job\n"); */
813         opj_cond_wait(worker_thread->cond, worker_thread->mutex);
814
815         opj_mutex_unlock(worker_thread->mutex);
816         /* printf("got job\n"); */
817     }
818 }
819
820 OPJ_BOOL opj_thread_pool_submit_job(opj_thread_pool_t* tp,
821                                     opj_job_fn job_fn,
822                                     void* user_data)
823 {
824     opj_worker_thread_job_t* job;
825     opj_job_list_t* item;
826
827     if (tp->mutex == NULL) {
828         job_fn(user_data, tp->tls);
829         return OPJ_TRUE;
830     }
831
832     job = (opj_worker_thread_job_t*)opj_malloc(sizeof(opj_worker_thread_job_t));
833     if (job == NULL) {
834         return OPJ_FALSE;
835     }
836     job->job_fn = job_fn;
837     job->user_data = user_data;
838
839     item = (opj_job_list_t*) opj_malloc(sizeof(opj_job_list_t));
840     if (item == NULL) {
841         opj_free(job);
842         return OPJ_FALSE;
843     }
844     item->job = job;
845
846     opj_mutex_lock(tp->mutex);
847
848     tp->signaling_threshold = 100 * tp->worker_threads_count;
849     while (tp->pending_jobs_count > tp->signaling_threshold) {
850         /* printf("%d jobs enqueued. Waiting\n", tp->pending_jobs_count); */
851         opj_cond_wait(tp->cond, tp->mutex);
852         /* printf("...%d jobs enqueued.\n", tp->pending_jobs_count); */
853     }
854
855     item->next = tp->job_queue;
856     tp->job_queue = item;
857     tp->pending_jobs_count ++;
858
859     if (tp->waiting_worker_thread_list) {
860         opj_worker_thread_t* worker_thread;
861         opj_worker_thread_list_t* next;
862         opj_worker_thread_list_t* to_opj_free;
863
864         worker_thread = tp->waiting_worker_thread_list->worker_thread;
865
866         assert(worker_thread->marked_as_waiting);
867         worker_thread->marked_as_waiting = OPJ_FALSE;
868
869         next = tp->waiting_worker_thread_list->next;
870         to_opj_free = tp->waiting_worker_thread_list;
871         tp->waiting_worker_thread_list = next;
872         tp->waiting_worker_thread_count --;
873
874         opj_mutex_lock(worker_thread->mutex);
875         opj_mutex_unlock(tp->mutex);
876         opj_cond_signal(worker_thread->cond);
877         opj_mutex_unlock(worker_thread->mutex);
878
879         opj_free(to_opj_free);
880     } else {
881         opj_mutex_unlock(tp->mutex);
882     }
883
884     return OPJ_TRUE;
885 }
886
887 void opj_thread_pool_wait_completion(opj_thread_pool_t* tp,
888                                      int max_remaining_jobs)
889 {
890     if (tp->mutex == NULL) {
891         return;
892     }
893
894     if (max_remaining_jobs < 0) {
895         max_remaining_jobs = 0;
896     }
897     opj_mutex_lock(tp->mutex);
898     tp->signaling_threshold = max_remaining_jobs;
899     while (tp->pending_jobs_count > max_remaining_jobs) {
900         /*printf("tp=%p, jobs before wait = %d, max_remaining_jobs = %d\n", tp, tp->pending_jobs_count, max_remaining_jobs);*/
901         opj_cond_wait(tp->cond, tp->mutex);
902         /*printf("tp=%p, jobs after wait = %d\n", tp, tp->pending_jobs_count);*/
903     }
904     opj_mutex_unlock(tp->mutex);
905 }
906
907 int opj_thread_pool_get_thread_count(opj_thread_pool_t* tp)
908 {
909     return tp->worker_threads_count;
910 }
911
912 void opj_thread_pool_destroy(opj_thread_pool_t* tp)
913 {
914     if (!tp) {
915         return;
916     }
917     if (tp->cond) {
918         int i;
919         opj_thread_pool_wait_completion(tp, 0);
920
921         opj_mutex_lock(tp->mutex);
922         tp->state = OPJWTS_STOP;
923         opj_mutex_unlock(tp->mutex);
924
925         for (i = 0; i < tp->worker_threads_count; i++) {
926             opj_mutex_lock(tp->worker_threads[i].mutex);
927             opj_cond_signal(tp->worker_threads[i].cond);
928             opj_mutex_unlock(tp->worker_threads[i].mutex);
929             opj_thread_join(tp->worker_threads[i].thread);
930             opj_cond_destroy(tp->worker_threads[i].cond);
931             opj_mutex_destroy(tp->worker_threads[i].mutex);
932         }
933
934         opj_free(tp->worker_threads);
935
936         while (tp->waiting_worker_thread_list != NULL) {
937             opj_worker_thread_list_t* next = tp->waiting_worker_thread_list->next;
938             opj_free(tp->waiting_worker_thread_list);
939             tp->waiting_worker_thread_list = next;
940         }
941
942         opj_cond_destroy(tp->cond);
943     }
944     opj_mutex_destroy(tp->mutex);
945     opj_tls_destroy(tp->tls);
946     opj_free(tp);
947 }