Transactions for SQLite on Cloudflare Durable Objects

tl;dr: Adding long-lived connections (via WebSockets) to my previously built SQLite on Cloudflare Durable Objects, enables the use of SQL transactions.

This is a follow-up to Store SQLite in Cloudflare Durable Objects. Since then, Cloudflare announced D1, their official SQLite offering. I am happy to see an official SQLite product from Cloudflare. For my own closure, and since I still consider it a fun exercise, I had to finish the second part of this endeavor of running SQLite in a Durable Object nonetheless. This second part is about extending the prototype with transaction support.

As a small disclaimer, I don't have insights into how Cloudflare D1 is built – this is simply me further exploring how I could use the public Cloudflare platform.

Sneak-peak of the result:

In this recording, I use websocat to establish a WebSockets connection to an SQLite database running inside of a Cloudflare Durable Object. The protocol of this connection is JSON-based. At first, I create a new table, insert a new row into it and then select the table's contents to confirm that the row got persisted. Then I start a transaction, in which I insert another row and confirm that the row exists - at least inside of the transaction. I finally rollback the transaction and make sure that the changes inside of the transaction did not persist (the new row does not exist anymore).

WebSockets - Long-lived Connections to a Durable Object

The previous post ended with a prototype that had a simple POST /query endpoint. This approach doesn't work well with transactions. It is stateless and doesn't know if queries should be executed together in a transaction. It could be made stateful, with e.g. a header determining the transaction a query consists to. However, this just calls for stale transactions that are never committed or rolled back. Not just by user-error, but also by programs crashing, or network errors. Transactions are better served through long-lived connections. This way there is always an indicator that a transaction finished – which is once the connection gets closed (doesn't matter if manually or due to an error).

While one could keep an HTTP request open for a while and keep sending data using e.g. Transfer-Encoding: chunked, Cloudflare, and also most servers in general, have a timeout of how long an HTTP request is kept open. The time limit is at 30s of CPU time for Cloudflare Durable Objects.

Fortunately, Durable Objects also support WebSockets. With WebSockets, the limit of 30s of CPU time applies to each WebSockets message.

A bare-bones Durable Object that accepts WebSockets connections looks as follows:

export default {
  async fetch(req: Request, env: Env) {
    const id = env.DATABASE.idFromName("main");
    const stub = env.DATABASE.get(id);
    return stub.fetch(`http://sqlite/websocket`, req);
  },
};

export class Database {
  async fetch(req: Request) {
    if (req.method !== "GET") {
      return new Response("Only GET requests are allowed", {
        status: 405 /* method not allowed */,
      });
    }

    const url = new URL(req.url);
    if (url.pathname !== "/websocket") {
      return new Response("Not found, try GET /websocket", {
        status: 405 /* method not allowed */,
      });
    }

    if (req.headers.get("Upgrade") != "websocket") {
      return new Response("Expected websocket connection", {
        status: 426 /* upgrade required */,
      });
    }

    const { 0: tx, 1: rx } = new WebSocketPair();
    rx.accept();

    rx.addEventListener("message", (msg) => {
      // handle message
    });
    rx.addEventListener("close", () => {
      // handle close
    });
    rx.addEventListener("error", () => {
      // handle error
    });

    return new Response(null, { status: 101, webSocket: tx });
  }
}

This example accepts requests, gets an instance of the durable object DATABASE for the hard-coded name main and forwards the request to the path /websocket of the Durable Object. The Durable Object ensures that incoming requests are GET requests to /websocket and that they are requests trying to upgrade to a WebSockets connection. If all ok, it upgrades to a WebSockets connection and adds listeners for message, close and error.

Locks - Handle Concurrent Connections

Even though the previous implementation had only a single /query endpoint, its storage access was already asynchronous, which could in theory cause the storage access of concurrent requests to overlap. While this could be solved with the state.blockConcurrencyWhile() API for the single /query endpoint, I want to allow concurrent WebSockets connections. This requires a proper implementation of SQLite locking in the underlying VFS (Virtual File System).

I spend quite some time over the last couple of months working on sqlite-vfs. I managed to get most of SQLite's TCL test harness green when run against SQLite with a custom VFS built with sqlite-vfs. As a result, the previously written Vfs Rust trait changed quite a bit and now also includes methods to implement the locking for the VFS.

The locking of SQLite's default file systems is quite involved. This is due to the complexity of SQLite having multiple locking states and SQLite potentially running concurrently across multiple process and/or threads. Things in a nutshell that need to be considered:

  1. Locks must work across multiple independent processes (without a central coordinator).
  2. Locks must be released when a process/thread crashes.
  3. Locks must work when multiple concurrent processes/threads try to get a lock at the same time.

I'd argue that implementing locking for a single database daemon is a lot easier as

  1. You have a central coordinator.
  2. You don't care about a client crashing as this will close the connection which is a simple indicator to release the lock.
  3. When the coordination happens in the same process, you can simply rely on mutexes to coordinate.

Thus, locking our SQLite that runs in a Durable Object is quite easy. A Durable Object is:

  1. Unique (per database at least),
  2. A single service where clients connect to, and
  3. Single-threaded.

With that, coordinating a lock is as simple as having a global. The global wouldn't even need a mutex as the Durable Object is single-threaded.

In code, this means that the custom VFS built for SQLite in Durable Objects, has a property for its lock state:

pub struct PagesVfs<const PAGE_SIZE: usize> {
    lock_state: Arc<Mutex<LockState>>,
}

Per Durable Object instance, ie. per database, this struct is only ever instantiated once:

#[no_mangle]
extern "C" fn sqlite3_os_init() -> i32 {
    match register("cfdo", /* -> */ PagesVfs::<4096>::default() /* <- */, true) {
        Ok(_) => SQLITE_OK,
        Err(RegisterError::Nul(_)) => SQLITE_ERROR,
        Err(RegisterError::Register(code)) => code,
    }
}

This makes this lock_state basically a global. It is still wrapped in a Mutex here, because I've decided (for now) to require custom Vfs trait implementations to be Sync (trait Vfs: Sync). A connection to this VFS then simply gets a reference to this state (and its current own lock), which is enough to implement the locking.

pub struct Connection<const PAGE_SIZE: usize> {
    lock_state: Arc<Mutex<LockState>>, // <-- instance to same `lock_state` as above
    lock: LockKind, // <-- current
}

The locking implementation isn't worth posting here as it is just some logic looking at the current lock state, at the current lock of the connection and the requested lock to determine if the requested lock can be granted.

As of the docs, Durable Objects might get evicted from memory. This probably happens if there are no requests to it for a certain amount of time (I don't know the specifics here). In this case, the global lock state would get disposed, too. This however is perfectly fine, as it will only ever happen if there are no active connections, ie. if there are no active locks. In case it would happen while having active connections (e.g. due to a error/crash), it would still be fine as this would also drop all active connections.

Here is a simple demo showcasing the locking in action for concurrent connections. The first connection opens an exclusive transaction. This acquires an exclusive lock, which prevents other connections from even reading the database. This is shown by opening another connection and trying to query the database, which fails after 5 seconds with a database is locked error. After increasing the busy timeout (the time it is waiting for a lock), the transaction of the first connection is committed, which releases the lock. As a result, when going back to the second connection, it was able to retrieve a shared lock and thus finally execute the query.

WASI methods required by Locks

In part 1, I mentioned that I prefer to provide my own WASI imports and usually throw a not implemented error for all imports at first, and only implement them when I see them being used, e.g.:

  // "wasi_snapshot_preview1"."clock_time_get": [I32, I64, I32] -> [I32]
  clock_time_get() {
    throw new Error("clock_time_get not implemented");
  },

With the locking in place, there are two WASI imports I didn't implement yet that are now called: clock_time_get and poll_oneoff. I assume that clock_time_get is now called due to the busy timeout when trying to acquire a lock, which probably takes some time measurements to coordinate the timeout. My simplified implementation of clock_time_get for Cloudflare Workers looks like the following:

  // "wasi_snapshot_preview1"."clock_time_get": [I32, I64, I32] -> [I32]
  clock_time_get(_id: number, _precision: number, offset: number) {
    const memoryView = new DataView(exports.memory.buffer);
    const time = BigInt(Date.now()) * BigInt(1e6);
    memoryView.setBigUint64(offset, time, true);

    return ERRNO_SUCCESS;
  },

If you want to see a better one, check out deno's implementation. Since there is no performance.now() on Cloudflare Workers, the simpler implementation must suffice.

The other import (poll_oneoff) was called as a result of std::thread::sleep(), which is called when the connection waits a bit until it checks again if it can acquire a lock. poll_oneoff looked a bit too complicated for the time I was willing to invest into it, so instead of implementing it, I got rid of the std::thread::sleep() in favor of allowing the Vfs to bring its own sleep implementation. With that, the sleep implementation looks like:

extern "C" {
    pub fn conn_sleep(ms: u32);
}
fn sleep(&self, duration: Duration) -> Duration {
    let now = Instant::now();
    unsafe { crate::conn_sleep((duration.as_millis() as u32).max(1)) };
    now.elapsed()
}

I am asyncifying conn_sleep the same way I do for get_page and put_page (see part 1) and implement it with a setTimeout. While this only allows sleeps of at least one millisecond, it seems to be good enough here.

async conn_sleep(ms: number) {
  await new Promise<void>((resolve) => setTimeout(resolve, ms));
},

Durable Object Implementation Details

Starting with the bare-bones Durable Object from above, the first thing is support for multiple databases. I use a separate Durable Objects instance per database. This is done by using different names for the call to env.DATABASE.idFromName(name). I am retrieving the name from the URL and thus expect the URL pathname to match the pattern /:database. The resulting code is:

export default {
  async fetch(req: Request, env: Env) {
    if (req.method !== "GET") {
      return new Response(null, { status: 405 /* method not allowed */ });
    }

    // Expected pattern: /:database
    const url = new URL(req.url);
    const segments = url.pathname.slice(1).split("/");
    if (segments.length !== 1) {
      return new Response("not found", {
        status: 404 /* not found */,
      });
    }
    const [name] = segments;

    const id = env.DATABASE.idFromName(name);
    const stub = env.DATABASE.get(id);
    return stub.fetch(`http://sqlite/websocket`, req);
  },
};

Previously, I was instantiating the WASM module for each request. I've now moved this into the constructor of the DO to reduce overall overhead and memory usage. As the instantiation is asynchronous, an important part is to call blockConcurrencyWhile, to not accept any concurrent request until the instantiation is done. As part of the instantiation, I am also extracting the current page count out of the first database page. I keep this value as a property on the DO to make sure that all requests to the DO see the same up-to-date value. Finally, I provide pageCount, getPage, putPage and delPage implementations to wasm-sqlite. The methods pageCount and delPage are new additions compared to part 1 and are a result of the more mature sqlite-vfs. In a nutshell, pageCount is used to calculate the database size, and delPage is used to truncate the database.

export class Database {
  private pageCount: number;
  private sqlite: Sqlite;

  constructor(state: DurableObjectState, _env: Env) {
    this.pageCount = 0;

    const storage = state.storage;
    const self = this;

    // Block concurrency until DO is completely initialized
    state.blockConcurrencyWhile(async () => {
      // Read the page count from page 0 (if page 0 already exists)
      const page: Array<number> | undefined = await storage.get<Array<number>>(
        String(0)
      );
      if (page) {
        const view = new DataView(new Uint8Array(page).buffer);
        this.pageCount = view.getUint32(28, false);
      }

      this.sqlite = await Sqlite.instantiate({
        pageCount(): number {
          return self.pageCount;
        },

        async getPage(ix: number): Promise<Uint8Array> {
          const page: Array<number> =
            (await storage.get<Array<number>>(String(ix))) ?? new Array(4096);
          return new Uint8Array(page);
        },

        async putPage(ix: number, page: Uint8Array): Promise<void> {
          await storage.put(String(ix), Array.from(page), {});
          self.pageCount = Math.max(self.pageCount, ix + 1);
        },

        async delPage(ix: number): Promise<void> {
          await storage.delete(String(ix));
          if (ix + 1 >= self.pageCount) {
            self.pageCount = ix;
          }
        },
      });
    });
  }
}

Since the instantiation happens in the DO constructor, opening a connection is a separate step now, which happens for each WebSockets connection and kept open for the duration of the connection:

const conn = await this.sqlite.connect();

The final piece of the DO implementation are the WebSockets event handlers. The easy ones are error and close. Both just have to drop the connection to cleanup memory and release open locks if there are any.

rx.addEventListener("close", () => {
  conn.drop();
});
rx.addEventListener("error", () => {
  conn.drop();
});

The message event is also quite straight forward. It just validates the message content and executes the query on the database connection.

rx.addEventListener("message", (msg) => {
  let data: unknown;
  try {
    data = JSON.parse(msg.data);
  } catch (err) {
    rx.send(JSON.stringify({ error: `Expected JSON: ${err}` }));
    return;
  }
  if (!data || typeof data !== "object") {
    rx.send(JSON.stringify({ error: "Expected JSON object" }));
    return;
  }

  // validate data
  if (!isQuery(data)) {
    rx.send(JSON.stringify({ error: "Expected `sql` property" }));
    return;
  }
  if (!isParams(data.params)) {
    rx.send(
      JSON.stringify({
        error:
          "Expected `body.params` to be an array of `string | number | boolean | null`",
      })
    );
    return;
  }

  conn
    .queryRaw(data.sql, data.params)
    .then((json) => rx.send(json))
    .catch((err: unknown) =>
      rx.send(
        JSON.stringify({ error: String(err), stack: (err as Error).stack })
      )
    );
});

Conclusion

One of the open points / concerns from my previous post was:

I am pleased that I got this working as I cannot imaging using an SQL database without transactions.

The only remaining thing I'd want for a great developer experience is a modified SQLite CLI that connects to SQLite in a DO. But I am not diving head first into that rabbit hole and rather conclude this experiment here in favor of the upcoming Cloudflare D1, which I am very much looking forward to. I had a lot of fun and am even a bit glad that with Cloudflare D1 around the corner, I don't feel the need to move this from a prototype state into something that can be trusted with production data (not sure if I would have wanted that responsibility).

The code can be found here: