第3部:OpenRestyを使用してマイクロサービスAPIゲートウェイを構築する方法

API7.ai

February 3, 2023

OpenResty (NGINX + Lua)

この記事では、マイクロサービスAPIゲートウェイの構築が完了します。これまでに選択したコンポーネントを最小限の例で組み合わせ、設計した青図に従って実行してみましょう!

NGINXの設定と初期化

APIゲートウェイはトラフィックのエントリーポイントを処理するために使用されるため、まずnginx.confに簡単な設定を行い、すべてのトラフィックがゲートウェイのLuaコードを通じて処理されるようにします。

server {
    listen 9080;

    init_worker_by_lua_block {
        apisix.http_init_worker()
    }

    location / {
        access_by_lua_block {
            apisix.http_access_phase()
        }
        header_filter_by_lua_block {
            apisix.http_header_filter_phase()
        }
        body_filter_by_lua_block {
            apisix.http_body_filter_phase()
        }
        log_by_lua_block {
            apisix.http_log_phase()
        }
    }
}

ここでは、オープンソースのAPIゲートウェイApache APISIXを例として使用しているため、上記のコード例にはapisixというキーワードが含まれています。この例では、ポート9080をリッスンし、location /でこのポートへのすべてのリクエストをインターセプトし、accessrewriteheader filterbody filterlogフェーズを通じて処理し、各フェーズで対応するプラグイン関数を呼び出します。rewriteフェーズはapisix.http_access_phase関数に組み込まれています。

システムの初期化はinit_workerフェーズで行われ、設定パラメータの読み取り、etcdへのディレクトリの事前設定、etcdからのプラグインリストの取得、プラグインの優先順位によるソートなどが含まれます。ここではコードの主要部分をリストアップして説明しましたが、より完全な初期化関数はGitHubで確認できます。

function _M.http_init_worker()
    -- ルーティング、サービス、プラグインの初期化 - 最も重要な3つの部分
    router.init_worker()
    require("apisix.http.service").init_worker()
    require("apisix.plugins.ext-plugin.init").init_worker()
end

このコードからわかるように、ルーターとプラグインの初期化は少し複雑で、主に設定パラメータの読み取りとそれに応じた選択が含まれます。etcdからのデータ読み取りが含まれるため、init_workerフェーズでcosocketを使用できない制限を回避するためにngx.timerを使用しています。この部分に興味がある場合は、ソースコードを読んで理解を深めることをお勧めします。

ルートのマッチング

accessフェーズの最初に、リクエストに含まれるurihostargscookiesなどに基づいて、設定されたルーティングルールにマッチするルートを見つける必要があります。

router.router_http.match(api_ctx)

公開されている唯一のコードは上記の行で、api_ctxにはリクエストのurihostargscookie情報が格納されています。マッチ関数の具体的な実装では、前述のlua-resty-radixtreeを使用しています。ルートがマッチしない場合、リクエストに対応するアップストリームが存在しないため、404を返します。

local router = require("resty.radixtree")

local match_opts = {}

function _M.match(api_ctx)
    -- リクエストのパラメータをctxから取得し、ルートの判断条件として使用
    match_opts.method = api_ctx.var.method
    match_opts.host = api_ctx.var.host
    match_opts.remote_addr = api_ctx.var.remote_addr
    match_opts.vars = api_ctx.var
    -- ルートの判断関数を呼び出す
    local ok = uri_router:dispatch(api_ctx.var.uri, match_opts, api_ctx)
    -- ルートがマッチしない場合、404を返す
    if not ok then
        core.log.info("not find any matched route")
        return core.response.exit(404)
    end

    return true
end

プラグインのロード

ルートがヒットした場合、プラグインのフィルタリングとロードのステップに進みます。これはAPIゲートウェイの核心部分です。以下のコードから始めましょう。

local plugins = core.tablepool.fetch("plugins", 32, 0)
-- etcdのプラグインリストとローカル設定ファイルのプラグインリストを交差させる
api_ctx.plugins = plugin.filter(route, plugins)

-- rewriteとaccessフェーズにマウントされたプラグインの関数を順番に実行
run_plugin("rewrite", plugins, api_ctx)
run_plugin("access", plugins, api_ctx)

このコードでは、まずテーブルプールから長さ32のテーブルをリクエストします。これは前述のパフォーマンス最適化技術です。次にプラグインのフィルタ関数が実行されます。なぜこのステップが必要なのか疑問に思うかもしれません。プラグインのinit workerフェーズで、etcdからプラグインリストを取得し、ソートしているのではないでしょうか?

ここでのフィルタリングは、ローカル設定と比較して行われます。その理由は以下の2つです:

  1. 新しく開発されたプラグインはカナリアリリースが必要です。この時、新しいプラグインはetcdのリストに存在しますが、一部のゲートウェイノードでのみ有効になっています。そのため、追加の交差操作が必要です。
  2. デバッグモードをサポートするためです。クライアントのリクエストがどのプラグインによって処理されたか?これらのプラグインのロード順序は?この情報はデバッグ時に有用であるため、フィルタ関数はデバッグモードかどうかを判断し、この情報をレスポンスヘッダーに記録します。

したがって、accessフェーズの終わりに、これらのフィルタリングされたプラグインを優先順位に従って順番に実行します。以下のコードで示されています。

local function run_plugin(phase, plugins, api_ctx)
    for i = 1, #plugins, 2 do
        local phase_fun = plugins[i][phase]
        if phase_fun then
            -- コアとなる呼び出しコード
            phase_fun(plugins[i + 1], api_ctx)
        end
    end

    return api_ctx
end

プラグインをイテレートする際、2間隔で行っていることがわかります。これは、各プラグインにはプラグインオブジェクトとプラグインの設定パラメータの2つのコンポーネントがあるためです。次に、上記のサンプルコードのコアとなる行を見てみましょう。

phase_fun(plugins[i + 1], api_ctx)

この行のコードが少し抽象的であれば、具体的なlimit_countプラグインに置き換えてみると、より明確になります。

limit_count_plugin_rewrite_function(conf_of_plugin, api_ctx)

この時点で、APIゲートウェイの全体的な流れはほぼ完了です。このコードはすべて同じファイルに含まれており、400行以上のコードがありますが、その核心は上記で説明した数十行です。

プラグインの作成

完全なデモを実行する前に、プラグインを作成する必要があります。limit-countプラグインを例として取り上げます。完全な実装は60行以上のコードで、リンクをクリックして確認できます。ここでは、主要なコード行を詳しく説明します:

まず、リクエスト数を制限するための基本ライブラリとしてlua-resty-limit-trafficを導入します。

local limit_count_new = require("resty.limit.count").new

次に、rapidjsonjson schemaを使用して、このプラグインのパラメータを定義します:

local schema = {
    type = "object",
    properties = {
        count = {type = "integer", minimum = 0},
        time_window = {type = "integer", minimum = 0},
        key = {type = "string",
        enum = {"remote_addr", "server_addr"},
        },
        rejected_code = {type = "integer", minimum = 200, maximum = 600},
    },
    additionalProperties = false,
    required = {"count", "time_window", "key", "rejected_code"},
}

これらのプラグインパラメータは、resty.limit.countのパラメータのほとんどに対応しており、制限のキー、時間ウィンドウのサイズ、制限するリクエスト数が含まれます。さらに、プラグインはrejected_codeというパラメータを追加しています。これは、リクエストが制限された場合に指定されたステータスコードを返します。

最後のステップで、プラグインのハンドラ関数をrewriteフェーズにマウントします:

function _M.rewrite(conf, ctx)
    -- キャッシュからlimit countオブジェクトを取得し、存在しない場合は`create_limit_obj`関数を使用して新しいオブジェクトを作成し、キャッシュする
    local lim, err = core.lrucache.plugin_ctx(plugin_name, ctx,  create_limit_obj, conf)

    -- `ctx.var`からキーの値を取得し、設定タイプと設定バージョン番号と組み合わせて新しいキーを作成
    local key = (ctx.var[conf.key] or "") .. ctx.conf_type .. ctx.conf_version

    -- 制限が適用されるかどうかを判断する関数
    local delay, remaining = lim:incoming(key, true)
    if not delay then
        local err = remaining
        -- 閾値を超えた場合、指定されたステータスコードを返す
        if err == "rejected" then
            return conf.rejected_code
        end

        core.log.error("failed to limit req: ", err)
        return 500
    end

    -- 閾値を超えていない場合、リクエストを許可し、対応するレスポンスヘッダーを設定
    core.response.set_header("X-RateLimit-Limit", conf.count,
                             "X-RateLimit-Remaining", remaining)
end

上記のコードでは、制限を判断するロジックは1行だけです。残りは準備作業とレスポンスヘッダーの設定です。閾値を超えていない場合、優先順位に従って次のプラグインを実行します。

まとめ

最後に、考えさせられる質問を残します。APIゲートウェイはレイヤー7のトラフィックだけでなく、レイヤー4のトラフィックも処理できることを知っています。これに基づいて、いくつかの使用シナリオを考えてみてください。コメントを残して、この記事を共有し、より多くの人と学び、交流してください。

前回の記事: Part 1: OpenRestyを使用してマイクロサービスAPIゲートウェイを構築する方法 Part 2: OpenRestyを使用してマイクロサービスAPIゲートウェイを構築する方法