restructure project

This commit is contained in:
MatthieuCoder 2023-01-02 18:59:03 +04:00
parent 46fd26962e
commit f8c2a144e2
40 changed files with 1316 additions and 594 deletions

View file

@ -7,8 +7,6 @@ RUN apt update -y && apt install libssl-dev pkg-config apt-transport-https curl
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg && \ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg && \
# Add docker repository apt source # Add docker repository apt source
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list && \ echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | tee /etc/apt/sources.list.d/docker.list && \
# Add bazel repository apt source
echo "deb [arch=amd64] https://storage.googleapis.com/bazel-apt stable jdk1.8" | tee /etc/apt/sources.list.d/bazel.list && \
# Install docker # Install docker
apt update -y && apt install docker-ce-cli -y apt update -y && apt install docker-ce-cli -y

1
.gitignore vendored
View file

@ -3,3 +3,4 @@ target/
**/local* **/local*
.ijwb .ijwb
.idea .idea
config.yml

465
Cargo.lock generated
View file

@ -37,6 +37,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "anyhow"
version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb2f989d18dd141ab8ae82f64d1a8cdd37e0840f73a406896cf5e99502fab61"
[[package]] [[package]]
name = "arc-swap" name = "arc-swap"
version = "1.5.1" version = "1.5.1"
@ -77,6 +83,27 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "async-stream"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dad5c83079eae9969be7fadefe640a1c566901f05ff91ab221de4b6f68d9507e"
dependencies = [
"async-stream-impl",
"futures-core",
]
[[package]]
name = "async-stream-impl"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10f203db73a71dfa2fb6dd22763990fa26f3d2625a6da2da900d23b87d26be27"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.60" version = "0.1.60"
@ -105,6 +132,52 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "axum"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08b108ad2665fa3f6e6a517c3d80ec3e77d224c47d605167aefaa5d7ef97fa48"
dependencies = [
"async-trait",
"axum-core",
"bitflags",
"bytes",
"futures-util",
"http",
"http-body",
"hyper",
"itoa",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper",
"tower",
"tower-http",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79b8558f5a0581152dc94dcd289132a1d377494bdeafcd41869b3258e3e2ad92"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http",
"http-body",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]] [[package]]
name = "base64" name = "base64"
version = "0.13.1" version = "0.13.1"
@ -183,9 +256,11 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
name = "cache" name = "cache"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"async-nats", "async-nats",
"futures-util", "futures-util",
"log", "log",
"proto",
"redis", "redis",
"serde", "serde",
"serde_json", "serde_json",
@ -321,16 +396,6 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "ctor"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096"
dependencies = [
"quote",
"syn",
]
[[package]] [[package]]
name = "curve25519-dalek" name = "curve25519-dalek"
version = "3.2.0" version = "3.2.0"
@ -464,6 +529,18 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257" checksum = "0688c2a7f92e427f44895cd63841bff7b29f8d7a1648b9e7e07a4a365b2e1257"
[[package]]
name = "dns-lookup"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53ecafc952c4528d9b51a458d1a8904b81783feff9fde08ab6ed2545ff396872"
dependencies = [
"cfg-if",
"libc",
"socket2",
"winapi",
]
[[package]] [[package]]
name = "ed25519" name = "ed25519"
version = "1.5.2" version = "1.5.2"
@ -557,6 +634,12 @@ dependencies = [
"instant", "instant",
] ]
[[package]]
name = "fixedbitset"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]] [[package]]
name = "flate2" name = "flate2"
version = "1.0.25" version = "1.0.25"
@ -691,8 +774,11 @@ dependencies = [
name = "gateway" name = "gateway"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"bytes", "bytes",
"futures", "futures",
"leash",
"proto",
"serde", "serde",
"serde_json", "serde_json",
"shared", "shared",
@ -733,6 +819,12 @@ dependencies = [
"wasi 0.11.0+wasi-snapshot-preview1", "wasi 0.11.0+wasi-snapshot-preview1",
] ]
[[package]]
name = "glob"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.15" version = "0.3.15"
@ -761,6 +853,21 @@ dependencies = [
"ahash", "ahash",
] ]
[[package]]
name = "hashring"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd0ddd025eccd8a2fff9865e82ef4c8ce00c4a67709036847d95cf3ccffd07a8"
dependencies = [
"siphasher",
]
[[package]]
name = "heck"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]] [[package]]
name = "hermit-abi" name = "hermit-abi"
version = "0.1.19" version = "0.1.19"
@ -816,6 +923,12 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
] ]
[[package]]
name = "http-range-header"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29"
[[package]] [[package]]
name = "httparse" name = "httparse"
version = "1.8.0" version = "1.8.0"
@ -875,6 +988,18 @@ dependencies = [
"tokio-rustls", "tokio-rustls",
] ]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]] [[package]]
name = "hyper-tls" name = "hyper-tls"
version = "0.5.0" version = "0.5.0"
@ -1013,6 +1138,16 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "leash"
version = "0.1.0"
dependencies = [
"anyhow",
"serde",
"shared",
"tokio",
]
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.139" version = "0.2.139"
@ -1070,12 +1205,24 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "matchit"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.5.0" version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]] [[package]]
name = "minimal-lexical" name = "minimal-lexical"
version = "0.2.1" version = "0.2.1"
@ -1103,6 +1250,12 @@ dependencies = [
"windows-sys 0.42.0", "windows-sys 0.42.0",
] ]
[[package]]
name = "multimap"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a"
[[package]] [[package]]
name = "native-tls" name = "native-tls"
version = "0.2.11" version = "0.2.11"
@ -1349,6 +1502,16 @@ dependencies = [
"sha1", "sha1",
] ]
[[package]]
name = "petgraph"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5014253a1331579ce62aa67443b4a658c5e7dd03d4bc6d302b94474888143"
dependencies = [
"fixedbitset",
"indexmap",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.0.12" version = "1.0.12"
@ -1415,6 +1578,16 @@ dependencies = [
"log", "log",
] ]
[[package]]
name = "prettyplease"
version = "0.1.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c8992a85d8e93a28bdf76137db888d3874e3b230dee5ed8bebac4c9f7617773"
dependencies = [
"proc-macro2",
"syn",
]
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.49" version = "1.0.49"
@ -1454,9 +1627,70 @@ dependencies = [
"thiserror", "thiserror",
] ]
[[package]]
name = "prost"
version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c01db6702aa05baa3f57dec92b8eeeeb4cb19e894e73996b32a4093289e54592"
dependencies = [
"bytes",
"prost-derive",
]
[[package]]
name = "prost-build"
version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb5320c680de74ba083512704acb90fe00f28f79207286a848e730c45dd73ed6"
dependencies = [
"bytes",
"heck",
"itertools",
"lazy_static",
"log",
"multimap",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"regex",
"syn",
"tempfile",
"which",
]
[[package]]
name = "prost-derive"
version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8842bad1a5419bca14eac663ba798f6bc19c413c2fdceb5f3ba3b0932d96720"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "prost-types"
version = "0.11.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "017f79637768cde62820bc2d4fe0e45daaa027755c323ad077767c6c5f173091"
dependencies = [
"bytes",
"prost",
]
[[package]] [[package]]
name = "proto" name = "proto"
version = "0.1.0" version = "0.1.0"
dependencies = [
"glob",
"prost",
"tonic",
"tonic-build",
]
[[package]] [[package]]
name = "protobuf" name = "protobuf"
@ -1550,6 +1784,24 @@ dependencies = [
"rand_core 0.5.1", "rand_core 0.5.1",
] ]
[[package]]
name = "ratelimit"
version = "0.1.0"
dependencies = [
"anyhow",
"futures-util",
"hyper",
"proto",
"serde",
"serde_json",
"shared",
"tokio",
"tokio-stream",
"tonic",
"tracing",
"twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
]
[[package]] [[package]]
name = "redis" name = "redis"
version = "0.22.1" version = "0.22.1"
@ -1613,13 +1865,25 @@ dependencies = [
name = "rest" name = "rest"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"dns-lookup",
"futures-util", "futures-util",
"hashring",
"http",
"hyper", "hyper",
"hyper-tls", "hyper-tls",
"lazy_static", "lazy_static",
"leash",
"proto",
"serde", "serde",
"serde_json",
"shared", "shared",
"tokio", "tokio",
"tokio-scoped",
"tokio-stream",
"tonic",
"tracing",
"twilight-http-ratelimiting 0.14.0 (git+https://github.com/MatthieuCoder/twilight.git)",
"xxhash-rust", "xxhash-rust",
] ]
@ -1706,6 +1970,12 @@ dependencies = [
"base64", "base64",
] ]
[[package]]
name = "rustversion"
version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5583e89e108996506031660fe09baa5011b9dd0341b89029313006d1fb508d70"
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.12" version = "1.0.12"
@ -1906,6 +2176,7 @@ dependencies = [
name = "shared" name = "shared"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow",
"async-nats", "async-nats",
"config", "config",
"enumflags2", "enumflags2",
@ -1951,6 +2222,12 @@ version = "1.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c" checksum = "74233d3b3b2f6d4b006dc19dee745e73e2a6bfb6f93607cd3b02bd5b00797d7c"
[[package]]
name = "siphasher"
version = "0.3.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.7" version = "0.4.7"
@ -2023,6 +2300,12 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "sync_wrapper"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]] [[package]]
name = "synstructure" name = "synstructure"
version = "0.12.6" version = "0.12.6"
@ -2168,6 +2451,16 @@ dependencies = [
"windows-sys 0.42.0", "windows-sys 0.42.0",
] ]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b74022ada614a1b4834de765f9bb43877f910cc8ce4be40e89042c9223a8bf"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "1.8.2" version = "1.8.2"
@ -2211,6 +2504,27 @@ dependencies = [
"webpki", "webpki",
] ]
[[package]]
name = "tokio-scoped"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4beb8ba13bc53ac53ce1d52b42f02e5d8060f0f42138862869beb769722b256"
dependencies = [
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-stream"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d660770404473ccd7bc9f8b28494a811bc18542b915c0855c51e8f419d5223ce"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-tungstenite" name = "tokio-tungstenite"
version = "0.17.2" version = "0.17.2"
@ -2250,6 +2564,96 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "tonic"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f219fad3b929bef19b1f86fbc0358d35daed8f2cac972037ac0dc10bbb8d5fb"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64",
"bytes",
"futures-core",
"futures-util",
"h2",
"http",
"http-body",
"hyper",
"hyper-timeout",
"percent-encoding",
"pin-project",
"prost",
"prost-derive",
"tokio",
"tokio-stream",
"tokio-util",
"tower",
"tower-layer",
"tower-service",
"tracing",
"tracing-futures",
]
[[package]]
name = "tonic-build"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5bf5e9b9c0f7e0a7c027dcfaba7b2c60816c7049171f679d99ee2ff65d0de8c4"
dependencies = [
"prettyplease",
"proc-macro2",
"prost-build",
"quote",
"syn",
]
[[package]]
name = "tower"
version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
dependencies = [
"futures-core",
"futures-util",
"indexmap",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858"
dependencies = [
"bitflags",
"bytes",
"futures-core",
"futures-util",
"http",
"http-body",
"http-range-header",
"pin-project-lite",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "tower-layer"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0"
[[package]] [[package]]
name = "tower-service" name = "tower-service"
version = "0.3.2" version = "0.3.2"
@ -2263,6 +2667,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"log",
"pin-project-lite", "pin-project-lite",
"tracing-attributes", "tracing-attributes",
"tracing-core", "tracing-core",
@ -2288,6 +2693,16 @@ dependencies = [
"once_cell", "once_cell",
] ]
[[package]]
name = "tracing-futures"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project",
"tracing",
]
[[package]] [[package]]
name = "try-lock" name = "try-lock"
version = "0.2.3" version = "0.2.3"
@ -2362,7 +2777,7 @@ dependencies = [
"serde_json", "serde_json",
"tokio", "tokio",
"tracing", "tracing",
"twilight-http-ratelimiting", "twilight-http-ratelimiting 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"twilight-model", "twilight-model",
"twilight-validate", "twilight-validate",
] ]
@ -2379,6 +2794,17 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "twilight-http-ratelimiting"
version = "0.14.0"
source = "git+https://github.com/MatthieuCoder/twilight.git#a7953514373d3e3962435e6a539e0e2504a2c2fd"
dependencies = [
"futures-util",
"http",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "twilight-model" name = "twilight-model"
version = "0.14.0" version = "0.14.0"
@ -2578,13 +3004,13 @@ dependencies = [
name = "webhook" name = "webhook"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"ctor", "anyhow",
"ed25519-dalek", "ed25519-dalek",
"hex", "hex",
"hyper", "hyper",
"lazy_static", "lazy_static",
"libc", "leash",
"rand 0.8.5", "proto",
"serde", "serde",
"serde_json", "serde_json",
"shared", "shared",
@ -2602,6 +3028,17 @@ dependencies = [
"untrusted", "untrusted",
] ]
[[package]]
name = "which"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c831fbbee9e129a8cf93e7747a82da9d95ba8e16621cae60ec2cdc849bacb7b"
dependencies = [
"either",
"libc",
"once_cell",
]
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"

View file

@ -4,7 +4,9 @@ members = [
"exes/gateway/", "exes/gateway/",
"exes/rest/", "exes/rest/",
"exes/webhook/", "exes/webhook/",
"exes/ratelimit/",
"libs/proto/", "libs/proto/",
"libs/shared/" "libs/shared/",
"libs/leash/"
] ]

View file

@ -1,26 +1,92 @@
version: "3.3" version: "3.3"
services: services:
nats:
image: bitnami/nats
restart: always
ports:
- 4222:4222
- 8222:8222
redis:
image: redis
cache: cache:
image: ghcr.io/discordnova/nova/cache image: ghcr.io/discordnova/nova/cache
restart: always
build: build:
context: . context: .
args: args:
- COMPONENT=cache - COMPONENT=cache
volumes:
- ./config.yml:/config/default.yml
environment:
- RUST_LOG=info
depends_on:
- nats
- redis
gateway: gateway:
image: ghcr.io/discordnova/nova/gateway image: ghcr.io/discordnova/nova/gateway
restart: always
build: build:
context: . context: .
args: args:
- COMPONENT=gateway - COMPONENT=gateway
volumes:
- ./config.yml:/config/default.yml
environment:
- RUST_LOG=info
depends_on:
- nats
ports:
- 9000:9000
rest: rest:
image: ghcr.io/discordnova/nova/rest image: ghcr.io/discordnova/nova/rest
restart: always
build: build:
context: . context: .
args: args:
- COMPONENT=rest - COMPONENT=rest
volumes:
- ./config.yml:/config/default.yml
environment:
- RUST_LOG=info
depends_on:
- ratelimit
ports:
- 9001:9000
- 8080:8080
webhook: webhook:
image: ghcr.io/discordnova/nova/webhook image: ghcr.io/discordnova/nova/webhook
restart: always
build: build:
context: . context: .
args: args:
- COMPONENT=webhook - COMPONENT=webhook
volumes:
- ./config.yml:/config/default.yml
environment:
- RUST_LOG=info
depends_on:
- nats
ports:
- 9002:9000
- 8081:8080
ratelimit:
image: ghcr.io/discordnova/nova/ratelimit
restart: always
build:
context: .
args:
- COMPONENT=ratelimit
volumes:
- ./config.yml:/config/default.yml
environment:
- RUST_LOG=info
depends_on:
- nats
- redis
ports:
- 9003:9000
- 8082:8080

1
exes/.gitignore vendored Normal file
View file

@ -0,0 +1 @@
ratelimit/

View file

@ -7,6 +7,7 @@ edition = "2018"
[dependencies] [dependencies]
shared = { path = "../../libs/shared" } shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
async-nats = "0.25.1" async-nats = "0.25.1"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] } serde = { version = "1.0.8", features = ["derive"] }
@ -15,3 +16,4 @@ serde_json = { version = "1.0" }
redis = "*" redis = "*"
futures-util = "*" futures-util = "*"
twilight-model = "0.14" twilight-model = "0.14"
anyhow = "1.0.68"

View file

@ -1,5 +1,4 @@
use serde::Deserialize; use serde::Deserialize;
#[derive(Debug, Deserialize, Clone, Default)] #[derive(Debug, Deserialize, Clone, Default)]
pub struct CacheConfiguration { pub struct CacheConfiguration {
pub toggles: Vec<String> pub toggles: Vec<String>

View file

@ -1,7 +1,7 @@
use std::error::Error; use std::{error::Error, pin::Pin};
use async_nats::{Client, Subscriber}; use async_nats::{Client, Subscriber};
use futures_util::stream::StreamExt; use futures_util::{stream::StreamExt, Future};
use log::info; use log::info;
use managers::{ use managers::{
automoderation::Automoderation, bans::Bans, channels::Channels, automoderation::Automoderation, bans::Bans, channels::Channels,
@ -22,7 +22,7 @@ pub enum CacheSourcedEvents {
} }
#[derive(Default)] #[derive(Default)]
struct MegaCache { struct Cache {
automoderation: Automoderation, automoderation: Automoderation,
channels: Channels, channels: Channels,
bans: Bans, bans: Bans,
@ -42,18 +42,18 @@ struct MegaCache {
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap(); let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
info!("loaded configuration: {:?}", settings); info!("loaded configuration: {:?}", settings);
let nats =
let nats: Client = settings.nats.to_client().await?; Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats).await?;
// let redis: redis::Client = settings.redis.into(); // let redis: redis::Client = settings.redis.into();
let mut cache = MegaCache::default(); let mut cache = Cache::default();
let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?; let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;
listen(&mut sub, &mut cache, settings.config.toggles).await; listen(&mut sub, &mut cache, settings.config.toggles).await;
Ok(()) Ok(())
} }
async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) { async fn listen(sub: &mut Subscriber, cache: &mut Cache, features: Vec<String>) {
while let Some(data) = sub.next().await { while let Some(data) = sub.next().await {
let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap(); let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
let event = cp.data.data; let event = cp.data.data;

View file

@ -5,7 +5,6 @@ use crate::CacheSourcedEvents;
use super::CacheManager; use super::CacheManager;
use std::future::Future; use std::future::Future;
#[derive(Default)] #[derive(Default)]
pub struct Bans {} pub struct Bans {}
impl CacheManager for Bans { impl CacheManager for Bans {

View file

@ -5,6 +5,8 @@ edition = "2018"
[dependencies] [dependencies]
shared = { path = "../../libs/shared" } shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
leash = { path = "../../libs/leash" }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
twilight-gateway = { version = "0.14" } twilight-gateway = { version = "0.14" }
twilight-model = "0.14" twilight-model = "0.14"
@ -12,3 +14,4 @@ serde = { version = "1.0.8", features = ["derive"] }
futures = "0.3" futures = "0.3"
serde_json = { version = "1.0" } serde_json = { version = "1.0" }
bytes = "*" bytes = "*"
anyhow = "*"

View file

@ -2,13 +2,20 @@ use shared::serde::{Deserialize, Serialize};
use twilight_gateway::Intents; use twilight_gateway::Intents;
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
pub struct Config { pub struct GatewayConfig {
pub token: String, pub token: String,
pub intents: Intents pub intents: Intents,
pub shard: u64,
pub shard_total: u64,
} }
impl Default for Config { impl Default for GatewayConfig {
fn default() -> Self { fn default() -> Self {
Self { intents: Intents::empty(), token: String::default() } Self {
intents: Intents::empty(),
token: String::default(),
shard_total: 1,
shard: 1,
}
} }
} }

View file

@ -1,21 +1,31 @@
use config::Config; use config::GatewayConfig;
use leash::{ignite, AnyhowResultFuture, Component};
use shared::{ use shared::{
config::Settings, config::Settings,
log::{debug, info}, log::{debug, info},
nats_crate::Client, nats_crate::Client,
payloads::{CachePayload, DispatchEventTagged, Tracing}, payloads::{CachePayload, DispatchEventTagged, Tracing},
}; };
use std::{convert::TryFrom, error::Error}; use std::{convert::TryFrom, pin::Pin};
use twilight_gateway::{Event, Shard}; use twilight_gateway::{Event, Shard};
mod config; mod config;
use futures::StreamExt; use futures::{Future, StreamExt};
use twilight_model::gateway::event::DispatchEvent; use twilight_model::gateway::event::DispatchEvent;
#[tokio::main] struct GatewayServer {}
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> { impl Component for GatewayServer {
let settings: Settings<Config> = Settings::new("gateway").unwrap(); type Config = GatewayConfig;
let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents); const SERVICE_NAME: &'static str = "gateway";
let nats: Client = settings.nats.to_client().await?;
fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
Box::pin(async move {
let (shard, mut events) = Shard::builder(settings.token.to_owned(), settings.intents)
.shard(settings.shard, settings.shard_total)?
.build();
let nats =
Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats)
.await?;
shard.start().await?; shard.start().await?;
@ -48,4 +58,12 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
} }
Ok(()) Ok(())
})
}
fn new() -> Self {
Self {}
}
} }
ignite!(GatewayServer);

View file

@ -7,6 +7,9 @@ edition = "2018"
[dependencies] [dependencies]
shared = { path = "../../libs/shared" } shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
leash = { path = "../../libs/leash" }
hyper = { version = "0.14", features = ["full"] } hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] } serde = { version = "1.0.8", features = ["derive"] }
@ -14,3 +17,13 @@ futures-util = "0.3.17"
hyper-tls = "0.5.0" hyper-tls = "0.5.0"
lazy_static = "1.4.0" lazy_static = "1.4.0"
xxhash-rust = { version = "0.8.2", features = ["xxh32"] } xxhash-rust = { version = "0.8.2", features = ["xxh32"] }
twilight-http-ratelimiting = { git = "https://github.com/MatthieuCoder/twilight.git" }
tracing = "0.1.37"
hashring = "0.3.0"
anyhow = "*"
tonic = "0.8.3"
serde_json = { version = "1.0" }
http = "0.2.8"
tokio-stream = "0.1.11"
dns-lookup = "1.0.8"
tokio-scoped = "0.2.0"

View file

@ -1,9 +1,21 @@
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use serde::Deserialize; use serde::Deserialize;
#[derive(Debug, Deserialize, Clone, Default)] fn default_listening_address() -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
}
#[derive(Debug, Deserialize, Clone)]
pub struct ServerSettings { pub struct ServerSettings {
pub port: u16, #[serde(default = "default_listening_address")]
pub address: String, pub listening_adress: SocketAddr
}
impl Default for ServerSettings {
fn default() -> Self {
Self {
listening_adress: default_listening_address(),
}
}
} }
#[derive(Debug, Deserialize, Clone, Default)] #[derive(Debug, Deserialize, Clone, Default)]
@ -12,7 +24,7 @@ pub struct Discord {
} }
#[derive(Debug, Deserialize, Clone, Default)] #[derive(Debug, Deserialize, Clone, Default)]
pub struct Config { pub struct ReverseProxyConfig {
pub server: ServerSettings, pub server: ServerSettings,
pub discord: Discord, pub discord: Discord,
} }

141
exes/rest/src/handler.rs Normal file
View file

@ -0,0 +1,141 @@
use std::{
collections::hash_map::DefaultHasher,
convert::TryFrom,
hash::{Hash, Hasher},
str::FromStr,
};
use anyhow::bail;
use http::{
header::{AUTHORIZATION, CONNECTION, HOST, TRANSFER_ENCODING, UPGRADE},
HeaderValue, Method as HttpMethod, Request, Response, Uri,
};
use hyper::{client::HttpConnector, Body, Client};
use hyper_tls::HttpsConnector;
use shared::log::error;
use twilight_http_ratelimiting::{Method, Path};
use crate::ratelimit_client::RemoteRatelimiter;
/// Normalizes the path
fn normalize_path(request_path: &str) -> (&str, &str) {
if let Some(trimmed_path) = request_path.strip_prefix("/api") {
if let Some(maybe_api_version) = trimmed_path.split('/').nth(1) {
if let Some(version_number) = maybe_api_version.strip_prefix('v') {
if version_number.parse::<u8>().is_ok() {
let len = "/api/v".len() + version_number.len();
return (&request_path[..len], &request_path[len..]);
};
};
}
("/api", trimmed_path)
} else {
("/api", request_path)
}
}
pub async fn handle_request(
client: Client<HttpsConnector<HttpConnector>, Body>,
ratelimiter: RemoteRatelimiter,
token: String,
mut request: Request<Body>,
) -> Result<Response<Body>, anyhow::Error> {
let (hash, uri_string) = {
let method = match *request.method() {
HttpMethod::DELETE => Method::Delete,
HttpMethod::GET => Method::Get,
HttpMethod::PATCH => Method::Patch,
HttpMethod::POST => Method::Post,
HttpMethod::PUT => Method::Put,
_ => {
error!("Unsupported HTTP method in request, {}", request.method());
bail!("unsupported method");
}
};
let request_path = request.uri().path();
let (api_path, trimmed_path) = normalize_path(&request_path);
let mut uri_string = format!("http://192.168.0.27:8000{}{}", api_path, trimmed_path);
if let Some(query) = request.uri().query() {
uri_string.push('?');
uri_string.push_str(query);
}
let mut hash = DefaultHasher::new();
match Path::try_from((method, trimmed_path)) {
Ok(path) => path,
Err(e) => {
error!(
"Failed to parse path for {:?} {}: {:?}",
method, trimmed_path, e
);
bail!("failed o parse");
}
}
.hash(&mut hash);
(hash.finish().to_string(), uri_string)
};
let header_sender = match ratelimiter.ticket(hash).await {
Ok(sender) => sender,
Err(e) => {
error!("Failed to receive ticket for ratelimiting: {:?}", e);
bail!("failed to reteive ticket");
}
};
request.headers_mut().insert(
AUTHORIZATION,
HeaderValue::from_bytes(token.as_bytes())
.expect("strings are guaranteed to be valid utf-8"),
);
request
.headers_mut()
.insert(HOST, HeaderValue::from_static("discord.com"));
// Remove forbidden HTTP/2 headers
// https://datatracker.ietf.org/doc/html/rfc7540#section-8.1.2.2
request.headers_mut().remove(CONNECTION);
request.headers_mut().remove("keep-alive");
request.headers_mut().remove("proxy-connection");
request.headers_mut().remove(TRANSFER_ENCODING);
request.headers_mut().remove(UPGRADE);
request.headers_mut().remove(AUTHORIZATION);
request.headers_mut().append(
AUTHORIZATION,
HeaderValue::from_static(
"Bot ODA3MTg4MzM1NzE3Mzg0MjEy.G3sXFM.8gY2sVYDAq2WuPWwDskAAEFLfTg8htooxME-LE",
),
);
let uri = match Uri::from_str(&uri_string) {
Ok(uri) => uri,
Err(e) => {
error!("Failed to create URI for requesting Discord API: {:?}", e);
bail!("failed to create uri");
}
};
*request.uri_mut() = uri;
let resp = match client.request(request).await {
Ok(response) => response,
Err(e) => {
error!("Error when requesting the Discord API: {:?}", e);
bail!("failed to request the discord api");
}
};
let ratelimit_headers = resp
.headers()
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_str().unwrap().to_string()))
.collect();
if header_sender.send(ratelimit_headers).is_err() {
error!("Error when sending ratelimit headers to ratelimiter");
};
Ok(resp)
}

View file

@ -1,46 +1,56 @@
use std::{convert::Infallible, sync::Arc}; use config::ReverseProxyConfig;
use crate::{config::Config, ratelimit::Ratelimiter}; use handler::handle_request;
use shared::{ use hyper::{
config::Settings, server::conn::AddrStream,
log::{error, info}, service::{make_service_fn, service_fn},
redis_crate::Client, Body, Client, Request, Server,
}; };
use hyper::{server::conn::AddrStream, service::make_service_fn, Server}; use hyper_tls::HttpsConnector;
use std::net::ToSocketAddrs; use leash::{ignite, AnyhowResultFuture, Component};
use tokio::sync::Mutex; use shared::config::Settings;
use std::convert::Infallible;
use crate::proxy::ServiceProxy;
mod config; mod config;
mod proxy; mod handler;
mod ratelimit; mod ratelimit_client;
#[tokio::main] struct ReverseProxyServer {}
async fn main() { impl Component for ReverseProxyServer {
let settings: Settings<Config> = Settings::new("rest").unwrap(); type Config = ReverseProxyConfig;
let config = Arc::new(settings.config); const SERVICE_NAME: &'static str = "rest";
let redis_client: Client = settings.redis.into();
let redis = Arc::new(Mutex::new(
redis_client.get_async_connection().await.unwrap(),
));
let ratelimiter = Arc::new(Ratelimiter::new(redis));
let addr = format!("{}:{}", config.server.address, config.server.port) fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
.to_socket_addrs() Box::pin(async move {
.unwrap() // Client to the remote ratelimiters
.next() let ratelimiter = ratelimit_client::RemoteRatelimiter::new();
.unwrap(); let client = Client::builder().build(HttpsConnector::new());
let service_fn = make_service_fn(move |_: &AddrStream| { let service_fn = make_service_fn(move |_: &AddrStream| {
let service_proxy = ServiceProxy::new(config.clone(), ratelimiter.clone()); let client = client.clone();
async move { Ok::<_, Infallible>(service_proxy) } let ratelimiter = ratelimiter.clone();
async move {
Ok::<_, Infallible>(service_fn(move |request: Request<Body>| {
let client = client.clone();
let ratelimiter = ratelimiter.clone();
async move {
handle_request(client, ratelimiter, "token".to_string(), request).await
}
}))
}
}); });
let server = Server::bind(&addr).serve(service_fn); let server = Server::bind(&settings.config.server.listening_adress).serve(service_fn);
info!("starting ratelimit server"); server.await?;
if let Err(e) = server.await {
error!("server error: {}", e); Ok(())
})
}
fn new() -> Self {
Self {}
} }
} }
ignite!(ReverseProxyServer);

View file

@ -1,138 +0,0 @@
use crate::{config::Config, ratelimit::Ratelimiter};
use hyper::{
client::HttpConnector, header::HeaderValue, http::uri::Parts, service::Service, Body, Client,
Request, Response, Uri,
};
use hyper_tls::HttpsConnector;
use shared::{
log::debug,
prometheus::{labels, opts, register_counter, register_histogram_vec, Counter, HistogramVec},
};
use std::{future::Future, pin::Pin, sync::Arc, task::Poll};
use tokio::sync::Mutex;
lazy_static::lazy_static! {
static ref HTTP_COUNTER: Counter = register_counter!(opts!(
"nova_rest_http_requests_total",
"Number of HTTP requests made.",
labels! {"handler" => "all",}
))
.unwrap();
static ref HTTP_REQ_HISTOGRAM: HistogramVec = register_histogram_vec!(
"nova_rest_http_request_duration_seconds",
"The HTTP request latencies in seconds.",
&["handler"]
)
.unwrap();
static ref HTTP_COUNTER_STATUS: Counter = register_counter!(opts!(
"nova_rest_http_requests_status",
"Number of HTTP requests made by status",
labels! {"" => ""}
))
.unwrap();
}
#[derive(Clone)]
pub struct ServiceProxy {
client: Client<HttpsConnector<HttpConnector>>,
ratelimiter: Arc<Ratelimiter>,
config: Arc<Config>,
fail: Arc<Mutex<i32>>,
}
impl Service<Request<Body>> for ServiceProxy {
type Response = Response<Body>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
match self.client.poll_ready(cx) {
Poll::Ready(Ok(())) => Poll::Ready(Ok(())),
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Pending => Poll::Pending,
}
}
fn call(&mut self, mut req: Request<hyper::Body>) -> Self::Future {
HTTP_COUNTER.inc();
let timer = HTTP_REQ_HISTOGRAM.with_label_values(&["all"]).start_timer();
let host = "discord.com";
let mut new_parts = Parts::default();
let path = req.uri().path().to_string();
new_parts.scheme = Some("https".parse().unwrap());
new_parts.authority = Some(host.parse().unwrap());
new_parts.path_and_query = Some(path.parse().unwrap());
*req.uri_mut() = Uri::from_parts(new_parts).unwrap();
let headers = req.headers_mut();
headers.remove("user-agent");
headers.insert("Host", HeaderValue::from_str("discord.com").unwrap());
headers.insert(
"Authorization",
HeaderValue::from_str(&format!("Bot {}", self.config.discord.token)).unwrap(),
);
println!("{:?}", headers);
let client = self.client.clone();
let ratelimiter = self.ratelimiter.clone();
let fail = self.fail.clone();
return Box::pin(async move {
let resp = match ratelimiter.before_request(&req).await {
Ok(allowed) => match allowed {
crate::ratelimit::RatelimiterResponse::Ratelimited => {
debug!("ratelimited");
Ok(Response::builder().body("ratelimited".into()).unwrap())
}
_ => {
debug!("forwarding request");
match client.request(req).await {
Ok(mut response) => {
ratelimiter.after_request(&path, &response).await;
if response.status() != 200 {
*fail.lock().await += 1
}
response.headers_mut().insert(
"x-fails",
HeaderValue::from_str(&format!("{}", fail.lock().await))
.unwrap(),
);
Ok(response)
}
Err(e) => Err(e),
}
}
},
Err(e) => Ok(Response::builder()
.body(format!("server error: {}", e).into())
.unwrap()),
};
timer.observe_duration();
resp
});
}
}
impl ServiceProxy {
pub fn new(config: Arc<Config>, ratelimiter: Arc<Ratelimiter>) -> Self {
let https = HttpsConnector::new();
let client = Client::builder().build::<_, hyper::Body>(https);
let fail = Arc::new(Mutex::new(0));
ServiceProxy {
client,
config,
ratelimiter,
fail,
}
}
}

View file

@ -1,155 +0,0 @@
use shared::{
log::debug,
redis_crate::{aio::Connection, AsyncCommands}, error::GenericError,
};
use hyper::{Body, Request, Response};
use std::{
convert::TryInto,
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use tokio::sync::Mutex;
use xxhash_rust::xxh32::xxh32;
pub enum RatelimiterResponse {
NoSuchUrl,
Ratelimited,
Pass,
}
pub struct Ratelimiter {
redis: Arc<Mutex<Connection>>,
}
impl Ratelimiter {
pub fn new(redis: Arc<Mutex<Connection>>) -> Ratelimiter {
return Ratelimiter { redis };
}
pub async fn before_request(
&self,
request: &Request<Body>,
) -> Result<RatelimiterResponse, GenericError> {
// we lookup if the route hash is stored in the redis table
let path = request.uri().path();
let hash = xxh32(path.as_bytes(), 32);
let mut redis = self.redis.lock().await;
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
// global rate litmit
match redis
.get::<String, Option<i32>>(format!(
"nova:rest:ratelimit:global:{}",
since_the_epoch.as_secs()
))
.await
{
Ok(value) => {
match value {
Some(value) => {
debug!("incr: {}", value);
if value >= 49 {
return Ok(RatelimiterResponse::Ratelimited);
}
}
None => {
let key =
format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs());
// init global ratelimit
redis.set_ex::<String, i32, ()>(key, 0, 2).await.unwrap();
}
}
}
Err(_) => {
return Err(GenericError::StepFailed("radis ratelimit check".to_string()));
}
};
// we lookup the corresponding bucket for this url
match redis
.get::<String, Option<String>>(format!("nova:rest:ratelimit:url_bucket:{}", hash))
.await
{
Ok(bucket) => match bucket {
Some(bucket) => {
match redis
.exists::<String, bool>(format!("nova:rest:ratelimit:lock:{}", bucket))
.await
{
Ok(exists) => {
if exists {
Ok(RatelimiterResponse::Ratelimited)
} else {
Ok(RatelimiterResponse::Pass)
}
}
Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())),
}
}
None => Ok(RatelimiterResponse::NoSuchUrl),
},
Err(_) => Err(GenericError::StepFailed("radis ratelimit check".to_string())),
}
}
fn parse_headers(&self, response: &Response<Body>) -> Option<(String, i32, i32)> {
if let Some(bucket) = response.headers().get("X-RateLimit-Bucket") {
let bucket = bucket.to_str().unwrap().to_string();
let remaining = response.headers().get("X-RateLimit-Remaining").unwrap();
let reset = response.headers().get("X-RateLimit-Reset-After").unwrap();
let remaining_i32 = remaining.to_str().unwrap().parse::<i32>().unwrap();
let reset_ms_i32 = reset.to_str().unwrap().parse::<f32>().unwrap().ceil() as i32;
return Some((bucket, remaining_i32, reset_ms_i32));
} else {
None
}
}
pub async fn after_request(&self, path: &str, response: &Response<Body>) {
let hash = xxh32(path.as_bytes(), 32);
// verified earlier
let mut redis = self.redis.lock().await;
let start = SystemTime::now();
let since_the_epoch = start
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
redis
.incr::<String, i32, ()>(
format!("nova:rest:ratelimit:global:{}", since_the_epoch.as_secs()),
1,
)
.await
.unwrap();
if let Some((bucket, remaining, reset)) = self.parse_headers(response) {
if remaining <= 1 {
// we set a lock for the bucket until the timeout passes
redis
.set_ex::<String, bool, ()>(
format!("nova:rest:ratelimit:lock:{}", bucket),
true,
reset.try_into().unwrap(),
)
.await
.unwrap();
}
redis
.set_ex::<String, String, ()>(
format!("nova:rest:ratelimit:url_bucket:{}", hash),
bucket,
reset.try_into().unwrap(),
)
.await
.unwrap();
}
}
}

View file

@ -0,0 +1,137 @@
use self::remote_hashring::{HashRingWrapper, VNode};
use futures_util::Future;
use proto::nova::ratelimit::ratelimiter::bucket_submit_ticket_request::{Data, Headers};
use proto::nova::ratelimit::ratelimiter::BucketSubmitTicketRequest;
use shared::log::debug;
use std::collections::HashMap;
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
use std::time::{Duration, SystemTime};
use tokio::sync::oneshot::{self};
use tokio::sync::{broadcast, mpsc, RwLock};
use tokio_stream::wrappers::ReceiverStream;
mod remote_hashring;
#[derive(Clone, Debug)]
pub struct RemoteRatelimiter {
remotes: Arc<RwLock<HashRingWrapper>>,
stop: Arc<tokio::sync::broadcast::Sender<()>>,
}
impl Drop for RemoteRatelimiter {
fn drop(&mut self) {
self.stop.clone().send(()).unwrap();
}
}
impl RemoteRatelimiter {
async fn get_ratelimiters(&self) -> Result<(), anyhow::Error> {
// get list of dns responses
let responses = dns_lookup::lookup_host("ratelimit")
.unwrap()
.into_iter()
.map(|f| f.to_string());
let mut write = self.remotes.write().await;
for ip in responses {
let a = VNode::new(ip.into()).await?;
write.add(a.clone());
}
return Ok(());
}
#[must_use]
pub fn new() -> Self {
let (rx, mut tx) = broadcast::channel(1);
let obj = Self {
remotes: Arc::new(RwLock::new(HashRingWrapper::default())),
stop: Arc::new(rx),
};
let obj_clone = obj.clone();
// Task to update the ratelimiters in the background
tokio::spawn(async move {
loop {
let sleep = tokio::time::sleep(Duration::from_secs(10));
tokio::pin!(sleep);
debug!("refreshing");
obj_clone.get_ratelimiters().await.unwrap();
tokio::select! {
() = &mut sleep => {
println!("timer elapsed");
},
_ = tx.recv() => {}
}
}
});
obj
}
pub fn ticket(
&self,
path: String,
) -> Pin<
Box<
dyn Future<Output = anyhow::Result<oneshot::Sender<HashMap<String, String>>>>
+ Send
+ 'static,
>,
> {
let remotes = self.remotes.clone();
let (tx, rx) = oneshot::channel::<HashMap<String, String>>();
Box::pin(async move {
// Get node managing this path
let mut node = (*remotes.read().await.get(&path).unwrap()).clone();
// Buffers for the gRPC streaming channel.
let (send, remote) = mpsc::channel(5);
let (do_request, wait) = oneshot::channel();
// Tonic requires a stream to be used; Since we use a mpsc channel, we can create a stream from it
let stream = ReceiverStream::new(remote);
// Start the grpc streaming
let ticket = node.submit_ticket(stream).await?;
// First, send the request
send.send(BucketSubmitTicketRequest {
data: Some(Data::Path(path)),
})
.await?;
// We continuously listen for events in the channel.
tokio::spawn(async move {
let message = ticket.into_inner().message().await.unwrap().unwrap();
if message.accepted == 1 {
do_request.send(()).unwrap();
let headers = rx.await.unwrap();
send.send(BucketSubmitTicketRequest {
data: Some(Data::Headers(Headers {
precise_time: SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("time went backwards")
.as_millis() as u64,
headers,
})),
})
.await
.unwrap();
}
});
// Wait for the message to be sent
wait.await?;
Ok(tx)
})
}
}

View file

@ -0,0 +1,67 @@
use core::fmt::Debug;
use proto::nova::ratelimit::ratelimiter::ratelimiter_client::RatelimiterClient;
use std::hash::Hash;
use std::ops::Deref;
use std::ops::DerefMut;
use tonic::transport::Channel;
#[derive(Debug, Clone)]
pub struct VNode {
address: String,
client: RatelimiterClient<Channel>,
}
impl Deref for VNode {
type Target = RatelimiterClient<Channel>;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl DerefMut for VNode {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
}
impl Hash for VNode {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.address.hash(state);
}
}
impl VNode {
pub async fn new(address: String) -> Result<Self, tonic::transport::Error> {
let client = RatelimiterClient::connect(format!("http://{}:8080", address.clone())).await?;
Ok(VNode { client, address })
}
}
unsafe impl Send for VNode {}
#[repr(transparent)]
#[derive(Default)]
pub struct HashRingWrapper(hashring::HashRing<VNode>);
impl Deref for HashRingWrapper {
type Target = hashring::HashRing<VNode>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl DerefMut for HashRingWrapper {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl Debug for HashRingWrapper {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("HashRing").finish()
}
}

View file

@ -7,15 +7,16 @@ edition = "2018"
hyper = { version = "0.14", features = ["full"] } hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
shared = { path = "../../libs/shared" } shared = { path = "../../libs/shared" }
proto = { path = "../../libs/proto" }
leash = { path = "../../libs/leash" }
serde = { version = "1.0.8", features = ["derive"] } serde = { version = "1.0.8", features = ["derive"] }
hex = "0.4.3" hex = "0.4.3"
serde_json = { version = "1.0" } serde_json = { version = "1.0" }
libc = "0.2.101"
lazy_static = "1.4.0" lazy_static = "1.4.0"
ctor = "0.1.21"
ed25519-dalek = "1" ed25519-dalek = "1"
twilight-model = { version = "0.14" } twilight-model = { version = "0.14" }
rand = "0.8" anyhow = "1.0.68"
[[bin]] [[bin]]
name = "webhook" name = "webhook"

View file

@ -1,19 +1,43 @@
use serde::Deserialize; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
#[derive(Debug, Deserialize, Clone, Default)] use ed25519_dalek::PublicKey;
pub struct ServerSettings { use serde::{Deserialize, Deserializer};
pub port: u16,
pub address: String, fn default_listening_address() -> SocketAddr {
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 8080))
} }
#[derive(Debug, Deserialize, Clone, Default)] #[derive(Debug, Deserialize, Clone, Copy)]
pub struct ServerSettings {
#[serde(default = "default_listening_address")]
pub listening_adress: SocketAddr,
}
impl Default for ServerSettings {
fn default() -> Self {
Self {
listening_adress: default_listening_address(),
}
}
}
fn deserialize_pk<'de, D>(deserializer: D) -> Result<PublicKey, D::Error>
where
D: Deserializer<'de>,
{
let str = String::deserialize(deserializer)?;
let public_key = PublicKey::from_bytes(&hex::decode(&str).unwrap()).unwrap();
Ok(public_key)
}
#[derive(Debug, Deserialize, Clone, Default, Copy)]
pub struct Discord { pub struct Discord {
pub public_key: String, #[serde(deserialize_with = "deserialize_pk")]
pub public_key: PublicKey,
pub client_id: u32, pub client_id: u32,
} }
#[derive(Debug, Deserialize, Clone, Default)] #[derive(Debug, Deserialize, Clone, Default, Copy)]
pub struct Config { pub struct WebhookConfig {
pub server: ServerSettings, pub server: ServerSettings,
pub discord: Discord, pub discord: Discord,
} }

View file

@ -1,13 +1,12 @@
use super::error::WebhookError; use super::error::WebhookError;
use super::signature::validate_signature; use super::signature::validate_signature;
use crate::config::Config; use crate::config::WebhookConfig;
use ed25519_dalek::PublicKey; use ed25519_dalek::PublicKey;
use hyper::{ use hyper::{
body::{to_bytes, Bytes}, body::{to_bytes, Bytes},
service::Service, service::Service,
Body, Method, Request, Response, StatusCode, Body, Method, Request, Response, StatusCode,
}; };
use serde::{Deserialize, Serialize};
use shared::nats_crate::Client; use shared::nats_crate::Client;
use shared::{ use shared::{
log::{debug, error}, log::{debug, error},
@ -17,10 +16,9 @@ use std::{
future::Future, future::Future,
pin::Pin, pin::Pin,
str::from_utf8, str::from_utf8,
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use twilight_model::gateway::event::{DispatchEvent}; use twilight_model::gateway::event::DispatchEvent;
use twilight_model::{ use twilight_model::{
application::interaction::{Interaction, InteractionType}, application::interaction::{Interaction, InteractionType},
gateway::payload::incoming::InteractionCreate, gateway::payload::incoming::InteractionCreate,
@ -28,14 +26,13 @@ use twilight_model::{
/// Hyper service used to handle the discord webhooks /// Hyper service used to handle the discord webhooks
#[derive(Clone)] #[derive(Clone)]
pub struct HandlerService { pub struct WebhookService {
pub config: Arc<Config>, pub config: WebhookConfig,
pub nats: Arc<Client>, pub nats: Client,
pub public_key: Arc<PublicKey>,
} }
impl HandlerService { impl WebhookService {
async fn check_request(&self, req: Request<Body>) -> Result<Bytes, WebhookError> { async fn check_request(req: Request<Body>, pk: PublicKey) -> Result<Bytes, WebhookError> {
if req.method() == Method::POST { if req.method() == Method::POST {
let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") { let signature = if let Some(sig) = req.headers().get("X-Signature-Ed25519") {
sig.to_owned() sig.to_owned()
@ -57,7 +54,7 @@ impl HandlerService {
let data = to_bytes(req.into_body()).await?; let data = to_bytes(req.into_body()).await?;
if validate_signature( if validate_signature(
&self.public_key, &pk,
&[timestamp.as_bytes().to_vec(), data.to_vec()].concat(), &[timestamp.as_bytes().to_vec(), data.to_vec()].concat(),
signature.to_str()?, signature.to_str()?,
) { ) {
@ -74,10 +71,11 @@ impl HandlerService {
} }
async fn process_request( async fn process_request(
&mut self,
req: Request<Body>, req: Request<Body>,
nats: Client,
pk: PublicKey,
) -> Result<Response<Body>, WebhookError> { ) -> Result<Response<Body>, WebhookError> {
match self.check_request(req).await { match Self::check_request(req, pk).await {
Ok(data) => { Ok(data) => {
let utf8 = from_utf8(&data); let utf8 = from_utf8(&data);
match utf8 { match utf8 {
@ -86,7 +84,7 @@ impl HandlerService {
match value.kind { match value.kind {
InteractionType::Ping => Ok(Response::builder() InteractionType::Ping => Ok(Response::builder()
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.body(serde_json::to_string(&Ping { t: 1 }).unwrap().into()) .body(r#"{"t":1}"#.into())
.unwrap()), .unwrap()),
_ => { _ => {
debug!("calling nats"); debug!("calling nats");
@ -106,10 +104,13 @@ impl HandlerService {
let payload = serde_json::to_string(&data).unwrap(); let payload = serde_json::to_string(&data).unwrap();
match self.nats.request( match nats
.request(
"nova.cache.dispatch.INTERACTION_CREATE".to_string(), "nova.cache.dispatch.INTERACTION_CREATE".to_string(),
Bytes::from(payload), Bytes::from(payload),
).await { )
.await
{
Ok(response) => Ok(Response::builder() Ok(response) => Ok(Response::builder()
.header("Content-Type", "application/json") .header("Content-Type", "application/json")
.body(Body::from(response.reply.unwrap())) .body(Body::from(response.reply.unwrap()))
@ -144,15 +145,9 @@ impl HandlerService {
} }
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct Ping {
#[serde(rename = "type")]
t: i32,
}
/// Implementation of the service /// Implementation of the service
impl Service<Request<Body>> for HandlerService { impl Service<hyper::Request<Body>> for WebhookService {
type Response = Response<Body>; type Response = hyper::Response<Body>;
type Error = hyper::Error; type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
@ -161,9 +156,10 @@ impl Service<Request<Body>> for HandlerService {
} }
fn call(&mut self, req: Request<Body>) -> Self::Future { fn call(&mut self, req: Request<Body>) -> Self::Future {
let mut clone = self.clone(); let future =
Self::process_request(req, self.nats.clone(), self.config.discord.public_key);
Box::pin(async move { Box::pin(async move {
let response = clone.process_request(req).await; let response = future.await;
match response { match response {
Ok(r) => Ok(r), Ok(r) => Ok(r),

View file

@ -1,22 +1,15 @@
use super::handler::HandlerService;
use crate::config::Config;
use hyper::service::Service; use hyper::service::Service;
use shared::nats_crate::Client;
use std::{ use std::{
future::{ready, Ready}, future::{ready, Ready},
sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
}; };
use ed25519_dalek::PublicKey;
pub struct MakeSvc { pub struct MakeSvc<T: Clone> {
pub settings: Arc<Config>, pub service: T,
pub nats: Arc<Client>,
pub public_key: Arc<PublicKey>
} }
impl<T> Service<T> for MakeSvc { impl<T, V: Clone> Service<T> for MakeSvc<V> {
type Response = HandlerService; type Response = V;
type Error = std::io::Error; type Error = std::io::Error;
type Future = Ready<Result<Self::Response, Self::Error>>; type Future = Ready<Result<Self::Response, Self::Error>>;
@ -25,10 +18,12 @@ impl<T> Service<T> for MakeSvc {
} }
fn call(&mut self, _: T) -> Self::Future { fn call(&mut self, _: T) -> Self::Future {
ready(Ok(HandlerService { ready(Ok(self.service.clone()))
config: self.settings.clone(), }
nats: self.nats.clone(), }
public_key: self.public_key.clone()
})) impl<T: Clone> MakeSvc<T> {
pub fn new(service: T) -> Self {
Self { service }
} }
} }

View file

@ -1,5 +1,5 @@
mod error; mod error;
mod handler; pub mod handler;
pub mod make_service; pub mod make_service;
mod signature; mod signature;

View file

@ -5,7 +5,10 @@ use ed25519_dalek::PublicKey;
fn validate_signature_test() { fn validate_signature_test() {
let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002"; let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
let content = "message de test incroyable".as_bytes().to_vec(); let content = "message de test incroyable".as_bytes().to_vec();
let public_key = PublicKey::from_bytes(&hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap()).unwrap(); let public_key = PublicKey::from_bytes(
&hex::decode("eefe0c24473737cb2035232e3b4eb91c206f0a14684168f3503f7d8316058d6f").unwrap(),
)
.unwrap();
assert!(validate_signature(&public_key, &content, signature)) assert!(validate_signature(&public_key, &content, signature))
} }
@ -13,7 +16,10 @@ fn validate_signature_test() {
#[test] #[test]
fn validate_signature_reverse_test() { fn validate_signature_reverse_test() {
let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002"; let signature = "543ec3547d57f9ddb1ec4c5c36503ebf288ffda3da3d510764c9a49c2abb57690ef974c63d174771bdd2481de1066966f57abbec12a3ec171b9f6e2373837002";
let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap(); let public_key = PublicKey::from_bytes(
&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(),
)
.unwrap();
let content = "ceci est un test qui ne fonctionnera pas!" let content = "ceci est un test qui ne fonctionnera pas!"
.as_bytes() .as_bytes()
@ -24,7 +30,10 @@ fn validate_signature_reverse_test() {
#[test] #[test]
fn invalid_hex() { fn invalid_hex() {
let signature = "zzz"; let signature = "zzz";
let public_key = PublicKey::from_bytes(&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap()).unwrap(); let public_key = PublicKey::from_bytes(
&hex::decode("c029eea18437292c87c62aec34e7d1bd4e38fe6126f3f7c446de6375dc666044").unwrap(),
)
.unwrap();
let content = "ceci est un test qui ne fonctionnera pas!" let content = "ceci est un test qui ne fonctionnera pas!"
.as_bytes() .as_bytes()

View file

@ -1,45 +1,47 @@
use std::{net::ToSocketAddrs, sync::Arc};
mod config; mod config;
mod handler; mod handler;
use crate::handler::make_service::MakeSvc; use std::{future::Future, pin::Pin};
use crate::config::Config; use crate::{
use ed25519_dalek::PublicKey; config::WebhookConfig,
handler::{handler::WebhookService, make_service::MakeSvc},
};
use hyper::Server; use hyper::Server;
use shared::config::Settings; use leash::{ignite, AnyhowResultFuture, Component};
use shared::log::{error, info}; use shared::{config::Settings, log::info, nats_crate::Client};
#[tokio::main] #[derive(Clone, Copy)]
async fn main() { struct WebhookServer {}
let settings: Settings<Config> = Settings::new("webhook").unwrap();
start(settings).await;
}
async fn start(settings: Settings<Config>) { impl Component for WebhookServer {
let addr = format!( type Config = WebhookConfig;
"{}:{}", const SERVICE_NAME: &'static str = "webhook";
settings.config.server.address, settings.config.server.port
)
.to_socket_addrs()
.unwrap()
.next()
.unwrap();
info!( fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()> {
"Starting server on {}:{}", Box::pin(async move {
settings.config.server.address, settings.config.server.port info!("Starting server on {}", settings.server.listening_adress);
);
let config = Arc::new(settings.config); let bind = settings.server.listening_adress;
let public_key = let nats =
Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap()); Into::<Pin<Box<dyn Future<Output = anyhow::Result<Client>>>>>::into(settings.nats)
let server = Server::bind(&addr).serve(MakeSvc { .await?;
settings: config,
nats: Arc::new(settings.nats.to_client().await.unwrap()), let make_service = MakeSvc::new(WebhookService {
public_key: public_key, config: settings.config,
nats: nats.clone(),
}); });
if let Err(e) = server.await { let server = Server::bind(&bind).serve(make_service);
error!("server error: {}", e);
server.await?;
Ok(())
})
}
fn new() -> Self {
Self {}
} }
} }
ignite!(WebhookServer);

13
libs/leash/Cargo.toml Normal file
View file

@ -0,0 +1,13 @@
[package]
name = "leash"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
shared = { path = "../shared" }
anyhow = "1.0.68"
tokio = { version = "1.23.0", features = ["full"] }
serde = "1.0.152"

70
libs/leash/src/lib.rs Normal file
View file

@ -0,0 +1,70 @@
use anyhow::Result;
use serde::de::DeserializeOwned;
use shared::config::Settings;
use std::{future::Future, pin::Pin};
pub type AnyhowResultFuture<T> = Pin<Box<dyn Future<Output = Result<T>>>>;
pub trait Component: Send + Sync + 'static + Sized {
type Config: Default + Clone + DeserializeOwned;
const SERVICE_NAME: &'static str;
fn start(&self, settings: Settings<Self::Config>) -> AnyhowResultFuture<()>;
fn new() -> Self;
fn _internal_start(self) -> AnyhowResultFuture<()> {
Box::pin(async move {
let settings = Settings::<Self::Config>::new(Self::SERVICE_NAME);
// Start the grpc healthcheck
tokio::spawn(async move {});
// Start the prometheus monitoring job
tokio::spawn(async move {});
self.start(settings?).await
})
}
}
#[macro_export]
macro_rules! ignite {
($c:ty) => {
#[allow(dead_code)]
fn main() -> anyhow::Result<()> {
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(Box::new(<$c as Component>::new())._internal_start())?;
Ok(())
}
};
}
#[cfg(test)]
mod test {
use serde::Deserialize;
use crate::Component;
#[derive(Clone, Copy)]
struct TestComponent {}
#[derive(Default, Clone, Deserialize, Copy)]
struct TestComponentConfig {}
impl Component for TestComponent {
type Config = TestComponentConfig;
const SERVICE_NAME: &'static str = "test_component";
fn start(
&self,
_settings: shared::config::Settings<Self::Config>,
) -> crate::AnyhowResultFuture<()> {
Box::pin(async move { Ok(()) })
}
fn new() -> Self {
Self {}
}
}
ignite!(TestComponent);
}

View file

@ -4,3 +4,9 @@ version = "0.1.0"
edition = "2018" edition = "2018"
[dependencies] [dependencies]
tonic = "0.8.3"
prost = "0.11.5"
[build-dependencies]
tonic-build = "0.8.4"
glob = "0.3.0"

11
libs/proto/build.rs Normal file
View file

@ -0,0 +1,11 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
let paths: Vec<String> = glob::glob("../../proto/nova/**/*.proto")?
.map(|f| f.unwrap().to_str().unwrap().to_string())
.collect();
tonic_build::configure()
.include_file("genproto.rs")
.compile(&paths, &["../../proto"])?;
Ok(())
}

View file

@ -0,0 +1 @@
include!(concat!(env!("OUT_DIR"), concat!("/", "genproto.rs")));

View file

@ -19,6 +19,7 @@ twilight-model = "0.14"
serde_json = { version = "1.0" } serde_json = { version = "1.0" }
thiserror = "1.0.38" thiserror = "1.0.38"
inner = "0.1.1" inner = "0.1.1"
anyhow = "1.0.68"
[dependencies.redis] [dependencies.redis]
version = "*" version = "*"

View file

@ -1,17 +1,11 @@
use std::env; use std::{env, ops::Deref};
use config::{Config, Environment, File}; use config::{Config, Environment, File};
use log::info; use log::info;
use serde::Deserialize; use serde::{Deserialize, de::DeserializeOwned};
use crate::error::GenericError; use crate::error::GenericError;
/// Settings<T> is the base structure for all the nova's component config
/// you can specify a type T and the name of the component. the "config"
/// field will be equals to the key named after the given component name
/// and will be of type T
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]
#[serde(bound(deserialize = "T: Deserialize<'de> + std::default::Default + Clone"))] pub struct Settings<T: Clone + DeserializeOwned + Default> {
pub struct Settings<T> {
#[serde(skip_deserializing)] #[serde(skip_deserializing)]
pub config: T, pub config: T,
pub monitoring: crate::monitoring::MonitoringConfiguration, pub monitoring: crate::monitoring::MonitoringConfiguration,
@ -19,20 +13,11 @@ pub struct Settings<T> {
pub redis: crate::redis::RedisConfiguration, pub redis: crate::redis::RedisConfiguration,
} }
/// impl<'de, T: Clone + DeserializeOwned + Default> Settings<T>
impl<T> Settings<T>
where
T: Deserialize<'static> + std::default::Default + Clone,
{ {
/// Initializes a new configuration like the other components of nova
/// And starts the prometheus metrics server if needed.
pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> { pub fn new(service_name: &str) -> Result<Settings<T>, GenericError> {
pretty_env_logger::init();
let mut builder = Config::builder(); let mut builder = Config::builder();
// this file my be shared with all the components
builder = builder.add_source(File::with_name("config/default")); builder = builder.add_source(File::with_name("config/default"));
let mode = env::var("ENV").unwrap_or_else(|_| "development".into()); let mode = env::var("ENV").unwrap_or_else(|_| "development".into());
info!("Configuration Environment: {}", mode); info!("Configuration Environment: {}", mode);
@ -50,12 +35,14 @@ where
// try to load the config // try to load the config
settings.config = config.get::<T>(service_name)?; settings.config = config.get::<T>(service_name)?;
// start the monitoring system if needed
crate::monitoring::start_monitoring(&settings.monitoring);
Ok(settings) Ok(settings)
} }
} }
pub fn test_init() { impl<T: Clone + DeserializeOwned + Default> Deref for Settings<T> {
pretty_env_logger::init(); type Target = T;
fn deref(&self) -> &Self::Target {
&self.config
}
} }

View file

@ -1,8 +1,7 @@
use std::{future::Future, pin::Pin};
use async_nats::Client; use async_nats::Client;
use serde::Deserialize; use serde::Deserialize;
use std::future::Future;
use crate::error::GenericError;
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
pub struct NatsConfigurationClientCert { pub struct NatsConfigurationClientCert {
@ -20,10 +19,8 @@ pub struct NatsConfiguration {
pub host: String, pub host: String,
} }
// todo: Prefer From since it automatically gives a free Into implementation impl From<NatsConfiguration> for Pin<Box<dyn Future<Output = anyhow::Result<Client>>>> {
// Allows the configuration to directly create a nats connection fn from(value: NatsConfiguration) -> Self {
impl NatsConfiguration { Box::pin(async move { Ok(async_nats::connect(value.host).await?) })
pub async fn to_client(self) -> Result<Client, GenericError> {
Ok(async_nats::connect(self.host).await?)
} }
} }

View file

@ -1,6 +1,6 @@
use redis::Client; use redis::{aio::MultiplexedConnection, Client};
use serde::Deserialize; use serde::Deserialize;
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, Deserialize)] #[derive(Clone, Debug, Deserialize)]
pub struct RedisConfiguration { pub struct RedisConfiguration {
@ -13,3 +13,18 @@ impl Into<Client> for RedisConfiguration {
redis::Client::open(self.url).unwrap() redis::Client::open(self.url).unwrap()
} }
} }
impl From<RedisConfiguration>
for Pin<Box<dyn Future<Output = anyhow::Result<MultiplexedConnection>>>>
{
fn from(value: RedisConfiguration) -> Self {
Box::pin(async move {
let con = Client::open(value.url)?;
let (multiplex, ready) = con.create_multiplexed_tokio_connection().await?;
tokio::spawn(ready);
Ok(multiplex)
})
}
}

View file

@ -1,58 +0,0 @@
syntax = "proto3";
package nova.management.v1alpha;
message Empty {}
// Represents the status of a shard
enum ShardStatus {
DISCONNECTED = 0;
RUNNING = 1;
RECONNECTING = 2;
}
// represents the state of a nova shard
message ShardStatusResponse {
// Status of the shard in the cluster
ShardStatus status = 1;
// Index of the discord shard
int64 identifier = 2;
// If the cluster have a node assigned
string cluster = 3;
// the websocket latency of the shard
int64 latency = 4;
}
message ShardStatusRequest {
// the id of the shard
int64 identifier = 1;
}
// represents the status of a cluster
// (an instance of the gateway which holds multiple shards)
message ClusterStatusResponse {
// the unique id of the cluster
string id = 1;
// the node the cluster is running on
string node = 2;
// the average latency of the cluster
int64 average_latency = 3;
// list of all the shards on the cluster
repeated ShardStatusResponse shards = 4;
}
message ClusterStatusRequest {
string id = 1;
}
// Represents the status of all the nova clusters & shards
message GlobalClusterStatusResponse {
int64 size = 1;
repeated ClusterStatusResponse shards = 2;
}
// used by the cli to interact with the nova manager
service ManagementService {
rpc GetGlobalClusterStatus (Empty) returns (GlobalClusterStatusResponse);
rpc GetClusterStatus (ClusterStatusRequest) returns (ClusterStatusResponse);
rpc GetShardStatus (ShardStatusRequest) returns (ShardStatusResponse);
}

View file

@ -1,5 +0,0 @@
syntax = "proto3";
import "common/management/nova.management.v1alpha.proto";
package nova.management.rpc.v1alpha;

View file

@ -0,0 +1,37 @@
syntax = "proto3";
package nova.ratelimit.ratelimiter;
service Ratelimiter {
rpc GetBucketInformation(BucketInformationRequest) returns (BucketInformationResponse);
rpc SubmitTicket(stream BucketSubmitTicketRequest) returns (stream BucketSubmitTicketResponse);
}
message BucketInformationRequest {
string path = 1;
}
message BucketInformationResponse {
uint64 limit = 1;
uint64 remaining = 2;
uint64 reset_after = 3;
uint64 started_at = 4;
}
message BucketSubmitTicketRequest {
oneof data {
string path = 1;
Headers headers = 2;
}
message Headers {
map<string, string> headers = 1;
uint64 precise_time = 2;
}
}
message BucketSubmitTicketResponse {
int64 accepted = 1;
}