Skip to content

Commit

Permalink
Removed arc/mutex from db wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
sparky8251 committed Jan 24, 2024
1 parent af57a5a commit 71e1419
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 33 deletions.
19 changes: 6 additions & 13 deletions src/bot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use anyhow::Context;
use native_db::DatabaseBuilder;
use std::env;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::{mpsc, watch};
use tracing::{info, trace};
Expand All @@ -30,16 +29,13 @@ pub async fn init() -> anyhow::Result<()> {
.define::<AccessToken>()
.context("Unable to load access token database model")?;
//open db
let db = Arc::new(Mutex::new(builder.create(&path).with_context(|| {
format!("Unable to create/open db {}", &path.display())
})?));

let listener_storage = db.clone();
let db = builder
.create(&path)
.with_context(|| format!("Unable to create/open db {}", &path.display()))?;

// fetch access_token

let guard = db.lock().unwrap();
let r = guard
let r = db
.r_transaction()
.context("Unable to get read transaction from db")?;
let access_token = match r
Expand All @@ -52,7 +48,6 @@ pub async fn init() -> anyhow::Result<()> {
};
// drop the guard and rw transaction before await points to avoid deadlocks
std::mem::drop(r);
std::mem::drop(guard);

// Matrix initalization and login
let matrix_listener_client = ruma::client::Client::builder()
Expand All @@ -67,8 +62,7 @@ pub async fn init() -> anyhow::Result<()> {
.await?;

// Save returned session
let guard = db.lock().unwrap();
let rw = guard
let rw = db
.rw_transaction()
.context("Unable to get read transaction from db")?;
trace!("Session retrieved, saving session data...");
Expand All @@ -86,7 +80,6 @@ pub async fn init() -> anyhow::Result<()> {
// since we dont technically return from this function, explicitly drop the guard to free it for future use
rw.commit()
.context("Unable to commit access_token transaction")?;
std::mem::drop(guard);
info!("Successfully logged in as {}", config.mx_uname);

// Clone required clients/servers and channels
Expand All @@ -95,7 +88,7 @@ pub async fn init() -> anyhow::Result<()> {
let webhook_tx = matrix_tx.clone();

// Create thread structures
let mut matrix_listener = MatrixListener::new(&config, matrix_tx, listener_storage)?;
let mut matrix_listener = MatrixListener::new(&config, matrix_tx, &db)?;
let mut matrix_responder = MatrixResponder::new(matrix_rx)?;
let webhook_listener = WebhookListener::new(&config, webhook_tx);

Expand Down
20 changes: 9 additions & 11 deletions src/services/matrix/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use ruma::{
},
presence::PresenceState,
};
use std::sync::{Arc, Mutex};

use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::sync::watch::Receiver;
Expand All @@ -33,16 +33,16 @@ pub struct MatrixListener<'a> {
pub api_client: reqwest::Client,
send: Sender<MatrixMessage>,
/// Storage data.
pub storage: Arc<Mutex<Database<'a>>>,
pub storage: &'a Database<'a>,
}

impl MatrixListener<'_> {
impl<'a> MatrixListener<'a> {
/// Loads storage data, config data, and then creates a reqwest client and then returns a Bot instance.
pub fn new(
config: &Config,
send: Sender<MatrixMessage>,
storage: Arc<Mutex<Database>>,
) -> anyhow::Result<Self> {
storage: &Database<'a>,
) -> anyhow::Result<MatrixListener<'a>> {
let config = MatrixListenerConfig::new(config);
let api_client = reqwest::Client::new();
Ok(Self {
Expand All @@ -60,8 +60,7 @@ impl MatrixListener<'_> {
let mut req = sync_events::v3::Request::new();
req.filter = None;
let last_sync = {
let guard = self.storage.lock().unwrap();
let r = guard.r_transaction().unwrap();
let r = self.storage.r_transaction().unwrap();
r.get()
.primary::<LastSync>(1u8)
.unwrap()
Expand All @@ -88,9 +87,8 @@ impl MatrixListener<'_> {

match response {
Some(v) => {
{
let guard = self.storage.lock().unwrap();
let rw = guard.rw_transaction().unwrap();

let rw = self.storage.rw_transaction().unwrap();
match insert_or_update(&rw, LastSync {id: 1, last_sync: last_sync.map_or(String::new(), |v| v)}, LastSync {id: 1, last_sync: v.next_batch}) {
Ok(_) => (),
Err(e) => error!("Unable to write updated last_sync time to db! Error is {}", e)
Expand All @@ -99,7 +97,7 @@ impl MatrixListener<'_> {
if let Err(e) = rw.commit() {
error!("Unable to commit last_sync write to database! Error is {}", e)
};
}

for (room_id, joined_room) in &v.rooms.join {
for raw_event in &joined_room.timeline.events {
let event = raw_event.deserialize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use ruma::{
RoomId, UserId,
};
use spellcheck::spellcheck;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use text_expansion::text_expansion;
use tokio::sync::mpsc::Sender;
Expand All @@ -36,7 +35,7 @@ pub async fn commandless_handler(
relates_to: Option<&Relation>,
sender: &UserId,
room_id: &RoomId,
storage: &mut Arc<Mutex<Database<'_>>>,
storage: &Database<'_>,
config: &MatrixListenerConfig,
api_client: &reqwest::Client,
send: &mut Sender<MatrixMessage>,
Expand Down Expand Up @@ -126,8 +125,7 @@ pub async fn commandless_handler(
.await
{
Ok(_) => {
let guard = storage.lock().unwrap();
let rw = guard.rw_transaction().unwrap();
let rw = storage.rw_transaction().unwrap();
rw.insert(CorrectionTimeCooldown {
room_id: room_id.to_string(),
last_correction_time: SystemTime::now()
Expand All @@ -151,9 +149,8 @@ pub async fn commandless_handler(
Ok(())
}

fn correction_time_cooldown(storage: &Arc<Mutex<Database>>, room_id: &RoomId) -> bool {
let guard = storage.lock().unwrap();
let rw = guard.rw_transaction().unwrap();
fn correction_time_cooldown(storage: &Database, room_id: &RoomId) -> bool {
let rw = storage.rw_transaction().unwrap();
match rw
.get()
.primary::<CorrectionTimeCooldown>(room_id.to_string())
Expand Down
3 changes: 1 addition & 2 deletions src/services/matrix/matrix_handlers/listeners/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use ruma::{
events::room::message::{Relation, TextMessageEventContent},
RoomId, UserId,
};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc::Sender;
use tracing::{debug, trace};

Expand All @@ -31,7 +30,7 @@ pub async fn handle_text_event(
relates_to: Option<&Relation>,
sender: &UserId,
room_id: &RoomId,
storage: &mut Arc<Mutex<Database<'_>>>,
storage: &Database<'_>,
config: &MatrixListenerConfig,
api_client: &reqwest::Client,
send: &mut Sender<MatrixMessage>,
Expand Down

0 comments on commit 71e1419

Please sign in to comment.