diff --git a/Makefile b/Makefile index 5ab4180..eee1e65 100644 --- a/Makefile +++ b/Makefile @@ -14,11 +14,13 @@ all: ; install: all $(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/ - $(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/ $(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/request/ + $(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/status/ + $(INSTALL) -d $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/ $(INSTALL) src/lua/api-gateway/cache/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/ - $(INSTALL) src/lua/api-gateway/cache/store/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/ $(INSTALL) src/lua/api-gateway/cache/request/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/request/ + $(INSTALL) src/lua/api-gateway/cache/status/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/status/ + $(INSTALL) src/lua/api-gateway/cache/store/*.lua $(DESTDIR)/$(LUA_LIB_DIR)/api-gateway/cache/store/ test: redis echo "Starting redis server on default port" @@ -46,6 +48,8 @@ redis: all .PHONY: pre-docker-test pre-docker-test: echo " pre-docker-test" + echo " cleaning up any test_redis docker image" + docker ps | grep test_redis | awk '{print $$1}' | xargs docker stop | xargs docker rm rm -rf $(BUILD_DIR)/* rm -rf ~/tmp/apiplatform/api-gateway-cachemanager/ mkdir -p $(BUILD_DIR) @@ -60,16 +64,22 @@ pre-docker-test: mkdir -p ~/tmp/apiplatform/api-gateway-cachemanager/target/test-logs ln -s ~/tmp/apiplatform/api-gateway-cachemanager/target/test-logs ./target/test-logs +.PHONY: get-redis-docker-ip +get-redis-docker-ip: + $(eval $@_IP := $(shell docker run --entrypoint=ifconfig alpine eth0 | grep "inet addr" | cut -d: -f2 | awk '{ print $$1}')) + @echo "Assuming the next IP for the docker container is:" $($@_IP) + sed -i '' 's/127\.0\.0\.1\:6379/$($@_IP)\:6379/g' ~/tmp/apiplatform/api-gateway-cachemanager/test/perl/api-gateway/cache/status/remoteCacheStatus.t + post-docker-test: echo " post-docker-test" - # cp -r ~/tmp/apiplatform/api-gateway-cachemanager/target/ ./target - # rm -rf ~/tmp/apiplatform/api-gateway-cachemanager + cp -r ~/tmp/apiplatform/api-gateway-cachemanager/target/ ./target + rm -rf ~/tmp/apiplatform/api-gateway-cachemanager run-docker-test: echo " run-docker-test" - cd ./test && docker-compose up --force-recreate -test-docker: pre-docker-test run-docker-test post-docker-test +test-docker: pre-docker-test get-redis-docker-ip run-docker-test post-docker-test echo "running tests with docker ..." test-docker-manual: pre-docker-test diff --git a/src/lua/api-gateway/cache/status/remoteCacheStatus.lua b/src/lua/api-gateway/cache/status/remoteCacheStatus.lua new file mode 100644 index 0000000..5eea394 --- /dev/null +++ b/src/lua/api-gateway/cache/status/remoteCacheStatus.lua @@ -0,0 +1,177 @@ +--[[ + Copyright 2016 Adobe Systems Incorporated. All rights reserved. + + This file is licensed to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR RESPRESENTATIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + ]] +-- +-- Module for selecting a healthy server from an upstream. +-- It's best to be used with health-checks so that a peer is maked UP or DOWN +-- User: nramaswa +-- Date: 4/17/14 +-- Time: 7:38 PM + +local _M = {} +local DEFAULT_SHARED_DICT = "cachedkeys" + +function _M:new(o) + o = o or {} + setmetatable(o, self) + self.__index = self + if ( o ~= nil ) then + self.shared_dict = o.shared_dict or DEFAULT_SHARED_DICT + end + return o +end + +--- Reused from the "resty.upstream.healthcheck" module to get the +-- status of the upstream servers +local function gen_peers_status_info(peers, bits, idx) + local npeers = #peers + for i = 1, npeers do + local peer = peers[i] + bits[idx] = peer.name + if peer.down then + bits[idx + 1] = " DOWN\n" + else + bits[idx + 1] = " up\n" + end + idx = idx + 2 + end + return idx +end + +--- Returns the results of the health checks for the provided upstream_name +-- as found in the "resty.upstream.healthcheck" module. +-- @param upstream_name +-- +local function get_health_check_for_upstream(upstream_name) + local ok, upstream = pcall(require, "ngx.upstream") + if not ok then + error("ngx_upstream_lua module required") + end + + local get_primary_peers = upstream.get_primary_peers + local get_backup_peers = upstream.get_backup_peers + + local ok, new_tab = pcall(require, "table.new") + if not ok or type(new_tab) ~= "function" then + new_tab = function (narr, nrec) return {} end + end + + local n = 1 + local bits = new_tab(n * 20, 0) + local idx = 1 + + local peers, err = get_primary_peers(upstream_name) + if not peers then + return "failed to get primary peers in upstream " .. upstream_name .. ": " + .. err + end + + idx = gen_peers_status_info(peers, bits, idx) + + peers, err = get_backup_peers(upstream_name) + if not peers then + return "failed to get backup peers in upstream " .. upstream_name .. ": " + .. err + end + + idx = gen_peers_status_info(peers, bits, idx) + + return bits +end + +--- Returns a cached healthy upstream. +-- @param dict_name shared dict name +-- @param upstream_name the name of the upstream as defined in the config +-- +local function get_healthy_upstream_from_cache(dict_name, upstream_name) + local dict = ngx.shared[dict_name] + local healthy_upstream + local health_upstream_key = "healthy_upstream:" .. tostring(upstream_name) + if (nil ~= dict) then + healthy_upstream = dict:get(health_upstream_key) + end + return healthy_upstream +end + +local function update_healthy_upstream_in_cache(dict_name, upstream_name, healthy_upstream) + local dict = ngx.shared[dict_name]; + if (nil ~= dict) then + ngx.log(ngx.DEBUG, "Saving a healthy upstream:", healthy_upstream, " in cache:", dict_name, " for upstream:", upstream_name) + local exp_time_in_seconds = 5 + local health_upstream_key = "healthy_upstream:" .. tostring(upstream_name) + dict:set(health_upstream_key, healthy_upstream, exp_time_in_seconds) + return + end + + ngx.log(ngx.WARN, "Dictionary ", dict_name, " doesn't seem to be set. Did you define one ? ") +end + +--- Returns the host and port from an upstream like host:port +-- @param upstream_host +-- +local function get_host_and_port_in_upstream(upstream_host) + local p = {} + p.host = upstream_host + + local idx = string.find(upstream_host, ":", 1, true) + if idx then + p.host = string.sub(upstream_host, 1, idx - 1) + p.port = tonumber(string.sub(upstream_host, idx + 1)) + end + return p.host, p.port +end + + +function _M:getStatus(upstream_name) + return get_health_check_for_upstream(upstream_name) +end + +--- Returns the first healthy server found in the upstream_name +-- Returns 3 values: +-- The difference between upstream and is that the upstream may be just a string containing host:port +-- @param upstream_name +-- +function _M:getHealthyServer(upstream_name) + + -- get the host and port from the local cache first + local healthy_host = get_healthy_upstream_from_cache(self.shared_dict, upstream_name) + if ( nil ~= healthy_host) then + local host, port = get_host_and_port_in_upstream(healthy_host) + return healthy_host, host, port + end + + -- if the host is not in the local cache get it from the upstream configuration + ngx.log(ngx.DEBUG, "Looking up for a healthy peer in upstream:", upstream_name) + local upstream_health_result = get_health_check_for_upstream(upstream_name) + + if(upstream_health_result == nil) then + ngx.log(ngx.ERR, "\n No upstream results!") + return nil + end + + for key,value in ipairs(upstream_health_result) do + -- return the first peer found to be up. + -- TODO: save all the peers that are up and return them using round-robin alg + if(value == " up\n") then + healthy_host = upstream_health_result[key-1] + update_healthy_upstream_in_cache(self.shared_dict, upstream_name, healthy_host) + local host, port = get_host_and_port_in_upstream(healthy_host) + return healthy_host, host, port + end + if(value == " DOWN\n" and upstream_health_result[key-1] ~= nil ) then + ngx.log(ngx.WARN, "Peer ", tostring(upstream_health_result[key-1]), " is down! Checking for backup peers.") + end + end + + ngx.log(ngx.ERR, "All peers are down!") + return nil -- No peers are up +end + +return _M \ No newline at end of file diff --git a/src/lua/api-gateway/cache/store/redisCache.lua b/src/lua/api-gateway/cache/store/redisCache.lua index 5b4e7ba..24e90b5 100644 --- a/src/lua/api-gateway/cache/store/redisCache.lua +++ b/src/lua/api-gateway/cache/store/redisCache.lua @@ -21,7 +21,7 @@ -- local redis = require "resty.redis" -local RedisHealthCheck = require "api-gateway.redis.redisHealthCheck" +local RedisStatus = require "api-gateway.cache.status.remoteCacheStatus" local cjson = require "cjson" -- redis endpoints are assumed to be global per GW node and therefore are read here @@ -37,13 +37,13 @@ local REDIS_RW_UPSTREAM = "api-gateway-redis" -- Shared dictionary used by RedisHealthCheck local SHARED_DICT_NAME = "cachedkeys" -local redisHealthCheck = RedisHealthCheck:new({ +local redisStatus = RedisStatus:new({ shared_dict = SHARED_DICT_NAME }) local function getRedisUpstream(upstream_name) local n = upstream_name or REDIS_RO_UPSTREAM - local upstream, host, port = redisHealthCheck:getHealthyRedisNode(n) + local upstream, host, port = redisStatus:getHealthyServer(n) ngx.log(ngx.DEBUG, "Obtained Redis Host:" .. tostring(host) .. ":" .. tostring(port), " from upstream:", n) if (nil ~= host and nil ~= port) then return host, port diff --git a/test/perl/api-gateway/cache/status/remoteCacheStatus.t b/test/perl/api-gateway/cache/status/remoteCacheStatus.t new file mode 100644 index 0000000..9b9dba6 --- /dev/null +++ b/test/perl/api-gateway/cache/status/remoteCacheStatus.t @@ -0,0 +1,339 @@ +# vim:set ft= ts=4 sw=4 et fdm=marker: +use lib 'lib'; +use strict; +use warnings; +use Test::Nginx::Socket::Lua; +use Cwd qw(cwd); + +#worker_connections(1014); +#master_process_enabled(4); +#log_level('warn'); + +repeat_each(2); + +plan tests => repeat_each() * (blocks() * 3) - 6; + +my $pwd = cwd(); + +our $HttpConfig = <<_EOC_; + + lua_socket_log_errors off; + + lua_package_path "src/lua/?.lua;/usr/local/lib/lua/?.lua;;"; + + client_body_temp_path /tmp/; + proxy_temp_path /tmp/; + fastcgi_temp_path /tmp/; + + init_by_lua ' + local v = require "jit.v" + v.on("$Test::Nginx::Util::ErrLogFile") + require "resty.core" + '; + lua_shared_dict cachedkeys 10m; + include ../../api-gateway/redis-upstream.conf; # generated during test script + + init_worker_by_lua ' + local function loadrequire(module) + ngx.log(ngx.DEBUG, "Loading module [" .. tostring(module) .. "]") + local function requiref(module) + require(module) + end + + local res = pcall(requiref, module) + if not (res) then + ngx.log(ngx.WARN, "Could not load module [", module, "].") + return nil + end + return require(module) + end + + local function initRedisHealthCheck() + ngx.shared.cachedkeys:flush_all() + + local hc = loadrequire("resty.upstream.healthcheck") + if (hc == nil) then + return + end + + local ok, err = hc.spawn_checker{ + shm = "cachedkeys", -- defined by "lua_shared_dict" + upstream = "cache_read_only_backend", -- defined by "upstream" + type = "http", + http_req = "PING\\\\r\\\\n", -- raw HTTP request for checking + + interval = 2000, -- run the check cycle every X ms + timeout = 1500, -- timeout in ms for network operations + fall = 2, -- # of successive failures before turning a peer down + rise = 2, -- # of successive successes before turning a peer up + } + if not ok then + ngx.log(ngx.ERR, "failed to spawn health checker: ", err) + return + end + end + initRedisHealthCheck() + '; + + upstream cache_rw_backend { + server 127.0.0.1:6379; + } +_EOC_ + +#no_diff(); +no_long_string(); +run_tests(); + + +__DATA__ + +=== TEST 1: health check (good case), test with one server in upstream +--- http_config eval +"$::HttpConfig" +. q{ +upstream cache_read_only_backend { + server 127.0.0.1:6379; +} +} +--- config + location = /test1 { + access_log off; + content_by_lua ' + ngx.sleep(5.52) + + local hc = require "resty.upstream.healthcheck" + ngx.log(ngx.INFO,"[Test1]: Status is: \\n" .. hc.status_page()) + + local HealthCheckCls = require "api-gateway.cache.status.remoteCacheStatus" + local healthCheck = HealthCheckCls:new() + + local redisUpstreamHealthResult = healthCheck:getStatus("cache_read_only_backend") + + local responseString = "" + for key,value in ipairs(redisUpstreamHealthResult) do + responseString = responseString .. value + end + + ngx.log(ngx.INFO,"\\n responseString is ".. responseString) + ngx.print(responseString) + + local redisToRead = healthCheck:getHealthyServer("cache_read_only_backend") + ngx.log(ngx.INFO,"redis to read is ".. tostring(redisToRead)) + ngx.say("Selected Redis Node:", tostring(redisToRead)) + '; + } +--- timeout: 20s +--- request +GET /test1 + +--- response_body +127.0.0.1:6379 up +Selected Redis Node:127.0.0.1:6379 + +--- no_error_log +[error] + + +=== TEST 2: health check (bad case), test with a down backup +--- http_config eval +"$::HttpConfig" +. q{ + upstream cache_read_only_backend { + server 127.0.0.1:6379; + server localhost:12256 backup; + } +} +--- config + location = /test2 { + access_log off; + content_by_lua ' + ngx.sleep(5.52) + + local hc = require "resty.upstream.healthcheck" + ngx.log(ngx.INFO,"[Test2]: Status is: \\n" .. hc.status_page()) + + local HealthCheckCls = require "api-gateway.cache.status.remoteCacheStatus" + local healthCheck = HealthCheckCls:new() + + local redisUpstreamHealthResult = healthCheck:getStatus("cache_read_only_backend") + + local responseString = "" + for key,value in ipairs(redisUpstreamHealthResult) do + responseString = responseString .. value + end + + ngx.log(ngx.INFO,"\\n responseString is ".. responseString) + ngx.print(responseString) + + local redisToRead = healthCheck:getHealthyServer("cache_read_only_backend") + ngx.log(ngx.INFO,"redis to read is ".. tostring(redisToRead)) + ngx.say("Selected Redis Node:", tostring(redisToRead)) + '; + } +--- timeout: 20s +--- request +GET /test2 + +--- response_body +127.0.0.1:6379 up +127.0.0.1:12256 DOWN +Selected Redis Node:127.0.0.1:6379 +--- no_error_log + +=== TEST 3: health check (bad case), test that backup is selected in the end +--- http_config eval +"$::HttpConfig" +. q{ + upstream cache_read_only_backend { + server 127.0.0.1:12256; + server 127.0.0.1:12257; + server 127.0.0.1:6379 backup; + } +} +--- config + location = /test2 { + access_log off; + content_by_lua ' + ngx.sleep(5.52) + + local hc = require "resty.upstream.healthcheck" + ngx.log(ngx.INFO,"[Test3]: Status is: \\n" .. hc.status_page()) + + local HealthCheckCls = require "api-gateway.cache.status.remoteCacheStatus" + local healthCheck = HealthCheckCls:new() + + local redisUpstreamHealthResult = healthCheck:getStatus("cache_read_only_backend") + + local responseString = "" + for key,value in ipairs(redisUpstreamHealthResult) do + responseString = responseString .. value + end + + ngx.log(ngx.INFO,"\\n responseString is ".. responseString) + ngx.print(responseString) + + local redisToRead, redisHost, redisPort = healthCheck:getHealthyServer("cache_read_only_backend") + ngx.log(ngx.INFO,"redis to read is ".. tostring(redisToRead)) + ngx.say("Selected Redis Node:", tostring(redisToRead), ",port:", tostring(redisPort)) + '; + } +--- timeout: 20s +--- request +GET /test2 + +--- response_body +127.0.0.1:12256 DOWN +127.0.0.1:12257 DOWN +127.0.0.1:6379 up +Selected Redis Node:127.0.0.1:6379,port:6379 +--- no_error_log + + +=== TEST 4: health check (bad case), all nodes are down +--- http_config eval +"$::HttpConfig" +. q{ + upstream cache_read_only_backend { + server 127.0.0.1:12256; + server 127.0.0.1:12257; + server localhost:12258 backup; + } +} +--- config + location = /test2 { + access_log off; + content_by_lua ' + ngx.sleep(5.52) + + local hc = require "resty.upstream.healthcheck" + ngx.log(ngx.INFO,"[Test4]: Status is: \\n" .. hc.status_page()) + + local HealthCheckCls = require "api-gateway.cache.status.remoteCacheStatus" + local healthCheck = HealthCheckCls:new() + + local redisUpstreamHealthResult = healthCheck:getStatus("cache_read_only_backend") + + local responseString = "" + for key,value in ipairs(redisUpstreamHealthResult) do + responseString = responseString .. value + end + + ngx.log(ngx.INFO,"\\n responseString is ".. responseString) + ngx.print(responseString) + + local redisToRead, redisHost, redisPort = healthCheck:getHealthyServer("cache_read_only_backend") + ngx.log(ngx.INFO,"redis to read is ".. tostring(redisToRead)) + ngx.say("Selected Redis Node:", tostring(redisToRead), ",host:", tostring(redisHost), ",port:", tostring(redisPort)) + '; + } +--- timeout: 20s +--- request +GET /test2 + +--- response_body +127.0.0.1:12256 DOWN +127.0.0.1:12257 DOWN +127.0.0.1:12258 DOWN +Selected Redis Node:nil,host:nil,port:nil +--- no_error_log + + + +=== TEST 5: test that healthy node is stored in shared_dict +--- http_config eval +"$::HttpConfig" +. q{ +upstream cache_read_only_backend { + server 127.0.0.1:6379; +} +} +--- config + location = /test1 { + access_log off; + content_by_lua ' + ngx.sleep(5.52) + + local hc = require "resty.upstream.healthcheck" + ngx.log(ngx.INFO,"[Test1]: Status is: \\n" .. hc.status_page()) + + local HealthCheckCls = require "api-gateway.cache.status.remoteCacheStatus" + local healthCheck = HealthCheckCls:new({shared_dict = "cachedkeys" }) + + local redisUpstreamHealthResult = healthCheck:getStatus("cache_read_only_backend") + + local responseString = "" + for key,value in ipairs(redisUpstreamHealthResult) do + responseString = responseString .. value + end + + ngx.log(ngx.INFO,"\\n responseString is ".. responseString) + ngx.print(responseString) + + local redisToRead, redisHost, redisPort = healthCheck:getHealthyServer("cache_read_only_backend") + ngx.log(ngx.INFO,"redis to read is ".. tostring(redisToRead)) + ngx.say("Selected Redis Node:", tostring(redisToRead), ",port:", tostring(redisPort)) + + + -- make sure it is saved in the dictionary too + local dict_name = "cachedkeys" + local dict = ngx.shared[dict_name] + local health_upstream_key = "healthy_upstream:cache_read_only_backend" + local upstreamRedis = dict:get(health_upstream_key) + ngx.say("Selected Redis Node in shared_dict:", upstreamRedis) + + -- read the node again + redisToRead, redisHost, redisPort = healthCheck:getHealthyServer("cache_read_only_backend") + ngx.say("Selected Redis Node 2nd time:", tostring(redisToRead),",port:", tostring(redisPort)) + '; + } +--- timeout: 20s +--- request +GET /test1 + +--- response_body +127.0.0.1:6379 up +Selected Redis Node:127.0.0.1:6379,port:6379 +Selected Redis Node in shared_dict:127.0.0.1:6379 +Selected Redis Node 2nd time:127.0.0.1:6379,port:6379 +--- no_error_log +[error]