/* API
* ------------------------------------------------------------------------------------------------------------------ */
-void vg_db_open( vg_db *db, const char *path )
+static void vg_db_commit( vg_db *db, FILE *fwal )
+{
+ void *temp_page = malloc( VG_PAGE_SIZE );
+
+ vg_info( "Commiting WAL...\n" );
+ u64 last_good_checkpoint = 0,
+ last_good_log = 0;
+
+ if( fseek( fwal, 0, SEEK_SET ) )
+ {
+ vg_warn( "WAL file was empty (assuming db is atomic)\n" );
+ return;
+ }
+
+ vg_db_wal log;
+ if( !fread( &log, sizeof(log), 1, fwal ) )
+ {
+ vg_warn( "No checkpoints in WAL file (assuming db is atomic)\n" );
+ return;
+ }
+
+ if( log.type != k_wal_log_checkpoint )
+ vg_db_abort( db, "First log in WAL file was not a checkpoint. Invalid WAL file?\n" );
+
+ /* replay until we run off the end */
+ while(1)
+ {
+ u64 log_pos = ftell( fwal );
+
+ /* read one log */
+ if( !fread( &log, sizeof(log), 1, fwal ) )
+ {
+ vg_info( "Reached end of log\n" );
+ break;
+ }
+
+ if( log.type == k_wal_log_data )
+ {
+ if( !fseek( fwal, log.data_size, SEEK_CUR ) )
+ {
+ if( fread( temp_page, log.data_size, 1, fwal ) )
+ {
+ if( fseek( db->fp, log.db_file_offset, SEEK_SET ) )
+ vg_db_abort( db, "While playing back WAL, SEEK_SET(0x%lx) failed\n", log.db_file_offset );
+ if( !fwrite( temp_page, log.data_size, 1, db->fp ) )
+ vg_db_abort( db, "While playing back WAL, fwrite failed (0x%lx bytes)\n", log.data_size );
+ last_good_log = log_pos;
+ }
+ else
+ {
+ vg_warn( "Truncated log entry while playing back WAL (not enough redo data)\n" );
+ break;
+ }
+ }
+ else
+ {
+ vg_warn( "Truncated log entry while playing back WAL (didn't make it to redo data)\n" );
+ break;
+ }
+ }
+ else
+ {
+ last_good_log = log_pos;
+ last_good_checkpoint = log_pos;
+ }
+ }
+
+ /* Rewind until we get back to a good checkpoint (if needed) */
+ if( last_good_log != last_good_checkpoint )
+ vg_warn( "Rewinding...\n" );
+
+ u64 rewind_cur = last_good_log;
+ while( rewind_cur != last_good_checkpoint )
+ {
+ if( fseek( fwal, rewind_cur, SEEK_SET ) )
+ vg_db_abort( db, "While rewinding WAL, SEEK_SET(0x%lx) failed\n", rewind_cur );
+ if( !fread( &log, sizeof(log), 1, fwal ) )
+ vg_db_abort( db, "While rewinding WAL, fread failed\n" );
+ if( fread( temp_page, log.data_size, 1, fwal ) )
+ {
+ if( fseek( db->fp, log.db_file_offset, SEEK_SET ) )
+ vg_db_abort( db, "While rewinding WAL, SEEK_SET(0x%lx) failed\n", log.db_file_offset );
+ if( !fwrite( temp_page, log.data_size, 1, db->fp ) )
+ vg_db_abort( db, "While rewinding WAL, fwrite failed (0x%lx bytes)\n", log.data_size );
+ rewind_cur = log.previous_log_offset;
+ }
+ else
+ {
+ vg_warn( "Truncated log entry while playing back WAL (not enough redo data)\n" );
+ break;
+ }
+ }
+
+ free( temp_page );
+}
+
+void vg_db_open( vg_db *db, const char *path, const char *wal_path )
{
u32 k_ident = 0xf32b1a00;
vg_rand_seed( &db->rand, k_ident + time(NULL) );
+
db->fp = fopen( path, "rb+" );
+ db->fp_wal = NULL;
+ db->wal_path = wal_path;
db->page_data = malloc( VG_PAGE_SIZE*VG_MAX_CACHED_PAGES );
db->cache_count = 0;
db->lru_old = 0;
db->lru_young = 0;
if( db->fp )
{
+ FILE *fwal = fopen( wal_path, "rb" );
+ if( fwal )
+ {
+ vg_info( "WAL file found (%s), need to apply.\n", wal_path );
+ vg_db_commit( db, fwal );
+ fclose( fwal );
+ remove( wal_path );
+ }
+
u32 ident;
vg_db_read( db, 0, &ident, 4 );
if( ident != k_ident )
db->userdata_address = vg_db_virtual_allocate( db, VG_1GB );
vg_db_write( db, offsetof(vg_db_header,userdata_address), &db->userdata_address, 8 );
vg_db_tree_init( db, offsetof(vg_db_header,address_tree) );
+ vg_db_checkpoint( db );
}
}
void vg_db_close( vg_db *db )
{
- for( u32 i=0; i<VG_MAX_CACHED_PAGES; i ++ )
- vg_db_sync_page( db, i+1 );
+ vg_db_checkpoint( db );
+ if( db->fp_wal )
+ {
+ fclose( db->fp_wal );
+ db->fp_wal = NULL;
+ }
fclose( db->fp );
db->fp = NULL;
+#if 0
+ if( db->fp_wal )
+ {
+ fclose( db->fp_wal );
+ remove( db->wal_path ); /* dont need anymore */
+ }
+#endif
free( db->page_data );
db->page_data = NULL;
}
return page_data + inner_offset;
}
+void vg_db_checkpoint( vg_db *db )
+{
+ if( db->fp_wal == NULL )
+ return;
+
+ u64 file_pos = ftell( db->fp_wal );
+ vg_db_wal log = {
+ .type = k_wal_log_checkpoint,
+ .previous_log_offset = db->previous_wal_log,
+ .db_file_offset = 0,
+ .data_size = 0
+ };
+ if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) )
+ vg_db_abort( db, "fwrite failed into WAL\n" );
+ db->previous_wal_log = file_pos;
+
+ if( db->wal_writes > 1000 )
+ {
+ vg_info( "WAL reached %u writes (>1000), flushing...\n", db->wal_writes );
+ vg_db_commit( db, db->fp_wal );
+ fclose( db->fp_wal );
+ remove( db->wal_path );
+ db->wal_writes = 0;
+ db->previous_wal_log = 0;
+ db->fp_wal = NULL;
+ }
+}
+
void vg_db_xch( vg_db *db, u64 base_address, void *buf, u32 length, bool write )
{
u64 address = base_address,
*user_buffer = buf + (address-base_address);
if( write )
{
+ if( db->fp_wal == NULL )
+ {
+ db->fp_wal = fopen( db->wal_path, "wb+" );
+ vg_db_wal log = {
+ .type = k_wal_log_checkpoint,
+ .previous_log_offset = 0,
+ .db_file_offset = 0,
+ .data_size = 0
+ };
+ if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) )
+ vg_db_abort( db, "fwrite failed into WAL\n" );
+ }
+
+ u64 file_pos = ftell( db->fp_wal );
+ vg_db_wal log = {
+ .type = k_wal_log_data,
+ .previous_log_offset = db->previous_wal_log,
+ .db_file_offset = physical_address,
+ .data_size = byte_count
+ };
+ if( !fwrite( &log, sizeof(log), 1, db->fp_wal ) )
+ vg_db_abort( db, "fwrite failed into WAL\n" );
+ if( !fwrite( cache_buffer, byte_count, 1, db->fp_wal ) )
+ vg_db_abort( db, "fwrite failed into WAL\n" );
+ if( !fwrite( user_buffer, byte_count, 1, db->fp_wal ) )
+ vg_db_abort( db, "fwrite failed into WAL\n" );
+
+ db->previous_wal_log = file_pos;
+ db->wal_writes ++;
memcpy( cache_buffer, user_buffer, byte_count );
}
else