1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use anyhow::Context;
use oas_common::types::{Media, Post};
use oas_common::{Record, RecordMap, Resolver, UntypedRecord};
use crate::couch::durable_changes::ChangesOpts;
use crate::jobs::{typs as job_typs, JobManager};
use crate::State;
const DURABLE_ID: &str = "core.jobs";
pub async fn process_changes(state: State, infinite: bool) -> anyhow::Result<()> {
let opts = ChangesOpts {
infinite,
..Default::default()
};
let mut changes = state.db_manager.durable_changes(DURABLE_ID, opts).await;
while let Some(batch) = changes.next().await? {
process_batch(&state, batch.into_inner())
.await
.context("Failed to process changes batch for jobs")?;
}
Ok(())
}
pub async fn process_batch(state: &State, batch: Vec<UntypedRecord>) -> anyhow::Result<()> {
let mut sorted = RecordMap::from_untyped(batch)?;
let mut posts = sorted.into_vec::<Post>();
let medias = sorted.into_vec::<Media>();
state
.db
.resolve_all_refs(&mut posts)
.await
.context("failed to resolve refs")?;
for record in posts.into_iter() {
let res = on_post_change(state, record).await;
log_if_error(res);
}
for record in medias.into_iter() {
let res = on_media_change(state, record).await;
log_if_error(res);
}
Ok(())
}
async fn on_post_change(state: &State, record: Record<Post>) -> anyhow::Result<()> {
let typ = job_typs::NLP;
if let Some(_opts) = record.meta().jobs().setting(typ) {
if record.value.nlp.is_null() && !has_pending(&state.jobs, typ, record.guid()).await {
let job = job_typs::nlp_job(&record, None);
state.jobs.create_job(job).await?;
}
}
Ok(())
}
async fn on_media_change(state: &State, record: Record<Media>) -> anyhow::Result<()> {
let typ = job_typs::ASR;
if let Some(_opts) = record.meta().jobs().setting(typ) {
if record.value.transcript.is_none() && !has_pending(&state.jobs, typ, record.guid()).await
{
let job = job_typs::asr_job(&record, None);
state.jobs.create_job(job).await?;
}
}
Ok(())
}
async fn has_pending(jobs: &JobManager, typ: &str, guid: &str) -> bool {
match jobs.pending_jobs(guid, typ).await {
Err(_) => false,
Ok(list) => !list.is_empty(),
}
}
fn log_if_error<A>(res: anyhow::Result<A>) {
match res {
Ok(_) => {}
Err(err) => log::error!("{:?}", err),
}
}