Handling Layer 4 of traffic and Implementing Memcached Server by OpenResty

API7.ai

November 10, 2022

OpenResty (NGINX + Lua)

Em alguns artigos anteriores, introduzimos algumas APIs Lua para lidar com solicitações, todas relacionadas à Camada 7. Além disso, o OpenResty fornece o módulo stream-lua-nginx-module para lidar com o tráfego da Camada 4. Ele oferece instruções e APIs que são basicamente as mesmas do lua-nginx-module.

Hoje, vamos falar sobre a implementação de um servidor Memcached com o OpenResty, que requer apenas cerca de 100 linhas de código. Neste pequeno exercício prático, usaremos muito do que aprendemos anteriormente e também introduziremos alguns conteúdos dos capítulos de teste e otimização de desempenho mais adiante.

E devemos estar cientes de que o objetivo deste artigo não é entender as funções de cada linha de código, mas compreender a visão completa de como o OpenResty desenvolve um projeto do zero, desde a perspectiva de requisitos, testes, desenvolvimento, etc.

Requisitos originais e soluções técnicas

Sabemos que o tráfego HTTPS está se tornando predominante, mas alguns navegadores mais antigos não suportam session tickets, então precisamos armazenar o ID da sessão no lado do servidor. Se o espaço de armazenamento local for insuficiente, precisamos de um cluster para armazenamento, e os dados podem ser descartados, então o Memcached é mais adequado.

Neste ponto, introduzir o Memcached deveria ser a solução mais direta. No entanto, neste artigo, escolheremos usar o OpenResty para construir uma solução personalizada pelos seguintes motivos.

  • Primeiro, introduzir o Memcached diretamente adicionaria um processo extra, aumentando os custos de implantação e manutenção.
  • Segundo, o requisito é simples o suficiente, exigindo apenas operações de get e set, e suporte a expiração.
  • Terceiro, o OpenResty possui um módulo stream, que pode implementar rapidamente esse requisito.

Como queremos implementar um servidor Memcached, precisamos primeiro entender seu protocolo. O protocolo Memcached pode suportar TCP e UDP. Aqui usaremos TCP. Abaixo está o protocolo específico para os comandos get e set.

Get
obter valor com chave
Comando Telnet: get <key>*\r\n

Exemplo:
get key
VALUE key 0 4 data END
Set
Salvar chave-valor no memcached
Comando Telnet:set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n

Exemplo:
set key 0 900 4 data
STORED

Também precisamos saber como o "tratamento de erros" do protocolo Memcached é implementado, além de get e set. O "tratamento de erros" é muito importante para programas do lado do servidor, e precisamos escrever programas que lidem não apenas com solicitações normais, mas também com exceções. Por exemplo, em um cenário como o seguinte:

  • O Memcached envia uma solicitação diferente de get ou set, como devo lidar com isso?
  • Que tipo de feedback devo dar ao cliente Memcached quando há um erro no lado do servidor?

Além disso, queremos escrever um aplicativo cliente compatível com Memcached. Dessa forma, os usuários não precisam distinguir entre a versão oficial do Memcached e a implementação do OpenResty.

A figura abaixo da documentação do Memcached descreve o que deve ser retornado em caso de erro e o formato exato, que você pode usar como referência.

formato de erro

Agora, vamos definir a solução técnica. Sabemos que o shared dict do OpenResty pode ser usado entre workers e que colocar dados em um shared dict é muito semelhante a colocá-los no Memcached. Ambos suportam operações de get e set, e os dados são perdidos quando o processo é reiniciado. Portanto, é apropriado usar um shared dict para emular o Memcached, pois seus princípios e comportamentos são os mesmos.

Desenvolvimento Orientado a Testes

O próximo passo é começar a trabalhar nisso. No entanto, com base na ideia de desenvolvimento orientado a testes, vamos construir o caso de teste mais simples antes de escrever o código específico. Em vez de usar o framework test::nginx, que é notoriamente difícil de começar, vamos começar com um teste manual usando o resty.

$ resty -e 'local memcached = require "resty.memcached"
    local memc, err = memcached:new()

    memc:set_timeout(1000) -- 1 segundo
    local ok, err = memc:connect("127.0.0.1", 11212)
    local ok, err = memc:set("dog", 32)
    if not ok then
        ngx.say("failed to set dog: ", err)
        return
    end

    local res, flags, err = memc:get("dog")
    ngx.say("dog: ", res)'

Este código de teste usa a biblioteca cliente lua-rety-memcached para iniciar operações de connect e set e assume que o servidor Memcached está escutando na porta 11212 na máquina local.

Parece que deveria funcionar bem. Você pode executar este código em sua máquina e, sem surpresa, ele retornará um erro como failed to set dog: closed, já que o serviço não foi iniciado neste momento.

Neste ponto, sua solução técnica está clara: use o módulo stream para receber e enviar dados e use o shared dict para armazená-los.

A métrica para medir a conclusão do requisito é clara: execute o código acima e imprima o valor real de dog.

Construindo o framework

Então, o que você está esperando? Comece a escrever o código!

Meu hábito é construir primeiro um framework de código mínimo executável e, em seguida, preencher gradualmente o código. A vantagem disso é que você pode definir muitas pequenas metas durante o processo de codificação, e os casos de teste lhe darão feedback positivo quando você atingir uma pequena meta.

Vamos começar configurando o arquivo de configuração do NGINX, já que stream e shared dict devem ser pré-configurados nele. Aqui está o arquivo de configuração que eu configurei.

stream {
    lua_shared_dict memcached 100m;
    lua_package_path 'lib/?.lua;;';
    server {
        listen 11212;
        content_by_lua_block {
            local m = require("resty.memcached.server")
            m.run()
        }
    }
}

Como você pode ver, várias informações-chave estão neste arquivo de configuração.

  • Primeiro, o código é executado no contexto stream do NGINX, não no contexto HTTP, e está escutando na porta 11212.
  • Segundo, o nome do shared dict é memcached, e o tamanho é 100M, que não pode ser alterado em tempo de execução.
  • Além disso, o código está localizado no diretório lib/resty/memcached, o nome do arquivo é server.lua, e a função de entrada é run(), que você pode encontrar em lua_package_path e content_by_lua_block.

Em seguida, é hora de construir o framework do código. Você pode tentar por conta própria, e então vamos ver meu framework de código juntos.

local new_tab = require "table.new"
local str_sub = string.sub
local re_find = ngx.re.find
local mc_shdict = ngx.shared.memcached

local _M = { _VERSION = '0.01' }

local function parse_args(s, start)
end

function _M.get(tcpsock, keys)
end

function _M.set(tcpsock, res)
end

function _M.run()
    local tcpsock = assert(ngx.req.socket(true))

    while true do
        tcpsock:settimeout(60000) -- 60 segundos
        local data, err = tcpsock:receive("*l")

        local command, args
        if data then
            local from, to, err = re_find(data, [[(\S+)]], "jo")
            if from then
                command = str_sub(data, from, to)
                args = parse_args(data, to + 1)
            end
        end

        if args then
            local args_len = #args
            if command == 'get' and args_len > 0 then
                _M.get(tcpsock, args)
            elseif command == "set" and args_len == 4 then
                _M.set(tcpsock, args)
            end
        end
    end
end

return _M

Este trecho de código implementa a lógica principal da função de entrada run(). Embora eu não tenha feito nenhum tratamento de exceções e as dependências parse_args, get e set sejam todas funções vazias, este framework já expressa completamente a lógica do servidor Memcached.

Preenchendo o código

Em seguida, vamos implementar essas funções vazias na ordem em que o código é executado.

Primeiro, podemos analisar os parâmetros do comando Memcached de acordo com a documentação do protocolo Memcached.

local function parse_args(s, start)
    local arr = {}

    while true do
        local from, to = re_find(s, [[\S+]], "jo", {pos = start})
        if not from then
            break
        end

        table.insert(arr, str_sub(s, from, to))

        start = to + 1
    end

    return arr
end

Meu conselho é implementar primeiro uma versão mais intuitiva, sem pensar em nenhuma otimização de desempenho. Afinal, a conclusão é sempre mais importante que a perfeição, e a otimização incremental com base na conclusão é a única maneira de se aproximar da perfeição.

Em seguida, vamos implementar a função get. Ela pode consultar várias chaves de uma vez, então uso um loop for no código a seguir.

function _M.get(tcpsock, keys)
    local reply = ""

    for i = 1, #keys do
        local key = keys[i]
        local value, flags = mc_shdict:get(key)
        if value then
            local flags  = flags or 0
            reply = reply .. "VALUE" .. key .. " " .. flags .. " " .. #value .. "\r\n" .. value .. "\r\n"
        end
    end
    reply = reply ..  "END\r\n"

    tcpsock:settimeout(1000)  -- timeout de um segundo
    local bytes, err = tcpsock:send(reply)
end

Há apenas uma linha de código central aqui: local value, flags = mc_shdict:get(key), ou seja, consultar os dados do shared dict; quanto ao resto do código, ele segue o protocolo Memcached para montar a string e finalmente enviá-la ao cliente.

Por fim, vamos ver a função set. Ela converte os parâmetros recebidos no formato da API do shared dict, armazena os dados e, em caso de erros, os trata de acordo com o protocolo do Memcached.

function _M.set(tcpsock, res)
    local reply =  ""

    local key = res[1]
    local flags = res[2]
    local exptime = res[3]
    local bytes = res[4]

    local value, err = tcpsock:receive(tonumber(bytes) + 2)

    if str_sub(value, -2, -1) == "\r\n" then
        local succ, err, forcible = mc_shdict:set(key, str_sub(value, 1, bytes), exptime, flags)
        if succ then
            reply = reply .. “STORED\r\n"
        else
            reply = reply .. "SERVER_ERROR " .. err .. “\r\n”
        end
    else
        reply = reply .. "ERROR\r\n"
    end

    tcpsock:settimeout(1000)  -- timeout de um segundo
    local bytes, err = tcpsock:send(reply)
end

Além disso, você pode usar casos de teste para verificar e depurar com ngx.log enquanto preenche as funções acima. Infelizmente, estamos usando ngx.say e ngx.log para depurar, já que não há um depurador com pontos de interrupção no OpenResty, que ainda está em uma era rudimentar esperando por mais exploração.

Resumo

Este projeto prático está terminando agora e, finalmente, gostaria de deixar uma pergunta: Você poderia pegar o código de implementação do servidor Memcached acima, executá-lo completamente e passar no caso de teste?

A pergunta de hoje provavelmente exigirá muito esforço, mas esta ainda é uma versão primitiva. Não há tratamento de erros, otimização de desempenho e testes automatizados, que serão melhorados posteriormente.

Se você tiver alguma dúvida sobre a explicação de hoje ou sua prática, sinta-se à vontade para deixar um comentário e discutir conosco. Você também pode compartilhar este artigo com seus colegas e amigos para que possamos praticar e progredir juntos.