Layer 4 트래픽 처리 및 OpenResty를 통한 Memcached 서버 구현
API7.ai
November 10, 2022
이전 몇 편의 글에서 우리는 Layer 7과 관련된 요청을 처리하기 위한 몇 가지 Lua API를 소개했습니다. 또한 OpenResty는 Layer 4의 트래픽을 처리하기 위해 stream-lua-nginx-module
모듈을 제공합니다. 이 모듈은 lua-nginx-module
과 기본적으로 동일한 지시어와 API를 제공합니다.
오늘은 OpenResty를 사용하여 약 100줄의 코드로 Memcached 서버를 구현하는 방법에 대해 이야기하겠습니다. 이 작은 실습에서는 이전에 배운 많은 내용을 사용할 것이며, 나중에 테스트 및 성능 최적화 챕터의 일부 내용도 포함할 것입니다.
그리고 이 글의 요점은 모든 코드의 기능을 이해하는 것이 아니라, 요구사항, 테스트, 개발 등의 관점에서 OpenResty가 프로젝트를 처음부터 어떻게 개발하는지 전체적인 그림을 이해하는 것임을 명확히 해야 합니다.
원래 요구사항과 기술적 해결책
우리는 HTTPS 트래픽이 주류가 되고 있지만, 일부 오래된 브라우저는 session tickets
를 지원하지 않기 때문에 서버 측에 세션 ID를 저장해야 한다는 것을 알고 있습니다. 로컬 저장 공간이 충분하지 않다면 클러스터를 사용하여 저장해야 하며, 데이터는 버려질 수 있으므로 Memcached가 더 적합합니다.
이 시점에서 Memcached를 도입하는 것이 가장 직관적인 해결책일 것입니다. 그러나 이 글에서는 다음과 같은 이유로 OpenResty를 사용하여 바퀴를 다시 발명하기로 선택했습니다.
- 첫째, Memcached를 직접 도입하면 추가 프로세스가 도입되어 배포 및 유지 관리 비용이 증가합니다.
- 둘째, 요구사항이 매우 간단하여
get
및set
작업만 필요하며, 만료를 지원해야 합니다. - 셋째, OpenResty에는
stream
모듈이 있어 이 요구사항을 빠르게 구현할 수 있습니다.
Memcached 서버를 구현하려면 먼저 그 프로토콜을 이해해야 합니다. Memcached 프로토콜은 TCP와 UDP를 지원합니다. 여기서는 TCP를 사용합니다. 아래는 get
및 set
명령의 구체적인 프로토콜입니다.
Get
get value with key
Telnet command: get <key>*\r\n
Example:
get key
VALUE key 0 4 data END
Set
Save key-value to memcached
Telnet command:set <key> <flags> <exptime> <bytes> [noreply]\r\n<value>\r\n
Example:
set key 0 900 4 data
STORED
또한 get
및 set
외에 Memcached 프로토콜의 "오류 처리"가 어떻게 구현되는지 알아야 합니다. "오류 처리"는 서버 측 프로그램에서 매우 중요하며, 우리는 정상적인 요청뿐만 아니라 예외도 처리할 수 있는 프로그램을 작성해야 합니다. 예를 들어, 다음과 같은 시나리오에서:
- Memcached가
get
또는set
이외의 요청을 보내면 어떻게 처리할까요? - 서버 측에서 오류가 발생했을 때 Memcached 클라이언트에게 어떤 피드백을 제공해야 할까요?
또한, 우리는 Memcached와 호환되는 클라이언트 애플리케이션을 작성하고 싶습니다. 이렇게 하면 사용자는 공식 Memcached 버전과 OpenResty 구현을 구분할 필요가 없습니다.
Memcached 문서의 다음 그림은 오류 발생 시 반환해야 할 내용과 정확한 형식을 설명하며, 이를 참조할 수 있습니다.
이제 기술적 해결책을 정의해 보겠습니다. 우리는 OpenResty의 shared dict
가 worker
간에 사용될 수 있으며, shared dict
에 데이터를 넣는 것이 Memcached에 데이터를 넣는 것과 매우 유사하다는 것을 알고 있습니다. 둘 다 get
및 set
작업을 지원하며, 프로세스가 재시작되면 데이터가 손실됩니다. 따라서 shared dict
를 사용하여 Memcached를 에뮬레이트하는 것이 적절합니다. 왜냐하면 그들의 원리와 동작이 동일하기 때문입니다.
테스트 주도 개발
다음 단계는 작업을 시작하는 것입니다. 그러나 테스트 주도 개발의 아이디어에 기반하여 구체적인 코드를 작성하기 전에 가장 간단한 테스트 케이스를 구성해 보겠습니다. 시작하기 어려운 test::nginx
프레임워크를 사용하는 대신, resty
를 사용하여 수동 테스트를 시작해 보겠습니다.
$ resty -e 'local memcached = require "resty.memcached"
local memc, err = memcached:new()
memc:set_timeout(1000) -- 1 sec
local ok, err = memc:connect("127.0.0.1", 11212)
local ok, err = memc:set("dog", 32)
if not ok then
ngx.say("failed to set dog: ", err)
return
end
local res, flags, err = memc:get("dog")
ngx.say("dog: ", res)'
이 테스트 코드는 lua-rety-memcached
클라이언트 라이브러리를 사용하여 connect
및 set
작업을 시작하고, Memcached 서버가 로컬 머신의 11212
포트에서 수신 대기한다고 가정합니다.
이 코드는 잘 작동할 것처럼 보입니다. 이 코드를 여러분의 머신에서 실행하면, 서비스가 아직 시작되지 않았기 때문에 failed to set dog: closed
와 같은 오류가 반환될 것입니다.
이 시점에서 여러분의 기술적 해결책은 명확합니다: stream
모듈을 사용하여 데이터를 수신 및 전송하고, shared dict
를 사용하여 데이터를 저장합니다.
요구사항의 완료를 측정하는 지표는 명확합니다: 위의 코드를 실행하고 dog
의 실제 값을 출력합니다.
프레임워크 구축
그럼 무엇을 기다리고 있나요? 코드 작성을 시작하세요!
저의 습관은 최소한의 실행 가능한 코드 프레임워크를 먼저 구축한 다음 점차 코드를 채워 나가는 것입니다. 이 방법의 장점은 코딩 과정에서 많은 작은 목표를 설정할 수 있고, 작은 목표를 달성할 때 테스트 케이스가 긍정적인 피드백을 제공한다는 것입니다.
먼저 NGINX 설정 파일을 설정해 보겠습니다. stream
과 shared dict
는 미리 설정되어야 합니다. 다음은 제가 설정한 설정 파일입니다.
stream {
lua_shared_dict memcached 100m;
lua_package_path 'lib/?.lua;;';
server {
listen 11212;
content_by_lua_block {
local m = require("resty.memcached.server")
m.run()
}
}
}
보시다시피, 이 설정 파일에는 몇 가지 중요한 정보가 포함되어 있습니다.
- 첫째, 코드는 NGINX의
stream
컨텍스트에서 실행되며,HTTP
컨텍스트가 아니며,11212
포트에서 수신 대기합니다. - 둘째,
shared dict
의 이름은memcached
이며, 크기는100M
이며, 런타임에 변경할 수 없습니다. - 또한, 코드는
lib/resty/memcached
디렉토리에 위치하며, 파일 이름은server.lua
이고, 진입 함수는run()
입니다. 이는lua_package_path
와content_by_lua_block
에서 찾을 수 있습니다.
다음으로 코드 프레임워크를 구축할 차례입니다. 여러분도 직접 시도해 보시고, 그런 다음 제 프레임워크 코드를 함께 살펴보겠습니다.
local new_tab = require "table.new"
local str_sub = string.sub
local re_find = ngx.re.find
local mc_shdict = ngx.shared.memcached
local _M = { _VERSION = '0.01' }
local function parse_args(s, start)
end
function _M.get(tcpsock, keys)
end
function _M.set(tcpsock, res)
end
function _M.run()
local tcpsock = assert(ngx.req.socket(true))
while true do
tcpsock:settimeout(60000) -- 60 seconds
local data, err = tcpsock:receive("*l")
local command, args
if data then
local from, to, err = re_find(data, [[(\S+)]], "jo")
if from then
command = str_sub(data, from, to)
args = parse_args(data, to + 1)
end
end
if args then
local args_len = #args
if command == 'get' and args_len > 0 then
_M.get(tcpsock, args)
elseif command == "set" and args_len == 4 then
_M.set(tcpsock, args)
end
end
end
end
return _M
이 코드 조각은 진입 함수 run()
의 주요 로직을 구현합니다. 아직 예외 처리를 하지 않았고, 의존성 parse_args
, get
, set
이 모두 빈 함수이지만, 이 프레임워크는 이미 Memcached 서버의 로직을 완전히 표현하고 있습니다.
코드 채우기
다음으로, 코드가 실행되는 순서대로 이러한 빈 함수를 구현해 보겠습니다.
먼저, Memcached 프로토콜 문서에 따라 Memcached 명령의 매개변수를 파싱할 수 있습니다.
local function parse_args(s, start)
local arr = {}
while true do
local from, to = re_find(s, [[\S+]], "jo", {pos = start})
if not from then
break
end
table.insert(arr, str_sub(s, from, to))
start = to + 1
end
return arr
end
제 조언은 가장 직관적인 버전을 먼저 구현하는 것입니다. 성능 최적화를 고려하지 않고 말이죠. 결국 완성은 완벽보다 항상 더 중요하며, 완성을 기반으로 점진적인 최적화를 하는 것이 완벽에 가까워지는 유일한 방법입니다.
다음으로 get
함수를 구현해 보겠습니다. 이 함수는 한 번에 여러 키를 쿼리할 수 있으므로, 다음 코드에서 for
루프를 사용합니다.
function _M.get(tcpsock, keys)
local reply = ""
for i = 1, #keys do
local key = keys[i]
local value, flags = mc_shdict:get(key)
if value then
local flags = flags or 0
reply = reply .. "VALUE" .. key .. " " .. flags .. " " .. #value .. "\r\n" .. value .. "\r\n"
end
end
reply = reply .. "END\r\n"
tcpsock:settimeout(1000) -- one second timeout
local bytes, err = tcpsock:send(reply)
end
여기서는 단 한 줄의 핵심 코드가 있습니다: local value, flags = mc_shdict:get(key)
, 즉 shared dict
에서 데이터를 쿼리하는 것입니다. 나머지 코드는 Memcached 프로토콜에 따라 문자열을 조합하고 최종적으로 클라이언트에게 전송하는 것입니다.
마지막으로 set
함수를 살펴보겠습니다. 이 함수는 수신된 매개변수를 shared dict
API 형식으로 변환하고, 데이터를 저장하며, 오류가 발생하면 Memcached 프로토콜에 따라 처리합니다.
function _M.set(tcpsock, res)
local reply = ""
local key = res[1]
local flags = res[2]
local exptime = res[3]
local bytes = res[4]
local value, err = tcpsock:receive(tonumber(bytes) + 2)
if str_sub(value, -2, -1) == "\r\n" then
local succ, err, forcible = mc_shdict:set(key, str_sub(value, 1, bytes), exptime, flags)
if succ then
reply = reply .. “STORED\r\n"
else
reply = reply .. "SERVER_ERROR " .. err .. “\r\n”
end
else
reply = reply .. "ERROR\r\n"
end
tcpsock:settimeout(1000) -- one second timeout
local bytes, err = tcpsock:send(reply)
end
또한, 위의 함수를 채우는 동안 ngx.log
를 사용하여 테스트 케이스를 확인하고 디버깅할 수 있습니다. 안타깝게도 OpenResty에는 중단점 디버거가 없기 때문에 ngx.say
와 ngx.log
를 사용하여 디버깅해야 합니다. 이는 아직 더 탐구할 여지가 있는 원시적인 시대입니다.
요약
이 실습 프로젝트는 이제 끝나가고 있습니다. 마지막으로 질문을 남기겠습니다: 위의 Memcached 서버 구현 코드를 완전히 실행하고 테스트 케이스를 통과할 수 있나요?
오늘의 질문은 많은 노력이 필요할 것입니다. 그러나 이것은 여전히 원시적인 버전입니다. 오류 처리, 성능 최적화, 자동화 테스트가 없으며, 이는 나중에 개선될 것입니다.
오늘의 설명이나 실습에 대해 궁금한 점이 있다면, 언제든지 댓글을 남겨 우리와 함께 논의해 주세요. 또한 이 글을 동료나 친구들과 공유하여 함께 실습하고 발전할 수 있도록 해주세요.