March 2020
← Blog ♘

Using Postgres as a queue

A friend and I are building an RSS Reader. A saas thing, like Google Reader (only better of course). Every so often, say, every hour, it needs to have something look at a feed and go off to check if there are new posts for that feed.

One way to do this is to have a process load all the feeds from the database into a list and then hand them out to workers that go off and do their thing with a URL and a feed id. This works, but it doesn't scale very well since now we have state on the machine, so we can't just spin up new ones and expect it to just work. And it wouldn't work, of course. Each machine would have the same queue of all the feeds and update them in complete disregard to what the other machines might have done.

Instead, we need to keep the state, including the queue, in the database where state belongs.

To do this, let's first imagine how we'd do it with just one machine. Instead of loading all the feeds into memory, we just query the database for the one that has the lowest updated timestamp of those less than an hour ago. In Go, it would look something like this.

func feedUpdater() {
    for {
        feed := Feed{}
        err := db.Get(
            &feed,
            `select * from feeds
             where updated < NOW() - interval '1 hour'
             order by updated limit 1`
        )
        
        // All feeds up to date, go to sleep
        if err == sql.ErrNoRows {
            sleep(5 * time.Second)
            continue
        }
        
        // More error checking then update feed
    }
}

We get the feed out, we go fetch its posts, do our updates and update its updated field so we won't see it again for another hour.

This doesn't scale at all either. If we run two machines, then they will almost certainly get the same feed out since doing an http request takes a lot of time, so while the first one is waiting for that to return, the other machine will do the same query and get the same feed out and do the exact same http request.

We can solve part of this problem by doing it in a transaction. We really should be doing this in a transaction, because the database stores feeds and posts in separate tables, and we are updating both.

begin;

select * from feeds
    where updated < NOW() - interval '1 hour'
    order by updated limit 1
    for update;
    
-- do stuff

commit;

This does not solve our problem though, since now the second machine will have to wait for this transaction to finish before it can move on. In essence, we just serialized our workers. To fix this problem however, Postgres 9.5 gave us skip locked.

begin;

select * from feeds
    where updated < NOW() - interval '1 hour'
    order by updated limit 1
    for update
    skip locked;
    
-- do stuff

commit;

So now, the second (and third and fourth) machine ignores any feeds that are locked by a transaction and just gets the next one in the row. This is exactly what we want. This will for our purposes here act exactly like a queue.

🐘🐘🐘