change the timing model in async2
authorhgn <hgodden00@gmail.com>
Fri, 11 Apr 2025 21:45:39 +0000 (22:45 +0100)
committerhgn <hgodden00@gmail.com>
Fri, 11 Apr 2025 21:45:39 +0000 (22:45 +0100)
vg_async2.c
vg_async2.h

index 76bb850fec88217190146aa060251a7fa84af183..925166ccb9cc73fc951abdd875025da6c28c7010 100644 (file)
@@ -11,12 +11,16 @@ bool vg_init_async_queue( vg_async_queue *queue )
    if( sem_init( &queue->blocking_memory_signal, 0, 0 ) )
       goto e2;
 
+   if( sem_init( &queue->work_semaphore, 0, 0 ) )
+      goto e3;
+
    queue->queue.buffer = malloc( queue->buffer_size );
    queue->queue.size = queue->buffer_size;
    if( !queue->queue.buffer ) 
       goto e2;
 
    return 1;
+e3: sem_destroy( &queue->blocking_memory_signal );
 e2: pthread_mutex_destroy( &queue->data_lock );
 e1: pthread_mutex_destroy( &queue->lock );
 e0: return 0;
@@ -38,124 +42,72 @@ vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool bl
    vg_queue *ring = &queue->queue;
    vg_async_task *task = vg_queue_alloc( ring, total_size, NULL );
 
-   if( !task )
+   while( blocking && !task )
    {
-      if( blocking )
-      {
-         queue->requested_bytes = total_size;
-         pthread_mutex_unlock( &queue->lock );
-         sem_wait( &queue->blocking_memory_signal );
-         pthread_mutex_lock( &queue->lock );
-         task = vg_queue_alloc( ring, total_size, NULL );
-         VG_ASSERT( task );
-      }
+      pthread_mutex_unlock( &queue->lock );
+      sem_wait( &queue->blocking_memory_signal );
+      pthread_mutex_lock( &queue->lock );
+      task = vg_queue_alloc( ring, total_size, NULL );
    }
 
    if( task )
-   {
-      task->dispatched = 0;
       task->handler = NULL;
-   }
 
    pthread_mutex_unlock( &queue->lock );
    return task;
 }
 
-void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task )
+void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task, void (*handler)( vg_async_task *task ) )
 {
-   pthread_mutex_lock( &queue->lock );
-   task->dispatched = 1;
-   pthread_mutex_unlock( &queue->lock );
    pthread_mutex_unlock( &queue->data_lock );
+
+   task->handler = handler;
+   sem_post( &queue->work_semaphore );
 }
 
-bool vg_async_consume( vg_async_queue *queue, u32 limit )
+bool vg_async_has_work( vg_async_queue *queue )
 {
-   if( limit == 0 )
-      limit = 0xffffffff;
+   int work_count = 0;
+   sem_getvalue( &queue->work_semaphore, &work_count );
+   return work_count > 0? 1: 0;
+}
 
-   pthread_mutex_lock( &queue->lock );
+bool vg_async_process_next_task( vg_async_queue *queue )
+{
+   sem_wait( &queue->work_semaphore );
 
+   pthread_mutex_lock( &queue->lock );
    if( queue->quit == k_async_quit_immediate )
    {
       pthread_mutex_unlock( &queue->lock );
-      return 1;
-   }
-
-   bool head_of_line_blocking = 0;
-   for( u32 i=0; i<limit; i ++ )
-   {
-      vg_async_task *task = vg_queue_tail_data( &queue->queue );
-
-      if( task )
-      {
-         if( task->dispatched )
-         {
-            pthread_mutex_unlock( &queue->lock );
-            task->handler( task );
-            pthread_mutex_lock( &queue->lock );
-            vg_queue_pop( &queue->queue );
-         }
-         else 
-         {
-            head_of_line_blocking = 1;
-            break;
-         }
-      }
-      else
-         break;
-   }
-
-   if( queue->requested_bytes )
-   {
-      if( head_of_line_blocking )
-      {
-         vg_error( "You lost.\n" );
-         abort();
-      }
-
-      /* currently wait until the whole queue is cleared. This could maybe use a better strategy */
-      if( queue->queue.allocation_count == 0 )
-      {
-         queue->requested_bytes = 0;
-         sem_post( &queue->blocking_memory_signal );
-      }
+      return 0;
    }
-
-   if( queue->quit == k_async_quit_when_empty )
+   else if( queue->quit == k_async_quit_when_empty )
    {
       if( queue->queue.allocation_count == 0 )
       {
          pthread_mutex_unlock( &queue->lock );
-         return 1;
+         return 0;
       }
    }
-
+   vg_async_task *task = vg_queue_tail_data( &queue->queue );
    pthread_mutex_unlock( &queue->lock );
-   return 0;
-}
 
-struct single_function_async_data
-{
-   void (*function)(void *, u32);
-   void *userdata;
-   u32 usersize;
-};
+   if( task )
+   {
+      task->handler( task );
 
-static void single_function_call_handler( vg_async_task *task )
-{
-   struct single_function_async_data *data = (void *)task->data;
-   data->function( data->userdata, data->usersize );
-}
+      pthread_mutex_lock( &queue->lock );
+      vg_queue_pop( &queue->queue );
+      pthread_mutex_unlock( &queue->lock );
+   }
 
-void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize )
-{
-   vg_async_task *task = vg_allocate_async_task( queue, sizeof(struct single_function_async_data), 1 );
-   struct single_function_async_data *data = (void *)task->data;
-   data->function = function;
-   data->userdata = userdata;
-   data->usersize = usersize;
-   vg_async_task_dispatch( queue, task );
+   int status = 1;
+   sem_getvalue( &queue->blocking_memory_signal, &status );
+   if( status <= 0 )
+      sem_post( &queue->blocking_memory_signal );
+
+   return 1;
 }
 
 void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit )
@@ -163,4 +115,5 @@ void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit )
    pthread_mutex_lock( &queue->lock );
    queue->quit = quit;
    pthread_mutex_unlock( &queue->lock );
+   sem_post( &queue->work_semaphore );
 }
index 1852e201dbcafb0166789939517186568b05e6bb..1e6f2d0478c54e667e61b01b5e984f535f2fb0e6 100644 (file)
@@ -14,6 +14,7 @@ struct vg_async_queue
    u32 requested_bytes;
 
    sem_t blocking_memory_signal;
+   sem_t work_semaphore;
    pthread_mutex_t lock, data_lock;
    vg_queue queue;
 
@@ -28,9 +29,7 @@ struct vg_async_queue
 
 struct vg_async_task
 {
-   bool dispatched;
    void (*handler)( vg_async_task *task );
-
    u8 data[];
 };
 
@@ -39,7 +38,9 @@ void vg_free_async_queue( vg_async_queue *queue );
 
 /* returns NULL if out of memory, or if blocking is set, wait for memory to become availible */
 vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool blocking );
-void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task );
-bool vg_async_consume( vg_async_queue *queue, u32 limit );
+void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task, void (*handler)( vg_async_task *task ) );
 void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize );
 void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit );
+
+bool vg_async_has_work( vg_async_queue *queue );
+bool vg_async_process_next_task( vg_async_queue *queue );