mptc/
worker.rs

1use super::signals::SignalTracker;
2use super::{Request, RequestHandler};
3use std::fmt;
4use std::sync::mpsc;
5use std::thread;
6use std::time::Duration;
7use std::time::SystemTime;
8
9const SHUTDOWN_POLL_INTERVAL: u64 = 5;
10
11#[derive(Debug, Clone, PartialEq)]
12pub enum WorkerState {
13    Idle,
14    Active,
15    Done,
16}
17
18/// # Examples
19///
20/// ```
21/// use mptc::worker::WorkerState;
22///
23/// let state = WorkerState::Active;
24/// assert_eq!(state.to_string(), "Active");
25/// ```
26impl From<&WorkerState> for &'static str {
27    fn from(e: &WorkerState) -> &'static str {
28        match e {
29            WorkerState::Idle => "Idle",
30            WorkerState::Active => "Active",
31            WorkerState::Done => "Done",
32        }
33    }
34}
35
36impl fmt::Display for WorkerState {
37    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38        let s: &str = self.into();
39        write!(f, "{s}")
40    }
41}
42
43pub struct WorkerStateEvent {
44    worker_id: u64,
45    state: WorkerState,
46}
47
48impl fmt::Display for WorkerStateEvent {
49    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50        write!(
51            f,
52            "WorkerStateEvent worker={} state={}",
53            self.worker_id, self.state
54        )
55    }
56}
57
58impl WorkerStateEvent {
59    pub fn worker_id(&self) -> u64 {
60        self.worker_id
61    }
62    pub fn state(&self) -> &WorkerState {
63        &self.state
64    }
65}
66
67/// Data for tracking a specific worker thread.
68pub struct WorkerInstance {
69    pub worker_id: u64,
70    pub state: WorkerState,
71    pub join_handle: thread::JoinHandle<()>,
72    pub to_worker_tx: mpsc::Sender<Box<dyn Request>>,
73}
74
75impl WorkerInstance {
76    pub fn worker_id(&self) -> u64 {
77        self.worker_id
78    }
79    pub fn state(&self) -> &WorkerState {
80        &self.state
81    }
82    pub fn join_handle(&self) -> &thread::JoinHandle<()> {
83        &self.join_handle
84    }
85}
86
87impl fmt::Display for WorkerInstance {
88    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89        write!(
90            f,
91            "WorkerInstance id={} state={}",
92            self.worker_id, self.state
93        )
94    }
95}
96
97pub struct Worker {
98    worker_id: u64,
99    max_requests: usize,
100    request_count: usize,
101    start_time_epoch: u64,
102    to_parent_tx: mpsc::Sender<WorkerStateEvent>,
103    to_worker_rx: mpsc::Receiver<Box<dyn Request>>,
104    handler: Box<dyn RequestHandler>,
105    sig_tracker: SignalTracker,
106}
107
108impl Worker {
109    pub fn new(
110        worker_id: u64,
111        max_requests: usize,
112        sig_tracker: SignalTracker,
113        to_parent_tx: mpsc::Sender<WorkerStateEvent>,
114        to_worker_rx: mpsc::Receiver<Box<dyn Request>>,
115        handler: Box<dyn RequestHandler>,
116    ) -> Worker {
117        let epoch = SystemTime::now()
118            .duration_since(SystemTime::UNIX_EPOCH)
119            .unwrap()
120            .as_secs();
121
122        Worker {
123            worker_id,
124            max_requests,
125            sig_tracker,
126            start_time_epoch: epoch,
127            to_parent_tx,
128            to_worker_rx,
129            request_count: 0,
130            handler,
131        }
132    }
133
134    fn set_as_idle(&mut self) -> Result<(), String> {
135        self.set_state(WorkerState::Idle)
136    }
137
138    fn set_as_done(&mut self) -> Result<(), String> {
139        self.set_state(WorkerState::Done)
140    }
141
142    fn set_state(&mut self, state: WorkerState) -> Result<(), String> {
143        let evt = WorkerStateEvent {
144            worker_id: self.worker_id,
145            state,
146        };
147
148        if let Err(e) = self.to_parent_tx.send(evt) {
149            // If we're here, our parent server has exited or failed in
150            // some unrecoverable way.  Tell our fellow workers it's
151            // time to shut down.
152            self.sig_tracker.request_fast_shutdown();
153
154            Err(format!("Error notifying parent of state change: {e}"))
155        } else {
156            Ok(())
157        }
158    }
159
160    fn should_shut_down(&self) -> bool {
161        if self.sig_tracker.any_shutdown_requested() {
162            log::debug!("{self} received shutdown, exiting run loop");
163            println!("{self} received shutdown, exiting run loop");
164            return true;
165        }
166
167        let reload_time = self.sig_tracker.reload_request_time();
168        if reload_time > self.start_time_epoch {
169            log::info!("{self} shutdown_before of {reload_time} issued.  That includes us");
170            return true;
171        }
172
173        false
174    }
175
176    pub fn run(&mut self) {
177        log::debug!("{self} starting");
178
179        if let Err(e) = self.handler.worker_start() {
180            log::error!("{self} error starting worker: {e}.  Exiting");
181
182            // Failure to start the worker likely means a systemic issue
183            // that could result in a lot of thread churn.  Sleep for a
184            // sec here to keep things from getting too chaotic.
185            thread::sleep(Duration::from_secs(1));
186            panic!("{} worker_start failed {}.  exiting", self, e);
187        }
188
189        loop {
190            if self.should_shut_down() {
191                break;
192            }
193
194            let work_done = match self.process_one_request() {
195                Ok(b) => b,
196                Err(e) => {
197                    log::error!("{self} error processing request: {e}; exiting");
198                    // If we're here, our parent server has exited
199                    // or failed in some unrecoverable way.
200                    // Tell our fellow workers it's time to shut down.
201                    self.sig_tracker.request_graceful_shutdown();
202                    break;
203                }
204            };
205
206            if !work_done {
207                // Go back and keep listening for requests.
208                continue;
209            }
210
211            self.request_count += 1;
212
213            if self.max_requests > 0 && self.request_count == self.max_requests {
214                // All done
215                // No need to set_as_idle here since we're just
216                // about to set_as_done.
217                break;
218            }
219
220            // Request complete.  Set ourselves as idle, but only if
221            // we're going back into the listen pool.
222            if let Err(e) = self.set_as_idle() {
223                log::debug!("{self} exiting on set_as_idle() failure: {e}");
224                break;
225            }
226        }
227
228        let done_result = self.set_as_done();
229
230        log::debug!("{self} exiting main listen loop");
231
232        if let Err(e) = self.handler.worker_end() {
233            log::error!("{self} handler returned on error on exit: {e}");
234        }
235
236        if let Err(e) = done_result {
237            panic!("{self} could not set as done {e}; panic'ing to force cleanup");
238        }
239    }
240
241    /// Returns result of true if this worker performed any work.
242    fn process_one_request(&mut self) -> Result<bool, String> {
243        let recv_result = self
244            .to_worker_rx
245            .recv_timeout(Duration::from_secs(SHUTDOWN_POLL_INTERVAL));
246
247        let request = match recv_result {
248            Ok(r) => r,
249            Err(e) => {
250                match e {
251                    // Timeouts are expected.
252                    std::sync::mpsc::RecvTimeoutError::Timeout => return Ok(false),
253                    // Other errors are not.
254                    _ => return Err(format!("Error receiving request from parent: {e}")),
255                }
256            }
257        };
258
259        // NOTE no need to report our status as Active to the parent
260        // server, since it applies the Active state to this worker's
261        // metadata just before sending us this request.
262
263        if let Err(e) = self.handler.process(request) {
264            // This is not necessarily an existential crisis, probably
265            // just a malformed request, etc.
266            log::error!("{self} error processing request: {e}");
267        }
268
269        Ok(true)
270    }
271}
272
273impl fmt::Display for Worker {
274    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
275        write!(
276            f,
277            "Worker id={} requests={} max-requests={}",
278            self.worker_id, self.request_count, self.max_requests
279        )
280    }
281}