From 06afdaace5136cea7cdc868c1e3cea3947574442 Mon Sep 17 00:00:00 2001 From: Andriy Zavada Date: Sun, 5 Nov 2023 17:38:03 +0200 Subject: [PATCH] queue stats logging refinements * New tunable, queue_manager_log_frequency, setting the interval, in seconds, between logging of queue stats in replrtq_{snk,src}. * New tunable, queue_manager_log_suppress_zero_stats, optionally to avoid logging of zero stats in replrtq_src and overflow_queue. * Logging in replrtq_* is now done with queue_name in lager:info metadata, to allow operators to filter such entries into separate lager sinks. --- priv/riak_kv.schema | 15 ++++++++++- src/riak_kv_overflow_queue.erl | 47 +++++++++++++++++++--------------- src/riak_kv_replrtq_snk.erl | 12 ++++++--- src/riak_kv_replrtq_src.erl | 18 +++++++------ 4 files changed, 59 insertions(+), 33 deletions(-) diff --git a/priv/riak_kv.schema b/priv/riak_kv.schema index 9e5ac234d..bd5b3804b 100644 --- a/priv/riak_kv.schema +++ b/priv/riak_kv.schema @@ -1411,6 +1411,19 @@ {commented, 900} ]}. +%% @doc Periodically log replrtq queue stats (queue size, reap/delete +%% attempts and aborts) at this interval (seconds) +{mapping, "queue_manager_log_frequency", "riak_kv.queue_manager_log_frequency", [ + {datatype, integer}, + {default, 30} +]}. + +%% @doc Suppress logging of queue stats when all items are 0 +{mapping, "queue_manager_log_suppress_zero_stats", "riak_kv.queue_manager_log_suppress_zero_stats", [ + {datatype, flag}, + {default, off} +]}. + %% @doc Enable the `recalc` compaction strategy within the leveled backend in %% riak. The default (when disabled) is `retain`, but this will leave %% uncollected garbage within the, journal. @@ -1508,4 +1521,4 @@ {mapping, "handoff_deletes", "riak_kv.handoff_deletes", [ {datatype, {flag, enabled, disabled}}, {default, disabled} -]}. \ No newline at end of file +]}. diff --git a/src/riak_kv_overflow_queue.erl b/src/riak_kv_overflow_queue.erl index 444a3c995..01888c01f 100644 --- a/src/riak_kv_overflow_queue.erl +++ b/src/riak_kv_overflow_queue.erl @@ -105,32 +105,39 @@ new(Priorities, FilePath, QueueLimit, OverflowLimit) -> non_neg_integer(), non_neg_integer(), overflowq()) -> overflowq(). log(Type, JobID, Attempts, Aborts, Queue) -> - QueueLengths = - lists:foldl(fun({P, L}, Acc) -> - [Acc, io_lib:format("queue_p~w=~w ", [P, L])] + {QueueLengths, QLCount} = + lists:foldl(fun({P, L}, {AccS, AccN}) -> + {[AccS, io_lib:format("queue_p~w=~w ", [P, L])], AccN + L} end, - "Queue lengths ", + {"Queue lengths ", 0}, Queue#overflowq.mqueue_lengths), - OverflowLengths = - lists:foldl(fun({P, L}, Acc) -> - [Acc, io_lib:format("overflow_p~w=~w ", [P, L])] + {OverflowLengths, OLCount} = + lists:foldl(fun({P, L}, {AccS, AccN}) -> + {[AccS, io_lib:format("overflow_p~w=~w ", [P, L])], AccN + L} end, - "Overflow lengths ", + {"Overflow lengths ", 0}, Queue#overflowq.overflow_lengths), - DiscardCounts = - lists:foldl(fun({P, L}, Acc) -> - [Acc, io_lib:format("discard_p~w=~w ", [P, L])] + {DiscardCounts, DCCount} = + lists:foldl(fun({P, L}, {AccS, AccN}) -> + {[AccS, io_lib:format("discard_p~w=~w ", [P, L])], AccN} end, - "Discard counts ", + {"Discard counts ", 0}, Queue#overflowq.overflow_discards), - _ = lager:info(lists:flatten(["~p job_id=~p has ", - "attempts=~w aborts=~w ", - QueueLengths, - OverflowLengths, - DiscardCounts]), - [Type, JobID, Attempts, Aborts]), - + case app_helper:get_env(riak_kv, queue_manager_log_suppress_zero_stats, false) of + true when QLCount == 0, + OLCount == 0, + DCCount == 0, + Attempts == 0, + Aborts == 0 -> + ok; + _ -> + lager:info(lists:flatten(["~p job_id=~p has attempts=~w aborts=~w ", + QueueLengths, + OverflowLengths, + DiscardCounts]), + [Type, JobID, Attempts, Aborts]) + end, ResetDiscards = lists:map(fun({P, _L}) -> {P, 0} end, Queue#overflowq.overflow_discards), @@ -579,4 +586,4 @@ underover_overflow_test() -> close(RootPath, FlowQ15). --endif. \ No newline at end of file +-endif. diff --git a/src/riak_kv_replrtq_snk.erl b/src/riak_kv_replrtq_snk.erl index cd026b8e2..4661e94eb 100644 --- a/src/riak_kv_replrtq_snk.erl +++ b/src/riak_kv_replrtq_snk.erl @@ -415,10 +415,12 @@ handle_cast({requeue_work, WorkItem}, State) -> handle_info(timeout, State) -> prompt_work(), - erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), + LogFreq = app_helper:get_env(riak_kv, queue_manager_log_frequency, ?LOG_TIMER_SECONDS), + erlang:send_after(LogFreq * 1000, self(), log_stats), {noreply, State}; handle_info(log_stats, State) -> - erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats), + LogFreq = app_helper:get_env(riak_kv, queue_manager_log_frequency, ?LOG_TIMER_SECONDS), + erlang:send_after(LogFreq * 1000, self(), log_stats), SinkWork0 = case State#state.enabled of true -> @@ -854,7 +856,8 @@ log_mapfun({QueueName, Iteration, SinkWork}) -> {replmod_time, RT}, {modified_time, MTS, MTM, MTH, MTD, MTL}} = SinkWork#sink_work.queue_stats, - lager:info("Queue=~w success_count=~w error_count=~w" ++ + lager:info([{queue_name, QueueName}], + "Queue=~w success_count=~w error_count=~w" ++ " mean_fetchtime_ms=~s" ++ " mean_pushtime_ms=~s" ++ " mean_repltime_ms=~s" ++ @@ -868,7 +871,8 @@ log_mapfun({QueueName, Iteration, SinkWork}) -> end, PeerDelays = lists:foldl(FoldPeerInfoFun, "", SinkWork#sink_work.peer_list), - lager:info("Queue=~w has peer delays of~s", [QueueName, PeerDelays]), + lager:info([{queue_name, QueueName}], + "Queue=~w has peer delays of~s", [QueueName, PeerDelays]), {QueueName, Iteration, SinkWork#sink_work{queue_stats = ?ZERO_STATS}}. -spec log_queue_addition( diff --git a/src/riak_kv_replrtq_src.erl b/src/riak_kv_replrtq_src.erl index fa15ff0db..b372e77e0 100644 --- a/src/riak_kv_replrtq_src.erl +++ b/src/riak_kv_replrtq_src.erl @@ -325,11 +325,7 @@ init([FilePath]) -> end, QO = lists:map(MapToQOverflow, QFM), QC = lists:map(MaptoQCache, QFM), - LogFreq = - app_helper:get_env( - riak_kv, - replrtq_logfrequency, - ?LOG_TIMER_SECONDS * 1000), + LogFreq = 1000 * app_helper:get_env(riak_kv, queue_manager_log_frequency, ?LOG_TIMER_SECONDS), erlang:send_after(LogFreq, self(), log_queue), {ok, #state{queue_filtermap = QFM, @@ -670,9 +666,15 @@ handle_info(log_queue, State) -> lists:map( MapFun, [?FLD_PRIORITY, ?AAE_PRIORITY, ?RTQ_PRIORITY]), - lager:info( - "QueueName=~w has queue sizes p1=~w p2=~w p3=~w", - [QueueName, P1L, P2L, P3L]) + case app_helper:get_env(riak_kv, queue_manager_log_suppress_zero_stats, false) of + true when P1L == 0, P2L == 0, P3L == 0 -> + ok; + _ -> + lager:info( + [{queue_name, QueueName}], + "QueueName=~w has queue sizes p1=~w p2=~w p3=~w", + [QueueName, P1L, P2L, P3L]) + end end, lists:foreach(LogFun, State#state.queue_filtermap), erlang:send_after(State#state.log_frequency_in_ms, self(), log_queue),