1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use std::sync::Arc;
use super::durable_changes::{ChangesOpts, DurableChanges};
use super::{Config, CouchDB, CouchError};
use crate::util::{wait_for_ready, RetryOpts};
pub const RECORD_DB_NAME: &str = "records";
pub const META_DB_NAME: &str = "meta";
pub const SEPERATOR: &str = "$";
#[derive(Debug, Clone)]
pub struct CouchManager {
config: Arc<Config>,
client: reqwest::Client,
record_db: CouchDB,
meta_db: CouchDB,
}
pub fn db_name(prefix: &str, name: &str) -> String {
format!("{}{}{}", prefix, SEPERATOR, name)
}
impl CouchManager {
pub fn with_url<S>(url: Option<S>) -> anyhow::Result<Self>
where
S: AsRef<str>,
{
let url = url.map(|s| s.as_ref().to_string());
let config = Config::from_url_or_default(url.as_deref())?;
let db = Self::with_config(config)?;
Ok(db)
}
pub fn with_config(config: Config) -> anyhow::Result<Self> {
let client = reqwest::Client::new();
let mut record_config = config.clone();
let mut meta_config = config.clone();
record_config.database = db_name(&config.database, RECORD_DB_NAME);
meta_config.database = db_name(&config.database, META_DB_NAME);
let meta_db = CouchDB::with_config_and_client(meta_config, client.clone());
let record_db = CouchDB::with_config_and_client(record_config, client.clone());
Ok(Self {
config: Arc::new(config),
client,
record_db,
meta_db,
})
}
fn db(&self, name: &str, include_prefix: bool) -> CouchDB {
let mut config = (*self.config).clone();
config.database = match include_prefix {
true => db_name(&config.database, name),
false => name.to_string(),
};
CouchDB::with_config_and_client(config, self.client.clone())
}
pub async fn wait_for_ready(&self) -> Result<(), CouchError> {
let opts = RetryOpts::with_name("CouchDB".into());
wait_for_ready(&self.client, opts, || {
self.client.get(&self.config.host).build()
})
.await?;
Ok(())
}
pub async fn init(&self) -> anyhow::Result<()> {
self.wait_for_ready().await?;
let res = futures::future::join_all([
self.db("_users", false).init(),
self.db("_replicator", false).init(),
self.db("_global_changes", false).init(),
])
.await;
for err in res.into_iter().filter_map(|r| r.err()) {
log::warn!("Failed to ensure system CouchDB: {}", err);
}
let res = futures::future::join_all([self.record_db().init(), self.meta_db().init()]).await;
for res in res {
res?
}
Ok(())
}
pub async fn destroy_and_init(&self) -> anyhow::Result<()> {
let res = futures::future::join_all(vec![
self.record_db().destroy_and_init(),
self.meta_db().destroy_and_init(),
])
.await;
for res in res {
res?
}
Ok(())
}
pub fn record_db(&self) -> &CouchDB {
&self.record_db
}
pub fn meta_db(&self) -> &CouchDB {
&self.meta_db
}
pub async fn durable_changes(&self, id: impl ToString, opts: ChangesOpts) -> DurableChanges {
let id = id.to_string();
let changes =
DurableChanges::new(self.record_db().clone(), self.meta_db().clone(), id, opts).await;
changes
}
}