In the last post, we figured out how we can find what pages we should flush to disk. In this one, I want to talk about how we can actually do the flushing. Once a page is marked as ready for flushing, we need to start an async process in which we write it to the disk. While that process is running, we need to ensure that there are no further writes happening on that page. We want to be able to handle flushing writes in the background with as little fuss as we possibly can, while at the same time maximizing parallelism. That turns out to be a significant challenge. Our previous write strategy simply gathered all the writes in memory and then flushed them to disk. Now we need to have an ongoing process. In addition, we also want to handle the scenario where we flush everything in memory for certain scenarios, such as checkpoints or orderly shutdown.
In short, the approach we previously took isn’t going to be sufficient for our needs. For that reason, I want to clear the board and start from scratch. As a reminder, here is the API that we provide:
The rules for this API are fairly simple. You acquire the writer and then may call the write() method any number of times. The writer will aggregate the writes in memory, and will use write behind to write the data to disk in the background. In other words, we aren’t necessarily going to write to the disk when write() is called, only to our own buffer.
Note that the writer is working using pages, not chunks, even though the buffer pool we use operates on chunks. In addition to the write() method, we have a flush() which pushes everything that is currently in the writer to the disk. That one is meant to be called when we want to create a checkpoint or when we do an orderly shutdown. The flush() call starts an asynchronous process that we can wait upon, so we aren’t going to stall on checkpoints. While we write a page to disk, we’ll reject any writes to that page. This is important, because to do otherwise will mean that we may send messed up data to disk.
We also need to handle any and all sequences of writes. For example, consider:
- write(2, [1 page]); // write a single page #2
- write(4, [4 pages]);// write 4 pages from page #4 (so pages: 4,5,6,7)
Now, we may actually get additional writes as well:
- write(6, [3 pages]);
- write(4, [1 page]);
In other words, there are no requirements from the caller about what should be called and in what order. The caller may write a set of pages and then repurpose them again. In this case, we expect that the end result is that we’ll issue the following writes to the disk:
- write(2, 1 page);
- write(4, 1 page);
- write(6, 3 pages);
Note that we have page #5, it was written in the first call to page #4 (when it used 4 pages), however, with the update of page #4 to be a single page, the data in page #5 is no longer relevant and we can skip writing it to disk. This is an effect of working with pages. The data doesn’t make sense outside of a page boundary, after all, so we should only write meaningful data.
After spending a lot of time thinking about it, I decided to go with the following behavior: The Pager’s writer will manage the write buffers, flushing to the disk when it is time. Callers can decide to let writes happen in the background or to wait for them. Here is the new state of the Writer:
There are a few interesting fields here:
- The lock is there to ensure that we have only a single writer. As a reminder, the actual I/O is asynchronous.
- The cache implements an Lru with a twist, I improved on that since the last post, more on this later.
- The parent field is the owning pager, obviously.
- The last three fields (inflight, completed and backgroundWrites) are all about handling the asynchronous writes for us.
Let’s take a look at how we handle the state for a particular write and then we’ll be able to discuss the implementation in depth:
On the surface, the WriteState is pretty simple. It contains the page we are writing and how many pages are involved in a particular write, but there are also quite a few additional fields that are… strange. The writer field is used when we are returning from a callback, the chunks are used to record which chunks are relevant for this particular write (remember, the Writer operates at the page level, while the Pager operates at the chunk level). Because we deal with individual pages, we have to ensure that each one of them has its own reference for the chunk in question. That also has the effect of pinning any dirty pages in memory until they are flushed to disk. The err field holds whether there was an error (and what was it) when writing to the disk. The list field is used in conjunction with the write behind policy and will be discussed shortly. We use the next field to create a single linked list with the head being the completed field on the Writer.
With that in mind, let’s look at how the write() method is implemented:
We start by draining any completed writes. It is important to detect any errors and to release the references from the chunks we locked during the write portion. We treat any error during the write process as fatal and abort immediately. At this point, we don’t know what the state of the system is. The only safe way to handle this is to shutdown and run recovery.
The next step is ensureNotWrittenTo() call. This is required because we may be in the process of writing a page to the disk, and we cannot modify the memory for that page (or pages) while they are being written to the disk. This is a recoverable error, by the way. The caller should wait until the pending writes are complete and retry.
We then check in the cache if the page is already there. If it is, we have to consider the potential for a mismatch in the sizes (described earlier in this post), when we have a previous write that spanned three pages but the current one only applies to a single page. Since the page boundary is important, we can just discard the previous write and continue. For the common case of a write to a page that was previously written to, however, we can simply copy the data from the provided buffer to the right page in memory and move on. Most of the hot pages should fit into this code path.
We then allocate the state for the current write, get the page (or pages) and copy the data from the provided buffer to our own memory. We also, crucially, push the new state into the writer’s cache. In the previous post that action would evict a page that we need to handle. In the current code, that is handled at the epilog of the method.
We iterate over the cache and evict any pages that don’t fit into the cache any longer. This is because a single write may evict multiple pages. If we have space for 32KB, and we currently have 4 separate pages in the cache, we are full. Adding a write for 2 pages, however, will need to evict two items out of the cache. Another aspect that I changed from the last post is that I’m no longer requiring that the current page will not be evicted immediately.
I mentioned that I changed the Lru’s implementation significantly. Previously it was fairly generic, but it turns out that it is better to specialize it specifically for our needs. Here is the current version:
This Lru is implemented using a generational approach. The idea is that we can specify the sizes of the young and old generations independently. A new page that is added to the cache is written to the young generation. If there is another write to the page, it will be moved to the old generation. If the size of the data in each generation is too big, we use a simple Least Recently Used to evict items. An item that expired on the old generation will be moved to the young generation, while an item that expires from the young generation will be evicted completely. Here is how this looks like in code:
This is the same rough algorithm used by MySQL. The advantage here is that there is O(1) cost for the entire process, but at the same time, busy pages actually have a good chance of not being written to the disk (which is ideal, since if they are busy, they keep changing). To give some context to the sizes involved, we may decide that the old generation contains 192 MB and the young generation is 64 MB. A page write will remain in the young generation until we have another 64 MB (8,192 individual page writes), if there were any writes to it again in that time frame, it will go to the old generation. Only if we didn't have any writes to it in that time frame will we flush it to disk. Once a page was moved to the old generation, it has to go through a buffer of 256 MB (32K page writes) before it will go to the disk. Hot pages should be safe from spurious writes, especially since we’ll write them to the disk anyway whenever we do a checkpoint.
When a page is evicted from the cache, the writePageToDisk() method is called on it, there is a bit of work going on here:
An important observation about writing to the disk is that coalescing writes can have a huge impact on performance. Instead of having to go to the disk multiple times, we can schlepp the data there just once. The amount of the data we write is actually far less significant than what you may expect. You can see that the first thing that we do is to try to merge the pages (both forward and backward). If we find a candidate, we expand our own WriteState and release the one that subsumed. This is an important aspect, because the writes we send to the disk don’t necessarily match the old / young generations division. If we evicted a page that is adjacent to another modified page, we’ll write them both to the disk. That is even if the second page is in the old generation (and that will evict the page from the write cache). In practice, databases exhibit a lot of locality, so I don’t expect that to be an actual issue.
The last thing we do before actually submitting the I/O operation is to register the write in the inflight record. This is basically a sorted list (by the first page for that write) which we’ll check on any future writes. We also increment the number of background writes we have so we’ll know to wait for them to complete. Note that if we fail to submit the I/O, we decrement the number of backgroundWrites, we use this value to wait for async operations (for example, during shutdown). And since such errors are probably fatal, we are heading that way soon.
The actual I/O happens in the background and we’ll call the callback function when that is done (either successfully or with an error). So far, all the code we saw was single threaded and protected by the lock mutex. The callback, on the other hand, is running without any such protections. Furthermore, the callback thread is running all the notifications we have. We want to do as little as possible there. Here is what I ended up with:
We register the completed write in the completed linked list and wake any pending waiters. Any actual processing, of course, is moved to the part of the process where we actually run under lock and don’t have to worry about concurrency and multi threading. The actual behavior around completing writes is implemented in the drainCompleted() call that is part of the write() call.
Even though we are under lock, the I/O completion may compete with us on the completed list, so we’re using lock free methods to iterate through the list of completed actions. The actual behavior is straightforward, we remove the write from the list of inflight writes and return an error if the write had a problem. Note that we have to deal with potentially nested errors as well. Aside from calling this as part of the write call(), we may also want to explicitly wait for all the pending operations to complete. This is done using the waitForWrites() call:
We drain all the completed writes and then wait for any pending ones to complete as well. We repeat this until there are no more backgroundWrites. Note that we run this under the lock, so we know that there can never be additional writes happening.
In special circumstances, we may want to ask the writer to push everything to the disk, we do that by calling the checkpoint() method:
There is almost nothing there, I’m happy to say, we are almost done. We evict pages from the cache from the youngest to the oldest, using the same write coalescing as before. Given write locality, that is likely to produce good results. We complete the process by waiting for all those writes to complete, and we are done.
There is just one last topic that I want to cover: Shutting down:
Shutting down turns out to be a fairly involved process. This is especially the case if we are shutting down because of an error. We need to wait until all the pending writes have been completed (to do otherwise is to invite Use-After-Free bugs). That is why we call the waitForWrites() until it completes successfully. At worst, each time this is called will process a single write.
On shutdown, we can’t really start writing to the disk (after all, we may be shutting down because of the disk). We just discard the data in this case. That is something that we are okay to do, because we assume that any important data can be restored when we run recovery. This is one of those cases where the fact that we are building a database makes our life easier. We don’t need to try very hard to persist to disk, we can assume that as a given.
I’m pretty happy with how all this turned out. The full code for the pager is now at around 1,500 lines of code. I think that at this point we are pretty much at or near what you’ll need to replace the mmap() based pager that I demonstrated earlier in this series. This is the sort of code that needs a metric ton of tests, of course, and a lot of actual experience throwing stuff at it. The nice thing about this approach is that this is entirely in our hands. We can now implement whatever policies we want. The actual behavior is more or less done, but we can play with the exact mechanics and what will trigger what at will. In some cases, that can give us a great deal of power and flexibility.