From: hgn Date: Fri, 2 May 2025 17:18:42 +0000 (+0100) Subject: WAL for db X-Git-Url: https://harrygodden.com/git/?a=commitdiff_plain;h=b94a22a7d75213c86ca4fb756e97bc9f1091205d;p=vg.git WAL for db --- diff --git a/vg_db.c b/vg_db.c index 258e120..bac495b 100644 --- a/vg_db.c +++ b/vg_db.c @@ -32,17 +32,125 @@ static void vg_db_sync_page( vg_db *db, u16 cache_id ); /* API * ------------------------------------------------------------------------------------------------------------------ */ -void vg_db_open( vg_db *db, const char *path ) +static void vg_db_commit( vg_db *db, FILE *fwal ) +{ + void *temp_page = malloc( VG_PAGE_SIZE ); + + vg_info( "Commiting WAL...\n" ); + u64 last_good_checkpoint = 0, + last_good_log = 0; + + if( fseek( fwal, 0, SEEK_SET ) ) + { + vg_warn( "WAL file was empty (assuming db is atomic)\n" ); + return; + } + + vg_db_wal log; + if( !fread( &log, sizeof(log), 1, fwal ) ) + { + vg_warn( "No checkpoints in WAL file (assuming db is atomic)\n" ); + return; + } + + if( log.type != k_wal_log_checkpoint ) + vg_db_abort( db, "First log in WAL file was not a checkpoint. Invalid WAL file?\n" ); + + /* replay until we run off the end */ + while(1) + { + u64 log_pos = ftell( fwal ); + + /* read one log */ + if( !fread( &log, sizeof(log), 1, fwal ) ) + { + vg_info( "Reached end of log\n" ); + break; + } + + if( log.type == k_wal_log_data ) + { + if( !fseek( fwal, log.data_size, SEEK_CUR ) ) + { + if( fread( temp_page, log.data_size, 1, fwal ) ) + { + if( fseek( db->fp, log.db_file_offset, SEEK_SET ) ) + vg_db_abort( db, "While playing back WAL, SEEK_SET(0x%lx) failed\n", log.db_file_offset ); + if( !fwrite( temp_page, log.data_size, 1, db->fp ) ) + vg_db_abort( db, "While playing back WAL, fwrite failed (0x%lx bytes)\n", log.data_size ); + last_good_log = log_pos; + } + else + { + vg_warn( "Truncated log entry while playing back WAL (not enough redo data)\n" ); + break; + } + } + else + { + vg_warn( "Truncated log entry while playing back WAL (didn't make it to redo data)\n" ); + break; + } + } + else + { + last_good_log = log_pos; + last_good_checkpoint = log_pos; + } + } + + /* Rewind until we get back to a good checkpoint (if needed) */ + if( last_good_log != last_good_checkpoint ) + vg_warn( "Rewinding...\n" ); + + u64 rewind_cur = last_good_log; + while( rewind_cur != last_good_checkpoint ) + { + if( fseek( fwal, rewind_cur, SEEK_SET ) ) + vg_db_abort( db, "While rewinding WAL, SEEK_SET(0x%lx) failed\n", rewind_cur ); + if( !fread( &log, sizeof(log), 1, fwal ) ) + vg_db_abort( db, "While rewinding WAL, fread failed\n" ); + if( fread( temp_page, log.data_size, 1, fwal ) ) + { + if( fseek( db->fp, log.db_file_offset, SEEK_SET ) ) + vg_db_abort( db, "While rewinding WAL, SEEK_SET(0x%lx) failed\n", log.db_file_offset ); + if( !fwrite( temp_page, log.data_size, 1, db->fp ) ) + vg_db_abort( db, "While rewinding WAL, fwrite failed (0x%lx bytes)\n", log.data_size ); + rewind_cur = log.previous_log_offset; + } + else + { + vg_warn( "Truncated log entry while playing back WAL (not enough redo data)\n" ); + break; + } + } + + free( temp_page ); +} + +void vg_db_open( vg_db *db, const char *path, const char *wal_path ) { u32 k_ident = 0xf32b1a00; vg_rand_seed( &db->rand, k_ident + time(NULL) ); + db->fp = fopen( path, "rb+" ); + db->fp_wal = NULL; + db->wal_path = wal_path; db->page_data = malloc( VG_PAGE_SIZE*VG_MAX_CACHED_PAGES ); db->cache_count = 0; db->lru_old = 0; db->lru_young = 0; if( db->fp ) { + FILE *fwal = fopen( wal_path, "rb" ); + if( fwal ) + { + vg_info( "WAL file found (%s), need to apply.\n", wal_path ); + vg_db_commit( db, fwal ); + fclose( fwal ); + remove( wal_path ); + } + u32 ident; vg_db_read( db, 0, &ident, 4 ); if( ident != k_ident ) @@ -59,15 +167,27 @@ void vg_db_open( vg_db *db, const char *path ) db->userdata_address = vg_db_virtual_allocate( db, VG_1GB ); vg_db_write( db, offsetof(vg_db_header,userdata_address), &db->userdata_address, 8 ); vg_db_tree_init( db, offsetof(vg_db_header,address_tree) ); + vg_db_checkpoint( db ); } } void vg_db_close( vg_db *db ) { - for( u32 i=0; ifp_wal ) + { + fclose( db->fp_wal ); + db->fp_wal = NULL; + } fclose( db->fp ); db->fp = NULL; +#if 0 + if( db->fp_wal ) + { + fclose( db->fp_wal ); + remove( db->wal_path ); /* dont need anymore */ + } +#endif free( db->page_data ); db->page_data = NULL; } @@ -204,6 +324,34 @@ static void *vg_db_devirtualize( vg_db *db, u64 address, bool write, u64 *out_ph return page_data + inner_offset; } +void vg_db_checkpoint( vg_db *db ) +{ + if( db->fp_wal == NULL ) + return; + + u64 file_pos = ftell( db->fp_wal ); + vg_db_wal log = { + .type = k_wal_log_checkpoint, + .previous_log_offset = db->previous_wal_log, + .db_file_offset = 0, + .data_size = 0 + }; + if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) ) + vg_db_abort( db, "fwrite failed into WAL\n" ); + db->previous_wal_log = file_pos; + + if( db->wal_writes > 1000 ) + { + vg_info( "WAL reached %u writes (>1000), flushing...\n", db->wal_writes ); + vg_db_commit( db, db->fp_wal ); + fclose( db->fp_wal ); + remove( db->wal_path ); + db->wal_writes = 0; + db->previous_wal_log = 0; + db->fp_wal = NULL; + } +} + void vg_db_xch( vg_db *db, u64 base_address, void *buf, u32 length, bool write ) { u64 address = base_address, @@ -220,6 +368,35 @@ void vg_db_xch( vg_db *db, u64 base_address, void *buf, u32 length, bool write ) *user_buffer = buf + (address-base_address); if( write ) { + if( db->fp_wal == NULL ) + { + db->fp_wal = fopen( db->wal_path, "wb+" ); + vg_db_wal log = { + .type = k_wal_log_checkpoint, + .previous_log_offset = 0, + .db_file_offset = 0, + .data_size = 0 + }; + if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) ) + vg_db_abort( db, "fwrite failed into WAL\n" ); + } + + u64 file_pos = ftell( db->fp_wal ); + vg_db_wal log = { + .type = k_wal_log_data, + .previous_log_offset = db->previous_wal_log, + .db_file_offset = physical_address, + .data_size = byte_count + }; + if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) ) + vg_db_abort( db, "fwrite failed into WAL\n" ); + if( !fwrite( cache_buffer, byte_count, 1, db->fp_wal ) ) + vg_db_abort( db, "fwrite failed into WAL\n" ); + if( !fwrite( user_buffer, byte_count, 1, db->fp_wal ) ) + vg_db_abort( db, "fwrite failed into WAL\n" ); + + db->previous_wal_log = file_pos; + db->wal_writes ++; memcpy( cache_buffer, user_buffer, byte_count ); } else diff --git a/vg_db.h b/vg_db.h index 498bbbd..e72666f 100644 --- a/vg_db.h +++ b/vg_db.h @@ -73,7 +73,10 @@ struct vg_db_page struct vg_db { - FILE *fp; + FILE *fp, *fp_wal; + const char *wal_path; + u64 previous_wal_log; + u32 wal_writes; vg_db_page page_cache[ VG_MAX_CACHED_PAGES ]; u16 lru_old, lru_young, cache_count; @@ -84,7 +87,21 @@ struct vg_db vg_rand rand; }; -void vg_db_open( vg_db *db, const char *path ); +enum wal_log_type +{ + k_wal_log_data = 0x07, + k_wal_log_checkpoint = 0x09 +}; + +typedef struct vg_db_wal vg_db_wal; +struct vg_db_wal +{ + u8 type; + u64 previous_log_offset, db_file_offset, data_size; +}; + +void vg_db_checkpoint( vg_db *db ); +void vg_db_open( vg_db *db, const char *path, const char *wal_path ); void vg_db_close( vg_db *db ); #define vg_db_read( DB, BASE, BUF, LEN ) vg_db_xch( DB, BASE, BUF, LEN, 0 ) #define vg_db_write( DB, BASE, BUF, LEN ) vg_db_xch( DB, BASE, BUF, LEN, 1 )