if( sem_init( &queue->blocking_memory_signal, 0, 0 ) )
goto e2;
+ if( sem_init( &queue->work_semaphore, 0, 0 ) )
+ goto e3;
+
queue->queue.buffer = malloc( queue->buffer_size );
queue->queue.size = queue->buffer_size;
if( !queue->queue.buffer )
goto e2;
return 1;
+e3: sem_destroy( &queue->blocking_memory_signal );
e2: pthread_mutex_destroy( &queue->data_lock );
e1: pthread_mutex_destroy( &queue->lock );
e0: return 0;
vg_queue *ring = &queue->queue;
vg_async_task *task = vg_queue_alloc( ring, total_size, NULL );
- if( !task )
+ while( blocking && !task )
{
- if( blocking )
- {
- queue->requested_bytes = total_size;
- pthread_mutex_unlock( &queue->lock );
- sem_wait( &queue->blocking_memory_signal );
- pthread_mutex_lock( &queue->lock );
- task = vg_queue_alloc( ring, total_size, NULL );
- VG_ASSERT( task );
- }
+ pthread_mutex_unlock( &queue->lock );
+ sem_wait( &queue->blocking_memory_signal );
+ pthread_mutex_lock( &queue->lock );
+ task = vg_queue_alloc( ring, total_size, NULL );
}
if( task )
- {
- task->dispatched = 0;
task->handler = NULL;
- }
pthread_mutex_unlock( &queue->lock );
return task;
}
-void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task )
+void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task, void (*handler)( vg_async_task *task ) )
{
- pthread_mutex_lock( &queue->lock );
- task->dispatched = 1;
- pthread_mutex_unlock( &queue->lock );
pthread_mutex_unlock( &queue->data_lock );
+
+ task->handler = handler;
+ sem_post( &queue->work_semaphore );
}
-bool vg_async_consume( vg_async_queue *queue, u32 limit )
+bool vg_async_has_work( vg_async_queue *queue )
{
- if( limit == 0 )
- limit = 0xffffffff;
+ int work_count = 0;
+ sem_getvalue( &queue->work_semaphore, &work_count );
+ return work_count > 0? 1: 0;
+}
- pthread_mutex_lock( &queue->lock );
+bool vg_async_process_next_task( vg_async_queue *queue )
+{
+ sem_wait( &queue->work_semaphore );
+ pthread_mutex_lock( &queue->lock );
if( queue->quit == k_async_quit_immediate )
{
pthread_mutex_unlock( &queue->lock );
- return 1;
- }
-
- bool head_of_line_blocking = 0;
- for( u32 i=0; i<limit; i ++ )
- {
- vg_async_task *task = vg_queue_tail_data( &queue->queue );
-
- if( task )
- {
- if( task->dispatched )
- {
- pthread_mutex_unlock( &queue->lock );
- task->handler( task );
- pthread_mutex_lock( &queue->lock );
- vg_queue_pop( &queue->queue );
- }
- else
- {
- head_of_line_blocking = 1;
- break;
- }
- }
- else
- break;
- }
-
- if( queue->requested_bytes )
- {
- if( head_of_line_blocking )
- {
- vg_error( "You lost.\n" );
- abort();
- }
-
- /* currently wait until the whole queue is cleared. This could maybe use a better strategy */
- if( queue->queue.allocation_count == 0 )
- {
- queue->requested_bytes = 0;
- sem_post( &queue->blocking_memory_signal );
- }
+ return 0;
}
-
- if( queue->quit == k_async_quit_when_empty )
+ else if( queue->quit == k_async_quit_when_empty )
{
if( queue->queue.allocation_count == 0 )
{
pthread_mutex_unlock( &queue->lock );
- return 1;
+ return 0;
}
}
-
+ vg_async_task *task = vg_queue_tail_data( &queue->queue );
pthread_mutex_unlock( &queue->lock );
- return 0;
-}
-struct single_function_async_data
-{
- void (*function)(void *, u32);
- void *userdata;
- u32 usersize;
-};
+ if( task )
+ {
+ task->handler( task );
-static void single_function_call_handler( vg_async_task *task )
-{
- struct single_function_async_data *data = (void *)task->data;
- data->function( data->userdata, data->usersize );
-}
+ pthread_mutex_lock( &queue->lock );
+ vg_queue_pop( &queue->queue );
+ pthread_mutex_unlock( &queue->lock );
+ }
-void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize )
-{
- vg_async_task *task = vg_allocate_async_task( queue, sizeof(struct single_function_async_data), 1 );
- struct single_function_async_data *data = (void *)task->data;
- data->function = function;
- data->userdata = userdata;
- data->usersize = usersize;
- vg_async_task_dispatch( queue, task );
+ int status = 1;
+ sem_getvalue( &queue->blocking_memory_signal, &status );
+ if( status <= 0 )
+ sem_post( &queue->blocking_memory_signal );
+
+ return 1;
}
void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit )
pthread_mutex_lock( &queue->lock );
queue->quit = quit;
pthread_mutex_unlock( &queue->lock );
+ sem_post( &queue->work_semaphore );
}
u32 requested_bytes;
sem_t blocking_memory_signal;
+ sem_t work_semaphore;
pthread_mutex_t lock, data_lock;
vg_queue queue;
struct vg_async_task
{
- bool dispatched;
void (*handler)( vg_async_task *task );
-
u8 data[];
};
/* returns NULL if out of memory, or if blocking is set, wait for memory to become availible */
vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool blocking );
-void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task );
-bool vg_async_consume( vg_async_queue *queue, u32 limit );
+void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task, void (*handler)( vg_async_task *task ) );
void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize );
void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit );
+
+bool vg_async_has_work( vg_async_queue *queue );
+bool vg_async_process_next_task( vg_async_queue *queue );