マストドンはワーカーが最も処理のウェイトを占めていると言っても過言ではありません。 これほどまでにワーカーを積極的に使ったアプリケーションはなかなかお目にかかれないでしょう。 ActiveJob は Rails 4.2 から登場した新しめの機能で、いまだに定跡のようなものが確立しているとは言えません。 新時代のRailsアプリケーションの設計例として、マストドンのワーカーは非常に参考になるのではないでしょうか。
Sidekiqは、リトライ機能を備えたワーカーシステムで、処理の中で例外が発生すると自動でリトライしてくれます。 そのため、例外が発生してリトライされたとしても正しく動作するため、ひとつのジョブには適切な範囲を設定し、また例外設計は十分考慮しなければなりません。
具体的に、フォロワー全員に何かをする、という処理の場合を考えます。
問題のある設計では、ジョブの引数にユーザIDだけをとり、その中で user.followers.each {|f| do_something(f) }
と処理を呼び出します。
この時、 たった1人のフォロワーに対する do_something
が失敗しただけで、ジョブ全体がまるごとリトライされてしまい、既に成功したフォロワーに対して処理が2重に発生してしまいます。
正しい設計では、 ユーザIDとターゲットユーザIDを引数にとるジョブを作り、 フォロワーの数だけジョブを登録します。 この場合、一部のフォロワーに対する処理が失敗しても、他の成功したジョブはリトライされることはありません。
ワーカーの引数は、オブジェクトではなくIDを渡し、ジョブの中でデータベースを参照するべきです。 なぜなら、ジョブの実行待ちやリトライの間に、データベースの中身が変わる可能性があるからです。 リトライするということは順番が変わる可能性もあるので、差分のような形でデータを持つこともできません。 常にジョブ実行時の最新の情報をデータベースから引かなければならないのです。
データベースの中身が変わるということは、当然レコードの削除も含まれるので、
あらゆる例外、特に RecordNotFound
のケアを適切にしなければなりません。
もしこのケアを怠ると、無限にリトライされ続けるジョブが生まれることでしょう。
user = User.find(1)
# bad job
class BadJob1
def perfom(user_id)
user = User.find(id)
user.followers.each {|target| do_something(user, target) }
end
end
BadJob1.perform_async(user.id)
# bad job
class BadJob2
def perform(user, target)
do_something(user, target)
end
end
user.followers.each {|target| BadJob2.perform_async(user, follower) }
# good job
class GoodJob
def perform(user_id, target_id)
user = User.find(user_id)
target = User.find(target_id)
do_something(user, target)
rescue ActiveRecord::RecordNotFound
true
end
end
user.followers.each {|target| GoodJob.perform_async(user.id, follower.id) }
サンプルコードを見てわかるように、 個別のワーカーが毎回 User.find
をしていて非効率に見えるかもしれません。
しかしこれは、もうそういうものとして覚悟しなくてはいけません。
どのような処理をジョブ化しているか見てみましょう。 簡単に言うと、リアルタイム1にユーザに結果を返さないでいい部分は積極的にジョブにしているようです。
リアルタイムに結果を返すというのは、例としてコントローラのレスポンスのようなものです。
投稿の内容を、ユーザIDとParamsを引数にしてジョブにすれば、投稿の新規作成 (Status.create
)自体をワーカーに任せ、ユーザにはさっさとレスポンスをかえすことはできます。
しかしそうしてしまうと、ユーザは投稿が成功したのか、いつ投稿が実際に反映されるのか、全くわからないので普通はそのような実装はしません。
一方、誰かが投稿した内容が自分のタイムラインに表示されることは、リアルタイム性を必要としません。 時系列はそれほど重要ではないし、そもそも誰かが投稿したことを知る術がないからです。
例として、トゥート投稿する時の処理を一通り見ていきましょう。
送信ボタンを押すと、まずフロントのJavaScriptがAPIにリクエストを行います。
リクエストがサーバに到達すると、 app/controllers/api/v1/statuses_controller#create
で処理されます。
コントローラは処理を PostStatusService
に丸投げし、新しい Status
を作らせます。
いったん PostStatusService
の中身は置いておいて、コントローラは作られた Status
をJSONにしてレスポンスを返します。
JavaScriptはレスポンスを受け取り、自分のホームやローカルタイムラインにそのトゥートを表示します。 ここまで、自分が送信ボタンを押し、自分のホームに表示されるまでは完全にリアルタイムの処理になっています。 トゥートを投稿した本人には全く違和感がないことでしょう。
さて、PostStatusService
の中身を見ていきましょう。
まず、新しい Status
が作られ、 ProcessMentionService
や ProcessHashtagService
が呼ばれます。 これらは、本文を解析しメンションの相手やハッシュタグなどをステータスに関連付ける処理です。
これでステータスは完成し、 あとはOEmbedを取得する LinkCrawlWorker
や、他のユーザにトゥートを配信する DistributionWorker
などのジョブが登録されます。
これはつまり、メンション相手を関連付けたりするのはリアルタイム性が必要だけど、
OEmbedを取得したり他のユーザに配信するのは後回しでもよいということです。
登録されたジョブは、ワーカーによって処理さるのを待ちます2。
次は DistributionWorker
を見てみましょう。
DistributionWorker
は単純に FanOutOnWriteService
を呼び出すだけです。
FanOutOnWriteService
では、 deliver_to_xxx
というメソッドで作られたステータスを様々なユーザに配信しています。
その中で、フォロワーに配信するメソッドである deliver_to_followers
だけは FeedInsertWorker
という別のジョブをさらに登録しています。
これは、リアルタイム性というよりも、単純に数が多く(フォロワーは無限に増える)、実行時間が読めないのでワーカー化しているのではないかと思います。
本来すべての配信メソッドをワーカー化したほう良さそうですが、めんどくささ に耐え切れなくなりそうです。