OpenResty による Layer 4 トラフィックの処理と Memcached サーバーの実装
API7.ai
November 10, 2022
以前のいくつかの記事で、リクエストを処理するためのいくつかのLua APIを紹介しましたが、それらはすべてレイヤー7に関連するものでした。さらに、OpenRestyはレイヤー4からのトラフィックを処理するためにstream-lua-nginx-module
モジュールを提供しています。これはlua-nginx-module
と基本的に同じ指示とAPIを提供します。
今日は、OpenRestyを使ってMemcachedサーバーを実装する方法について話します。これは約100行のコードで実現できます。この小さなハンズオンでは、以前学んだ多くのことを使用し、後のテストとパフォーマンス最適化の章からの内容も取り入れます。
そして、この記事のポイントは、すべてのコード行の機能を理解することではなく、OpenRestyがプロジェクトをゼロから開発する方法を、要件、テスト、開発などの観点から全体像を理解することです。
元の要件と技術的解決策
HTTPSトラフィックが主流になりつつありますが、一部の古いブラウザはsession tickets
をサポートしていないため、サーバー側にセッションIDを保存する必要があります。ローカルのストレージスペースが不足している場合、クラスタを利用して保存する必要があり、データは破棄できるため、Memcachedが適しています。
この時点で、Memcachedを導入することが最も直接的な解決策であるべきです。しかし、この記事では、以下の理由からOpenRestyを使って車輪の再発明を選択します。
- 第一に、Memcachedを直接導入すると、追加のプロセスが導入され、デプロイとメンテナンスのコストが増加します。
- 第二に、要件は非常にシンプルで、
get
とset
操作のみをサポートし、有効期限をサポートする必要があります。 - 第三に、OpenRestyには
stream
モジュールがあり、この要件を迅速に実装できます。
Memcachedサーバーを実装したいので、まずそのプロトコルを理解する必要があります。MemcachedプロトコルはTCPとUDPをサポートしています。ここではTCPを使用します。以下はget
とset
コマンドの具体的なプロトコルです。
Get
get value with key
Telnet command: get <key>*\r\n
Example:
get key
VALUE key 0 4 data END
Set
Save key-value to memcached
Telnet command:set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n
Example:
set key 0 900 4 data
STORED
また、get
とset
以外に、Memcachedプロトコルの「エラーハンドリング」がどのように実装されているかを知る必要があります。「エラーハンドリング」はサーバーサイドプログラムにとって非常に重要であり、通常のリクエストだけでなく例外も処理するプログラムを書く必要があります。例えば、以下のようなシナリオです:
- Memcachedが
get
やset
以外のリクエストを送信した場合、どのように処理するか? - サーバー側でエラーが発生した場合、Memcachedクライアントにどのようなフィードバックを返すか?
また、Memcached互換のクライアントアプリケーションを書きたいと考えています。これにより、ユーザーは公式のMemcachedバージョンとOpenRestyの実装を区別する必要がなくなります。
Memcachedのドキュメントからの以下の図は、エラーが発生した場合に何を返すべきか、その正確な形式を示しています。これを参考にしてください。
さて、技術的解決策を定義しましょう。OpenRestyのshared dict
はworker
間で使用でき、shared dict
にデータを格納することはMemcachedにデータを格納することと非常に似ています。どちらもget
とset
操作をサポートし、プロセスが再起動するとデータは失われます。したがって、shared dict
を使用してMemcachedをエミュレートすることは適切です。なぜなら、それらの原理と動作は同じだからです。
テスト駆動開発
次のステップは、実際に作業を開始することです。しかし、テスト駆動開発の考え方に基づいて、具体的なコードを書く前に最もシンプルなテストケースを構築しましょう。test::nginx
フレームワークは開始が難しいことで知られているため、resty
を使用した手動テストから始めます。
$ resty -e 'local memcached = require "resty.memcached"
local memc, err = memcached:new()
memc:set_timeout(1000) -- 1 sec
local ok, err = memc:connect("127.0.0.1", 11212)
local ok, err = memc:set("dog", 32)
if not ok then
ngx.say("failed to set dog: ", err)
return
end
local res, flags, err = memc:get("dog")
ngx.say("dog: ", res)'
このテストコードは、lua-rety-memcached
クライアントライブラリを使用してconnect
とset
操作を開始し、Memcachedサーバーがローカルマシンのポート11212
でリッスンしていると仮定しています。
これはうまく動作するように見えます。このコードをマシンで実行すると、予想通り、サービスが開始されていないためfailed to set dog: closed
のようなエラーが返されます。
この時点で、技術的解決策は明確です:stream
モジュールを使用してデータを受信および送信し、shared dict
を使用してデータを保存します。
要件の完了を測定するための指標は明確です:上記のコードを実行し、dog
の実際の値を出力します。
フレームワークの構築
では、何を待っているのでしょうか?コードを書き始めましょう!
私の習慣は、まず最小限の実行可能なコードフレームワークを構築し、その後徐々にコードを埋めていくことです。この方法の利点は、コーディングプロセス中に多くの小さな目標を設定でき、小さな目標を達成するたびにテストケースがポジティブなフィードバックを提供することです。
まず、NGINXの設定ファイルを設定します。stream
とshared dict
は事前に設定する必要があります。以下は私が設定した設定ファイルです。
stream {
lua_shared_dict memcached 100m;
lua_package_path 'lib/?.lua;;';
server {
listen 11212;
content_by_lua_block {
local m = require("resty.memcached.server")
m.run()
}
}
}
この設定ファイルにはいくつかの重要な情報が含まれています。
- 第一に、コードはNGINXの
stream
コンテキストで実行され、HTTP
コンテキストではなく、ポート11212
でリッスンしています。 - 第二に、
shared dict
の名前はmemcached
で、サイズは100M
であり、実行時に変更できません。 - さらに、コードは
lib/resty/memcached
ディレクトリにあり、ファイル名はserver.lua
で、エントリ関数はrun()
です。これはlua_package_path
とcontent_by_lua_block
から見つけることができます。
次に、コードフレームワークを構築します。自分で試してみて、それから私のフレームワークコードを見てみましょう。
local new_tab = require "table.new"
local str_sub = string.sub
local re_find = ngx.re.find
local mc_shdict = ngx.shared.memcached
local _M = { _VERSION = '0.01' }
local function parse_args(s, start)
end
function _M.get(tcpsock, keys)
end
function _M.set(tcpsock, res)
end
function _M.run()
local tcpsock = assert(ngx.req.socket(true))
while true do
tcpsock:settimeout(60000) -- 60 seconds
local data, err = tcpsock:receive("*l")
local command, args
if data then
local from, to, err = re_find(data, [[(\S+)]], "jo")
if from then
command = str_sub(data, from, to)
args = parse_args(data, to + 1)
end
end
if args then
local args_len = #args
if command == 'get' and args_len > 0 then
_M.get(tcpsock, args)
elseif command == "set" and args_len == 4 then
_M.set(tcpsock, args)
end
end
end
end
return _M
このコードスニペットは、エントリ関数run()
の主要なロジックを実装しています。例外処理はまだ行っておらず、依存関係parse_args
、get
、set
はすべて空の関数ですが、このフレームワークはすでにMemcachedサーバーのロジックを完全に表現しています。
コードの埋め込み
次に、コードが実行される順序に従って、これらの空の関数を実装していきましょう。
まず、Memcachedプロトコルのドキュメントに従ってMemcachedコマンドのパラメータを解析します。
local function parse_args(s, start)
local arr = {}
while true do
local from, to = re_find(s, [[\S+]], "jo", {pos = start})
if not from then
break
end
table.insert(arr, str_sub(s, from, to))
start = to + 1
end
return arr
end
私のアドバイスは、まず最も直感的なバージョンを実装し、パフォーマンスの最適化は考えないことです。結局のところ、完璧さよりも完成が重要であり、完成に基づいて段階的に最適化することが完璧に近づく唯一の方法です。
次に、get
関数を実装します。これは一度に複数のキーをクエリできるため、以下のコードではfor
ループを使用しています。
function _M.get(tcpsock, keys)
local reply = ""
for i = 1, #keys do
local key = keys[i]
local value, flags = mc_shdict:get(key)
if value then
local flags = flags or 0
reply = reply .. "VALUE" .. key .. " " .. flags .. " " .. #value .. "\r\n" .. value .. "\r\n"
end
end
reply = reply .. "END\r\n"
tcpsock:settimeout(1000) -- one second timeout
local bytes, err = tcpsock:send(reply)
end
ここでのコアコードは1行だけです:local value, flags = mc_shdict:get(key)
、つまりshared dict
からデータをクエリすることです。残りのコードは、Memcachedプロトコルに従って文字列を結合し、最終的にクライアントに送信します。
最後に、set
関数を見てみましょう。これは受信したパラメータをshared dict
APIの形式に変換し、データを保存し、エラーが発生した場合にはMemcachedのプロトコルに従って処理します。
function _M.set(tcpsock, res)
local reply = ""
local key = res[1]
local flags = res[2]
local exptime = res[3]
local bytes = res[4]
local value, err = tcpsock:receive(tonumber(bytes) + 2)
if str_sub(value, -2, -1) == "\r\n" then
local succ, err, forcible = mc_shdict:set(key, str_sub(value, 1, bytes), exptime, flags)
if succ then
reply = reply .. “STORED\r\n"
else
reply = reply .. "SERVER_ERROR " .. err .. “\r\n”
end
else
reply = reply .. "ERROR\r\n"
end
tcpsock:settimeout(1000) -- one second timeout
local bytes, err = tcpsock:send(reply)
end
さらに、上記の関数を埋めながら、ngx.log
を使用してテストケースでチェックおよびデバッグすることができます。残念ながら、OpenRestyにはブレークポイントデバッガーがないため、ngx.say
とngx.log
を使用してデバッグしています。これはまだ開拓待ちの時代です。
まとめ
このハンズオンプロジェクトはこれで終了です。最後に、質問を残します:上記のMemcachedサーバー実装コードを完全に実行し、テストケースを通過させることができますか?
今日の質問はおそらく多くの努力を必要としますが、これはまだ原始的なバージョンです。エラーハンドリング、パフォーマンス最適化、自動化テストはまだ改善されていません。
今日の説明や実践について疑問がある場合は、コメントを残して私たちと議論してください。また、この記事を同僚や友人と共有して、一緒に実践し、進歩しましょう。