From c377438581c1c4921a9e904ec2536abf4bc7533b Mon Sep 17 00:00:00 2001 From: hgn Date: Sun, 30 Mar 2025 07:45:38 +0100 Subject: [PATCH] async2 --- vg_async2.c | 194 +++++++++++++++++++++++++++++++++ vg_async2.h | 45 ++++++++ vg_log.c | 19 ---- vg_mem_queue.c | 17 +++ vg_mem_queue.h | 2 + vg_steam_networking.h | 246 ++++++++++++++++++++++++------------------ 6 files changed, 401 insertions(+), 122 deletions(-) create mode 100644 vg_async2.c create mode 100644 vg_async2.h diff --git a/vg_async2.c b/vg_async2.c new file mode 100644 index 0000000..55d45fe --- /dev/null +++ b/vg_async2.c @@ -0,0 +1,194 @@ +#include "vg/vg_async2.h" + +bool vg_init_async_queue( vg_async_queue *queue ) +{ + if( pthread_mutex_init( &queue->lock, NULL ) ) + goto e0; + + if( pthread_mutex_init( &queue->data_lock, NULL ) ) + goto e1; + + if( sem_init( &queue->blocking_memory_signal, 0, 0 ) ) + goto e2; + + return 1; +e2: pthread_mutex_destroy( &queue->data_lock ); +e1: pthread_mutex_destroy( &queue->lock ); +e0: return 0; +} + +void vg_free_async_queue( vg_async_queue *queue ) +{ + pthread_mutex_destroy( &queue->data_lock ); + pthread_mutex_destroy( &queue->lock ); + sem_destroy( &queue->blocking_memory_signal ); +} + +vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool blocking ) +{ + if( queue->upper_memory_limit ) + VG_ASSERT( bytes <= queue->upper_memory_limit ); + + u32 total_size = sizeof(vg_async_task) + bytes; + pthread_mutex_lock( &queue->data_lock ); + pthread_mutex_lock( &queue->lock ); + vg_queue *ring = &queue->queue; + vg_async_task *task = vg_queue_alloc( ring, total_size, NULL ); + + if( !task ) + { + bool reallocate = 1; + if( queue->upper_memory_limit ) + { + if( ring->size >= queue->upper_memory_limit ) + reallocate = 0; + } + + if( reallocate ) + { + u32 min_size = vg_align8( total_size ) + sizeof(vg_queue_item) + vg_queue_usage( ring ), + grow_size = ring->size * 2, + max_size = queue->upper_memory_limit? queue->upper_memory_limit: 0xffffffff; + + u32 new_size = min_size; + if( grow_size > new_size ) + new_size = grow_size; + + if( new_size > max_size ) + new_size = max_size; + + vg_queue new_q = { .buffer = malloc( new_size ), .size = new_size }; + if( new_q.buffer ) + { + vg_queue_copy_upgrade( ring, &new_q ); + free( ring->buffer ); + *ring = new_q; + task = vg_queue_alloc( ring, total_size, NULL ); + } + } + } + + if( !task ) + { + if( blocking ) + { + queue->requested_bytes = total_size; + pthread_mutex_unlock( &queue->lock ); + sem_wait( &queue->blocking_memory_signal ); + pthread_mutex_lock( &queue->lock ); + task = vg_queue_alloc( ring, total_size, NULL ); + VG_ASSERT( task ); + } + } + + if( task ) + { + task->dispatched = 0; + task->handler = NULL; + } + + pthread_mutex_unlock( &queue->lock ); + return task; +} + +void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task ) +{ + pthread_mutex_lock( &queue->lock ); + task->dispatched = 1; + pthread_mutex_unlock( &queue->lock ); + pthread_mutex_unlock( &queue->data_lock ); +} + +bool vg_async_consume( vg_async_queue *queue, u32 limit ) +{ + if( limit == 0 ) + limit = 0xffffffff; + + pthread_mutex_lock( &queue->lock ); + + if( queue->quit == k_async_quit_immediate ) + { + pthread_mutex_unlock( &queue->lock ); + return 1; + } + + bool head_of_line_blocking = 0; + for( u32 i=0; iqueue ); + + if( task ) + { + if( task->dispatched ) + { + task->handler( task ); + vg_queue_pop( &queue->queue ); + } + else + { + head_of_line_blocking = 1; + break; + } + } + else + break; + } + + if( queue->requested_bytes ) + { + if( head_of_line_blocking ) + { + vg_error( "You lost.\n" ); + abort(); + } + + /* currently wait until the whole queue is cleared. This could maybe use a better strategy */ + if( queue->queue.allocation_count == 0 ) + { + queue->requested_bytes = 0; + sem_post( &queue->blocking_memory_signal ); + } + } + + if( queue->quit == k_async_quit_when_empty ) + { + if( queue->queue.allocation_count == 0 ) + { + pthread_mutex_unlock( &queue->lock ); + return 1; + } + } + + pthread_mutex_unlock( &queue->lock ); + return 0; +} + +struct single_function_async_data +{ + void (*function)(void *, u32); + void *userdata; + u32 usersize; +}; + +static void single_function_call_handler( vg_async_task *task ) +{ + struct single_function_async_data *data = (void *)task->data; + data->function( data->userdata, data->usersize ); +} + +void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize ) +{ + vg_async_task *task = vg_allocate_async_task( queue, sizeof(struct single_function_async_data), 1 ); + struct single_function_async_data *data = (void *)task->data; + data->function = function; + data->userdata = userdata; + data->usersize = usersize; + vg_async_task_dispatch( queue, task ); +} + +void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit ) +{ + pthread_mutex_lock( &queue->lock ); + queue->quit = quit; + pthread_mutex_unlock( &queue->lock ); +} diff --git a/vg_async2.h b/vg_async2.h new file mode 100644 index 0000000..2e8a98a --- /dev/null +++ b/vg_async2.h @@ -0,0 +1,45 @@ +#pragma once +#include "vg/vg_mem.h" +#include "vg/vg_mem_queue.h" + +#include +#include + +typedef struct vg_async_queue vg_async_queue; +typedef struct vg_async_task vg_async_task; + +struct vg_async_queue +{ + u32 upper_memory_limit; + u32 requested_bytes; + + sem_t blocking_memory_signal; + pthread_mutex_t lock, data_lock; + vg_queue queue; + + enum async_quit + { + k_async_no_quit, + k_async_quit_immediate, + k_async_quit_when_empty + } + quit; +}; + +struct vg_async_task +{ + bool dispatched; + void (*handler)( vg_async_task *task ); + + u8 data[]; +}; + +bool vg_init_async_queue( vg_async_queue *queue ); +void vg_free_async_queue( vg_async_queue *queue ); + +/* returns NULL if out of memory, or if blocking is set, wait for memory to become availible */ +vg_async_task *vg_allocate_async_task( vg_async_queue *queue, u32 bytes, bool blocking ); +void vg_async_task_dispatch( vg_async_queue *queue, vg_async_task *task ); +bool vg_async_consume( vg_async_queue *queue, u32 limit ); +void vg_async_call_function( vg_async_queue *queue, void (*function)(void *, u32), void *userdata, u32 usersize ); +void vg_async_queue_end( vg_async_queue *queue, enum async_quit quit ); diff --git a/vg_log.c b/vg_log.c index 242fbb1..06df992 100644 --- a/vg_log.c +++ b/vg_log.c @@ -50,28 +50,9 @@ void _vg_logx_va( FILE *file, const char *location, const char *prefix, char buffer[4096], line[96]; static char last_buffer[4096]; - static int dupe_counter = 1; vsnprintf( buffer, VG_ARRAY_LEN(buffer), fmt, args ); int line_length = snprintf( line, 90, "%s%7s" KNRM "|%s ", colour, prefix, colour ); - - if( !strcmp( last_buffer, buffer ) ) - { - dupe_counter ++; - fprintf( file, "repeat x%d\r", dupe_counter ); -#ifdef VG_ENGINE - if( SDL_UnlockMutex( vg_log.mutex ) ) - vg_fatal_error( "" ); -#endif - return; - } - - if( dupe_counter > 1 ) - { - fprintf( file, "\n" ); - dupe_counter = 1; - } - strcpy( last_buffer, buffer ); for( u32 i=0; itail_offset = start; } + +u32 vg_queue_usage( vg_queue *q ) +{ + if( q->allocation_count ) + { + vg_queue_item *head = q->buffer + q->head_offset; + u32 end = q->head_offset + head->alloc_size, + start = q->tail_offset; + + if( start < end ) + return end - start; + else + return (q->size - start) + end; + } + else + return 0; +} diff --git a/vg_mem_queue.h b/vg_mem_queue.h index 3e14ae5..22b8750 100644 --- a/vg_mem_queue.h +++ b/vg_mem_queue.h @@ -26,3 +26,5 @@ void *vg_queue_tail_data( vg_queue *q ); void vg_queue_pop( vg_queue *q ); void vg_queue_copy_upgrade( vg_queue *q1, vg_queue *q2 ); + +u32 vg_queue_usage( vg_queue *q ); diff --git a/vg_steam_networking.h b/vg_steam_networking.h index 5abb7b6..3ea715a 100644 --- a/vg_steam_networking.h +++ b/vg_steam_networking.h @@ -382,9 +382,15 @@ void SteamAPI_SteamNetworkingConfigValue_t_SetString( const char * data ); typedef void ISteamNetworkingSockets; +typedef void ISteamNetworkingUtils; typedef struct SteamNetworkingMessage_t SteamNetworkingMessage_t; typedef struct SteamNetConnectionInfo_t SteamNetConnectionInfo_t; +ISteamNetworkingUtils *SteamAPI_SteamNetworkingUtils_SteamAPI_v004(); +static inline ISteamNetworkingUtils *SteamAPI_SteamNetworkingUtils_SteamAPI() +{ + return SteamAPI_SteamNetworkingUtils_SteamAPI_v004(); +} /* * Handle used to identify a poll group, used to query many @@ -393,19 +399,14 @@ typedef struct SteamNetConnectionInfo_t SteamNetConnectionInfo_t; typedef u32 HSteamNetPollGroup; static HSteamNetPollGroup const k_HSteamNetPollGroup_Invalid = 0; -ISteamNetworkingSockets -*SteamAPI_SteamGameServerNetworkingSockets_SteamAPI_v012(void); - -static inline ISteamNetworkingSockets -*SteamAPI_SteamGameServerNetworkingSockets_SteamAPI(void) +ISteamNetworkingSockets *SteamAPI_SteamGameServerNetworkingSockets_SteamAPI_v012(void); +static inline ISteamNetworkingSockets *SteamAPI_SteamGameServerNetworkingSockets_SteamAPI(void) { return SteamAPI_SteamGameServerNetworkingSockets_SteamAPI_v012(); } - ISteamNetworkingSockets *SteamAPI_SteamNetworkingSockets_SteamAPI_v012(); -static inline -ISteamNetworkingSockets *SteamAPI_SteamNetworkingSockets_SteamAPI() +static inline ISteamNetworkingSockets *SteamAPI_SteamNetworkingSockets_SteamAPI() { return SteamAPI_SteamNetworkingSockets_SteamAPI_v012(); } @@ -511,101 +512,6 @@ static const int k_nSteamNetworkingSend_Reliable = 8; static const int k_nSteamNetworkingSend_ReliableNoNagle = k_nSteamNetworkingSend_Reliable | k_nSteamNetworkingSend_NoNagle; - - -HSteamListenSocket SteamAPI_ISteamNetworkingSockets_CreateListenSocketIP( - ISteamNetworkingSockets *self, - SteamNetworkingIPAddr *localAddress, int nOptions, - SteamNetworkingConfigValue_t *pOptions ); - -HSteamNetConnection SteamAPI_ISteamNetworkingSockets_ConnectByIPAddress( - ISteamNetworkingSockets *self, - SteamNetworkingIPAddr *address, int nOptions, - SteamNetworkingConfigValue_t *pOptions ); - - -EResult SteamAPI_ISteamNetworkingSockets_AcceptConnection( - ISteamNetworkingSockets *self, - HSteamNetConnection hConn ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_CloseConnection( - ISteamNetworkingSockets *self, - HSteamNetConnection hPeer, int nReason, const char *pszDebug, - steamapi_bool bEnableLinger ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_GetListenSocketAddress( - ISteamNetworkingSockets *self, HSteamListenSocket hSocket, - SteamNetworkingIPAddr *address ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_CloseConnection( - ISteamNetworkingSockets *self, - HSteamNetConnection hPeer, int nReason, const char *pszDebug, - steamapi_bool bEnableLinger ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_CloseListenSocket( - ISteamNetworkingSockets *self, HSteamListenSocket hSocket ); - -EResult SteamAPI_ISteamNetworkingSockets_SendMessageToConnection( - ISteamNetworkingSockets* self, - HSteamNetConnection hConn, const void *pData, u32 cbData, int nSendFlags, - i64 * pOutMessageNumber ); - -void SteamAPI_ISteamNetworkingSockets_SendMessages( - ISteamNetworkingSockets* self, - int nMessages, SteamNetworkingMessage_t *const *pMessages, - i64 *pOutMessageNumberOrResult ); - -EResult SteamAPI_ISteamNetworkingSockets_FlushMessagesOnConnection( - ISteamNetworkingSockets* self, - HSteamNetConnection hConn ); - -int SteamAPI_ISteamNetworkingSockets_ReceiveMessagesOnConnection( - ISteamNetworkingSockets* self, - HSteamNetConnection hConn, - SteamNetworkingMessage_t **ppOutMessages, - int nMaxMessages ); - -/* - * Poll Groups - */ - -HSteamNetPollGroup SteamAPI_ISteamNetworkingSockets_CreatePollGroup( - ISteamNetworkingSockets *self ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_DestroyPollGroup( - ISteamNetworkingSockets *self, - HSteamNetPollGroup hPollGroup ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_SetConnectionPollGroup( - ISteamNetworkingSockets *self, - HSteamNetConnection hConn, HSteamNetPollGroup hPollGroup ); - -int SteamAPI_ISteamNetworkingSockets_ReceiveMessagesOnPollGroup( - ISteamNetworkingSockets *self, - HSteamNetPollGroup hPollGroup, SteamNetworkingMessage_t **ppOutMessages, - int nMaxMessages ); -/* - * Returns basic information about the high-level state of the connection. - * Returns false if the connection handle is invalid. - */ -steamapi_bool SteamAPI_ISteamNetworkingSockets_GetConnectionInfo( - ISteamNetworkingSockets* self, - HSteamNetConnection hConn, SteamNetConnectionInfo_t * pInfo ); - -int SteamAPI_ISteamNetworkingSockets_GetDetailedConnectionStatus( - ISteamNetworkingSockets* self, - HSteamNetConnection hConn, char *pszBuf, int cbBuf ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_SetConnectionUserData( - ISteamNetworkingSockets* self, HSteamNetConnection hPeer, i64 nUserData ); - -i64 SteamAPI_ISteamNetworkingSockets_GetConnectionUserData( - ISteamNetworkingSockets* self, HSteamNetConnection hPeer ); - -steamapi_bool SteamAPI_ISteamNetworkingSockets_GetListenSocketAddress( - ISteamNetworkingSockets* self, - HSteamListenSocket hSocket, SteamNetworkingIPAddr *address ); - enum{ k_cchSteamNetworkingMaxConnectionCloseReason = 128 }; enum{ k_cchSteamNetworkingMaxConnectionDescription = 128 }; enum{ k_cchSteamNetworkingMaxConnectionAppName = 32 }; @@ -690,6 +596,7 @@ struct SteamNetConnectionInfo_t * Quick connection state, pared down to something you could call * more frequently without it being too big of a perf hit. */ +typedef struct SteamNetConnectionRealTimeStatus_t SteamNetConnectionRealTimeStatus_t; struct SteamNetConnectionRealTimeStatus_t { /* High level state of the connection */ @@ -778,6 +685,26 @@ struct SteamNetConnectionRealTimeStatus_t u32 reserved[16]; }; +/* Quick status of a particular lane */ +typedef struct SteamNetConnectionRealTimeLaneStatus_t SteamNetConnectionRealTimeLaneStatus_t; +struct SteamNetConnectionRealTimeLaneStatus_t +{ + /* Counters for this particular lane. See the corresponding variables + * in SteamNetConnectionRealTimeStatus_t */ + int m_cbPendingUnreliable; + int m_cbPendingReliable; + int m_cbSentUnackedReliable; + int _reservePad1; // Reserved for future use + + /* Lane-specific queue time. This value takes into consideration lane priorities + * and weights, and how much data is queued in each lane, and attempts to predict + * how any data currently queued will be sent out. */ + SteamNetworkingMicroseconds m_usecQueueTime; + + // Internal stuff, room to change API easily + u32 reserved[10]; +}; + /* * Callbacks */ @@ -1009,3 +936,116 @@ static inline const char *string_ESteamNetworkingAvailability( break; } } + + +HSteamListenSocket SteamAPI_ISteamNetworkingSockets_CreateListenSocketIP( + ISteamNetworkingSockets *self, + SteamNetworkingIPAddr *localAddress, int nOptions, + SteamNetworkingConfigValue_t *pOptions ); + +HSteamNetConnection SteamAPI_ISteamNetworkingSockets_ConnectByIPAddress( + ISteamNetworkingSockets *self, + SteamNetworkingIPAddr *address, int nOptions, + SteamNetworkingConfigValue_t *pOptions ); + + +EResult SteamAPI_ISteamNetworkingSockets_AcceptConnection( + ISteamNetworkingSockets *self, + HSteamNetConnection hConn ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_CloseConnection( + ISteamNetworkingSockets *self, + HSteamNetConnection hPeer, int nReason, const char *pszDebug, + steamapi_bool bEnableLinger ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_GetListenSocketAddress( + ISteamNetworkingSockets *self, HSteamListenSocket hSocket, + SteamNetworkingIPAddr *address ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_CloseConnection( + ISteamNetworkingSockets *self, + HSteamNetConnection hPeer, int nReason, const char *pszDebug, + steamapi_bool bEnableLinger ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_CloseListenSocket( + ISteamNetworkingSockets *self, HSteamListenSocket hSocket ); + +EResult SteamAPI_ISteamNetworkingSockets_SendMessageToConnection( + ISteamNetworkingSockets* self, + HSteamNetConnection hConn, const void *pData, u32 cbData, int nSendFlags, + i64 * pOutMessageNumber ); + +SteamNetworkingMessage_t *SteamAPI_ISteamNetworkingUtils_AllocateMessage( + ISteamNetworkingUtils *self, + int cbAllocateBuffer ); + +void SteamAPI_ISteamNetworkingSockets_SendMessages( + ISteamNetworkingSockets* self, + int nMessages, SteamNetworkingMessage_t *const *pMessages, + i64 *pOutMessageNumberOrResult ); + +EResult SteamAPI_ISteamNetworkingSockets_FlushMessagesOnConnection( + ISteamNetworkingSockets* self, + HSteamNetConnection hConn ); + +int SteamAPI_ISteamNetworkingSockets_ReceiveMessagesOnConnection( + ISteamNetworkingSockets* self, + HSteamNetConnection hConn, + SteamNetworkingMessage_t **ppOutMessages, + int nMaxMessages ); + +EResult SteamAPI_ISteamNetworkingSockets_GetConnectionRealTimeStatus( + ISteamNetworkingSockets *self, + HSteamNetConnection hConn, + SteamNetConnectionRealTimeStatus_t *pStatus, + int nLanes, + SteamNetConnectionRealTimeLaneStatus_t *pLanes ); + +EResult SteamAPI_ISteamNetworkingSockets_ConfigureConnectionLanes( + ISteamNetworkingSockets *self, + HSteamNetConnection hConn, + int nNumLanes, + const int *pLanePriorities, + const u16 *pLaneWeights ); + +/* + * Poll Groups + */ + +HSteamNetPollGroup SteamAPI_ISteamNetworkingSockets_CreatePollGroup( + ISteamNetworkingSockets *self ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_DestroyPollGroup( + ISteamNetworkingSockets *self, + HSteamNetPollGroup hPollGroup ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_SetConnectionPollGroup( + ISteamNetworkingSockets *self, + HSteamNetConnection hConn, HSteamNetPollGroup hPollGroup ); + +int SteamAPI_ISteamNetworkingSockets_ReceiveMessagesOnPollGroup( + ISteamNetworkingSockets *self, + HSteamNetPollGroup hPollGroup, SteamNetworkingMessage_t **ppOutMessages, + int nMaxMessages ); +/* + * Returns basic information about the high-level state of the connection. + * Returns false if the connection handle is invalid. + */ +steamapi_bool SteamAPI_ISteamNetworkingSockets_GetConnectionInfo( + ISteamNetworkingSockets* self, + HSteamNetConnection hConn, SteamNetConnectionInfo_t * pInfo ); + +int SteamAPI_ISteamNetworkingSockets_GetDetailedConnectionStatus( + ISteamNetworkingSockets* self, + HSteamNetConnection hConn, char *pszBuf, int cbBuf ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_SetConnectionUserData( + ISteamNetworkingSockets* self, HSteamNetConnection hPeer, i64 nUserData ); + +i64 SteamAPI_ISteamNetworkingSockets_GetConnectionUserData( + ISteamNetworkingSockets* self, HSteamNetConnection hPeer ); + +steamapi_bool SteamAPI_ISteamNetworkingSockets_GetListenSocketAddress( + ISteamNetworkingSockets* self, + HSteamListenSocket hSocket, SteamNetworkingIPAddr *address ); + -- 2.25.1