2 * $Id: msgqueue_manager.c 53 2011-05-09 16:55:39Z kaori $
4 * Copyright (c) 2002-2011, Communications and Remote Sensing Laboratory, Universite catholique de Louvain (UCL), Belgium
5 * Copyright (c) 2002-2011, Professor Benoit Macq
6 * Copyright (c) 2010-2011, Kaori Hagihara
7 * Copyright (c) 2011, Lucian Corlaciu, GSoC
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
13 * 1. Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
19 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS `AS IS'
20 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
23 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29 * POSSIBILITY OF SUCH DAMAGE.
34 #include <sys/types.h>
38 #include "msgqueue_manager.h"
39 #include "metadata_manager.h"
40 #include "index_manager.h"
43 #include "fcgi_stdio.h"
44 #define logstream FCGI_stdout
46 #define FCGI_stdout stdout
47 #define FCGI_stderr stderr
48 #define logstream stderr
51 msgqueue_param_t * gene_msgqueue( bool stateless, cachemodel_param_t *cachemodel)
53 msgqueue_param_t *msgqueue;
55 msgqueue = (msgqueue_param_t *)malloc( sizeof(msgqueue_param_t));
57 msgqueue->first = NULL;
58 msgqueue->last = NULL;
60 msgqueue->stateless = stateless;
61 msgqueue->cachemodel = cachemodel;
66 void delete_msgqueue( msgqueue_param_t **msgqueue)
68 message_param_t *ptr, *next;
73 ptr = (*msgqueue)->first;
80 if( (*msgqueue)->stateless && (*msgqueue)->cachemodel)
81 delete_cachemodel( &((*msgqueue)->cachemodel));
86 void print_msgqueue( msgqueue_param_t *msgqueue)
89 char *message_class[] = { "Precinct", "Ext-Prec", "TileHead", "non", "Tile", "Ext-Tile", "Main", "non", "Meta"};
94 fprintf( logstream, "message queue:\n");
95 ptr = msgqueue->first;
98 fprintf( logstream, "\t class_id: %lld %s\n", ptr->class_id, message_class[ptr->class_id]);
99 fprintf( logstream, "\t in_class_id: %lld\n", ptr->in_class_id );
100 fprintf( logstream, "\t csn: %lld\n", ptr->csn );
101 fprintf( logstream, "\t bin_offset: %#llx\n", ptr->bin_offset );
102 fprintf( logstream, "\t length: %#llx\n", ptr->length );
104 fprintf( logstream, "\t aux: %lld\n", ptr->aux );
105 fprintf( logstream, "\t last_byte: %d\n", ptr->last_byte );
107 print_placeholder( ptr->phld);
109 fprintf( logstream, "\t res_offset: %#llx\n", ptr->res_offset );
110 fprintf( logstream, "\n");
116 void enqueue_message( message_param_t *msg, msgqueue_param_t *msgqueue);
118 void enqueue_mainheader( msgqueue_param_t *msgqueue)
120 cachemodel_param_t *cachemodel;
121 target_param_t *target;
122 index_param_t *codeidx;
123 message_param_t *msg;
125 cachemodel = msgqueue->cachemodel;
126 target = cachemodel->target;
127 codeidx = target->codeidx;
129 msg = (message_param_t *)malloc( sizeof(message_param_t));
131 msg->last_byte = true;
132 msg->in_class_id = 0;
133 msg->class_id = MAINHEADER_MSG;
134 msg->csn = target->csn;
136 msg->length = codeidx->mhead_length;
137 msg->aux = 0; // non exist
138 msg->res_offset = codeidx->offset;
142 enqueue_message( msg, msgqueue);
144 cachemodel->mhead_model = true;
147 void enqueue_tileheader( int tile_id, msgqueue_param_t *msgqueue)
149 cachemodel_param_t *cachemodel;
150 target_param_t *target;
151 index_param_t *codeidx;
152 message_param_t *msg;
154 cachemodel = msgqueue->cachemodel;
155 target = cachemodel->target;
156 codeidx = target->codeidx;
158 if( !cachemodel->th_model[ tile_id]){
159 msg = (message_param_t *)malloc( sizeof(message_param_t));
160 msg->last_byte = true;
161 msg->in_class_id = tile_id;
162 msg->class_id = TILE_HEADER_MSG;
163 msg->csn = target->csn;
165 msg->length = codeidx->tileheader[tile_id]->tlen-2; // SOT marker segment is removed
166 msg->aux = 0; // non exist
167 msg->res_offset = codeidx->offset + get_elemOff( codeidx->tilepart, 0, tile_id) + 2; // skip SOT marker seg
171 enqueue_message( msg, msgqueue);
172 cachemodel->th_model[ tile_id] = true;
176 void enqueue_tile( int tile_id, int level, msgqueue_param_t *msgqueue)
178 cachemodel_param_t *cachemodel;
179 target_param_t *target;
181 Byte8_t numOftparts; // num of tile parts par tile
183 index_param_t *codeidx;
184 faixbox_param_t *tilepart;
185 message_param_t *msg;
186 Byte8_t binOffset, binLength, class_id;
189 cachemodel = msgqueue->cachemodel;
190 target = cachemodel->target;
191 codeidx = target->codeidx;
192 tilepart = codeidx->tilepart;
194 numOftparts = get_nmax( tilepart);
195 numOftiles = get_m( tilepart);
197 class_id = (numOftparts==1) ? TILE_MSG : EXT_TILE_MSG;
199 if( tile_id < 0 || numOftiles <= tile_id){
200 fprintf( FCGI_stderr, "Error, Invalid tile-id %d\n", tile_id);
204 tp_model = &cachemodel->tp_model[ tile_id*numOftparts];
207 for( i=0; i<numOftparts-level; i++){
208 binLength = get_elemLen( tilepart, i, tile_id);
211 msg = (message_param_t *)malloc( sizeof(message_param_t));
213 msg->last_byte = (i==numOftparts-1);
214 msg->in_class_id = tile_id;
215 msg->class_id = class_id;
216 msg->csn = target->csn;
217 msg->bin_offset = binOffset;
218 msg->length = binLength;
219 msg->aux = numOftparts-i;
220 msg->res_offset = codeidx->offset+get_elemOff( tilepart, i, tile_id)/*-1*/;
224 enqueue_message( msg, msgqueue);
228 binOffset += binLength;
232 void enqueue_precinct( int seq_id, int tile_id, int comp_id, int layers, msgqueue_param_t *msgqueue)
234 cachemodel_param_t *cachemodel;
235 index_param_t *codeidx;
236 faixbox_param_t *precpacket;
237 message_param_t *msg;
238 Byte8_t nmax, binOffset, binLength;
239 int layer_id, numOflayers;
241 cachemodel = msgqueue->cachemodel;
242 codeidx = cachemodel->target->codeidx;
243 precpacket = codeidx->precpacket[ comp_id];
244 numOflayers = codeidx->COD.numOflayers;
246 nmax = get_nmax(precpacket);
248 layers = numOflayers;
251 for( layer_id = 0; layer_id < layers; layer_id++){
253 binLength = get_elemLen( precpacket, seq_id*numOflayers+layer_id, tile_id);
255 if( !cachemodel->pp_model[comp_id][ tile_id*nmax+seq_id*numOflayers+layer_id]){
257 msg = (message_param_t *)malloc( sizeof(message_param_t));
258 msg->last_byte = (layer_id == (numOflayers-1));
259 msg->in_class_id = comp_precinct_id( tile_id, comp_id, seq_id, codeidx->SIZ.Csiz, codeidx->SIZ.XTnum * codeidx->SIZ.YTnum);
260 msg->class_id = PRECINCT_MSG;
261 msg->csn = cachemodel->target->csn;
262 msg->bin_offset = binOffset;
263 msg->length = binLength;
265 msg->res_offset = codeidx->offset+get_elemOff( precpacket, seq_id*numOflayers+layer_id, tile_id);
269 enqueue_message( msg, msgqueue);
271 cachemodel->pp_model[comp_id][ tile_id*nmax+seq_id*numOflayers+layer_id] = true;
273 binOffset += binLength;
277 Byte8_t comp_precinct_id( int t, int c, int s, int num_components, int num_tiles)
279 return t + (c + s * num_components ) * num_tiles;
282 void enqueue_box( int meta_id, boxlist_param_t *boxlist, msgqueue_param_t *msgqueue, Byte8_t *binOffset);
283 void enqueue_phld( int meta_id, placeholderlist_param_t *phldlist, msgqueue_param_t *msgqueue, Byte8_t *binOffset);
284 void enqueue_boxcontents( int meta_id, boxcontents_param_t *boxcontents, msgqueue_param_t *msgqueue, Byte8_t *binOffset);
286 void enqueue_metadata( int meta_id, msgqueue_param_t *msgqueue)
288 metadatalist_param_t *metadatalist;
289 metadata_param_t *metadata;
292 metadatalist = msgqueue->cachemodel->target->codeidx->metadatalist;
293 metadata = search_metadata( meta_id, metadatalist);
296 fprintf( FCGI_stderr, "Error: metadata-bin %d not found\n", meta_id);
301 if( metadata->boxlist)
302 enqueue_box( meta_id, metadata->boxlist, msgqueue, &binOffset);
304 if( metadata->placeholderlist)
305 enqueue_phld( meta_id, metadata->placeholderlist, msgqueue, &binOffset);
307 if( metadata->boxcontents)
308 enqueue_boxcontents( meta_id, metadata->boxcontents, msgqueue, &binOffset);
310 msgqueue->last->last_byte = true;
313 message_param_t * gene_metamsg( int meta_id, Byte8_t binoffset, Byte8_t length, Byte8_t res_offset, placeholder_param_t *phld, Byte8_t csn);
315 void enqueue_box( int meta_id, boxlist_param_t *boxlist, msgqueue_param_t *msgqueue, Byte8_t *binOffset)
318 message_param_t *msg;
320 box = boxlist->first;
322 msg = gene_metamsg( meta_id, *binOffset, box->length, box->offset, NULL, msgqueue->cachemodel->target->csn);
323 enqueue_message( msg, msgqueue);
325 *binOffset += box->length;
330 void enqueue_phld( int meta_id, placeholderlist_param_t *phldlist, msgqueue_param_t *msgqueue, Byte8_t *binOffset)
332 placeholder_param_t *phld;
333 message_param_t *msg;
335 phld = phldlist->first;
337 msg = gene_metamsg( meta_id, *binOffset, phld->LBox, 0, phld, msgqueue->cachemodel->target->csn);
338 enqueue_message( msg, msgqueue);
340 *binOffset += phld->LBox;
345 void enqueue_boxcontents( int meta_id, boxcontents_param_t *boxcontents, msgqueue_param_t *msgqueue, Byte8_t *binOffset)
347 message_param_t *msg;
349 msg = gene_metamsg( meta_id, *binOffset, boxcontents->length, boxcontents->offset, NULL, msgqueue->cachemodel->target->csn);
350 enqueue_message( msg, msgqueue);
352 *binOffset += boxcontents->length;
355 message_param_t * gene_metamsg( int meta_id, Byte8_t binOffset, Byte8_t length, Byte8_t res_offset, placeholder_param_t *phld, Byte8_t csn)
357 message_param_t *msg;
359 msg = (message_param_t *)malloc( sizeof(message_param_t));
361 msg->last_byte = false;
362 msg->in_class_id = meta_id;
363 msg->class_id = METADATA_MSG;
365 msg->bin_offset = binOffset;
366 msg->length = length;
367 msg->aux = 0; // non exist
368 msg->res_offset = res_offset;
375 void enqueue_message( message_param_t *msg, msgqueue_param_t *msgqueue)
378 msgqueue->last->next = msg;
380 msgqueue->first = msg;
382 msgqueue->last = msg;
385 void add_bin_id_vbas_stream( Byte_t bb, Byte_t c, Byte8_t in_class_id, int tmpfd);
386 void add_vbas_stream( Byte8_t code, int tmpfd);
387 void add_body_stream( message_param_t *msg, int fd, int tmpfd);
388 void add_placeholder_stream( placeholder_param_t *phld, int tmpfd);
390 void recons_stream_from_msgqueue( msgqueue_param_t *msgqueue, int tmpfd)
392 message_param_t *msg;
393 Byte8_t class_id, csn;
399 msg = msgqueue->first;
403 if( msg->csn == csn){
404 if( msg->class_id == class_id)
408 class_id = msg->class_id;
413 class_id = msg->class_id;
417 c = msg->last_byte ? 1 : 0;
419 add_bin_id_vbas_stream( bb, c, msg->in_class_id, tmpfd);
422 add_vbas_stream( class_id, tmpfd);
424 add_vbas_stream( csn, tmpfd);
426 add_vbas_stream( msg->bin_offset, tmpfd);
427 add_vbas_stream (msg->length, tmpfd);
429 if( msg->class_id%2) // Aux is present only if the id is odd
430 add_vbas_stream( msg->aux, tmpfd);
433 add_placeholder_stream( msg->phld, tmpfd);
435 add_body_stream( msg, msgqueue->cachemodel->target->fd, tmpfd);
441 void add_vbas_with_bytelen_stream( Byte8_t code, int bytelength, int tmpfd);
442 void print_binarycode( Byte8_t n, int segmentlen);
444 void add_bin_id_vbas_stream( Byte_t bb, Byte_t c, Byte8_t in_class_id, int tmpfd)
449 // A.2.3 In-class identifiers
450 // 7k-3bits, where k is the number of bytes in the VBAS
452 tmp = in_class_id >> 4;
458 in_class_id |= (((bb & 3) << 5) | (c & 1) << 4) << ((bytelength-1)*7);
460 add_vbas_with_bytelen_stream( in_class_id, bytelength, tmpfd);
463 void add_vbas_stream( Byte8_t code, int tmpfd)
473 add_vbas_with_bytelen_stream( code, bytelength, tmpfd);
476 void add_vbas_with_bytelen_stream( Byte8_t code, int bytelength, int tmpfd)
483 seg = ( code >> (n*7)) & 0x7f;
486 if( write( tmpfd, ( Byte4_t *)&seg, 1) != 1){
487 fprintf( FCGI_stderr, "Error: failed to write vbas\n");
494 void add_body_stream( message_param_t *msg, int fd, int tmpfd)
498 if( !(data = fetch_bytes( fd, msg->res_offset, msg->length))){
499 fprintf( FCGI_stderr, "Error: fetch_bytes in add_body_stream()\n");
503 if( write( tmpfd, data, msg->length) < 1){
505 fprintf( FCGI_stderr, "Error: fwrite in add_body_stream()\n");
511 void add_bigendian_bytestream( Byte8_t code, int bytelength, int tmpfd);
513 void add_placeholder_stream( placeholder_param_t *phld, int tmpfd)
515 add_bigendian_bytestream( phld->LBox, 4, tmpfd);
516 if( write( tmpfd, phld->TBox, 4) < 1){
517 fprintf( FCGI_stderr, "Error: fwrite in add_placeholder_stream()\n");
520 add_bigendian_bytestream( phld->Flags, 4, tmpfd);
521 add_bigendian_bytestream( phld->OrigID, 8, tmpfd);
523 if( write( tmpfd, phld->OrigBH, phld->OrigBHlen) < 1){
524 fprintf( FCGI_stderr, "Error: fwrite in add_placeholder_stream()\n");
529 void add_bigendian_bytestream( Byte8_t code, int bytelength, int tmpfd)
536 seg = ( code >> (n*8)) & 0xff;
537 if( write( tmpfd, ( Byte4_t *)&seg, 1) != 1){
538 fprintf( FCGI_stderr, "ERROR: failed to write bigendian_bytestream\n");
545 void print_binarycode( Byte8_t n, int segmentlen)
551 buf[i++] = n%2 ? '1' : '0';
554 for( j=segmentlen-1; j>=i; j--)
557 for( j=i-1, k=0; j>=0; j--, k++){
559 if( !((k+1)%segmentlen))
565 Byte_t * parse_bin_id_vbas( Byte_t *streamptr, Byte_t *bb, Byte_t *c, Byte8_t *in_class_id);
566 Byte_t * parse_vbas( Byte_t *streamptr, Byte8_t *elem);
568 void parse_JPIPstream( Byte_t *JPIPstream, Byte8_t streamlen, Byte8_t offset, msgqueue_param_t *msgqueue)
570 Byte_t *ptr; // stream pointer
571 message_param_t *msg;
573 Byte8_t class_id, csn;
575 class_id = -1; // dummy
578 while( ptr-JPIPstream < streamlen){
579 msg = (message_param_t *)malloc( sizeof(message_param_t));
581 ptr = parse_bin_id_vbas( ptr, &bb, &c, &msg->in_class_id);
583 msg->last_byte = c == 1 ? true : false;
586 ptr = parse_vbas( ptr, &class_id);
588 msg->class_id = class_id;
591 ptr = parse_vbas( ptr, &csn);
594 ptr = parse_vbas( ptr, &msg->bin_offset);
595 ptr = parse_vbas( ptr, &msg->length);
597 if( msg->class_id%2) // Aux is present only if the id is odd
598 ptr = parse_vbas( ptr, &msg->aux);
602 msg->res_offset = ptr-JPIPstream+offset;
607 msgqueue->last->next = msg;
609 msgqueue->first = msg;
610 msgqueue->last = msg;
616 void parse_metadata( metadata_param_t *metadata, message_param_t *msg, Byte_t *stream);
618 void parse_metamsg( msgqueue_param_t *msgqueue, Byte_t *stream, Byte8_t streamlen, metadatalist_param_t *metadatalist)
620 message_param_t *msg;
622 if( metadatalist == NULL)
625 msg = msgqueue->first;
627 if( msg->class_id == METADATA_MSG){
628 metadata_param_t *metadata = gene_metadata( msg->in_class_id, NULL, NULL, NULL);
629 insert_metadata_into_list( metadata, metadatalist);
630 parse_metadata( metadata, msg, stream+msg->res_offset);
636 placeholder_param_t * parse_phld( Byte_t *datastream, Byte8_t metalength);
638 void parse_metadata( metadata_param_t *metadata, message_param_t *msg, Byte_t *datastream)
640 char *boxtype = (char *)(datastream+4);
644 if( strncmp( boxtype, "phld", 4) == 0){
645 if( !metadata->placeholderlist)
646 metadata->placeholderlist = gene_placeholderlist();
648 placeholder_param_t *phld = parse_phld( datastream, msg->length);
650 insert_placeholder_into_list( phld, metadata->placeholderlist);
652 else if( isalpha(boxtype[0]) && isalpha(boxtype[1]) &&
653 (isalnum(boxtype[2])||isblank(boxtype[2])) &&
654 (isalpha(boxtype[3])||isblank(boxtype[3]))){
655 if( !metadata->boxlist)
656 metadata->boxlist = gene_boxlist();
658 box_param_t *box = gene_boxbyOffinStream( datastream, msg->res_offset);
659 insert_box_into_list( box, metadata->boxlist);
662 metadata->boxcontents = gene_boxcontents( msg->res_offset, msg->length);
665 placeholder_param_t * parse_phld( Byte_t *datastream, Byte8_t metalength)
667 placeholder_param_t *phld;
669 phld = (placeholder_param_t *)malloc( sizeof(placeholder_param_t));
671 phld->LBox = big4( datastream);
672 strcpy( phld->TBox, "phld");
673 phld->Flags = big4( datastream+8);
674 phld->OrigID = big8( datastream+12);
675 phld->OrigBHlen = metalength - 20;
676 phld->OrigBH = (Byte_t *)malloc(phld->OrigBHlen);
677 memcpy( phld->OrigBH, datastream+20, phld->OrigBHlen);
683 Byte_t * parse_bin_id_vbas( Byte_t *streamptr, Byte_t *bb, Byte_t *c, Byte8_t *in_class_id)
691 *bb = (code >> 5) & 3;
692 *c = (code >> 4) & 1;
694 *in_class_id = code & 15;
698 *in_class_id = (*in_class_id << 7) | (code & 0x7f);
703 Byte_t * parse_vbas( Byte_t *streamptr, Byte8_t *elem)
712 *elem = (*elem << 7) | (code & 0x7f);
718 void delete_message_in_msgqueue( message_param_t **msg, msgqueue_param_t *msgqueue)
720 message_param_t *ptr;
725 if( *msg == msgqueue->first)
726 msgqueue->first = (*msg)->next;
728 ptr = msgqueue->first;
729 while( ptr->next != *msg){
733 ptr->next = (*msg)->next;
735 if( *msg == msgqueue->last)
736 msgqueue->last = ptr;