add metrics and jemalloc

This commit is contained in:
MatthieuCoder 2023-01-15 17:03:29 +04:00
parent c06cf52e88
commit cbcaa3c01e
22 changed files with 427 additions and 83 deletions

39
Cargo.lock generated
View file

@ -86,9 +86,9 @@ checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
[[package]] [[package]]
name = "async-nats" name = "async-nats"
version = "0.25.1" version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32" checksum = "0b0e90b3cd41350d89a242b981fb888f0eb8e3cb81c0fcb4563338f7e96a1084"
dependencies = [ dependencies = [
"base64", "base64",
"base64-url", "base64-url",
@ -770,6 +770,12 @@ dependencies = [
"percent-encoding", "percent-encoding",
] ]
[[package]]
name = "fs_extra"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.25" version = "0.3.25"
@ -873,6 +879,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"shared", "shared",
"tikv-jemallocator",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tracing", "tracing",
@ -1930,7 +1937,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"criterion", "criterion",
"env_logger 0.7.1", "env_logger 0.10.0",
"hyper", "hyper",
"leash", "leash",
"opentelemetry", "opentelemetry",
@ -1940,6 +1947,7 @@ dependencies = [
"serde", "serde",
"shared", "shared",
"test-log", "test-log",
"tikv-jemallocator",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-test", "tokio-test",
@ -2059,6 +2067,7 @@ dependencies = [
"proto", "proto",
"serde", "serde",
"shared", "shared",
"tikv-jemallocator",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tonic", "tonic",
@ -2354,6 +2363,8 @@ dependencies = [
"anyhow", "anyhow",
"async-nats", "async-nats",
"config", "config",
"opentelemetry",
"opentelemetry-otlp",
"redis", "redis",
"serde", "serde",
"serde_json", "serde_json",
@ -2563,6 +2574,27 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "tikv-jemalloc-sys"
version = "0.5.2+5.3.0-patched"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec45c14da997d0925c7835883e4d5c181f196fa142f8c19d7643d1e9af2592c3"
dependencies = [
"cc",
"fs_extra",
"libc",
]
[[package]]
name = "tikv-jemallocator"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20612db8a13a6c06d57ec83953694185a367e16945f66565e8028d2c0bd76979"
dependencies = [
"libc",
"tikv-jemalloc-sys",
]
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.17" version = "0.3.17"
@ -3264,6 +3296,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"shared", "shared",
"tikv-jemallocator",
"tokio", "tokio",
"tracing", "tracing",
"twilight-model", "twilight-model",

View file

@ -1,7 +1,7 @@
FROM rust AS chef FROM rust AS chef
USER root USER root
RUN cargo install cargo-chef RUN cargo install cargo-chef
RUN apt-get update && apt-get install -y protobuf-compiler RUN apt-get update && apt-get install -y protobuf-compiler
WORKDIR /app WORKDIR /app
# Planning install # Planning install

View file

@ -1,7 +1,7 @@
version: "3.3" version: "3.3"
services: services:
nats: nats:
image: bitnami/nats image: nats
restart: always restart: always
ports: ports:
- 4222:4222 - 4222:4222
@ -11,7 +11,8 @@ services:
image: redis image: redis
ports: ports:
- 6379:6379 - 6379:6379
mock:
image: nginx
cache: cache:
image: ghcr.io/discordnova/nova/cache image: ghcr.io/discordnova/nova/cache
restart: always restart: always
@ -23,7 +24,6 @@ services:
- ./config/default.yml:/config/default.yml - ./config/default.yml:/config/default.yml
environment: environment:
- RUST_LOG=debug - RUST_LOG=debug
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on: depends_on:
- nats - nats
- redis - redis
@ -40,7 +40,6 @@ services:
- ./config/default.yml:/config/default.yml - ./config/default.yml:/config/default.yml
environment: environment:
- RUST_LOG=debug - RUST_LOG=debug
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on: depends_on:
- nats - nats
- otelcol - otelcol
@ -56,7 +55,6 @@ services:
- ./config/default.yml:/config/default.yml - ./config/default.yml:/config/default.yml
environment: environment:
- RUST_LOG=debug - RUST_LOG=debug
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on: depends_on:
- ratelimit - ratelimit
- otelcol - otelcol
@ -76,7 +74,6 @@ services:
- ./config/default.yml:/config/default.yml - ./config/default.yml:/config/default.yml
environment: environment:
- RUST_LOG=debug - RUST_LOG=debug
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on: depends_on:
- nats - nats
- otelcol - otelcol
@ -94,7 +91,6 @@ services:
- ./config/default.yml:/config/default.yml - ./config/default.yml:/config/default.yml
environment: environment:
- RUST_LOG=debug - RUST_LOG=debug
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otelcol:4317
depends_on: depends_on:
- nats - nats
- redis - redis
@ -108,8 +104,6 @@ services:
image: jaegertracing/all-in-one image: jaegertracing/all-in-one
container_name: jaeger container_name: jaeger
command: command:
- "--memory.max-debugs"
- "10000"
- "--query.base-path" - "--query.base-path"
- "/jaeger/ui" - "/jaeger/ui"
- "--prometheus.server-url" - "--prometheus.server-url"

View file

@ -19,7 +19,7 @@ tokio-stream = "0.1.11"
serde = { version = "1.0.8", features = ["derive"] } serde = { version = "1.0.8", features = ["derive"] }
serde_json = { version = "1.0" } serde_json = { version = "1.0" }
async-nats = "0.25.1" async-nats = "0.26.0"
twilight-model = "0.14" twilight-model = "0.14"
anyhow = "1.0.68" anyhow = "1.0.68"

View file

@ -23,8 +23,11 @@ serde_json = { version = "1.0" }
tracing = "0.1.37" tracing = "0.1.37"
tracing-futures = "0.2.5" tracing-futures = "0.2.5"
async-nats = "0.25.1" async-nats = "0.26.0"
tracing-opentelemetry = "0.18.0" tracing-opentelemetry = "0.18.0"
opentelemetry = "0.18.0" opentelemetry = "0.18.0"
opentelemetry-http = "0.7.0" opentelemetry-http = "0.7.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5"

View file

@ -24,7 +24,7 @@ use tokio_stream::StreamExt;
use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_opentelemetry::OpenTelemetrySpanExt;
use twilight_gateway::{Event, Shard}; use twilight_gateway::{Event, Shard};
pub mod config; pub mod config;
use tracing::{debug, error, info, trace_span}; use tracing::{debug, error, info, info_span, instrument, Instrument};
use twilight_model::gateway::event::DispatchEvent; use twilight_model::gateway::event::DispatchEvent;
struct MetadataMap<'a>(&'a mut HeaderMap); struct MetadataMap<'a>(&'a mut HeaderMap);
@ -84,13 +84,15 @@ impl Component for GatewayServer {
} }
} }
#[instrument]
async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> { async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
if let Event::Ready(ready) = event { if let Event::Ready(ready) = event {
info!("Logged in as {}", ready.user.name); info!(username = ready.user.name, "logged in");
} else { } else {
let name = event.kind().name(); let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) { if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
debug!("handling event {}", name.unwrap()); let name = name.unwrap();
debug!(event_name = name, "handling dispatch event");
let data = CachePayload { let data = CachePayload {
data: DispatchEventTagged(dispatch_event), data: DispatchEventTagged(dispatch_event),
@ -98,7 +100,7 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
let value = serde_json::to_string(&data)?; let value = serde_json::to_string(&data)?;
let bytes = bytes::Bytes::from(value); let bytes = bytes::Bytes::from(value);
let span = trace_span!("nats send"); let span = info_span!("nats send");
let mut header_map = HeaderMap::new(); let mut header_map = HeaderMap::new();
let context = span.context(); let context = span.context();
@ -106,12 +108,9 @@ async fn handle_event(event: Event, nats: &Client) -> anyhow::Result<()> {
propagator.inject_context(&context, &mut MetadataMap(&mut header_map)); propagator.inject_context(&context, &mut MetadataMap(&mut header_map));
}); });
nats.publish_with_headers( nats.publish_with_headers(format!("nova.cache.dispatch.{name}"), header_map, bytes)
format!("nova.cache.dispatch.{}", name.unwrap()), .instrument(info_span!("sending to nats"))
header_map, .await?;
bytes,
)
.await?;
} }
} }

View file

@ -1,4 +1,11 @@
use gateway::GatewayServer; use gateway::GatewayServer;
use leash::ignite; use leash::ignite;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
ignite!(GatewayServer); ignite!(GatewayServer);

View file

@ -39,3 +39,6 @@ test-log = { version = "0.2.11", features = ["log", "trace"] }
[[bench]] [[bench]]
name = "bucket" name = "bucket"
harness = false harness = false
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5"

View file

@ -1,4 +1,11 @@
use leash::ignite; use leash::ignite;
use ratelimit::RatelimiterServerComponent; use ratelimit::RatelimiterServerComponent;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
ignite!(RatelimiterServerComponent); ignite!(RatelimiterServerComponent);

View file

@ -29,4 +29,7 @@ tokio-stream = "0.1.11"
dns-lookup = "1.0.8" dns-lookup = "1.0.8"
opentelemetry = "0.18.0" opentelemetry = "0.18.0"
opentelemetry-http = "0.7.0" opentelemetry-http = "0.7.0"
tracing-opentelemetry = "0.18.0" tracing-opentelemetry = "0.18.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5"

View file

@ -28,4 +28,11 @@ pub struct ReverseProxy {
pub discord: Discord, pub discord: Discord,
pub ratelimiter_address: String, pub ratelimiter_address: String,
pub ratelimiter_port: u16, pub ratelimiter_port: u16,
#[serde(default = "default_upstream")]
pub upstream: Option<String>,
}
#[allow(clippy::unnecessary_wraps)]
fn default_upstream() -> Option<String> {
Some("https://discord.com".to_string())
} }

View file

@ -1,23 +1,78 @@
use anyhow::bail; use anyhow::{bail, Context};
use futures_util::FutureExt;
use http::{ use http::{
header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE}, header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
HeaderValue, Method as HttpMethod, Request, Response, Uri, HeaderValue, Method as HttpMethod, Request, Response, Uri,
}; };
use hyper::{client::HttpConnector, Body, Client}; use hyper::{client::HttpConnector, Body, Client};
use hyper_rustls::HttpsConnector; use hyper_rustls::HttpsConnector;
use opentelemetry::{
global,
metrics::{Counter, Histogram},
Context as OpenTelemetryContext, KeyValue,
};
use std::{ use std::{
collections::hash_map::DefaultHasher, collections::hash_map::DefaultHasher,
convert::TryFrom, convert::TryFrom,
hash::{Hash, Hasher}, hash::{Hash, Hasher},
str::FromStr, str::FromStr,
sync::Arc, sync::Arc,
time::SystemTime,
}; };
use tracing::{debug_span, error, info_span, instrument, Instrument}; use tracing::{debug_span, error, info_span, log::trace, Instrument};
use twilight_http_ratelimiting::{Method, Path}; use twilight_http_ratelimiting::{Method, Path};
use crate::ratelimit_client::RemoteRatelimiter; use crate::{config::ReverseProxy, ratelimit_client::RemoteRatelimiter};
use lazy_static::lazy_static;
lazy_static! {
static ref METER_NAME: &'static str = "";
static ref REQUESTS: Counter<u64> = {
global::meter(&METER_NAME)
.u64_counter("rest.http_requests_total")
.with_description("Amount of requests processed by the rest reverse proxy")
.init()
};
static ref UPSTREAM_CALLS: Counter<u64> = {
global::meter(&METER_NAME)
.u64_counter("rest.upstream_http_requests_total")
.with_description("Amount of requests sent to discord")
.init()
};
static ref TICKET_CALLS: Counter<u64> = {
global::meter(&METER_NAME)
.u64_counter("rest.ticket_http_requests_total")
.with_description("Amount of requests sent to the ratelimiter")
.init()
};
static ref HEADERS_SUBMIT_CALLS: Counter<u64> = {
global::meter(&METER_NAME)
.u64_counter("rest.header_submit_http_requests_total")
.with_description("Amount of requests sent to the ratelimiter")
.init()
};
static ref UPSTREAM_TIMES: Histogram<f64> = {
global::meter(&METER_NAME)
.f64_histogram("rest.upstream_http_request_duration_seconds")
.with_description("Time took to request discord")
.init()
};
static ref TICKET_TIMES: Histogram<f64> = {
global::meter(&METER_NAME)
.f64_histogram("rest.ticket_http_request_duration_seconds")
.with_description("Time took to get a ticket from the ratelimiter")
.init()
};
static ref HEADERS_SUBMIT_TIMES: Histogram<f64> = {
global::meter(&METER_NAME)
.f64_histogram("rest.header_submit_http_request_duration_seconds")
.with_description("Time took to get a ticket from the ratelimiter")
.init()
};
}
/// Normalizes the path /// Normalizes the path
#[inline]
fn normalize_path(request_path: &str) -> (&str, &str) { fn normalize_path(request_path: &str) -> (&str, &str) {
if let Some(trimmed_path) = request_path.strip_prefix("/api") { if let Some(trimmed_path) = request_path.strip_prefix("/api") {
if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) { if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
@ -35,14 +90,18 @@ fn normalize_path(request_path: &str) -> (&str, &str) {
} }
} }
#[instrument] #[inline]
#[allow(clippy::too_many_lines)]
pub async fn handle_request( pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>, client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: Arc<RemoteRatelimiter>, ratelimiter: Arc<RemoteRatelimiter>,
config: ReverseProxy,
token: String, token: String,
mut request: Request<Body>, mut request: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> { ) -> Result<Response<Body>, anyhow::Error> {
let (hash, uri_string) = { let cx = OpenTelemetryContext::current();
let (bucket, uri_string) = {
let method = match *request.method() { let method = match *request.method() {
HttpMethod::DELETE => Method::Delete, HttpMethod::DELETE => Method::Delete,
HttpMethod::GET => Method::Get, HttpMethod::GET => Method::Get,
@ -50,20 +109,25 @@ pub async fn handle_request(
HttpMethod::POST => Method::Post, HttpMethod::POST => Method::Post,
HttpMethod::PUT => Method::Put, HttpMethod::PUT => Method::Put,
_ => { _ => {
error!("Unsupported HTTP method in request, {}", request.method()); error!(method =? request.method(), "unsupported HTTP method in request");
bail!("unsupported method"); bail!("unsupported method");
} }
}; };
let request_path = request.uri().path(); let request_path = request.uri().path();
let (api_path, trimmed_path) = normalize_path(request_path); let (api_path, trimmed_path) = normalize_path(request_path);
trace!("normalized path to {trimmed_path}");
let mut uri_string = format!("https://discord.com{api_path}{trimmed_path}"); let mut uri_string = format!(
"{}{api_path}{trimmed_path}",
config.upstream.expect("no upstream")
);
if let Some(query) = request.uri().query() { if let Some(query) = request.uri().query() {
uri_string.push('?'); uri_string.push('?');
uri_string.push_str(query); uri_string.push_str(query);
} }
trace!("full request uri is {uri_string}");
let mut hash = DefaultHasher::new(); let mut hash = DefaultHasher::new();
match Path::try_from((method, trimmed_path)) { match Path::try_from((method, trimmed_path)) {
Ok(path) => path, Ok(path) => path,
@ -76,14 +140,36 @@ pub async fn handle_request(
} }
} }
.hash(&mut hash); .hash(&mut hash);
let bucket = hash.finish().to_string();
trace!("Request bucket is {}", bucket);
(hash.finish().to_string(), uri_string) (bucket, uri_string)
}; };
REQUESTS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
let ticket_start = SystemTime::now();
TICKET_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
// waits for the request to be authorized // waits for the request to be authorized
ratelimiter match ratelimiter
.ticket(hash.clone()) .ticket(bucket.clone())
.instrument(debug_span!("ticket validation request")) .instrument(debug_span!("ticket validation request"))
.await?; .then(|v| async {
TICKET_TIMES.record(
&cx,
ticket_start.elapsed()?.as_secs_f64(),
&[KeyValue::new("bucket", bucket.clone())],
);
v
})
.await
{
Ok(_) => {}
Err(e) => {
error!("Error when requesting the ratelimiter: {:?}", e);
bail!("failed to request the ratelimiter");
}
}
request request
.headers_mut() .headers_mut()
@ -121,7 +207,21 @@ pub async fn handle_request(
}; };
*request.uri_mut() = uri; *request.uri_mut() = uri;
let span = debug_span!("upstream request to discord"); let span = debug_span!("upstream request to discord");
let resp = match client.request(request).instrument(span).await { let upstream_start = SystemTime::now();
UPSTREAM_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
let resp = match client
.request(request)
.instrument(span)
.then(|v| async {
UPSTREAM_TIMES.record(
&cx,
upstream_start.elapsed()?.as_secs_f64(),
&[KeyValue::new("bucket", bucket.clone())],
);
v.context("")
})
.await
{
Ok(response) => response, Ok(response) => response,
Err(e) => { Err(e) => {
error!("Error when requesting the Discord API: {:?}", e); error!("Error when requesting the Discord API: {:?}", e);
@ -132,14 +232,28 @@ pub async fn handle_request(
let headers = resp let headers = resp
.headers() .headers()
.into_iter() .into_iter()
.map(|(k, v)| (k.to_string(), v.to_str().map(std::string::ToString::to_string))) .map(|(k, v)| {
(
k.to_string(),
v.to_str().map(std::string::ToString::to_string),
)
})
.filter(|f| f.1.is_ok()) .filter(|f| f.1.is_ok())
.map(|f| (f.0, f.1.expect("errors should be filtered"))) .map(|f| (f.0, f.1.expect("errors should be filtered")))
.collect(); .collect();
HEADERS_SUBMIT_CALLS.add(&cx, 1, &[KeyValue::new("bucket", bucket.clone())]);
let _submit_headers = ratelimiter let _submit_headers = ratelimiter
.submit_headers(hash, headers) .submit_headers(bucket.clone(), headers)
.instrument(info_span!("submitting headers")) .instrument(info_span!("submitting headers"))
.then(|v| async {
HEADERS_SUBMIT_TIMES.record(
&cx,
upstream_start.elapsed()?.as_secs_f64(),
&[KeyValue::new("bucket", bucket.clone())],
);
v
})
.await; .await;
Ok(resp) Ok(resp)

View file

@ -53,11 +53,12 @@ impl Component for ReverseProxyServer {
let client: Client<_, hyper::Body> = Client::builder().build(https); let client: Client<_, hyper::Body> = Client::builder().build(https);
let token = settings.config.discord.token.clone(); let token = settings.config.discord.token.clone();
let config = settings.config.clone();
let service_fn = make_service_fn(move |_: &AddrStream| { let service_fn = make_service_fn(move |_: &AddrStream| {
let client = client.clone(); let client = client.clone();
let ratelimiter = ratelimiter.clone(); let ratelimiter = ratelimiter.clone();
let token = token.clone(); let token = token.clone();
let config = config.clone();
async move { async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| { Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
let token = token.clone(); let token = token.clone();
@ -71,10 +72,11 @@ impl Component for ReverseProxyServer {
let client = client.clone(); let client = client.clone();
let ratelimiter = ratelimiter.clone(); let ratelimiter = ratelimiter.clone();
let config = config.clone();
async move { async move {
let token = token.clone(); let token = token.clone();
let ratelimiter = ratelimiter.clone(); let ratelimiter = ratelimiter.clone();
handle_request(client, ratelimiter, token, request).await handle_request(client, ratelimiter, config, token, request).await
} }
})) }))
} }

View file

@ -1,4 +1,11 @@
use leash::ignite; use leash::ignite;
use rest::ReverseProxyServer; use rest::ReverseProxyServer;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
ignite!(ReverseProxyServer); ignite!(ReverseProxyServer);

View file

@ -19,4 +19,7 @@ ed25519-dalek = "1"
twilight-model = { version = "0.14" } twilight-model = { version = "0.14" }
anyhow = "1.0.68" anyhow = "1.0.68"
async-nats = "0.25.1" async-nats = "0.26.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5"

View file

@ -1,4 +1,11 @@
use leash::ignite; use leash::ignite;
use webhook::WebhookServer; use webhook::WebhookServer;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
ignite!(WebhookServer); ignite!(WebhookServer);

View file

@ -16,4 +16,4 @@ tracing = "0.1.37"
env_logger = "0.10.0" env_logger = "0.10.0"
tracing-opentelemetry = "0.18.0" tracing-opentelemetry = "0.18.0"
opentelemetry = { version ="0.18.0", features = ["rt-tokio"] } opentelemetry = { version ="0.18.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.11.0" } opentelemetry-otlp = { version = "0.11.0", features = ["metrics"] }

View file

@ -6,10 +6,13 @@
clippy::complexity, clippy::complexity,
clippy::perf, clippy::perf,
clippy::pedantic, clippy::pedantic,
clippy::nursery, clippy::nursery
)] )]
use anyhow::Result; use anyhow::Result;
use opentelemetry::global::shutdown_tracer_provider;
use opentelemetry::sdk::export::metrics::aggregation::stateless_temporality_selector;
use opentelemetry::sdk::metrics::selectors;
use opentelemetry::sdk::propagation::TraceContextPropagator; use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry::sdk::trace::{self}; use opentelemetry::sdk::trace::{self};
use opentelemetry::sdk::Resource; use opentelemetry::sdk::Resource;
@ -18,8 +21,10 @@ use opentelemetry_otlp::WithExportConfig;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use shared::config::Settings; use shared::config::Settings;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::log::error;
use tracing::{info, log::trace}; use tracing::{info, log::trace};
use tracing_subscriber::filter::Directive; use tracing_subscriber::filter::Directive;
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@ -35,54 +40,111 @@ pub trait Component: Send + Sync + 'static + Sized {
stop: oneshot::Receiver<()>, stop: oneshot::Receiver<()>,
) -> AnyhowResultFuture<()>; ) -> AnyhowResultFuture<()>;
fn new() -> Self; fn new() -> Self;
}
fn _internal_start(self) -> AnyhowResultFuture<()> { /// # Panics
Box::pin(async move { /// Panics in case of an invalid `RUST_LOG` variable.
global::set_text_map_propagator(TraceContextPropagator::new()); pub fn start_component<Y, C>(component: Y) -> AnyhowResultFuture<()>
where
Y: Component<Config = C>,
C: Default + Clone + DeserializeOwned + Send,
{
Box::pin(async move {
let settings = Settings::<Y::Config>::new(Y::SERVICE_NAME)?;
if let Some(meter_config) = settings
.opentelemetry
.as_ref()
.and_then(|f| f.metrics.clone())
{
let meter = opentelemetry_otlp::new_pipeline()
.metrics(
selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
stateless_temporality_selector(),
opentelemetry::runtime::Tokio,
)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(meter_config.into()),
)
.with_period(Duration::from_secs(3))
.with_timeout(Duration::from_secs(10))
.build()?;
// Using the opentelemetry_otlp meter
global::set_meter_provider(meter);
}
// Use the text propagator
global::set_text_map_propagator(TraceContextPropagator::new());
// Print debug errors
global::set_error_handler(|error| {
error!("OpenTelemetry error: {}", error);
})?;
if let Some(tracer_config) = settings
.opentelemetry
.as_ref()
.and_then(|f| f.traces.clone())
{
let tracer = opentelemetry_otlp::new_pipeline() let tracer = opentelemetry_otlp::new_pipeline()
.tracing() .tracing()
.with_trace_config(trace::config().with_resource(Resource::new(vec![ .with_trace_config(trace::config().with_resource(Resource::new(vec![
KeyValue::new("service.name", Self::SERVICE_NAME), KeyValue::new("service.name", Y::SERVICE_NAME),
]))) ])))
.with_exporter(opentelemetry_otlp::new_exporter().tonic().with_env()) .with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_export_config(tracer_config.into()),
)
.install_batch(opentelemetry::runtime::Tokio)?; .install_batch(opentelemetry::runtime::Tokio)?;
let otel_layer = tracing_opentelemetry::layer().with_tracer(tracer);
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry() tracing_subscriber::registry()
.with(fmt::layer()) .with(otel_layer)
.with(telemetry)
.with( .with(
// Use the info level as default
EnvFilter::builder() EnvFilter::builder()
.with_default_directive(Directive::from_str("info").unwrap()) .with_default_directive(Directive::from_str("info").unwrap())
.from_env()?, .from_env()?,
) )
.init(); .init();
} else {
// Setup tracing
tracing_subscriber::registry()
.with(fmt::layer())
.with(
// Use the info level as default
EnvFilter::builder()
.with_default_directive(Directive::from_str("info").unwrap())
.from_env()?,
)
.init();
}
info!("Starting nova"); // Finally starting nova
let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME); info!("Starting nova component {}", Y::SERVICE_NAME);
let (stop, stop_channel) = oneshot::channel(); let (stop, stop_channel) = oneshot::channel();
tokio::spawn(async move { tokio::spawn(async move {
trace!("started signal watching"); trace!("started signal watching");
#[cfg(unix)] #[cfg(unix)]
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.unwrap() .unwrap()
.recv() .recv()
.await; .await;
#[cfg(not(unix))] #[cfg(not(unix))]
return tokio::signal::ctrl_c().await.unwrap(); return tokio::signal::ctrl_c().await.unwrap();
stop.send(()).unwrap(); stop.send(()).unwrap();
}); shutdown_tracer_provider();
});
trace!( trace!(
"Starting component {component}", "Starting component {component}",
component = Self::SERVICE_NAME component = Y::SERVICE_NAME
); );
self.start(settings?, stop_channel).await component.start(settings, stop_channel).await
}) })
}
} }
#[macro_export] #[macro_export]
@ -90,9 +152,9 @@ macro_rules! ignite {
($c:ty) => { ($c:ty) => {
#[allow(dead_code)] #[allow(dead_code)]
fn main() -> anyhow::Result<()> { fn main() -> anyhow::Result<()> {
use leash::Component; use $crate::Component;
let rt = tokio::runtime::Runtime::new()?; let rt = tokio::runtime::Runtime::new()?;
rt.block_on(<$c as Component>::new()._internal_start())?; rt.block_on($crate::start_component(<$c as Component>::new()))?;
Ok(()) Ok(())
} }
}; };
@ -128,7 +190,5 @@ mod test {
} }
} }
ignite!(TestComponent); ignite!(TestComponent);
} }

View file

@ -10,7 +10,7 @@ serde_repr = "0.1"
config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] } config = { version = "0.13", default-features = false, features = ["json", "yaml-rust", "ini"] }
async-nats = "0.25.1" async-nats = "0.26.0"
redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] } redis = { version = "0.22.1", features = ["cluster", "connection-manager", "tokio-comp"] }
tokio = { version = "1", features = ["signal", "rt"] } tokio = { version = "1", features = ["signal", "rt"] }
@ -21,4 +21,6 @@ anyhow = "1.0.68"
serde_test = "1.0.152" serde_test = "1.0.152"
tracing = "0.1.37" tracing = "0.1.37"
opentelemetry-otlp = "0.11.0"
opentelemetry = "0.18.0"

View file

@ -10,6 +10,7 @@ pub struct Settings<T: Clone + DeserializeOwned> {
pub config: T, pub config: T,
pub nats: crate::nats::Configuration, pub nats: crate::nats::Configuration,
pub redis: crate::redis::Configuration, pub redis: crate::redis::Configuration,
pub opentelemetry: Option<crate::opentelemetry::Configuration>,
} }
impl<T: Clone + DeserializeOwned + Default> Settings<T> { impl<T: Clone + DeserializeOwned + Default> Settings<T> {

View file

@ -13,3 +13,4 @@ pub mod config;
pub mod nats; pub mod nats;
pub mod payloads; pub mod payloads;
pub mod redis; pub mod redis;
pub mod opentelemetry;

View file

@ -0,0 +1,91 @@
use std::ops::{Deref, DerefMut};
use opentelemetry_otlp::{ExportConfig, Protocol};
use serde::{de::Visitor, Deserialize};
#[derive(Debug, Default)]
#[repr(transparent)]
pub struct ExportConfigDeserialize(ExportConfig);
impl Clone for ExportConfigDeserialize {
fn clone(&self) -> Self {
Self(ExportConfig {
endpoint: self.0.endpoint.clone(),
protocol: self.0.protocol,
timeout: self.0.timeout,
})
}
}
impl From<ExportConfigDeserialize> for ExportConfig {
fn from(val: ExportConfigDeserialize) -> Self {
val.0
}
}
impl Deref for ExportConfigDeserialize {
type Target = ExportConfig;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for ExportConfigDeserialize {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<'de> Deserialize<'de> for ExportConfigDeserialize {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(Deserialize, Debug)]
#[serde(field_identifier, rename_all = "lowercase")]
enum Fields {
Endpoint,
Timeout,
}
struct OpenTelemetryExportConfigVisitor;
impl<'de> Visitor<'de> for OpenTelemetryExportConfigVisitor {
type Value = ExportConfigDeserialize;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("struct OpenTelemetryExportConfig")
}
fn visit_map<A>(self, mut map: A) -> std::result::Result<Self::Value, A::Error>
where
A: serde::de::MapAccess<'de>,
{
let mut export_config = ExportConfigDeserialize::default();
export_config.0.protocol = Protocol::Grpc;
while let Some(name) = map.next_key::<Fields>()? {
match name {
Fields::Endpoint => {
export_config.0.endpoint = map.next_value()?;
}
Fields::Timeout => {
export_config.0.timeout = map.next_value()?;
}
}
}
Ok(export_config)
}
}
deserializer.deserialize_struct(
"OpenTelemetryExportConfig",
&["endpoint", "protocol", "timeout"],
OpenTelemetryExportConfigVisitor,
)
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct Configuration {
pub traces: Option<ExportConfigDeserialize>,
pub metrics: Option<ExportConfigDeserialize>,
}