if( sem_init( &queue->blocking_memory_signal, 0, 0 ) )
goto e2;
+ queue->queue.buffer = malloc( queue->buffer_size );
+ queue->queue.size = queue->buffer_size;
+ if( !queue->queue.buffer )
+ goto e2;
+
return 1;
e2: pthread_mutex_destroy( &queue->data_lock );
e1: pthread_mutex_destroy( &queue->lock );
vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool blocking )
{
- if( queue->upper_memory_limit )
- VG_ASSERT( bytes <= queue->upper_memory_limit );
-
+ VG_ASSERT( bytes <= queue->queue.size );
u32 total_size = sizeof(vg_async_task) + bytes;
pthread_mutex_lock( &queue->data_lock );
pthread_mutex_lock( &queue->lock );
vg_queue *ring = &queue->queue;
vg_async_task *task = vg_queue_alloc( ring, total_size, NULL );
- if( !task )
- {
- bool reallocate = 1;
- if( queue->upper_memory_limit )
- {
- if( ring->size >= queue->upper_memory_limit )
- reallocate = 0;
- }
-
- if( reallocate )
- {
- u32 min_size = vg_align8( total_size ) + sizeof(vg_queue_item) + vg_queue_usage( ring ),
- grow_size = ring->size * 2,
- max_size = queue->upper_memory_limit? queue->upper_memory_limit: 0xffffffff;
-
- u32 new_size = min_size;
- if( grow_size > new_size )
- new_size = grow_size;
-
- if( new_size > max_size )
- new_size = max_size;
-
- vg_queue new_q = { .buffer = malloc( new_size ), .size = new_size };
- if( new_q.buffer )
- {
- vg_queue_copy_upgrade( ring, &new_q );
- free( ring->buffer );
- *ring = new_q;
- task = vg_queue_alloc( ring, total_size, NULL );
- }
- }
- }
-
if( !task )
{
if( blocking )
{
if( task->dispatched )
{
+ pthread_mutex_unlock( &queue->lock );
task->handler( task );
+ pthread_mutex_lock( &queue->lock );
vg_queue_pop( &queue->queue );
}
else
struct vg_async_queue
{
- u32 upper_memory_limit;
+ u32 buffer_size;
u32 requested_bytes;
sem_t blocking_memory_signal;
memcpy( dst, q->buffer + start, size );
}
-/*
- * Copy q1 to q2, and reorganize the memory to correct for q2's new size
- */
-void vg_queue_copy_upgrade( vg_queue *q1, vg_queue *q2 )
-{
- if( q1->allocation_count == 0 )
- {
- q2->head_offset = 0;
- q2->tail_offset = 0;
- q2->allocation_count = 0;
- return;
- }
-
- vg_queue_item *head = q1->buffer + q1->head_offset;
- u32 end = q1->head_offset + head->alloc_size,
- start = q1->tail_offset;
-
- q2->y0 = start;
- q2->allocation_count = q1->allocation_count;
- q2->tail_offset = 0;
- if( start < end )
- {
- u32 r0 = end-start;
- VG_ASSERT( q2->size >= r0 );
- memcpy( q2->buffer, q1->buffer+start, r0 );
- q2->head_offset = r0 - head->alloc_size;
- q2->z0 = r0;
- }
- else
- {
- u32 r0 = q1->size - start;
- VG_ASSERT( q2->size >= (r0+end) );
- memcpy( q2->buffer, q1->buffer+start, r0 );
- memcpy( q2->buffer + r0, q1->buffer, end );
- q2->head_offset = r0 + end - head->alloc_size;
- q2->z0 = r0;
- }
-}
-
-u32 vg_queue_offset_upgrade( vg_queue *q, u32 offset )
-{
- if( offset > q->y0 ) return offset - q->y0;
- else return offset + q->z0;
-}
-
void *vg_queue_data( vg_queue *q, u32 offset )
{
vg_queue_item *item = q->buffer + offset;
u32 size;
u32 head_offset, tail_offset;
u32 allocation_count;
- u32 y0, z0;
};
void *vg_queue_alloc( vg_queue *q, u32 size, u32 *out_offset );
bool vg_queue_next( vg_queue *q, u32 item_id, u32 *out_next );
bool vg_queue_previous( vg_queue *q, u32 item_id, u32 *out_prev );
void vg_queue_pop( vg_queue *q );
-void vg_queue_copy_upgrade( vg_queue *q1, vg_queue *q2 );
-u32 vg_queue_offset_upgrade( vg_queue *q, u32 offset );
void vg_queue_memcpy( vg_queue *q, void *dst, u32 start, u32 size );
u32 vg_queue_usage( vg_queue *q );
}
/* write a sized type */
-void vg_msg_wkvnum( vg_msg *msg, const char *key,
- u8 type, u8 count, void *data )
+void vg_msg_wkvnum( vg_msg *msg, const char *key, u8 type, u8 count, void *data )
{
u8 code = type | vg_msg_count_bits(count);
-
vg_msg_wbuf( msg, &code, 1 );
vg_msg_wstr( msg, key );
vg_msg_wbuf( msg, data, vg_msg_cmd_bytecount(code) );