Using Postgres as a queue
A friend and I are building an RSS Reader. A saas thing, like Google Reader (only better of course). Every so often, say, every hour, it needs to have something look at a feed and go off to check if there are new posts for that feed.
One way to do this is to have a process load all the feeds from the database into a list and then hand them out to workers that go off and do their thing with a URL and a feed id. This works, but it doesn't scale very well since now we have state on the machine, so we can't just spin up new ones and expect it to just work. And it wouldn't work, of course. Each machine would have the same queue of all the feeds and update them in complete disregard to what the other machines might have done.
Instead, we need to keep the state, including the queue, in the database where state belongs.
To do this, let's first imagine how we'd do it with just one machine. Instead
of loading all the feeds into memory, we just query the database for the one
that has the lowest updated
timestamp of those less than an hour ago. In Go,
it would look something like this.
func feedUpdater() { for { feed := Feed{} err := db.Get( &feed, `select * from feeds where updated < NOW() - interval '1 hour' order by updated limit 1` ) // All feeds up to date, go to sleep if err == sql.ErrNoRows { sleep(5 * time.Second) continue } // More error checking then update feed } }
We get the feed out, we go fetch its posts, do our updates and update its
updated
field so we won't see it again for another hour.
This doesn't scale at all either. If we run two machines, then they will almost certainly get the same feed out since doing an http request takes a lot of time, so while the first one is waiting for that to return, the other machine will do the same query and get the same feed out and do the exact same http request.
We can solve part of this problem by doing it in a transaction. We really should be doing this in a transaction, because the database stores feeds and posts in separate tables, and we are updating both.
begin; select * from feeds where updated < NOW() - interval '1 hour' order by updated limit 1 for update; -- do stuff commit;
This does not solve our problem though, since now the second machine will have
to wait for this transaction to finish before it can move on. In essence, we
just serialized our workers. To fix this problem however, Postgres 9.5 gave us
skip locked
.
begin; select * from feeds where updated < NOW() - interval '1 hour' order by updated limit 1 for update skip locked; -- do stuff commit;
So now, the second (and third and fourth) machine ignores any feeds that are locked by a transaction and just gets the next one in the row. This is exactly what we want. This will for our purposes here act exactly like a queue.
🐘🐘🐘