Verarbeitung von Layer 4-Datenverkehr und Implementierung eines Memcached-Servers mit OpenResty

API7.ai

November 10, 2022

OpenResty (NGINX + Lua)

In einigen früheren Artikeln haben wir einige Lua-APIs zur Behandlung von Anfragen vorgestellt, die alle mit Layer 7 zusammenhängen. Darüber hinaus bietet OpenResty das Modul stream-lua-nginx-module an, um den Datenverkehr von Layer 4 zu verarbeiten. Es stellt Anweisungen und APIs bereit, die im Wesentlichen mit dem lua-nginx-module identisch sind.

Heute werden wir darüber sprechen, wie man einen Memcached-Server mit OpenResty implementiert, was nur etwa 100 Codezeilen erfordert. In dieser kleinen praktischen Übung werden wir viel von dem verwenden, was wir zuvor gelernt haben, und wir werden auch einige Inhalte aus den Kapiteln über Tests und Leistungsoptimierung einbringen.

Und wir sollten uns darüber im Klaren sein, dass der Punkt dieses Artikels nicht darin besteht, die Funktionen jeder Codezeile zu verstehen, sondern die Gesamtansicht zu verstehen, wie OpenResty ein Projekt von Grund auf entwickelt, aus der Perspektive von Anforderungen, Tests, Entwicklung usw.

Ursprüngliche Anforderungen und technische Lösungen

Wir wissen, dass HTTPS-Datenverkehr immer mehr zum Mainstream wird, aber einige ältere Browser unterstützen session tickets nicht, daher müssen wir die Sitzungs-ID serverseitig speichern. Wenn der lokale Speicherplatz nicht ausreicht, benötigen wir einen Cluster zur Speicherung, und die Daten können verworfen werden, daher ist Memcached besser geeignet.

An diesem Punkt wäre die Einführung von Memcached die einfachste Lösung. In diesem Artikel werden wir jedoch aus den folgenden Gründen OpenResty verwenden, um ein Rad neu zu erfinden.

  • Erstens würde die direkte Einführung von Memcached einen zusätzlichen Prozess einführen, was die Bereitstellungs- und Wartungskosten erhöht.
  • Zweitens ist die Anforderung einfach genug, es werden nur get- und set-Operationen benötigt, und die Unterstützung für Ablaufzeiten.
  • Drittens hat OpenResty ein stream-Modul, das diese Anforderung schnell umsetzen kann.

Da wir einen Memcached-Server implementieren möchten, müssen wir zuerst sein Protokoll verstehen. Das Memcached-Protokoll kann TCP und UDP unterstützen. Hier verwenden wir TCP. Unten ist das spezifische Protokoll der get- und set-Befehle.

Get
get value with key
Telnet-Befehl: get <key>*\r\n

Beispiel:
get key
VALUE key 0 4 data END
Set
Speichern Sie Schlüssel-Wert-Paare in Memcached
Telnet-Befehl:set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n

Beispiel:
set key 0 900 4 data
STORED

Wir müssen auch wissen, wie die "Fehlerbehandlung" des Memcached-Protokolls neben get und set implementiert ist. "Fehlerbehandlung" ist sehr wichtig für serverseitige Programme, und wir müssen Programme schreiben, die nicht nur normale Anfragen, sondern auch Ausnahmen behandeln. Zum Beispiel in einem Szenario wie dem folgenden:

  • Memcached sendet eine Anfrage, die weder get noch set ist, wie gehe ich damit um?
  • Welches Feedback gebe ich dem Memcached-Client, wenn auf der Serverseite ein Fehler auftritt?

Außerdem möchten wir eine mit Memcached kompatible Client-Anwendung schreiben. Auf diese Weise müssen Benutzer nicht zwischen der offiziellen Memcached-Version und der OpenResty-Implementierung unterscheiden.

Die folgende Abbildung aus der Memcached-Dokumentation beschreibt, was im Fehlerfall zurückgegeben werden sollte und das genaue Format, das Sie als Referenz verwenden können.

Fehlerformat

Lassen Sie uns nun die technische Lösung definieren. Wir wissen, dass OpenResty's shared dict über workers hinweg verwendet werden kann und dass das Speichern von Daten in einem shared dict dem Speichern in Memcached sehr ähnlich ist. Beide unterstützen get- und set-Operationen, und die Daten gehen verloren, wenn der Prozess neu gestartet wird. Daher ist es angemessen, ein shared dict zu verwenden, um Memcached zu emulieren, da ihre Prinzipien und Verhaltensweisen gleich sind.

Testgetriebene Entwicklung

Der nächste Schritt besteht darin, mit der Arbeit zu beginnen. Basierend auf der Idee der testgetriebenen Entwicklung konstruieren wir jedoch den einfachsten Testfall, bevor wir den spezifischen Code schreiben. Anstatt das test::nginx-Framework zu verwenden, das bekanntlich schwer zu starten ist, beginnen wir mit einem manuellen Test mit resty.

$ resty -e 'local memcached = require "resty.memcached"
    local memc, err = memcached:new()

    memc:set_timeout(1000) -- 1 Sekunde
    local ok, err = memc:connect("127.0.0.1", 11212)
    local ok, err = memc:set("dog", 32)
    if not ok then
        ngx.say("failed to set dog: ", err)
        return
    end

    local res, flags, err = memc:get("dog")
    ngx.say("dog: ", res)'

Dieser Testcode verwendet die lua-rety-memcached-Client-Bibliothek, um connect- und set-Operationen zu initiieren, und nimmt an, dass der Memcached-Server auf Port 11212 auf dem lokalen Rechner lauscht.

Es sieht so aus, als ob es gut funktionieren sollte. Sie können diesen Code auf Ihrem Rechner ausführen, und es wird, nicht überraschend, einen Fehler wie failed to set dog: closed zurückgeben, da der Dienst zu diesem Zeitpunkt nicht gestartet ist.

An diesem Punkt ist Ihre technische Lösung klar: Verwenden Sie das stream-Modul, um Daten zu empfangen und zu senden, und verwenden Sie das shared dict, um sie zu speichern.

Das Maß für die Erfüllung der Anforderung ist klar: Führen Sie den obigen Code aus und geben Sie den tatsächlichen Wert von dog aus.

Aufbau des Rahmens

Also, worauf warten Sie noch? Beginnen Sie mit dem Schreiben von Code!

Meine Gewohnheit ist es, zuerst einen minimalen lauffähigen Code-Rahmen zu erstellen und dann schrittweise den Code zu füllen. Der Vorteil dabei ist, dass Sie während des Codierungsprozesses viele kleine Ziele setzen können, und die Testfälle geben Ihnen positives Feedback, wenn Sie ein kleines Ziel erreichen.

Beginnen wir mit der Einrichtung der NGINX-Konfigurationsdatei, da stream und shared dict darin voreingestellt sein sollten. Hier ist die Konfigurationsdatei, die ich eingerichtet habe.

stream {
    lua_shared_dict memcached 100m;
    lua_package_path 'lib/?.lua;;';
    server {
        listen 11212;
        content_by_lua_block {
            local m = require("resty.memcached.server")
            m.run()
        }
    }
}

Wie Sie sehen können, sind in dieser Konfigurationsdatei mehrere Schlüsselinformationen enthalten.

  • Erstens läuft der Code im stream-Kontext von NGINX, nicht im HTTP-Kontext, und lauscht auf Port 11212.
  • Zweitens ist der Name des shared dict memcached, und die Größe ist 100M, die zur Laufzeit nicht geändert werden kann.
  • Darüber hinaus befindet sich der Code im Verzeichnis lib/resty/memcached, der Dateiname ist server.lua, und die Einstiegsfunktion ist run(), die Sie aus lua_package_path und content_by_lua_block entnehmen können.

Als nächstes ist es an der Zeit, den Code-Rahmen zu erstellen. Sie können es selbst versuchen, und dann schauen wir uns meinen Rahmen-Code gemeinsam an.

local new_tab = require "table.new"
local str_sub = string.sub
local re_find = ngx.re.find
local mc_shdict = ngx.shared.memcached

local _M = { _VERSION = '0.01' }

local function parse_args(s, start)
end

function _M.get(tcpsock, keys)
end

function _M.set(tcpsock, res)
end

function _M.run()
    local tcpsock = assert(ngx.req.socket(true))

    while true do
        tcpsock:settimeout(60000) -- 60 Sekunden
        local data, err = tcpsock:receive("*l")

        local command, args
        if data then
            local from, to, err = re_find(data, [[(\S+)]], "jo")
            if from then
                command = str_sub(data, from, to)
                args = parse_args(data, to + 1)
            end
        end

        if args then
            local args_len = #args
            if command == 'get' and args_len > 0 then
                _M.get(tcpsock, args)
            elseif command == "set" and args_len == 4 then
                _M.set(tcpsock, args)
            end
        end
    end
end

return _M

Dieser Codeausschnitt implementiert die Hauptlogik der Einstiegsfunktion run(). Obwohl ich noch keine Fehlerbehandlung durchgeführt habe und die Abhängigkeiten parse_args, get und set alle leere Funktionen sind, drückt dieser Rahmen bereits die gesamte Logik des Memcached-Servers aus.

Code füllen

Als nächstes implementieren wir diese leeren Funktionen in der Reihenfolge, in der der Code ausgeführt wird.

Zuerst können wir die Parameter des Memcached-Befehls gemäß der Memcached-Protokoll-Dokumentation parsen.

local function parse_args(s, start)
    local arr = {}

    while true do
        local from, to = re_find(s, [[\S+]], "jo", {pos = start})
        if not from then
            break
        end

        table.insert(arr, str_sub(s, from, to))

        start = to + 1
    end

    return arr
end

Mein Rat ist, zuerst eine Version zu implementieren, die am intuitivsten ist, ohne an Leistungsoptimierung zu denken. Schließlich ist die Fertigstellung immer wichtiger als Perfektion, und inkrementelle Optimierung basierend auf Fertigstellung ist der einzige Weg, sich der Perfektion zu nähern.

Als nächstes implementieren wir die get-Funktion. Sie kann mehrere Schlüssel auf einmal abfragen, daher verwende ich in dem folgenden Code eine for-Schleife.

function _M.get(tcpsock, keys)
    local reply = ""

    for i = 1, #keys do
        local key = keys[i]
        local value, flags = mc_shdict:get(key)
        if value then
            local flags  = flags or 0
            reply = reply .. "VALUE" .. key .. " " .. flags .. " " .. #value .. "\r\n" .. value .. "\r\n"
        end
    end
    reply = reply ..  "END\r\n"

    tcpsock:settimeout(1000)  -- ein Sekunde Timeout
    local bytes, err = tcpsock:send(reply)
end

Hier gibt es nur eine Zeile Kerncode: local value, flags = mc_shdict:get(key), das heißt, die Daten aus dem shared dict abfragen; der Rest des Codes folgt dem Memcached-Protokoll, um den String zu erstellen und schließlich an den Client zu senden.

Schließlich schauen wir uns die set-Funktion an. Sie konvertiert die empfangenen Parameter in das shared dict-API-Format, speichert die Daten und behandelt Fehler gemäß dem Memcached-Protokoll.

function _M.set(tcpsock, res)
    local reply =  ""

    local key = res[1]
    local flags = res[2]
    local exptime = res[3]
    local bytes = res[4]

    local value, err = tcpsock:receive(tonumber(bytes) + 2)

    if str_sub(value, -2, -1) == "\r\n" then
        local succ, err, forcible = mc_shdict:set(key, str_sub(value, 1, bytes), exptime, flags)
        if succ then
            reply = reply .. “STORED\r\n"
        else
            reply = reply .. "SERVER_ERROR " .. err .. “\r\n”
        end
    else
        reply = reply .. "ERROR\r\n"
    end

    tcpsock:settimeout(1000)  -- ein Sekunde Timeout
    local bytes, err = tcpsock:send(reply)
end

Darüber hinaus können Sie Testfälle verwenden, um mit ngx.log zu überprüfen und zu debuggen, während Sie die obigen Funktionen füllen. Leider verwenden wir ngx.say und ngx.log zum Debuggen, da es in OpenResty keinen Breakpoint-Debugger gibt, was immer noch eine Pionierzeit ist, die auf weitere Erkundungen wartet.

Zusammenfassung

Dieses praktische Projekt endet nun, und schließlich möchte ich eine Frage hinterlassen: Könnten Sie den obigen Memcached-Server-Implementierungscode nehmen, ihn vollständig ausführen und den Testfall bestehen?

Die heutige Frage wird wahrscheinlich viel Aufwand erfordern, aber dies ist immer noch eine primitive Version. Es gibt keine Fehlerbehandlung, Leistungsoptimierung und automatisierten Tests, die später verbessert werden.

Wenn Sie Zweifel an der heutigen Erklärung oder Ihrer Praxis haben, sind Sie herzlich eingeladen, einen Kommentar zu hinterlassen und mit uns zu diskutieren. Sie sind auch eingeladen, diesen Artikel mit Ihren Kollegen und Freunden zu teilen, damit wir gemeinsam üben und Fortschritte machen können.