evergreen/common/trigger/
processor.rs

1/// Main entry point for processing A/T events related to a
2/// given event definition.
3use crate as eg;
4use eg::common::trigger::{Event, EventState};
5use eg::idl;
6use eg::util::thread_id;
7use eg::Editor;
8use eg::EgResult;
9use eg::EgValue;
10use std::fmt;
11use std::process;
12
13// Add feature to roll-back failures and reset event states.
14// Add a retry state that's only processed intentionally?
15pub struct Processor<'a> {
16    pub editor: &'a mut Editor,
17    event_def_id: i64,
18    event_def: EgValue,
19    target_flesh: EgValue,
20}
21
22impl fmt::Display for Processor<'_> {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        write!(
25            f,
26            "Processor A/T Definition [id={}] '{}'",
27            self.event_def_id, self.event_def["name"]
28        )
29    }
30}
31
32impl<'a> Processor<'a> {
33    pub fn new(editor: &'a mut Editor, event_def_id: i64) -> EgResult<Processor<'a>> {
34        let flesh = eg::hash! {
35            "flesh": 1,
36            "flesh_fields": {"atevdef": ["hook", "env", "params"]}
37        };
38
39        let event_def = editor
40            .retrieve_with_ops("atevdef", event_def_id, flesh)?
41            .ok_or_else(|| editor.die_event())?;
42
43        let mut proc = Self {
44            event_def,
45            event_def_id,
46            target_flesh: EgValue::Null,
47            editor,
48        };
49
50        proc.set_target_flesh()?;
51
52        Ok(proc)
53    }
54
55    /// One-off single event processor without requiring a standalone Processor
56    pub fn process_event_once(editor: &mut Editor, event_id: i64) -> EgResult<Event> {
57        let jevent = editor
58            .retrieve("atev", event_id)?
59            .ok_or_else(|| editor.die_event())?;
60
61        let mut proc = Processor::new(editor, jevent["event_def"].int()?)?;
62
63        let mut event = Event::from_source(jevent)?;
64
65        proc.process_event(&mut event)?;
66
67        Ok(event)
68    }
69
70    /// Process a single event via an existing Processor
71    pub fn process_event(&mut self, event: &mut Event) -> EgResult<()> {
72        log::info!("{self} processing event {}", event.id());
73
74        self.collect(event)?;
75
76        if self.validate(event)? {
77            self.react(&mut [event])?;
78            self.set_event_state(event, EventState::Complete)?;
79        }
80
81        Ok(())
82    }
83
84    /// One-off event group processor without requiring a standalone Processor
85    ///
86    /// Returns all processed events, even if invalid.
87    pub fn process_event_group_once(
88        editor: &mut Editor,
89        event_ids: &[i64],
90    ) -> EgResult<Vec<Event>> {
91        let query = eg::hash! {"id": event_ids};
92        let mut jevents = editor.search("atev", query)?;
93
94        if jevents.is_empty() {
95            return Err(format!("No such events: {event_ids:?}").into());
96        }
97
98        // Here we trust that events from the database are shaped correctly.
99        let mut events: Vec<Event> = Vec::new();
100        for jevent in jevents.drain(..) {
101            events.push(Event::from_source(jevent)?);
102        }
103
104        let mut proc = Processor::new(editor, events[0].id())?;
105
106        let mut slice = events.iter_mut().collect::<Vec<&mut Event>>();
107        proc.process_event_group(&mut slice[..])?;
108
109        Ok(events)
110    }
111
112    pub fn process_event_group(&mut self, events: &mut [&mut Event]) -> EgResult<()> {
113        let mut valid_events: Vec<&mut Event> = Vec::new();
114        for event in events.iter_mut() {
115            self.collect(event)?;
116            if self.validate(event)? {
117                valid_events.push(event);
118            }
119        }
120
121        if valid_events.is_empty() {
122            // No valid events to react
123            return Ok(());
124        }
125
126        let slice = &mut valid_events[..];
127        self.react(slice)?;
128
129        for event in valid_events {
130            self.set_event_state(event, EventState::Complete)?;
131        }
132
133        Ok(())
134    }
135
136    pub fn event_def_id(&self) -> i64 {
137        self.event_def_id
138    }
139    pub fn event_def(&self) -> &EgValue {
140        &self.event_def
141    }
142    pub fn core_type(&self) -> &str {
143        self.event_def["hook"]["core_type"].as_str().unwrap()
144    }
145    pub fn user_field(&self) -> Option<&str> {
146        self.event_def["usr_field"].as_str()
147    }
148    pub fn group_field(&self) -> Option<&str> {
149        self.event_def["group_field"].as_str()
150    }
151    pub fn validator(&self) -> &str {
152        self.event_def["validator"].as_str().unwrap()
153    }
154    pub fn reactor(&self) -> &str {
155        self.event_def["reactor"].as_str().unwrap()
156    }
157    pub fn environment(&self) -> &EgValue {
158        &self.event_def["env"]
159    }
160
161    /// Will be a JSON array
162    pub fn params(&self) -> &EgValue {
163        &self.event_def["params"]
164    }
165
166    /// Compile the flesh expression we'll use each time we
167    /// fetch an event from the database.
168    fn set_target_flesh(&mut self) -> EgResult<()> {
169        let mut paths: Vec<&str> = self
170            .environment()
171            .members()
172            .map(|e| e["path"].as_str().unwrap()) // required
173            .collect();
174
175        let group_field: String;
176        if let Some(gfield) = self.group_field() {
177            // If there is a group field path, flesh it as well.
178            let mut gfield: Vec<&str> = gfield.split('.').collect();
179
180            // However, drop the final part which is a field name
181            // and does not need to be fleshed.
182            gfield.pop();
183
184            if !gfield.is_empty() {
185                group_field = gfield.join(".");
186                paths.push(&group_field);
187            }
188        }
189
190        self.target_flesh =
191            idl::parser().field_paths_to_flesh(self.core_type(), paths.as_slice())?;
192
193        Ok(())
194    }
195
196    /// Returns the parameter value with the provided name or None if no
197    /// such parameter exists.
198    pub fn param_value(&mut self, param_name: &str) -> Option<&EgValue> {
199        for param in self.params().members() {
200            if param["param"].as_str() == Some(param_name) {
201                return Some(&param["value"]);
202            }
203        }
204        None
205    }
206
207    /// Returns the parameter value with the provided name as a &str or
208    /// None if no such parameter exists OR the parameter is not a JSON
209    /// string.
210    pub fn param_value_as_str(&mut self, param_name: &str) -> Option<&str> {
211        if let Some(pval) = self.param_value(param_name) {
212            pval["value"].as_str()
213        } else {
214            None
215        }
216    }
217
218    /// Returns true if a parameter value exists and has truthy,
219    /// false otherwise.
220    pub fn param_value_as_bool(&mut self, param_name: &str) -> bool {
221        if let Some(pval) = self.param_value(param_name) {
222            pval["value"].boolish()
223        } else {
224            false
225        }
226    }
227
228    pub fn set_event_state(&mut self, event: &mut Event, state: EventState) -> EgResult<()> {
229        self.set_event_state_impl(event, state, None)
230    }
231
232    pub fn set_event_state_error(&mut self, event: &mut Event, error_text: &str) -> EgResult<()> {
233        self.set_event_state_impl(event, EventState::Error, Some(error_text))
234    }
235
236    /// Update the event state and related state-tracking values.
237    fn set_event_state_impl(
238        &mut self,
239        event: &mut Event,
240        state: EventState,
241        error_text: Option<&str>,
242    ) -> EgResult<()> {
243        event.set_state(state);
244
245        let state_str: &str = state.into();
246
247        self.editor.xact_begin()?;
248
249        let mut atev = self
250            .editor
251            .retrieve("atev", event.id())?
252            .ok_or_else(|| "Our event disappeared from the DB?".to_string())?;
253
254        if let Some(err) = error_text {
255            let mut output = eg::hash! {
256                "data": err,
257                "is_error": true,
258                // TODO locale
259            };
260            output.bless("ateo")?;
261
262            let mut result = self.editor.create(output)?;
263
264            atev["error_output"] = result["id"].take();
265        }
266
267        atev["state"] = EgValue::from(state_str);
268        atev["update_time"] = EgValue::from("now");
269        atev["update_process"] = EgValue::from(format!("{}-{}", process::id(), thread_id()));
270
271        if atev["start_time"].is_null() && state != EventState::Pending {
272            atev["start_time"] = EgValue::from("now");
273        }
274
275        if state == EventState::Complete {
276            atev["complete_time"] = EgValue::from("now");
277        }
278
279        self.editor.update(atev)?;
280
281        self.editor.xact_commit()?;
282
283        if state == EventState::Complete || state == EventState::Error {
284            // If we're likely done, force a disconnect.
285            // This does not prevent additional connects/begins/etc.
286            self.editor.disconnect()
287        } else {
288            Ok(())
289        }
290    }
291
292    /// Flesh the target linked to this event and set the event
293    /// group value if necessary.
294    pub fn collect(&mut self, event: &mut Event) -> EgResult<()> {
295        log::info!("{self} collecting {event}");
296
297        self.set_event_state(event, EventState::Collecting)?;
298
299        // Fetch our target object with the needed fleshing.
300        // clone() is required for retrieve()
301        let flesh = self.target_flesh.clone();
302        let core_type = self.core_type().to_string(); // parallel mut's
303
304        let target = self
305            .editor
306            .retrieve_with_ops(&core_type, event.target_pkey().clone(), flesh)?
307            .ok_or_else(|| self.editor.die_event())?;
308
309        event.set_target(target);
310
311        self.set_group_value(event)?;
312
313        // TODO additional data is needed for user_message support.
314
315        self.set_event_state(event, EventState::Collected)
316    }
317
318    /// If this is a grouped event, apply the group-specific value
319    /// to the provided event.
320    ///
321    /// This value is used to sort events into like groups.
322    fn set_group_value(&mut self, event: &mut Event) -> EgResult<()> {
323        let gfield_path = match self.group_field() {
324            Some(f) => f,
325            None => return Ok(()),
326        };
327
328        let mut obj = event.target();
329
330        for part in gfield_path.split('.') {
331            obj = &obj[part];
332        }
333
334        let obj_clone;
335        if let Some(pkey) = obj.pkey_value() {
336            // The object may have been fleshed beyond where we
337            // need it during target collection. If so, extract
338            // the pkey value from the fleshed object.
339            obj_clone = pkey.clone();
340        } else {
341            obj_clone = obj.clone();
342        }
343
344        if obj.is_string() || obj.is_number() {
345            event.set_group_value(obj_clone);
346            Ok(())
347        } else {
348            Err(format!("Invalid group field path: {gfield_path}").into())
349        }
350    }
351}