WAL for db
authorhgn <hgodden00@gmail.com>
Fri, 2 May 2025 17:18:42 +0000 (18:18 +0100)
committerhgn <hgodden00@gmail.com>
Fri, 2 May 2025 17:18:42 +0000 (18:18 +0100)
vg_db.c
vg_db.h

diff --git a/vg_db.c b/vg_db.c
index 258e1209d2c9d760ee16e7c9d02766912c931f09..bac495badbf4671a29c276b66e1cb0bd04851378 100644 (file)
--- a/vg_db.c
+++ b/vg_db.c
@@ -32,17 +32,125 @@ static void vg_db_sync_page( vg_db *db, u16 cache_id );
 /* API
  * ------------------------------------------------------------------------------------------------------------------ */
 
-void vg_db_open( vg_db *db, const char *path )
+static void vg_db_commit( vg_db *db, FILE *fwal )
+{
+   void *temp_page = malloc( VG_PAGE_SIZE );
+
+   vg_info( "Commiting WAL...\n" );
+   u64 last_good_checkpoint = 0,
+       last_good_log = 0;
+
+   if( fseek( fwal, 0, SEEK_SET ) )
+   {
+      vg_warn( "WAL file was empty (assuming db is atomic)\n" );
+      return;
+   }
+
+   vg_db_wal log;
+   if( !fread( &log, sizeof(log), 1, fwal ) )
+   {
+      vg_warn( "No checkpoints in WAL file (assuming db is atomic)\n" );
+      return;
+   }
+
+   if( log.type != k_wal_log_checkpoint )
+      vg_db_abort( db, "First log in WAL file was not a checkpoint. Invalid WAL file?\n" );
+
+   /* replay until we run off the end */
+   while(1)
+   {
+      u64 log_pos = ftell( fwal );
+
+      /* read one log */
+      if( !fread( &log, sizeof(log), 1, fwal ) )
+      {
+         vg_info( "Reached end of log\n" );
+         break;
+      }
+
+      if( log.type == k_wal_log_data )
+      {
+         if( !fseek( fwal, log.data_size, SEEK_CUR ) )
+         {
+            if( fread( temp_page, log.data_size, 1, fwal ) )
+            {
+               if( fseek( db->fp, log.db_file_offset, SEEK_SET ) )
+                  vg_db_abort( db, "While playing back WAL, SEEK_SET(0x%lx) failed\n", log.db_file_offset );
+               if( !fwrite( temp_page, log.data_size, 1, db->fp ) )
+                  vg_db_abort( db, "While playing back WAL, fwrite failed (0x%lx bytes)\n", log.data_size );
+               last_good_log = log_pos;
+            }
+            else
+            {
+               vg_warn( "Truncated log entry while playing back WAL (not enough redo data)\n" );
+               break;
+            }
+         }
+         else
+         {
+            vg_warn( "Truncated log entry while playing back WAL (didn't make it to redo data)\n" );
+            break;
+         }
+      }
+      else
+      {
+         last_good_log = log_pos;
+         last_good_checkpoint = log_pos;
+      }
+   }
+
+   /* Rewind until we get back to a good checkpoint (if needed) */
+   if( last_good_log != last_good_checkpoint )
+      vg_warn( "Rewinding...\n" );
+
+   u64 rewind_cur = last_good_log;
+   while( rewind_cur != last_good_checkpoint )
+   {
+      if( fseek( fwal, rewind_cur, SEEK_SET ) )
+         vg_db_abort( db, "While rewinding WAL, SEEK_SET(0x%lx) failed\n", rewind_cur );
+      if( !fread( &log, sizeof(log), 1, fwal ) )
+         vg_db_abort( db, "While rewinding WAL, fread failed\n" );
+      if( fread( temp_page, log.data_size, 1, fwal ) )
+      {
+         if( fseek( db->fp, log.db_file_offset, SEEK_SET ) )
+            vg_db_abort( db, "While rewinding WAL, SEEK_SET(0x%lx) failed\n", log.db_file_offset );
+         if( !fwrite( temp_page, log.data_size, 1, db->fp ) )
+            vg_db_abort( db, "While rewinding WAL, fwrite failed (0x%lx bytes)\n", log.data_size );
+         rewind_cur = log.previous_log_offset;
+      }
+      else
+      {
+         vg_warn( "Truncated log entry while playing back WAL (not enough redo data)\n" );
+         break;
+      }
+   }
+
+   free( temp_page );
+}
+
+void vg_db_open( vg_db *db, const char *path, const char *wal_path )
 {
    u32 k_ident = 0xf32b1a00;
    vg_rand_seed( &db->rand, k_ident + time(NULL) );
+
    db->fp = fopen( path, "rb+" );
+   db->fp_wal = NULL;
+   db->wal_path = wal_path;
    db->page_data = malloc( VG_PAGE_SIZE*VG_MAX_CACHED_PAGES );
    db->cache_count = 0;
    db->lru_old = 0;
    db->lru_young = 0;
    if( db->fp )
    {
+      FILE *fwal = fopen( wal_path, "rb" );
+      if( fwal )
+      {
+         vg_info( "WAL file found (%s), need to apply.\n", wal_path );
+         vg_db_commit( db, fwal );
+         fclose( fwal );
+         remove( wal_path );
+      }
+
       u32 ident;
       vg_db_read( db, 0, &ident, 4 );
       if( ident != k_ident )
@@ -59,15 +167,27 @@ void vg_db_open( vg_db *db, const char *path )
       db->userdata_address = vg_db_virtual_allocate( db, VG_1GB );
       vg_db_write( db, offsetof(vg_db_header,userdata_address), &db->userdata_address, 8 );
       vg_db_tree_init( db, offsetof(vg_db_header,address_tree) );
+      vg_db_checkpoint( db );
    }
 }
 
 void vg_db_close( vg_db *db )
 {
-   for( u32 i=0; i<VG_MAX_CACHED_PAGES; i ++ )
-      vg_db_sync_page( db, i+1 );
+   vg_db_checkpoint( db );
+   if( db->fp_wal )
+   {
+      fclose( db->fp_wal );
+      db->fp_wal = NULL;
+   }
    fclose( db->fp );
    db->fp = NULL;
+#if 0
+   if( db->fp_wal )
+   {
+      fclose( db->fp_wal );
+      remove( db->wal_path ); /* dont need anymore */
+   }
+#endif
    free( db->page_data );
    db->page_data = NULL;
 }
@@ -204,6 +324,34 @@ static void *vg_db_devirtualize( vg_db *db, u64 address, bool write, u64 *out_ph
    return page_data + inner_offset;
 }
 
+void vg_db_checkpoint( vg_db *db )
+{
+   if( db->fp_wal == NULL )
+      return;
+
+   u64 file_pos = ftell( db->fp_wal );
+   vg_db_wal log = {
+      .type = k_wal_log_checkpoint,
+      .previous_log_offset = db->previous_wal_log,
+      .db_file_offset = 0,
+      .data_size = 0
+   };
+   if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) )
+      vg_db_abort( db, "fwrite failed into WAL\n" );
+   db->previous_wal_log = file_pos;
+
+   if( db->wal_writes > 1000 )
+   {
+      vg_info( "WAL reached %u writes (>1000), flushing...\n", db->wal_writes );
+      vg_db_commit( db, db->fp_wal );
+      fclose( db->fp_wal );
+      remove( db->wal_path );
+      db->wal_writes = 0;
+      db->previous_wal_log = 0;
+      db->fp_wal = NULL;
+   }
+}
+
 void vg_db_xch( vg_db *db, u64 base_address, void *buf, u32 length, bool write )
 {
    u64 address = base_address,
@@ -220,6 +368,35 @@ void vg_db_xch( vg_db *db, u64 base_address, void *buf, u32 length, bool write )
            *user_buffer  = buf + (address-base_address);
       if( write ) 
       {
+         if( db->fp_wal == NULL )
+         {
+            db->fp_wal = fopen( db->wal_path, "wb+" );
+            vg_db_wal log = {
+               .type = k_wal_log_checkpoint,
+               .previous_log_offset = 0,
+               .db_file_offset = 0,
+               .data_size = 0
+            };
+            if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) )
+               vg_db_abort( db, "fwrite failed into WAL\n" );
+         }
+
+         u64 file_pos = ftell( db->fp_wal );
+         vg_db_wal log = {
+            .type = k_wal_log_data,
+            .previous_log_offset = db->previous_wal_log,
+            .db_file_offset = physical_address,
+            .data_size = byte_count
+         };
+         if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) )
+            vg_db_abort( db, "fwrite failed into WAL\n" );
+         if( !fwrite( cache_buffer, byte_count, 1, db->fp_wal ) )
+            vg_db_abort( db, "fwrite failed into WAL\n" );
+         if( !fwrite( user_buffer, byte_count, 1, db->fp_wal ) )
+            vg_db_abort( db, "fwrite failed into WAL\n" );
+
+         db->previous_wal_log = file_pos;
+         db->wal_writes ++;
          memcpy( cache_buffer, user_buffer, byte_count );
       }
       else
diff --git a/vg_db.h b/vg_db.h
index 498bbbdee7a8e4818fc08f1f4027bd396d4727ac..e72666fdaddc1121aa9b06fafc693c22f394671b 100644 (file)
--- a/vg_db.h
+++ b/vg_db.h
@@ -73,7 +73,10 @@ struct vg_db_page
 
 struct vg_db
 {
-   FILE *fp;
+   FILE *fp, *fp_wal;
+   const char *wal_path;
+   u64 previous_wal_log;
+   u32 wal_writes;
 
    vg_db_page page_cache[ VG_MAX_CACHED_PAGES ];
    u16 lru_old, lru_young, cache_count;
@@ -84,7 +87,21 @@ struct vg_db
    vg_rand rand;
 };
 
-void vg_db_open( vg_db *db, const char *path );
+enum wal_log_type
+{
+   k_wal_log_data = 0x07,
+   k_wal_log_checkpoint = 0x09
+};
+
+typedef struct vg_db_wal vg_db_wal;
+struct vg_db_wal
+{
+   u8 type;
+   u64 previous_log_offset, db_file_offset, data_size;
+};
+
+void vg_db_checkpoint( vg_db *db );
+void vg_db_open( vg_db *db, const char *path, const char *wal_path );
 void vg_db_close( vg_db *db );
 #define vg_db_read( DB, BASE, BUF, LEN ) vg_db_xch( DB, BASE, BUF, LEN, 0 )
 #define vg_db_write( DB, BASE, BUF, LEN ) vg_db_xch( DB, BASE, BUF, LEN, 1 )