1use super::signals::SignalTracker;
2use super::{Request, RequestHandler};
3use std::fmt;
4use std::sync::mpsc;
5use std::thread;
6use std::time::Duration;
7use std::time::SystemTime;
8
9const SHUTDOWN_POLL_INTERVAL: u64 = 5;
10
11#[derive(Debug, Clone, PartialEq)]
12pub enum WorkerState {
13 Idle,
14 Active,
15 Done,
16}
17
18impl From<&WorkerState> for &'static str {
27 fn from(e: &WorkerState) -> &'static str {
28 match e {
29 WorkerState::Idle => "Idle",
30 WorkerState::Active => "Active",
31 WorkerState::Done => "Done",
32 }
33 }
34}
35
36impl fmt::Display for WorkerState {
37 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
38 let s: &str = self.into();
39 write!(f, "{s}")
40 }
41}
42
43pub struct WorkerStateEvent {
44 worker_id: u64,
45 state: WorkerState,
46}
47
48impl fmt::Display for WorkerStateEvent {
49 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
50 write!(
51 f,
52 "WorkerStateEvent worker={} state={}",
53 self.worker_id, self.state
54 )
55 }
56}
57
58impl WorkerStateEvent {
59 pub fn worker_id(&self) -> u64 {
60 self.worker_id
61 }
62 pub fn state(&self) -> &WorkerState {
63 &self.state
64 }
65}
66
67pub struct WorkerInstance {
69 pub worker_id: u64,
70 pub state: WorkerState,
71 pub join_handle: thread::JoinHandle<()>,
72 pub to_worker_tx: mpsc::Sender<Box<dyn Request>>,
73}
74
75impl WorkerInstance {
76 pub fn worker_id(&self) -> u64 {
77 self.worker_id
78 }
79 pub fn state(&self) -> &WorkerState {
80 &self.state
81 }
82 pub fn join_handle(&self) -> &thread::JoinHandle<()> {
83 &self.join_handle
84 }
85}
86
87impl fmt::Display for WorkerInstance {
88 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
89 write!(
90 f,
91 "WorkerInstance id={} state={}",
92 self.worker_id, self.state
93 )
94 }
95}
96
97pub struct Worker {
98 worker_id: u64,
99 max_requests: usize,
100 request_count: usize,
101 start_time_epoch: u64,
102 to_parent_tx: mpsc::Sender<WorkerStateEvent>,
103 to_worker_rx: mpsc::Receiver<Box<dyn Request>>,
104 handler: Box<dyn RequestHandler>,
105 sig_tracker: SignalTracker,
106}
107
108impl Worker {
109 pub fn new(
110 worker_id: u64,
111 max_requests: usize,
112 sig_tracker: SignalTracker,
113 to_parent_tx: mpsc::Sender<WorkerStateEvent>,
114 to_worker_rx: mpsc::Receiver<Box<dyn Request>>,
115 handler: Box<dyn RequestHandler>,
116 ) -> Worker {
117 let epoch = SystemTime::now()
118 .duration_since(SystemTime::UNIX_EPOCH)
119 .unwrap()
120 .as_secs();
121
122 Worker {
123 worker_id,
124 max_requests,
125 sig_tracker,
126 start_time_epoch: epoch,
127 to_parent_tx,
128 to_worker_rx,
129 request_count: 0,
130 handler,
131 }
132 }
133
134 fn set_as_idle(&mut self) -> Result<(), String> {
135 self.set_state(WorkerState::Idle)
136 }
137
138 fn set_as_done(&mut self) -> Result<(), String> {
139 self.set_state(WorkerState::Done)
140 }
141
142 fn set_state(&mut self, state: WorkerState) -> Result<(), String> {
143 let evt = WorkerStateEvent {
144 worker_id: self.worker_id,
145 state,
146 };
147
148 if let Err(e) = self.to_parent_tx.send(evt) {
149 self.sig_tracker.request_fast_shutdown();
153
154 Err(format!("Error notifying parent of state change: {e}"))
155 } else {
156 Ok(())
157 }
158 }
159
160 fn should_shut_down(&self) -> bool {
161 if self.sig_tracker.any_shutdown_requested() {
162 log::debug!("{self} received shutdown, exiting run loop");
163 println!("{self} received shutdown, exiting run loop");
164 return true;
165 }
166
167 let reload_time = self.sig_tracker.reload_request_time();
168 if reload_time > self.start_time_epoch {
169 log::info!("{self} shutdown_before of {reload_time} issued. That includes us");
170 return true;
171 }
172
173 false
174 }
175
176 pub fn run(&mut self) {
177 log::debug!("{self} starting");
178
179 if let Err(e) = self.handler.worker_start() {
180 log::error!("{self} error starting worker: {e}. Exiting");
181
182 thread::sleep(Duration::from_secs(1));
186 panic!("{} worker_start failed {}. exiting", self, e);
187 }
188
189 loop {
190 if self.should_shut_down() {
191 break;
192 }
193
194 let work_done = match self.process_one_request() {
195 Ok(b) => b,
196 Err(e) => {
197 log::error!("{self} error processing request: {e}; exiting");
198 self.sig_tracker.request_graceful_shutdown();
202 break;
203 }
204 };
205
206 if !work_done {
207 continue;
209 }
210
211 self.request_count += 1;
212
213 if self.max_requests > 0 && self.request_count == self.max_requests {
214 break;
218 }
219
220 if let Err(e) = self.set_as_idle() {
223 log::debug!("{self} exiting on set_as_idle() failure: {e}");
224 break;
225 }
226 }
227
228 let done_result = self.set_as_done();
229
230 log::debug!("{self} exiting main listen loop");
231
232 if let Err(e) = self.handler.worker_end() {
233 log::error!("{self} handler returned on error on exit: {e}");
234 }
235
236 if let Err(e) = done_result {
237 panic!("{self} could not set as done {e}; panic'ing to force cleanup");
238 }
239 }
240
241 fn process_one_request(&mut self) -> Result<bool, String> {
243 let recv_result = self
244 .to_worker_rx
245 .recv_timeout(Duration::from_secs(SHUTDOWN_POLL_INTERVAL));
246
247 let request = match recv_result {
248 Ok(r) => r,
249 Err(e) => {
250 match e {
251 std::sync::mpsc::RecvTimeoutError::Timeout => return Ok(false),
253 _ => return Err(format!("Error receiving request from parent: {e}")),
255 }
256 }
257 };
258
259 if let Err(e) = self.handler.process(request) {
264 log::error!("{self} error processing request: {e}");
267 }
268
269 Ok(true)
270 }
271}
272
273impl fmt::Display for Worker {
274 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
275 write!(
276 f,
277 "Worker id={} requests={} max-requests={}",
278 self.worker_id, self.request_count, self.max_requests
279 )
280 }
281}