add base of cache component

This commit is contained in:
MatthieuCoder 2022-12-31 22:48:40 +04:00
parent 0fcc68291a
commit 46fd26962e
27 changed files with 725 additions and 282 deletions

319
Cargo.lock generated
View file

@ -44,32 +44,39 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "983cd8b9d4b02a6dc6ffa557262eb5858a27a0038ffffe21a0f133eaa819a164"
[[package]]
name = "async-channel"
version = "1.8.0"
name = "async-nats"
version = "0.25.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf46fee83e5ccffc220104713af3292ff9bc7c64c7de289f66dae8e38d826833"
checksum = "f69bf051b7d96b3275cdea9a4abbe2e937ce6de66c742c57050c5c98b4a6db32"
dependencies = [
"concurrent-queue",
"event-listener",
"futures-core",
"base64",
"base64-url",
"bytes",
"futures",
"http",
"itertools",
"itoa",
"lazy_static",
"nkeys",
"nuid",
"once_cell",
"regex",
"ring",
"rustls-native-certs",
"rustls-pemfile",
"serde",
"serde_json",
"serde_nanos",
"serde_repr",
"subslice",
"time 0.3.17",
"tokio",
"tokio-retry",
"tokio-rustls",
"tracing",
"url",
]
[[package]]
name = "async-lock"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8101efe8695a6c17e02911402145357e718ac92d3ff88ae8419e84b1707b685"
dependencies = [
"event-listener",
"futures-lite",
]
[[package]]
name = "async-task"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a40729d2133846d9ed0ea60a8b9541bccddab49cd30f0715a1da672fe9a2524"
[[package]]
name = "async-trait"
version = "0.1.60"
@ -81,12 +88,6 @@ dependencies = [
"syn",
]
[[package]]
name = "atomic-waker"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a"
[[package]]
name = "atty"
version = "0.2.14"
@ -149,20 +150,6 @@ dependencies = [
"generic-array",
]
[[package]]
name = "blocking"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c67b173a56acffd6d2326fb7ab938ba0b00a71480e14902b2591c87bc5741e8"
dependencies = [
"async-channel",
"async-lock",
"async-task",
"atomic-waker",
"fastrand",
"futures-lite",
]
[[package]]
name = "bollard-stubs"
version = "1.41.0"
@ -196,12 +183,15 @@ checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
name = "cache"
version = "0.1.0"
dependencies = [
"async-nats",
"futures-util",
"log",
"nats",
"redis",
"serde",
"serde_json",
"shared",
"tokio",
"twilight-model",
]
[[package]]
@ -256,15 +246,6 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "concurrent-queue"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd7bef69dc86e3c610e4e7aed41035e2a7ed12e72dd7530f61327a6579a4390b"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "config"
version = "0.13.3"
@ -330,25 +311,6 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fb766fa798726286dbbb842f174001dab8abc7b627a1dd86e0b7222a95d929f"
dependencies = [
"cfg-if",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
@ -525,6 +487,12 @@ dependencies = [
"zeroize",
]
[[package]]
name = "either"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90e5c1c8368803113bf0c9584fc495a58b86dc8a29edbf8fe877d21d9507e797"
[[package]]
name = "enumflags2"
version = "0.7.5"
@ -580,12 +548,6 @@ dependencies = [
"libc",
]
[[package]]
name = "event-listener"
version = "2.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0"
[[package]]
name = "fastrand"
version = "1.8.0"
@ -684,21 +646,6 @@ version = "0.3.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "00f5fb52a06bdcadeb54e8d3671f8888a39697dcb0b81b23b55174030427f4eb"
[[package]]
name = "futures-lite"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48"
dependencies = [
"fastrand",
"futures-core",
"futures-io",
"memchr",
"parking",
"pin-project-lite",
"waker-fn",
]
[[package]]
name = "futures-macro"
version = "0.3.25"
@ -744,6 +691,7 @@ dependencies = [
name = "gateway"
version = "0.1.0"
dependencies = [
"bytes",
"futures",
"serde",
"serde_json",
@ -921,8 +869,8 @@ checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c"
dependencies = [
"http",
"hyper",
"rustls 0.20.7",
"rustls-native-certs 0.6.2",
"rustls",
"rustls-native-certs",
"tokio",
"tokio-rustls",
]
@ -1015,6 +963,15 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "itertools"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.5"
@ -1030,12 +987,6 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "json"
version = "0.12.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "078e285eafdfb6c4b434e0d31e8cfcb5115b651496faca5749b88fafd4f23bfd"
[[package]]
name = "json5"
version = "0.4.1"
@ -1170,42 +1121,6 @@ dependencies = [
"tempfile",
]
[[package]]
name = "nats"
version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3d877cd2e71146efa7065300fc5f5da967f938694b4d65e8bc64cc4a409092c"
dependencies = [
"base64",
"base64-url",
"blocking",
"crossbeam-channel",
"fastrand",
"itoa",
"json",
"lazy_static",
"libc",
"log",
"memchr",
"nkeys",
"nuid",
"once_cell",
"parking_lot",
"regex",
"ring",
"rustls 0.19.1",
"rustls-native-certs 0.5.0",
"rustls-pemfile 0.2.1",
"serde",
"serde_json",
"serde_nanos",
"serde_repr",
"time 0.3.17",
"url",
"webpki 0.21.4",
"winapi",
]
[[package]]
name = "nkeys"
version = "0.2.0"
@ -1346,12 +1261,6 @@ dependencies = [
"hashbrown",
]
[[package]]
name = "parking"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72"
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -1440,6 +1349,26 @@ dependencies = [
"sha1",
]
[[package]]
name = "pin-project"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad29a609b6bcd67fee905812e544992d216af9d755757c05ed2d0e15a74c6ecc"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.9"
@ -1744,19 +1673,6 @@ dependencies = [
"windows-sys 0.42.0",
]
[[package]]
name = "rustls"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35edb675feee39aec9c99fa5ff985081995a06d594114ae14cbe797ad7b7a6d7"
dependencies = [
"base64",
"log",
"ring",
"sct 0.6.1",
"webpki 0.21.4",
]
[[package]]
name = "rustls"
version = "0.20.7"
@ -1765,20 +1681,8 @@ checksum = "539a2bfe908f471bfa933876bd1eb6a19cf2176d375f82ef7f99530a40e48c2c"
dependencies = [
"log",
"ring",
"sct 0.7.0",
"webpki 0.22.0",
]
[[package]]
name = "rustls-native-certs"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a07b7c1885bd8ed3831c289b7870b13ef46fe0e856d288c30d9cc17d75a2092"
dependencies = [
"openssl-probe",
"rustls 0.19.1",
"schannel",
"security-framework",
"sct",
"webpki",
]
[[package]]
@ -1788,20 +1692,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0167bac7a9f490495f3c33013e7722b53cb087ecbe082fb0c6387c96f634ea50"
dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.1",
"rustls-pemfile",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.1"
@ -1839,16 +1734,6 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2"
[[package]]
name = "sct"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b362b83898e0e69f38515b82ee15aa80636befe47c3b6d3d89a911e78fc228ce"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "sct"
version = "0.7.0"
@ -2021,12 +1906,12 @@ dependencies = [
name = "shared"
version = "0.1.0"
dependencies = [
"async-nats",
"config",
"enumflags2",
"hyper",
"inner",
"log",
"nats",
"pretty_env_logger",
"prometheus",
"redis",
@ -2112,6 +1997,15 @@ version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "subslice"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0a8e4809a3bb02de01f1f7faf1ba01a83af9e8eabcd4d31dd6e413d14d56aae"
dependencies = [
"memchr",
]
[[package]]
name = "subtle"
version = "2.4.1"
@ -2295,15 +2189,26 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-retry"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
dependencies = [
"pin-project",
"rand 0.8.5",
"tokio",
]
[[package]]
name = "tokio-rustls"
version = "0.23.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59"
dependencies = [
"rustls 0.20.7",
"rustls",
"tokio",
"webpki 0.22.0",
"webpki",
]
[[package]]
@ -2314,12 +2219,12 @@ checksum = "f714dd15bead90401d77e04243611caec13726c2408afd5b31901dfcdcb3b181"
dependencies = [
"futures-util",
"log",
"rustls 0.20.7",
"rustls-native-certs 0.6.2",
"rustls",
"rustls-native-certs",
"tokio",
"tokio-rustls",
"tungstenite",
"webpki 0.22.0",
"webpki",
]
[[package]]
@ -2402,12 +2307,12 @@ dependencies = [
"httparse",
"log",
"rand 0.8.5",
"rustls 0.20.7",
"rustls",
"sha-1",
"thiserror",
"url",
"utf-8",
"webpki 0.22.0",
"webpki",
]
[[package]]
@ -2420,8 +2325,8 @@ dependencies = [
"flate2",
"futures-util",
"leaky-bucket-lite",
"rustls 0.20.7",
"rustls-native-certs 0.6.2",
"rustls",
"rustls-native-certs",
"serde",
"serde_json",
"tokio",
@ -2577,12 +2482,6 @@ version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waker-fn"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "want"
version = "0.3.0"
@ -2693,16 +2592,6 @@ dependencies = [
"twilight-model",
]
[[package]]
name = "webpki"
version = "0.21.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8e38c0608262c46d4a56202ebabdeb094cef7e560ca7a226c6bf055188aa4ea"
dependencies = [
"ring",
"untrusted",
]
[[package]]
name = "webpki"
version = "0.22.0"

View file

@ -7,8 +7,11 @@ edition = "2018"
[dependencies]
shared = { path = "../../libs/shared" }
nats = "0.23.1"
async-nats = "0.25.1"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0.8", features = ["derive"] }
log = { version = "0.4", features = ["std"] }
serde_json = { version = "1.0" }
redis = "*"
redis = "*"
futures-util = "*"
twilight-model = "0.14"

View file

@ -1,4 +1,6 @@
use serde::Deserialize;
#[derive(Debug, Deserialize, Clone, Default)]
pub struct CacheConfiguration {}
pub struct CacheConfiguration {
pub toggles: Vec<String>
}

193
exes/cache/src/main.rs vendored
View file

@ -1,14 +1,201 @@
use shared::config::Settings;
use std::error::Error;
use async_nats::{Client, Subscriber};
use futures_util::stream::StreamExt;
use log::info;
use managers::{
automoderation::Automoderation, bans::Bans, channels::Channels,
guild_schedules::GuildSchedules, guilds::Guilds, integrations::Integrations, invites::Invites,
members::Members, messages::Messages, reactions::Reactions, roles::Roles,
stage_instances::StageInstances, threads::Threads, CacheManager,
};
use shared::{config::Settings, payloads::CachePayload};
use twilight_model::gateway::event::DispatchEvent;
use crate::config::CacheConfiguration;
mod config;
mod managers;
pub enum CacheSourcedEvents {
None,
}
fn main() {
#[derive(Default)]
struct MegaCache {
automoderation: Automoderation,
channels: Channels,
bans: Bans,
guild_schedules: GuildSchedules,
guilds: Guilds,
integrations: Integrations,
invites: Invites,
members: Members,
messages: Messages,
reactions: Reactions,
roles: Roles,
stage_instances: StageInstances,
threads: Threads,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<CacheConfiguration> = Settings::new("cache").unwrap();
info!("loaded configuration: {:?}", settings);
let nats: Client = settings.nats.to_client().await?;
// let redis: redis::Client = settings.redis.into();
let mut cache = MegaCache::default();
let mut sub = nats.subscribe("nova.cache.dispatch.*".to_string()).await?;
listen(&mut sub, &mut cache, settings.config.toggles).await;
Ok(())
}
async fn listen(sub: &mut Subscriber, cache: &mut MegaCache, features: Vec<String>) {
while let Some(data) = sub.next().await {
let cp: CachePayload = serde_json::from_slice(&data.payload).unwrap();
let event = cp.data.data;
match event {
// Channel events
DispatchEvent::ChannelCreate(_)
| DispatchEvent::ChannelDelete(_)
| DispatchEvent::ChannelPinsUpdate(_)
| DispatchEvent::ChannelUpdate(_)
if features.contains(&"channels_cache".to_string()) =>
{
cache.channels.handle(event);
}
// Guild Cache
DispatchEvent::GuildCreate(_)
| DispatchEvent::GuildDelete(_)
| DispatchEvent::UnavailableGuild(_)
| DispatchEvent::GuildUpdate(_)
| DispatchEvent::WebhooksUpdate(_)
| DispatchEvent::GuildStickersUpdate(_)
| DispatchEvent::GuildEmojisUpdate(_)
| DispatchEvent::VoiceServerUpdate(_)
| DispatchEvent::GuildIntegrationsUpdate(_)
| DispatchEvent::CommandPermissionsUpdate(_)
if features.contains(&"guilds_cache".to_string()) =>
{
cache.guilds.handle(event);
}
// Guild Scheduled event
DispatchEvent::GuildScheduledEventCreate(_)
| DispatchEvent::GuildScheduledEventDelete(_)
| DispatchEvent::GuildScheduledEventUpdate(_)
| DispatchEvent::GuildScheduledEventUserAdd(_)
| DispatchEvent::GuildScheduledEventUserRemove(_)
if features.contains(&"guild_schedules_cache".to_string()) =>
{
cache.guild_schedules.handle(event);
}
// Stage events
DispatchEvent::StageInstanceCreate(_)
| DispatchEvent::StageInstanceDelete(_)
| DispatchEvent::StageInstanceUpdate(_)
if features.contains(&"stage_instances_cache".to_string()) =>
{
cache.stage_instances.handle(event);
}
// Integration events
DispatchEvent::IntegrationCreate(_)
| DispatchEvent::IntegrationDelete(_)
| DispatchEvent::IntegrationUpdate(_)
| DispatchEvent::InteractionCreate(_)
if features.contains(&"integrations_cache".to_string()) =>
{
cache.integrations.handle(event);
}
// Member events
DispatchEvent::MemberAdd(_)
| DispatchEvent::MemberRemove(_)
| DispatchEvent::MemberUpdate(_)
| DispatchEvent::MemberChunk(_)
| DispatchEvent::UserUpdate(_)
if features.contains(&"members_cache".to_string()) =>
{
cache.members.handle(event);
}
// Ban cache
DispatchEvent::BanAdd(_) | DispatchEvent::BanRemove(_)
if features.contains(&"bans_cache".to_string()) =>
{
cache.bans.handle(event);
}
// Reaction cache
DispatchEvent::ReactionAdd(_)
| DispatchEvent::ReactionRemove(_)
| DispatchEvent::ReactionRemoveAll(_)
| DispatchEvent::ReactionRemoveEmoji(_)
if features.contains(&"reactions_cache".to_string()) =>
{
cache.reactions.handle(event);
}
// Message cache
DispatchEvent::MessageCreate(_)
| DispatchEvent::MessageDelete(_)
| DispatchEvent::MessageDeleteBulk(_)
| DispatchEvent::MessageUpdate(_)
if features.contains(&"messages_cache".to_string()) =>
{
cache.messages.handle(event);
}
// Thread cache
DispatchEvent::ThreadCreate(_)
| DispatchEvent::ThreadDelete(_)
| DispatchEvent::ThreadListSync(_)
| DispatchEvent::ThreadMemberUpdate(_)
| DispatchEvent::ThreadMembersUpdate(_)
| DispatchEvent::ThreadUpdate(_)
if features.contains(&"threads_cache".to_string()) =>
{
cache.threads.handle(event);
}
// Invite cache
DispatchEvent::InviteCreate(_) | DispatchEvent::InviteDelete(_)
if features.contains(&"invites_cache".to_string()) =>
{
cache.invites.handle(event);
}
// Roles cache
DispatchEvent::RoleCreate(_)
| DispatchEvent::RoleDelete(_)
| DispatchEvent::RoleUpdate(_)
if features.contains(&"roles_cache".to_string()) =>
{
cache.roles.handle(event);
}
// Automod rules
DispatchEvent::AutoModerationRuleCreate(_)
| DispatchEvent::AutoModerationRuleDelete(_)
| DispatchEvent::AutoModerationRuleUpdate(_)
if features.contains(&"automoderation_cache".to_string()) =>
{
cache.automoderation.handle(event);
}
// Voice State
DispatchEvent::VoiceStateUpdate(_)
if features.contains(&"voice_states_cache".to_string()) => {}
_ => {
// just forward
}
}
}
}

View file

@ -0,0 +1,26 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Automoderation {}
impl CacheManager for Automoderation {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::AutoModerationRuleCreate(_) => {}
DispatchEvent::AutoModerationRuleDelete(_) => {}
DispatchEvent::AutoModerationRuleUpdate(_) => {}
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

26
exes/cache/src/managers/bans.rs vendored Normal file
View file

@ -0,0 +1,26 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Bans {}
impl CacheManager for Bans {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::BanAdd(_) => {}
DispatchEvent::BanRemove(_) => {}
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

28
exes/cache/src/managers/channels.rs vendored Normal file
View file

@ -0,0 +1,28 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Channels {}
impl CacheManager for Channels {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::ChannelCreate(_) => {}
DispatchEvent::ChannelDelete(_) => {}
DispatchEvent::ChannelPinsUpdate(_) => {}
DispatchEvent::ChannelUpdate(_) => {}
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

View file

@ -0,0 +1,29 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct GuildSchedules {}
impl CacheManager for GuildSchedules {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::GuildScheduledEventCreate(_) => {}
DispatchEvent::GuildScheduledEventDelete(_) => {}
DispatchEvent::GuildScheduledEventUpdate(_) => {}
DispatchEvent::GuildScheduledEventUserAdd(_) => {}
DispatchEvent::GuildScheduledEventUserRemove(_) => {}
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

34
exes/cache/src/managers/guilds.rs vendored Normal file
View file

@ -0,0 +1,34 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Guilds {}
impl CacheManager for Guilds {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::GuildCreate(_) => {},
DispatchEvent::GuildDelete(_) => {},
DispatchEvent::UnavailableGuild(_) => {},
DispatchEvent::GuildUpdate(_) => {},
DispatchEvent::WebhooksUpdate(_) => {},
DispatchEvent::GuildStickersUpdate(_) => {},
DispatchEvent::GuildEmojisUpdate(_) => {},
DispatchEvent::VoiceServerUpdate(_) => {},
DispatchEvent::GuildIntegrationsUpdate(_) => {},
DispatchEvent::CommandPermissionsUpdate(_) => {},
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

28
exes/cache/src/managers/integrations.rs vendored Normal file
View file

@ -0,0 +1,28 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Integrations {}
impl CacheManager for Integrations {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::IntegrationCreate(_) => {}
DispatchEvent::IntegrationDelete(_) => {}
DispatchEvent::IntegrationUpdate(_) => {}
DispatchEvent::InteractionCreate(_) => {}
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

26
exes/cache/src/managers/invites.rs vendored Normal file
View file

@ -0,0 +1,26 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Invites {}
impl CacheManager for Invites {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::InviteCreate(_) => {}
DispatchEvent::InviteDelete(_) => {}
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

29
exes/cache/src/managers/members.rs vendored Normal file
View file

@ -0,0 +1,29 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Members {}
impl CacheManager for Members {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::MemberAdd(_) => {},
DispatchEvent::MemberRemove(_) => {},
DispatchEvent::MemberUpdate(_) => {},
DispatchEvent::MemberChunk(_) => {},
DispatchEvent::UserUpdate(_) => {},
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

28
exes/cache/src/managers/messages.rs vendored Normal file
View file

@ -0,0 +1,28 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Messages {}
impl CacheManager for Messages {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::MessageCreate(_) => {},
DispatchEvent::MessageDelete(_) => {},
DispatchEvent::MessageDeleteBulk(_) => {},
DispatchEvent::MessageUpdate(_) => {},
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

23
exes/cache/src/managers/mod.rs vendored Normal file
View file

@ -0,0 +1,23 @@
use std::pin::Pin;
use twilight_model::gateway::event::DispatchEvent;
use std::future::Future;
use crate::CacheSourcedEvents;
pub mod channels;
pub mod guilds;
pub mod guild_schedules;
pub mod stage_instances;
pub mod integrations;
pub mod members;
pub mod bans;
pub mod reactions;
pub mod messages;
pub mod threads;
pub mod invites;
pub mod roles;
pub mod automoderation;
pub trait CacheManager {
fn handle(&self, event: DispatchEvent) -> Pin<Box<dyn Future<Output = CacheSourcedEvents>>>;
}

28
exes/cache/src/managers/reactions.rs vendored Normal file
View file

@ -0,0 +1,28 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Reactions {}
impl CacheManager for Reactions {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::ReactionAdd(_) => {},
DispatchEvent::ReactionRemove(_) => {},
DispatchEvent::ReactionRemoveAll(_) => {},
DispatchEvent::ReactionRemoveEmoji(_) => {},
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

27
exes/cache/src/managers/roles.rs vendored Normal file
View file

@ -0,0 +1,27 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Roles {}
impl CacheManager for Roles {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::RoleCreate(_) => {},
DispatchEvent::RoleDelete(_) => {},
DispatchEvent::RoleUpdate(_) => {},
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

View file

@ -0,0 +1,27 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct StageInstances {}
impl CacheManager for StageInstances {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::StageInstanceCreate(_) => {},
DispatchEvent::StageInstanceDelete(_) => {},
DispatchEvent::StageInstanceUpdate(_) => {},
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

29
exes/cache/src/managers/threads.rs vendored Normal file
View file

@ -0,0 +1,29 @@
use twilight_model::gateway::event::DispatchEvent;
use crate::CacheSourcedEvents;
use super::CacheManager;
use std::future::Future;
#[derive(Default)]
pub struct Threads {}
impl CacheManager for Threads {
fn handle(
&self,
event: twilight_model::gateway::event::DispatchEvent,
) -> std::pin::Pin<Box<dyn Future<Output = crate::CacheSourcedEvents>>> {
Box::pin(async move {
match event {
DispatchEvent::ThreadCreate(_) => {},
DispatchEvent::ThreadDelete(_) => {},
DispatchEvent::ThreadListSync(_) => {},
DispatchEvent::ThreadMemberUpdate(_) => {},
DispatchEvent::ThreadMembersUpdate(_) => {},
DispatchEvent::ThreadUpdate(_) => {},
_ => unreachable!(),
};
CacheSourcedEvents::None
})
}
}

View file

@ -10,4 +10,5 @@ twilight-gateway = { version = "0.14" }
twilight-model = "0.14"
serde = { version = "1.0.8", features = ["derive"] }
futures = "0.3"
serde_json = { version = "1.0" }
serde_json = { version = "1.0" }
bytes = "*"

View file

@ -2,7 +2,7 @@ use config::Config;
use shared::{
config::Settings,
log::{debug, info},
nats_crate::Connection,
nats_crate::Client,
payloads::{CachePayload, DispatchEventTagged, Tracing},
};
use std::{convert::TryFrom, error::Error};
@ -15,7 +15,7 @@ use twilight_model::gateway::event::DispatchEvent;
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
let settings: Settings<Config> = Settings::new("gateway").unwrap();
let (shard, mut events) = Shard::new(settings.config.token, settings.config.intents);
let nats: Connection = settings.nats.into();
let nats: Client = settings.nats.to_client().await?;
shard.start().await?;
@ -26,7 +26,7 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
}
_ => {
let name = event.kind().name().unwrap();
let name = event.kind().name();
if let Ok(dispatch_event) = DispatchEvent::try_from(event) {
let data = CachePayload {
tracing: Tracing {
@ -39,7 +39,9 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
};
let value = serde_json::to_string(&data)?;
debug!("nats send: {}", value);
nats.publish(&format!("nova.cache.dispatch.{}", name), value)?;
let bytes = bytes::Bytes::from(value);
nats.publish(format!("nova.cache.dispatch.{}", name.unwrap()), bytes)
.await?;
}
}
}

View file

@ -8,7 +8,7 @@ use hyper::{
Body, Method, Request, Response, StatusCode,
};
use serde::{Deserialize, Serialize};
use shared::nats_crate::Connection;
use shared::nats_crate::Client;
use shared::{
log::{debug, error},
payloads::{CachePayload, DispatchEventTagged, Tracing},
@ -19,7 +19,6 @@ use std::{
str::from_utf8,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use twilight_model::gateway::event::{DispatchEvent};
use twilight_model::{
@ -31,7 +30,7 @@ use twilight_model::{
#[derive(Clone)]
pub struct HandlerService {
pub config: Arc<Config>,
pub nats: Arc<Connection>,
pub nats: Arc<Client>,
pub public_key: Arc<PublicKey>,
}
@ -107,14 +106,13 @@ impl HandlerService {
let payload = serde_json::to_string(&data).unwrap();
match self.nats.request_timeout(
"nova.cache.dispatch.INTERACTION_CREATE",
payload,
Duration::from_secs(2),
) {
match self.nats.request(
"nova.cache.dispatch.INTERACTION_CREATE".to_string(),
Bytes::from(payload),
).await {
Ok(response) => Ok(Response::builder()
.header("Content-Type", "application/json")
.body(Body::from(response.data))
.body(Body::from(response.reply.unwrap()))
.unwrap()),
Err(error) => {

View file

@ -1,7 +1,7 @@
use super::handler::HandlerService;
use crate::config::Config;
use hyper::service::Service;
use shared::nats_crate::Connection;
use shared::nats_crate::Client;
use std::{
future::{ready, Ready},
sync::Arc,
@ -11,7 +11,7 @@ use ed25519_dalek::PublicKey;
pub struct MakeSvc {
pub settings: Arc<Config>,
pub nats: Arc<Connection>,
pub nats: Arc<Client>,
pub public_key: Arc<PublicKey>
}

View file

@ -4,10 +4,10 @@ mod handler;
use crate::handler::make_service::MakeSvc;
use crate::config::Config;
use shared::config::Settings;
use shared::log::{error, info};
use ed25519_dalek::PublicKey;
use hyper::Server;
use shared::config::Settings;
use shared::log::{error, info};
#[tokio::main]
async fn main() {
@ -35,7 +35,7 @@ async fn start(settings: Settings<Config>) {
Arc::new(PublicKey::from_bytes(&hex::decode(&config.discord.public_key).unwrap()).unwrap());
let server = Server::bind(&addr).serve(MakeSvc {
settings: config,
nats: Arc::new(settings.nats.into()),
nats: Arc::new(settings.nats.to_client().await.unwrap()),
public_key: public_key,
});

View file

@ -1,7 +1,7 @@
[package]
name = "shared"
version = "0.1.0"
edition = "2018"
edition = "2021"
[dependencies]
pretty_env_logger = "0.4"
@ -13,7 +13,7 @@ hyper = { version = "0.14", features = ["full"] }
tokio = { version = "1", features = ["full"] }
enumflags2 = { version = "0.7.1", features = ["serde"] }
prometheus = { version = "0.13", features = ["process"] }
nats = "0.23"
async-nats = "0.25.1"
testcontainers = "0.14"
twilight-model = "0.14"
serde_json = { version = "1.0" }

View file

@ -1,5 +1,5 @@
use config::ConfigError;
use std::fmt::Debug;
use std::{fmt::Debug, io};
use thiserror::Error;
#[derive(Debug, Error)]
@ -12,4 +12,7 @@ pub enum GenericError {
#[error("step `{0}` failed")]
StepFailed(String),
#[error("io error")]
Io(#[from] io::Error)
}

View file

@ -1,5 +1,5 @@
pub use ::config as config_crate;
pub use ::nats as nats_crate;
pub use ::async_nats as nats_crate;
pub use ::redis as redis_crate;
pub use log;
pub use prometheus;

View file

@ -1,5 +1,8 @@
use nats::{Connection, Options};
use async_nats::Client;
use serde::Deserialize;
use std::future::Future;
use crate::error::GenericError;
#[derive(Clone, Debug, Deserialize)]
pub struct NatsConfigurationClientCert {
@ -14,46 +17,13 @@ pub struct NatsConfigurationTls {
#[derive(Clone, Debug, Deserialize)]
pub struct NatsConfiguration {
pub client_cert: Option<NatsConfigurationClientCert>,
pub root_cert: Option<Vec<String>>,
pub jetstream_api_prefix: Option<String>,
pub max_reconnects: Option<usize>,
pub reconnect_buffer_size: Option<usize>,
pub tls: Option<NatsConfigurationTls>,
pub client_name: Option<String>,
pub tls_required: Option<bool>,
pub host: String,
}
// todo: Prefer From since it automatically gives a free Into implementation
// Allows the configuration to directly create a nats connection
impl Into<Connection> for NatsConfiguration {
fn into(self) -> Connection {
let mut options = Options::new();
if let Some(client_cert) = self.client_cert {
options = options.client_cert(client_cert.cert, client_cert.key);
}
if let Some(root_certs) = self.root_cert {
for root_cert in root_certs {
options = options.add_root_certificate(root_cert);
}
}
options = options.max_reconnects(self.max_reconnects);
options = options.no_echo();
options = options.reconnect_buffer_size(self.reconnect_buffer_size.unwrap_or(64 * 1024));
options = options.tls_required(self.tls_required.unwrap_or(false));
options = options.with_name(&self.client_name.unwrap_or_else(|| "Nova".to_string()));
if let Some(tls) = self.tls {
let mut config = nats::rustls::ClientConfig::new();
config.set_mtu(&tls.mtu);
// todo: more options?
options = options.tls_client_config(config);
}
options.connect(&self.host).unwrap()
impl NatsConfiguration {
pub async fn to_client(self) -> Result<Client, GenericError> {
Ok(async_nats::connect(self.host).await?)
}
}
}