From: hgn Date: Fri, 11 Apr 2025 21:45:39 +0000 (+0100) Subject: change the timing model in async2 X-Git-Url: https://harrygodden.com/git/?a=commitdiff_plain;h=8fadcfaf17bc5953df19faaaba66812b4e6acab6;p=vg.git change the timing model in async2 --- diff --git a/vg_async2.c b/vg_async2.c index 76bb850..925166c 100644 --- a/vg_async2.c +++ b/vg_async2.c @@ -11,12 +11,16 @@ bool vg_init_async_queue( vg_async_queue *queue ) if( sem_init( &queue->blocking_memory_signal, 0, 0 ) ) goto e2; + if( sem_init( &queue->work_semaphore, 0, 0 ) ) + goto e3; + queue->queue.buffer = malloc( queue->buffer_size ); queue->queue.size = queue->buffer_size; if( !queue->queue.buffer ) goto e2; return 1; +e3: sem_destroy( &queue->blocking_memory_signal ); e2: pthread_mutex_destroy( &queue->data_lock ); e1: pthread_mutex_destroy( &queue->lock ); e0: return 0; @@ -38,124 +42,72 @@ vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool bl vg_queue *ring = &queue->queue; vg_async_task *task = vg_queue_alloc( ring, total_size, NULL ); - if( !task ) + while( blocking && !task ) { - if( blocking ) - { - queue->requested_bytes = total_size; - pthread_mutex_unlock( &queue->lock ); - sem_wait( &queue->blocking_memory_signal ); - pthread_mutex_lock( &queue->lock ); - task = vg_queue_alloc( ring, total_size, NULL ); - VG_ASSERT( task ); - } + pthread_mutex_unlock( &queue->lock ); + sem_wait( &queue->blocking_memory_signal ); + pthread_mutex_lock( &queue->lock ); + task = vg_queue_alloc( ring, total_size, NULL ); } if( task ) - { - task->dispatched = 0; task->handler = NULL; - } pthread_mutex_unlock( &queue->lock ); return task; } -void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task ) +void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task, void (*handler)( vg_async_task *task ) ) { - pthread_mutex_lock( &queue->lock ); - task->dispatched = 1; - pthread_mutex_unlock( &queue->lock ); pthread_mutex_unlock( &queue->data_lock ); + + task->handler = handler; + sem_post( &queue->work_semaphore ); } -bool vg_async_consume( vg_async_queue *queue, u32 limit ) +bool vg_async_has_work( vg_async_queue *queue ) { - if( limit == 0 ) - limit = 0xffffffff; + int work_count = 0; + sem_getvalue( &queue->work_semaphore, &work_count ); + return work_count > 0? 1: 0; +} - pthread_mutex_lock( &queue->lock ); +bool vg_async_process_next_task( vg_async_queue *queue ) +{ + sem_wait( &queue->work_semaphore ); + pthread_mutex_lock( &queue->lock ); if( queue->quit == k_async_quit_immediate ) { pthread_mutex_unlock( &queue->lock ); - return 1; - } - - bool head_of_line_blocking = 0; - for( u32 i=0; iqueue ); - - if( task ) - { - if( task->dispatched ) - { - pthread_mutex_unlock( &queue->lock ); - task->handler( task ); - pthread_mutex_lock( &queue->lock ); - vg_queue_pop( &queue->queue ); - } - else - { - head_of_line_blocking = 1; - break; - } - } - else - break; - } - - if( queue->requested_bytes ) - { - if( head_of_line_blocking ) - { - vg_error( "You lost.\n" ); - abort(); - } - - /* currently wait until the whole queue is cleared. This could maybe use a better strategy */ - if( queue->queue.allocation_count == 0 ) - { - queue->requested_bytes = 0; - sem_post( &queue->blocking_memory_signal ); - } + return 0; } - - if( queue->quit == k_async_quit_when_empty ) + else if( queue->quit == k_async_quit_when_empty ) { if( queue->queue.allocation_count == 0 ) { pthread_mutex_unlock( &queue->lock ); - return 1; + return 0; } } - + vg_async_task *task = vg_queue_tail_data( &queue->queue ); pthread_mutex_unlock( &queue->lock ); - return 0; -} -struct single_function_async_data -{ - void (*function)(void *, u32); - void *userdata; - u32 usersize; -}; + if( task ) + { + task->handler( task ); -static void single_function_call_handler( vg_async_task *task ) -{ - struct single_function_async_data *data = (void *)task->data; - data->function( data->userdata, data->usersize ); -} + pthread_mutex_lock( &queue->lock ); + vg_queue_pop( &queue->queue ); + pthread_mutex_unlock( &queue->lock ); + } -void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize ) -{ - vg_async_task *task = vg_allocate_async_task( queue, sizeof(struct single_function_async_data), 1 ); - struct single_function_async_data *data = (void *)task->data; - data->function = function; - data->userdata = userdata; - data->usersize = usersize; - vg_async_task_dispatch( queue, task ); + int status = 1; + sem_getvalue( &queue->blocking_memory_signal, &status ); + if( status <= 0 ) + sem_post( &queue->blocking_memory_signal ); + + return 1; } void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit ) @@ -163,4 +115,5 @@ void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit ) pthread_mutex_lock( &queue->lock ); queue->quit = quit; pthread_mutex_unlock( &queue->lock ); + sem_post( &queue->work_semaphore ); } diff --git a/vg_async2.h b/vg_async2.h index 1852e20..1e6f2d0 100644 --- a/vg_async2.h +++ b/vg_async2.h @@ -14,6 +14,7 @@ struct vg_async_queue u32 requested_bytes; sem_t blocking_memory_signal; + sem_t work_semaphore; pthread_mutex_t lock, data_lock; vg_queue queue; @@ -28,9 +29,7 @@ struct vg_async_queue struct vg_async_task { - bool dispatched; void (*handler)( vg_async_task *task ); - u8 data[]; }; @@ -39,7 +38,9 @@ void vg_free_async_queue( vg_async_queue *queue ); /* returns NULL if out of memory, or if blocking is set, wait for memory to become availible */ vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool blocking ); -void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task ); -bool vg_async_consume( vg_async_queue *queue, u32 limit ); +void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task, void (*handler)( vg_async_task *task ) ); void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize ); void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit ); + +bool vg_async_has_work( vg_async_queue *queue ); +bool vg_async_process_next_task( vg_async_queue *queue );