Verarbeitung von Layer 4-Datenverkehr und Implementierung eines Memcached-Servers mit OpenResty
API7.ai
November 10, 2022
In einigen früheren Artikeln haben wir einige Lua-APIs zur Behandlung von Anfragen vorgestellt, die alle mit Layer 7 zusammenhängen. Darüber hinaus bietet OpenResty das Modul stream-lua-nginx-module
an, um den Datenverkehr von Layer 4 zu verarbeiten. Es stellt Anweisungen und APIs bereit, die im Wesentlichen mit dem lua-nginx-module
identisch sind.
Heute werden wir darüber sprechen, wie man einen Memcached-Server mit OpenResty implementiert, was nur etwa 100 Codezeilen erfordert. In dieser kleinen praktischen Übung werden wir viel von dem verwenden, was wir zuvor gelernt haben, und wir werden auch einige Inhalte aus den Kapiteln über Tests und Leistungsoptimierung einbringen.
Und wir sollten uns darüber im Klaren sein, dass der Punkt dieses Artikels nicht darin besteht, die Funktionen jeder Codezeile zu verstehen, sondern die Gesamtansicht zu verstehen, wie OpenResty ein Projekt von Grund auf entwickelt, aus der Perspektive von Anforderungen, Tests, Entwicklung usw.
Ursprüngliche Anforderungen und technische Lösungen
Wir wissen, dass HTTPS-Datenverkehr immer mehr zum Mainstream wird, aber einige ältere Browser unterstützen session tickets
nicht, daher müssen wir die Sitzungs-ID serverseitig speichern. Wenn der lokale Speicherplatz nicht ausreicht, benötigen wir einen Cluster zur Speicherung, und die Daten können verworfen werden, daher ist Memcached besser geeignet.
An diesem Punkt wäre die Einführung von Memcached die einfachste Lösung. In diesem Artikel werden wir jedoch aus den folgenden Gründen OpenResty verwenden, um ein Rad neu zu erfinden.
- Erstens würde die direkte Einführung von Memcached einen zusätzlichen Prozess einführen, was die Bereitstellungs- und Wartungskosten erhöht.
- Zweitens ist die Anforderung einfach genug, es werden nur
get
- undset
-Operationen benötigt, und die Unterstützung für Ablaufzeiten. - Drittens hat OpenResty ein
stream
-Modul, das diese Anforderung schnell umsetzen kann.
Da wir einen Memcached-Server implementieren möchten, müssen wir zuerst sein Protokoll verstehen. Das Memcached-Protokoll kann TCP und UDP unterstützen. Hier verwenden wir TCP. Unten ist das spezifische Protokoll der get
- und set
-Befehle.
Get
get value with key
Telnet-Befehl: get <key>*\r\n
Beispiel:
get key
VALUE key 0 4 data END
Set
Speichern Sie Schlüssel-Wert-Paare in Memcached
Telnet-Befehl:set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n
Beispiel:
set key 0 900 4 data
STORED
Wir müssen auch wissen, wie die "Fehlerbehandlung" des Memcached-Protokolls neben get
und set
implementiert ist. "Fehlerbehandlung" ist sehr wichtig für serverseitige Programme, und wir müssen Programme schreiben, die nicht nur normale Anfragen, sondern auch Ausnahmen behandeln. Zum Beispiel in einem Szenario wie dem folgenden:
- Memcached sendet eine Anfrage, die weder
get
nochset
ist, wie gehe ich damit um? - Welches Feedback gebe ich dem Memcached-Client, wenn auf der Serverseite ein Fehler auftritt?
Außerdem möchten wir eine mit Memcached kompatible Client-Anwendung schreiben. Auf diese Weise müssen Benutzer nicht zwischen der offiziellen Memcached-Version und der OpenResty-Implementierung unterscheiden.
Die folgende Abbildung aus der Memcached-Dokumentation beschreibt, was im Fehlerfall zurückgegeben werden sollte und das genaue Format, das Sie als Referenz verwenden können.
Lassen Sie uns nun die technische Lösung definieren. Wir wissen, dass OpenResty's shared dict
über worker
s hinweg verwendet werden kann und dass das Speichern von Daten in einem shared dict
dem Speichern in Memcached sehr ähnlich ist. Beide unterstützen get
- und set
-Operationen, und die Daten gehen verloren, wenn der Prozess neu gestartet wird. Daher ist es angemessen, ein shared dict
zu verwenden, um Memcached zu emulieren, da ihre Prinzipien und Verhaltensweisen gleich sind.
Testgetriebene Entwicklung
Der nächste Schritt besteht darin, mit der Arbeit zu beginnen. Basierend auf der Idee der testgetriebenen Entwicklung konstruieren wir jedoch den einfachsten Testfall, bevor wir den spezifischen Code schreiben. Anstatt das test::nginx
-Framework zu verwenden, das bekanntlich schwer zu starten ist, beginnen wir mit einem manuellen Test mit resty
.
$ resty -e 'local memcached = require "resty.memcached"
local memc, err = memcached:new()
memc:set_timeout(1000) -- 1 Sekunde
local ok, err = memc:connect("127.0.0.1", 11212)
local ok, err = memc:set("dog", 32)
if not ok then
ngx.say("failed to set dog: ", err)
return
end
local res, flags, err = memc:get("dog")
ngx.say("dog: ", res)'
Dieser Testcode verwendet die lua-rety-memcached
-Client-Bibliothek, um connect
- und set
-Operationen zu initiieren, und nimmt an, dass der Memcached-Server auf Port 11212
auf dem lokalen Rechner lauscht.
Es sieht so aus, als ob es gut funktionieren sollte. Sie können diesen Code auf Ihrem Rechner ausführen, und es wird, nicht überraschend, einen Fehler wie failed to set dog: closed
zurückgeben, da der Dienst zu diesem Zeitpunkt nicht gestartet ist.
An diesem Punkt ist Ihre technische Lösung klar: Verwenden Sie das stream
-Modul, um Daten zu empfangen und zu senden, und verwenden Sie das shared dict
, um sie zu speichern.
Das Maß für die Erfüllung der Anforderung ist klar: Führen Sie den obigen Code aus und geben Sie den tatsächlichen Wert von dog
aus.
Aufbau des Rahmens
Also, worauf warten Sie noch? Beginnen Sie mit dem Schreiben von Code!
Meine Gewohnheit ist es, zuerst einen minimalen lauffähigen Code-Rahmen zu erstellen und dann schrittweise den Code zu füllen. Der Vorteil dabei ist, dass Sie während des Codierungsprozesses viele kleine Ziele setzen können, und die Testfälle geben Ihnen positives Feedback, wenn Sie ein kleines Ziel erreichen.
Beginnen wir mit der Einrichtung der NGINX-Konfigurationsdatei, da stream
und shared dict
darin voreingestellt sein sollten. Hier ist die Konfigurationsdatei, die ich eingerichtet habe.
stream {
lua_shared_dict memcached 100m;
lua_package_path 'lib/?.lua;;';
server {
listen 11212;
content_by_lua_block {
local m = require("resty.memcached.server")
m.run()
}
}
}
Wie Sie sehen können, sind in dieser Konfigurationsdatei mehrere Schlüsselinformationen enthalten.
- Erstens läuft der Code im
stream
-Kontext von NGINX, nicht imHTTP
-Kontext, und lauscht auf Port11212
. - Zweitens ist der Name des
shared dict
memcached
, und die Größe ist100M
, die zur Laufzeit nicht geändert werden kann. - Darüber hinaus befindet sich der Code im Verzeichnis
lib/resty/memcached
, der Dateiname istserver.lua
, und die Einstiegsfunktion istrun()
, die Sie auslua_package_path
undcontent_by_lua_block
entnehmen können.
Als nächstes ist es an der Zeit, den Code-Rahmen zu erstellen. Sie können es selbst versuchen, und dann schauen wir uns meinen Rahmen-Code gemeinsam an.
local new_tab = require "table.new"
local str_sub = string.sub
local re_find = ngx.re.find
local mc_shdict = ngx.shared.memcached
local _M = { _VERSION = '0.01' }
local function parse_args(s, start)
end
function _M.get(tcpsock, keys)
end
function _M.set(tcpsock, res)
end
function _M.run()
local tcpsock = assert(ngx.req.socket(true))
while true do
tcpsock:settimeout(60000) -- 60 Sekunden
local data, err = tcpsock:receive("*l")
local command, args
if data then
local from, to, err = re_find(data, [[(\S+)]], "jo")
if from then
command = str_sub(data, from, to)
args = parse_args(data, to + 1)
end
end
if args then
local args_len = #args
if command == 'get' and args_len > 0 then
_M.get(tcpsock, args)
elseif command == "set" and args_len == 4 then
_M.set(tcpsock, args)
end
end
end
end
return _M
Dieser Codeausschnitt implementiert die Hauptlogik der Einstiegsfunktion run()
. Obwohl ich noch keine Fehlerbehandlung durchgeführt habe und die Abhängigkeiten parse_args
, get
und set
alle leere Funktionen sind, drückt dieser Rahmen bereits die gesamte Logik des Memcached-Servers aus.
Code füllen
Als nächstes implementieren wir diese leeren Funktionen in der Reihenfolge, in der der Code ausgeführt wird.
Zuerst können wir die Parameter des Memcached-Befehls gemäß der Memcached-Protokoll-Dokumentation parsen.
local function parse_args(s, start)
local arr = {}
while true do
local from, to = re_find(s, [[\S+]], "jo", {pos = start})
if not from then
break
end
table.insert(arr, str_sub(s, from, to))
start = to + 1
end
return arr
end
Mein Rat ist, zuerst eine Version zu implementieren, die am intuitivsten ist, ohne an Leistungsoptimierung zu denken. Schließlich ist die Fertigstellung immer wichtiger als Perfektion, und inkrementelle Optimierung basierend auf Fertigstellung ist der einzige Weg, sich der Perfektion zu nähern.
Als nächstes implementieren wir die get
-Funktion. Sie kann mehrere Schlüssel auf einmal abfragen, daher verwende ich in dem folgenden Code eine for
-Schleife.
function _M.get(tcpsock, keys)
local reply = ""
for i = 1, #keys do
local key = keys[i]
local value, flags = mc_shdict:get(key)
if value then
local flags = flags or 0
reply = reply .. "VALUE" .. key .. " " .. flags .. " " .. #value .. "\r\n" .. value .. "\r\n"
end
end
reply = reply .. "END\r\n"
tcpsock:settimeout(1000) -- ein Sekunde Timeout
local bytes, err = tcpsock:send(reply)
end
Hier gibt es nur eine Zeile Kerncode: local value, flags = mc_shdict:get(key)
, das heißt, die Daten aus dem shared dict
abfragen; der Rest des Codes folgt dem Memcached-Protokoll, um den String zu erstellen und schließlich an den Client zu senden.
Schließlich schauen wir uns die set
-Funktion an. Sie konvertiert die empfangenen Parameter in das shared dict
-API-Format, speichert die Daten und behandelt Fehler gemäß dem Memcached-Protokoll.
function _M.set(tcpsock, res)
local reply = ""
local key = res[1]
local flags = res[2]
local exptime = res[3]
local bytes = res[4]
local value, err = tcpsock:receive(tonumber(bytes) + 2)
if str_sub(value, -2, -1) == "\r\n" then
local succ, err, forcible = mc_shdict:set(key, str_sub(value, 1, bytes), exptime, flags)
if succ then
reply = reply .. “STORED\r\n"
else
reply = reply .. "SERVER_ERROR " .. err .. “\r\n”
end
else
reply = reply .. "ERROR\r\n"
end
tcpsock:settimeout(1000) -- ein Sekunde Timeout
local bytes, err = tcpsock:send(reply)
end
Darüber hinaus können Sie Testfälle verwenden, um mit ngx.log
zu überprüfen und zu debuggen, während Sie die obigen Funktionen füllen. Leider verwenden wir ngx.say
und ngx.log
zum Debuggen, da es in OpenResty keinen Breakpoint-Debugger gibt, was immer noch eine Pionierzeit ist, die auf weitere Erkundungen wartet.
Zusammenfassung
Dieses praktische Projekt endet nun, und schließlich möchte ich eine Frage hinterlassen: Könnten Sie den obigen Memcached-Server-Implementierungscode nehmen, ihn vollständig ausführen und den Testfall bestehen?
Die heutige Frage wird wahrscheinlich viel Aufwand erfordern, aber dies ist immer noch eine primitive Version. Es gibt keine Fehlerbehandlung, Leistungsoptimierung und automatisierten Tests, die später verbessert werden.
Wenn Sie Zweifel an der heutigen Erklärung oder Ihrer Praxis haben, sind Sie herzlich eingeladen, einen Kommentar zu hinterlassen und mit uns zu diskutieren. Sie sind auch eingeladen, diesen Artikel mit Ihren Kollegen und Freunden zu teilen, damit wir gemeinsam üben und Fortschritte machen können.