evergreen/common/trigger/
processor.rs1use crate as eg;
4use eg::common::trigger::{Event, EventState};
5use eg::idl;
6use eg::util::thread_id;
7use eg::Editor;
8use eg::EgResult;
9use eg::EgValue;
10use std::fmt;
11use std::process;
12
13pub struct Processor<'a> {
16 pub editor: &'a mut Editor,
17 event_def_id: i64,
18 event_def: EgValue,
19 target_flesh: EgValue,
20}
21
22impl fmt::Display for Processor<'_> {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 write!(
25 f,
26 "Processor A/T Definition [id={}] '{}'",
27 self.event_def_id, self.event_def["name"]
28 )
29 }
30}
31
32impl<'a> Processor<'a> {
33 pub fn new(editor: &'a mut Editor, event_def_id: i64) -> EgResult<Processor<'a>> {
34 let flesh = eg::hash! {
35 "flesh": 1,
36 "flesh_fields": {"atevdef": ["hook", "env", "params"]}
37 };
38
39 let event_def = editor
40 .retrieve_with_ops("atevdef", event_def_id, flesh)?
41 .ok_or_else(|| editor.die_event())?;
42
43 let mut proc = Self {
44 event_def,
45 event_def_id,
46 target_flesh: EgValue::Null,
47 editor,
48 };
49
50 proc.set_target_flesh()?;
51
52 Ok(proc)
53 }
54
55 pub fn process_event_once(editor: &mut Editor, event_id: i64) -> EgResult<Event> {
57 let jevent = editor
58 .retrieve("atev", event_id)?
59 .ok_or_else(|| editor.die_event())?;
60
61 let mut proc = Processor::new(editor, jevent["event_def"].int()?)?;
62
63 let mut event = Event::from_source(jevent)?;
64
65 proc.process_event(&mut event)?;
66
67 Ok(event)
68 }
69
70 pub fn process_event(&mut self, event: &mut Event) -> EgResult<()> {
72 log::info!("{self} processing event {}", event.id());
73
74 self.collect(event)?;
75
76 if self.validate(event)? {
77 self.react(&mut [event])?;
78 self.set_event_state(event, EventState::Complete)?;
79 }
80
81 Ok(())
82 }
83
84 pub fn process_event_group_once(
88 editor: &mut Editor,
89 event_ids: &[i64],
90 ) -> EgResult<Vec<Event>> {
91 let query = eg::hash! {"id": event_ids};
92 let mut jevents = editor.search("atev", query)?;
93
94 if jevents.is_empty() {
95 return Err(format!("No such events: {event_ids:?}").into());
96 }
97
98 let mut events: Vec<Event> = Vec::new();
100 for jevent in jevents.drain(..) {
101 events.push(Event::from_source(jevent)?);
102 }
103
104 let mut proc = Processor::new(editor, events[0].id())?;
105
106 let mut slice = events.iter_mut().collect::<Vec<&mut Event>>();
107 proc.process_event_group(&mut slice[..])?;
108
109 Ok(events)
110 }
111
112 pub fn process_event_group(&mut self, events: &mut [&mut Event]) -> EgResult<()> {
113 let mut valid_events: Vec<&mut Event> = Vec::new();
114 for event in events.iter_mut() {
115 self.collect(event)?;
116 if self.validate(event)? {
117 valid_events.push(event);
118 }
119 }
120
121 if valid_events.is_empty() {
122 return Ok(());
124 }
125
126 let slice = &mut valid_events[..];
127 self.react(slice)?;
128
129 for event in valid_events {
130 self.set_event_state(event, EventState::Complete)?;
131 }
132
133 Ok(())
134 }
135
136 pub fn event_def_id(&self) -> i64 {
137 self.event_def_id
138 }
139 pub fn event_def(&self) -> &EgValue {
140 &self.event_def
141 }
142 pub fn core_type(&self) -> &str {
143 self.event_def["hook"]["core_type"].as_str().unwrap()
144 }
145 pub fn user_field(&self) -> Option<&str> {
146 self.event_def["usr_field"].as_str()
147 }
148 pub fn group_field(&self) -> Option<&str> {
149 self.event_def["group_field"].as_str()
150 }
151 pub fn validator(&self) -> &str {
152 self.event_def["validator"].as_str().unwrap()
153 }
154 pub fn reactor(&self) -> &str {
155 self.event_def["reactor"].as_str().unwrap()
156 }
157 pub fn environment(&self) -> &EgValue {
158 &self.event_def["env"]
159 }
160
161 pub fn params(&self) -> &EgValue {
163 &self.event_def["params"]
164 }
165
166 fn set_target_flesh(&mut self) -> EgResult<()> {
169 let mut paths: Vec<&str> = self
170 .environment()
171 .members()
172 .map(|e| e["path"].as_str().unwrap()) .collect();
174
175 let group_field: String;
176 if let Some(gfield) = self.group_field() {
177 let mut gfield: Vec<&str> = gfield.split('.').collect();
179
180 gfield.pop();
183
184 if !gfield.is_empty() {
185 group_field = gfield.join(".");
186 paths.push(&group_field);
187 }
188 }
189
190 self.target_flesh =
191 idl::parser().field_paths_to_flesh(self.core_type(), paths.as_slice())?;
192
193 Ok(())
194 }
195
196 pub fn param_value(&mut self, param_name: &str) -> Option<&EgValue> {
199 for param in self.params().members() {
200 if param["param"].as_str() == Some(param_name) {
201 return Some(¶m["value"]);
202 }
203 }
204 None
205 }
206
207 pub fn param_value_as_str(&mut self, param_name: &str) -> Option<&str> {
211 if let Some(pval) = self.param_value(param_name) {
212 pval["value"].as_str()
213 } else {
214 None
215 }
216 }
217
218 pub fn param_value_as_bool(&mut self, param_name: &str) -> bool {
221 if let Some(pval) = self.param_value(param_name) {
222 pval["value"].boolish()
223 } else {
224 false
225 }
226 }
227
228 pub fn set_event_state(&mut self, event: &mut Event, state: EventState) -> EgResult<()> {
229 self.set_event_state_impl(event, state, None)
230 }
231
232 pub fn set_event_state_error(&mut self, event: &mut Event, error_text: &str) -> EgResult<()> {
233 self.set_event_state_impl(event, EventState::Error, Some(error_text))
234 }
235
236 fn set_event_state_impl(
238 &mut self,
239 event: &mut Event,
240 state: EventState,
241 error_text: Option<&str>,
242 ) -> EgResult<()> {
243 event.set_state(state);
244
245 let state_str: &str = state.into();
246
247 self.editor.xact_begin()?;
248
249 let mut atev = self
250 .editor
251 .retrieve("atev", event.id())?
252 .ok_or_else(|| "Our event disappeared from the DB?".to_string())?;
253
254 if let Some(err) = error_text {
255 let mut output = eg::hash! {
256 "data": err,
257 "is_error": true,
258 };
260 output.bless("ateo")?;
261
262 let mut result = self.editor.create(output)?;
263
264 atev["error_output"] = result["id"].take();
265 }
266
267 atev["state"] = EgValue::from(state_str);
268 atev["update_time"] = EgValue::from("now");
269 atev["update_process"] = EgValue::from(format!("{}-{}", process::id(), thread_id()));
270
271 if atev["start_time"].is_null() && state != EventState::Pending {
272 atev["start_time"] = EgValue::from("now");
273 }
274
275 if state == EventState::Complete {
276 atev["complete_time"] = EgValue::from("now");
277 }
278
279 self.editor.update(atev)?;
280
281 self.editor.xact_commit()?;
282
283 if state == EventState::Complete || state == EventState::Error {
284 self.editor.disconnect()
287 } else {
288 Ok(())
289 }
290 }
291
292 pub fn collect(&mut self, event: &mut Event) -> EgResult<()> {
295 log::info!("{self} collecting {event}");
296
297 self.set_event_state(event, EventState::Collecting)?;
298
299 let flesh = self.target_flesh.clone();
302 let core_type = self.core_type().to_string(); let target = self
305 .editor
306 .retrieve_with_ops(&core_type, event.target_pkey().clone(), flesh)?
307 .ok_or_else(|| self.editor.die_event())?;
308
309 event.set_target(target);
310
311 self.set_group_value(event)?;
312
313 self.set_event_state(event, EventState::Collected)
316 }
317
318 fn set_group_value(&mut self, event: &mut Event) -> EgResult<()> {
323 let gfield_path = match self.group_field() {
324 Some(f) => f,
325 None => return Ok(()),
326 };
327
328 let mut obj = event.target();
329
330 for part in gfield_path.split('.') {
331 obj = &obj[part];
332 }
333
334 let obj_clone;
335 if let Some(pkey) = obj.pkey_value() {
336 obj_clone = pkey.clone();
340 } else {
341 obj_clone = obj.clone();
342 }
343
344 if obj.is_string() || obj.is_number() {
345 event.set_group_value(obj_clone);
346 Ok(())
347 } else {
348 Err(format!("Invalid group field path: {gfield_path}").into())
349 }
350 }
351}