OpenResty による Layer 4 トラフィックの処理と Memcached サーバーの実装

API7.ai

November 10, 2022

OpenResty (NGINX + Lua)

以前のいくつかの記事で、リクエストを処理するためのいくつかのLua APIを紹介しましたが、それらはすべてレイヤー7に関連するものでした。さらに、OpenRestyはレイヤー4からのトラフィックを処理するためにstream-lua-nginx-moduleモジュールを提供しています。これはlua-nginx-moduleと基本的に同じ指示とAPIを提供します。

今日は、OpenRestyを使ってMemcachedサーバーを実装する方法について話します。これは約100行のコードで実現できます。この小さなハンズオンでは、以前学んだ多くのことを使用し、後のテストとパフォーマンス最適化の章からの内容も取り入れます。

そして、この記事のポイントは、すべてのコード行の機能を理解することではなく、OpenRestyがプロジェクトをゼロから開発する方法を、要件、テスト、開発などの観点から全体像を理解することです。

元の要件と技術的解決策

HTTPSトラフィックが主流になりつつありますが、一部の古いブラウザはsession ticketsをサポートしていないため、サーバー側にセッションIDを保存する必要があります。ローカルのストレージスペースが不足している場合、クラスタを利用して保存する必要があり、データは破棄できるため、Memcachedが適しています。

この時点で、Memcachedを導入することが最も直接的な解決策であるべきです。しかし、この記事では、以下の理由からOpenRestyを使って車輪の再発明を選択します。

  • 第一に、Memcachedを直接導入すると、追加のプロセスが導入され、デプロイとメンテナンスのコストが増加します。
  • 第二に、要件は非常にシンプルで、getset操作のみをサポートし、有効期限をサポートする必要があります。
  • 第三に、OpenRestyにはstreamモジュールがあり、この要件を迅速に実装できます。

Memcachedサーバーを実装したいので、まずそのプロトコルを理解する必要があります。MemcachedプロトコルはTCPとUDPをサポートしています。ここではTCPを使用します。以下はgetsetコマンドの具体的なプロトコルです。

Get
get value with key
Telnet command: get <key>*\r\n

Example:
get key
VALUE key 0 4 data END
Set
Save key-value to memcached
Telnet command:set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n

Example:
set key 0 900 4 data
STORED

また、getset以外に、Memcachedプロトコルの「エラーハンドリング」がどのように実装されているかを知る必要があります。「エラーハンドリング」はサーバーサイドプログラムにとって非常に重要であり、通常のリクエストだけでなく例外も処理するプログラムを書く必要があります。例えば、以下のようなシナリオです:

  • Memcachedがgetset以外のリクエストを送信した場合、どのように処理するか?
  • サーバー側でエラーが発生した場合、Memcachedクライアントにどのようなフィードバックを返すか?

また、Memcached互換のクライアントアプリケーションを書きたいと考えています。これにより、ユーザーは公式のMemcachedバージョンとOpenRestyの実装を区別する必要がなくなります。

Memcachedのドキュメントからの以下の図は、エラーが発生した場合に何を返すべきか、その正確な形式を示しています。これを参考にしてください。

error format

さて、技術的解決策を定義しましょう。OpenRestyのshared dictworker間で使用でき、shared dictにデータを格納することはMemcachedにデータを格納することと非常に似ています。どちらもgetset操作をサポートし、プロセスが再起動するとデータは失われます。したがって、shared dictを使用してMemcachedをエミュレートすることは適切です。なぜなら、それらの原理と動作は同じだからです。

テスト駆動開発

次のステップは、実際に作業を開始することです。しかし、テスト駆動開発の考え方に基づいて、具体的なコードを書く前に最もシンプルなテストケースを構築しましょう。test::nginxフレームワークは開始が難しいことで知られているため、restyを使用した手動テストから始めます。

$ resty -e 'local memcached = require "resty.memcached"
    local memc, err = memcached:new()

    memc:set_timeout(1000) -- 1 sec
    local ok, err = memc:connect("127.0.0.1", 11212)
    local ok, err = memc:set("dog", 32)
    if not ok then
        ngx.say("failed to set dog: ", err)
        return
    end

    local res, flags, err = memc:get("dog")
    ngx.say("dog: ", res)'

このテストコードは、lua-rety-memcachedクライアントライブラリを使用してconnectset操作を開始し、Memcachedサーバーがローカルマシンのポート11212でリッスンしていると仮定しています。

これはうまく動作するように見えます。このコードをマシンで実行すると、予想通り、サービスが開始されていないためfailed to set dog: closedのようなエラーが返されます。

この時点で、技術的解決策は明確です:streamモジュールを使用してデータを受信および送信し、shared dictを使用してデータを保存します。

要件の完了を測定するための指標は明確です:上記のコードを実行し、dogの実際の値を出力します。

フレームワークの構築

では、何を待っているのでしょうか?コードを書き始めましょう!

私の習慣は、まず最小限の実行可能なコードフレームワークを構築し、その後徐々にコードを埋めていくことです。この方法の利点は、コーディングプロセス中に多くの小さな目標を設定でき、小さな目標を達成するたびにテストケースがポジティブなフィードバックを提供することです。

まず、NGINXの設定ファイルを設定します。streamshared dictは事前に設定する必要があります。以下は私が設定した設定ファイルです。

stream {
    lua_shared_dict memcached 100m;
    lua_package_path 'lib/?.lua;;';
    server {
        listen 11212;
        content_by_lua_block {
            local m = require("resty.memcached.server")
            m.run()
        }
    }
}

この設定ファイルにはいくつかの重要な情報が含まれています。

  • 第一に、コードはNGINXのstreamコンテキストで実行され、HTTPコンテキストではなく、ポート11212でリッスンしています。
  • 第二に、shared dictの名前はmemcachedで、サイズは100Mであり、実行時に変更できません。
  • さらに、コードはlib/resty/memcachedディレクトリにあり、ファイル名はserver.luaで、エントリ関数はrun()です。これはlua_package_pathcontent_by_lua_blockから見つけることができます。

次に、コードフレームワークを構築します。自分で試してみて、それから私のフレームワークコードを見てみましょう。

local new_tab = require "table.new"
local str_sub = string.sub
local re_find = ngx.re.find
local mc_shdict = ngx.shared.memcached

local _M = { _VERSION = '0.01' }

local function parse_args(s, start)
end

function _M.get(tcpsock, keys)
end

function _M.set(tcpsock, res)
end

function _M.run()
    local tcpsock = assert(ngx.req.socket(true))

    while true do
        tcpsock:settimeout(60000) -- 60 seconds
        local data, err = tcpsock:receive("*l")

        local command, args
        if data then
            local from, to, err = re_find(data, [[(\S+)]], "jo")
            if from then
                command = str_sub(data, from, to)
                args = parse_args(data, to + 1)
            end
        end

        if args then
            local args_len = #args
            if command == 'get' and args_len > 0 then
                _M.get(tcpsock, args)
            elseif command == "set" and args_len == 4 then
                _M.set(tcpsock, args)
            end
        end
    end
end

return _M

このコードスニペットは、エントリ関数run()の主要なロジックを実装しています。例外処理はまだ行っておらず、依存関係parse_argsgetsetはすべて空の関数ですが、このフレームワークはすでにMemcachedサーバーのロジックを完全に表現しています。

コードの埋め込み

次に、コードが実行される順序に従って、これらの空の関数を実装していきましょう。

まず、Memcachedプロトコルのドキュメントに従ってMemcachedコマンドのパラメータを解析します。

local function parse_args(s, start)
    local arr = {}

    while true do
        local from, to = re_find(s, [[\S+]], "jo", {pos = start})
        if not from then
            break
        end

        table.insert(arr, str_sub(s, from, to))

        start = to + 1
    end

    return arr
end

私のアドバイスは、まず最も直感的なバージョンを実装し、パフォーマンスの最適化は考えないことです。結局のところ、完璧さよりも完成が重要であり、完成に基づいて段階的に最適化することが完璧に近づく唯一の方法です。

次に、get関数を実装します。これは一度に複数のキーをクエリできるため、以下のコードではforループを使用しています。

function _M.get(tcpsock, keys)
    local reply = ""

    for i = 1, #keys do
        local key = keys[i]
        local value, flags = mc_shdict:get(key)
        if value then
            local flags  = flags or 0
            reply = reply .. "VALUE" .. key .. " " .. flags .. " " .. #value .. "\r\n" .. value .. "\r\n"
        end
    end
    reply = reply ..  "END\r\n"

    tcpsock:settimeout(1000)  -- one second timeout
    local bytes, err = tcpsock:send(reply)
end

ここでのコアコードは1行だけです:local value, flags = mc_shdict:get(key)、つまりshared dictからデータをクエリすることです。残りのコードは、Memcachedプロトコルに従って文字列を結合し、最終的にクライアントに送信します。

最後に、set関数を見てみましょう。これは受信したパラメータをshared dict APIの形式に変換し、データを保存し、エラーが発生した場合にはMemcachedのプロトコルに従って処理します。

function _M.set(tcpsock, res)
    local reply =  ""

    local key = res[1]
    local flags = res[2]
    local exptime = res[3]
    local bytes = res[4]

    local value, err = tcpsock:receive(tonumber(bytes) + 2)

    if str_sub(value, -2, -1) == "\r\n" then
        local succ, err, forcible = mc_shdict:set(key, str_sub(value, 1, bytes), exptime, flags)
        if succ then
            reply = reply .. “STORED\r\n"
        else
            reply = reply .. "SERVER_ERROR " .. err .. “\r\n”
        end
    else
        reply = reply .. "ERROR\r\n"
    end

    tcpsock:settimeout(1000)  -- one second timeout
    local bytes, err = tcpsock:send(reply)
end

さらに、上記の関数を埋めながら、ngx.logを使用してテストケースでチェックおよびデバッグすることができます。残念ながら、OpenRestyにはブレークポイントデバッガーがないため、ngx.sayngx.logを使用してデバッグしています。これはまだ開拓待ちの時代です。

まとめ

このハンズオンプロジェクトはこれで終了です。最後に、質問を残します:上記のMemcachedサーバー実装コードを完全に実行し、テストケースを通過させることができますか?

今日の質問はおそらく多くの努力を必要としますが、これはまだ原始的なバージョンです。エラーハンドリング、パフォーマンス最適化、自動化テストはまだ改善されていません。

今日の説明や実践について疑問がある場合は、コメントを残して私たちと議論してください。また、この記事を同僚や友人と共有して、一緒に実践し、進歩しましょう。