1use crate::init;
2use crate::osrf::app;
3use crate::osrf::client::Client;
4use crate::osrf::conf;
5use crate::osrf::message;
6use crate::osrf::method;
7use crate::osrf::sclient::HostSettings;
8use crate::osrf::session;
9use crate::osrf::worker::{Worker, WorkerState, WorkerStateEvent};
10use crate::util;
11use crate::EgResult;
12use mptc::signals::SignalTracker;
13use std::collections::HashMap;
14use std::env;
15use std::sync::mpsc;
16use std::sync::OnceLock;
17use std::thread;
18use std::time;
19use std::time::Duration;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22static REGISTERED_METHODS: OnceLock<HashMap<String, method::MethodDef>> = OnceLock::new();
31
32const IDLE_THREAD_WARN_THRESHOLD: usize = 1;
34const IDLE_WAKE_TIME: u64 = 3;
37const SHUTDOWN_MAX_WAIT: u64 = 30;
39const DEFAULT_MIN_WORKERS: usize = 3;
40const DEFAULT_MAX_WORKERS: usize = 30;
41const DEFAULT_MAX_WORKER_REQUESTS: usize = 1000;
42const DEFAULT_WORKER_KEEPALIVE: usize = 5;
43const DEFAULT_MIN_IDLE_WORKERS: usize = 1;
44
45const LOG_THREAD_STATS_FREQUENCY: u64 = 3;
47
48const LOG_THREAD_MIN_ACTIVE: usize = 5;
50
51#[derive(Debug)]
52pub struct WorkerThread {
53 pub state: WorkerState,
54 pub join_handle: thread::JoinHandle<()>,
55}
56
57pub struct Server {
58 application: Box<dyn app::Application>,
59 client: Client,
60 workers: HashMap<u64, WorkerThread>,
62 worker_id_gen: u64,
64 to_parent_tx: mpsc::SyncSender<WorkerStateEvent>,
65 to_parent_rx: mpsc::Receiver<WorkerStateEvent>,
66 min_workers: usize,
67 max_workers: usize,
68 max_worker_requests: usize,
69 worker_keepalive: usize,
70
71 exited_workers: HashMap<u64, WorkerThread>,
72
73 sig_tracker: SignalTracker,
74
75 min_idle_workers: usize,
82}
83
84impl Server {
85 pub fn start(application: Box<dyn app::Application>) -> EgResult<()> {
86 let service = application.name();
87
88 let mut options = init::InitOptions::new();
89 options.appname = Some(service.to_string());
90
91 let client = init::osrf_init(&options)?;
92
93 let mut min_workers =
94 HostSettings::get(&format!("apps/{service}/unix_config/min_children"))?
95 .as_usize()
96 .unwrap_or(DEFAULT_MIN_WORKERS);
97
98 let mut min_idle_workers =
99 HostSettings::get(&format!("apps/{service}/unix_config/min_spare_children"))?
100 .as_usize()
101 .unwrap_or(DEFAULT_MIN_IDLE_WORKERS);
102
103 let mut max_workers =
104 HostSettings::get(&format!("apps/{service}/unix_config/max_children"))?
105 .as_usize()
106 .unwrap_or(DEFAULT_MAX_WORKERS);
107
108 let mut max_worker_requests =
109 HostSettings::get(&format!("apps/{service}/unix_config/max_requests"))?
110 .as_usize()
111 .unwrap_or(DEFAULT_MAX_WORKER_REQUESTS);
112
113 let mut worker_keepalive =
114 HostSettings::get(&format!("apps/{service}/unix_config/keepalive"))?
115 .as_usize()
116 .unwrap_or(DEFAULT_WORKER_KEEPALIVE);
117
118 if let Ok(num) = env::var("OSRF_SERVER_MIN_WORKERS") {
121 if let Ok(num) = num.parse::<usize>() {
122 min_workers = num;
123 }
124 }
125
126 if let Ok(num) = env::var("OSRF_SERVER_MIN_IDLE_WORKERS") {
127 if let Ok(num) = num.parse::<usize>() {
128 min_idle_workers = num;
129 }
130 }
131
132 if let Ok(num) = env::var("OSRF_SERVER_MAX_WORKERS") {
133 if let Ok(num) = num.parse::<usize>() {
134 max_workers = num;
135 }
136 }
137
138 if let Ok(num) = env::var("OSRF_SERVER_MAX_WORKER_REQUESTS") {
139 if let Ok(num) = num.parse::<usize>() {
140 max_worker_requests = num;
141 }
142 }
143
144 if let Ok(num) = env::var("OSRF_SERVER_WORKER_KEEPALIVE") {
145 if let Ok(num) = num.parse::<usize>() {
146 worker_keepalive = num;
147 }
148 }
149
150 log::info!(
151 "Starting service {} with min-workers={} min-idle={} max-workers={} max-requests={} keepalive={}",
152 service,
153 min_workers,
154 min_idle_workers,
155 max_workers,
156 max_worker_requests,
157 worker_keepalive,
158 );
159
160 let (tx, rx): (
167 mpsc::SyncSender<WorkerStateEvent>,
168 mpsc::Receiver<WorkerStateEvent>,
169 ) = mpsc::sync_channel(0);
170
171 let mut server = Server {
172 client,
173 application,
174 min_workers,
175 max_workers,
176 max_worker_requests,
177 worker_keepalive,
178 min_idle_workers,
179 worker_id_gen: 0,
180 to_parent_tx: tx,
181 to_parent_rx: rx,
182 workers: HashMap::new(),
183 exited_workers: HashMap::new(),
184 sig_tracker: SignalTracker::new(),
185 };
186
187 server.listen()
188 }
189
190 fn app_mut(&mut self) -> &mut Box<dyn app::Application> {
191 &mut self.application
192 }
193
194 fn service(&self) -> &str {
195 self.application.name()
196 }
197
198 fn next_worker_id(&mut self) -> u64 {
199 self.worker_id_gen += 1;
200 self.worker_id_gen
201 }
202
203 fn spawn_threads(&mut self) {
204 if self.sig_tracker.any_shutdown_requested() {
205 return;
206 }
207 while self.workers.len() < self.min_workers {
208 self.spawn_one_thread();
209 }
210 }
211
212 fn spawn_one_thread(&mut self) {
213 let worker_id = self.next_worker_id();
214 let to_parent_tx = self.to_parent_tx.clone();
215 let service = self.service().to_string();
216 let factory = self.application.worker_factory();
217 let sig_tracker = self.sig_tracker.clone();
218 let max_worker_requests = self.max_worker_requests;
219 let worker_keepalive = self.worker_keepalive;
220
221 log::trace!("server: spawning a new worker {worker_id}");
222
223 let handle = thread::spawn(move || {
224 Server::start_worker_thread(
225 sig_tracker,
226 factory,
227 service,
228 worker_id,
229 to_parent_tx,
230 max_worker_requests,
231 worker_keepalive,
232 );
233 });
234
235 self.workers.insert(
236 worker_id,
237 WorkerThread {
238 state: WorkerState::Idle,
239 join_handle: handle,
240 },
241 );
242 }
243
244 fn start_worker_thread(
245 sig_tracker: SignalTracker,
246 factory: app::ApplicationWorkerFactory,
247 service: String,
248 worker_id: u64,
249 to_parent_tx: mpsc::SyncSender<WorkerStateEvent>,
250 max_worker_requests: usize,
251 worker_keepalive: usize,
252 ) {
253 log::debug!("{service} Creating new worker {worker_id}");
254
255 let worker_result = Worker::new(
256 service,
257 worker_id,
258 sig_tracker,
259 to_parent_tx,
260 max_worker_requests,
261 worker_keepalive,
262 );
263
264 let mut worker = match worker_result {
265 Ok(w) => w,
266 Err(e) => {
267 log::error!("Cannot create worker: {e}. Exiting.");
268
269 thread::sleep(Duration::from_secs(5));
273 return;
274 }
275 };
276
277 log::trace!("Worker {worker_id} going into listen()");
278
279 if let Err(e) = worker.listen(factory) {
280 thread::sleep(time::Duration::from_secs(1));
284
285 panic!("{worker} failed; forcing an exit: {e}");
289 }
290 }
291
292 fn hosting_domains(&self) -> Vec<(String, String)> {
295 let mut domains: Vec<(String, String)> = Vec::new();
296 for router in conf::config().client().routers() {
297 match router.services() {
298 Some(services) => {
299 if services.iter().any(|s| s.eq(self.service())) {
300 domains.push((router.username().to_string(), router.domain().to_string()));
301 }
302 }
303 None => {
304 domains.push((router.username().to_string(), router.domain().to_string()));
307 }
308 }
309 }
310
311 domains
312 }
313
314 fn register_routers(&mut self) -> EgResult<()> {
315 for (username, domain) in self.hosting_domains().iter() {
316 log::info!("server: registering with router at {domain}");
317
318 self.client
319 .send_router_command(username, domain, "register", Some(self.service()))?;
320 }
321
322 Ok(())
323 }
324
325 fn unregister_routers(&mut self) -> EgResult<()> {
326 for (username, domain) in self.hosting_domains().iter() {
327 log::info!("server: un-registering with router at {domain}");
328
329 self.client.send_router_command(
330 username,
331 domain,
332 "unregister",
333 Some(self.service()),
334 )?;
335 }
336 Ok(())
337 }
338
339 fn service_init(&mut self) -> EgResult<()> {
340 let client = self.client.clone();
341 self.app_mut().init(client)
342 }
343
344 pub fn methods() -> &'static HashMap<String, method::MethodDef> {
345 if let Some(h) = REGISTERED_METHODS.get() {
346 h
347 } else {
348 log::error!("Cannot call methods() prior to registration");
349 panic!("Cannot call methods() prior to registration");
350 }
351 }
352
353 fn register_methods(&mut self) -> EgResult<()> {
354 let client = self.client.clone();
355 let list = self.app_mut().register_methods(client)?;
356 let mut hash: HashMap<String, method::MethodDef> = HashMap::new();
357 for m in list {
358 hash.insert(m.name().to_string(), m);
359 }
360 self.add_system_methods(&mut hash);
361
362 if REGISTERED_METHODS.set(hash).is_err() {
363 return Err("Cannot call register_methods() more than once".into());
364 }
365
366 Ok(())
367 }
368
369 fn add_system_methods(&self, hash: &mut HashMap<String, method::MethodDef>) {
370 let name = "opensrf.system.echo";
371 let mut method = method::MethodDef::new(name, method::ParamCount::Any, system_method_echo);
372 method.set_desc("Echo back any values sent");
373 hash.insert(name.to_string(), method);
374
375 let name = "opensrf.system.time";
376 let mut method = method::MethodDef::new(name, method::ParamCount::Zero, system_method_time);
377 method.set_desc("Respond with system time in epoch seconds");
378 hash.insert(name.to_string(), method);
379
380 let name = "opensrf.system.method.all";
381 let mut method = method::MethodDef::new(
382 name,
383 method::ParamCount::Range(0, 1),
384 system_method_introspect,
385 );
386 method.set_desc("List published API definitions");
387
388 method.add_param(method::Param {
389 name: String::from("prefix"),
390 datatype: method::ParamDataType::String,
391 desc: Some(String::from("API name prefix filter")),
392 });
393
394 hash.insert(name.to_string(), method);
395
396 let name = "opensrf.system.method.all.summary";
397 let mut method = method::MethodDef::new(
398 name,
399 method::ParamCount::Range(0, 1),
400 system_method_introspect,
401 );
402 method.set_desc("Summary list published API definitions");
403
404 method.add_param(method::Param {
405 name: String::from("prefix"),
406 datatype: method::ParamDataType::String,
407 desc: Some(String::from("API name prefix filter")),
408 });
409
410 hash.insert(name.to_string(), method);
411 }
412
413 pub fn listen(&mut self) -> EgResult<()> {
414 self.service_init()?;
415 self.register_methods()?;
416 self.register_routers()?;
417 self.spawn_threads();
418 self.sig_tracker.track_graceful_shutdown();
419 self.sig_tracker.track_fast_shutdown();
420 self.sig_tracker.track_reload();
421
422 let duration = Duration::from_secs(IDLE_WAKE_TIME);
423 let mut log_timer = util::Timer::new(LOG_THREAD_STATS_FREQUENCY);
424
425 loop {
426 let mut work_performed = false;
429
430 if let Ok(evt) = self.to_parent_rx.recv_timeout(duration) {
437 self.handle_worker_event(&evt);
438 work_performed = true;
439 }
440
441 work_performed = self.check_failed_threads() || work_performed;
443
444 if self.sig_tracker.any_shutdown_requested() {
445 log::info!("We received a stop signal, exiting");
446 break;
447 }
448
449 if !work_performed {
450 self.perform_idle_worker_maint();
453 }
454
455 self.clean_exited_workers(false);
456 self.log_thread_counts(&mut log_timer);
457 }
458
459 self.unregister_routers()?;
460 self.shutdown();
461
462 Ok(())
463 }
464
465 fn log_thread_counts(&self, timer: &mut util::Timer) {
472 if !timer.done() {
473 return;
474 }
475
476 let active_count = self.active_thread_count();
477
478 if active_count < LOG_THREAD_MIN_ACTIVE {
479 return;
480 }
481
482 log::info!(
483 "Service {} max-threads={} active-threads={} idle-threads={} exited-threads={}",
484 self.application.name(),
485 self.max_workers,
486 active_count,
487 self.idle_thread_count(),
488 self.exited_workers.len(),
489 );
490
491 timer.reset();
492 }
493
494 fn perform_idle_worker_maint(&mut self) {
498 let idle_workers = self.idle_thread_count();
499
500 if self.min_idle_workers > 0
501 && self.workers.len() < self.max_workers
502 && idle_workers < self.min_idle_workers
503 {
504 self.spawn_one_thread();
505 log::debug!("Sawned idle worker; idle={idle_workers}");
506 }
507 }
508
509 fn shutdown(&mut self) {
510 let timer = util::Timer::new(SHUTDOWN_MAX_WAIT);
511 let duration = Duration::from_secs(1);
512
513 while !timer.done() && (!self.workers.is_empty() || !self.exited_workers.is_empty()) {
514 let info = format!(
515 "{} shutdown: {} threads; {} active; time remaining {}",
516 self.application.name(),
517 self.workers.len(),
518 self.active_thread_count(),
519 timer.remaining(),
520 );
521
522 println!("{info}...");
524
525 log::info!("{info}");
526
527 if let Ok(evt) = self.to_parent_rx.recv_timeout(duration) {
528 self.handle_worker_event(&evt);
529 }
530
531 self.check_failed_threads();
532 self.clean_exited_workers(true);
533 }
534
535 std::process::exit(0);
538 }
539
540 fn check_failed_threads(&mut self) -> bool {
545 let failed: Vec<u64> = self
546 .workers
547 .iter()
548 .filter(|(_, v)| v.join_handle.is_finished())
549 .map(|(k, _)| *k) .collect();
551
552 let mut handled = false;
553 for worker_id in failed {
554 handled = true;
555 log::info!("Found a thread that exited ungracefully: {worker_id}");
556 self.remove_thread(&worker_id);
557 }
558
559 handled
560 }
561
562 fn remove_thread(&mut self, worker_id: &u64) {
565 log::debug!("server: removing thread {}", worker_id);
566
567 if let Some(worker) = self.workers.remove(worker_id) {
571 self.exited_workers.insert(*worker_id, worker);
572 }
573 self.spawn_threads();
574 }
575
576 fn clean_exited_workers(&mut self, block: bool) {
577 let mut keep = HashMap::new();
578
579 log::debug!("server: exited thread count {}", self.exited_workers.len());
580
581 for (worker_id, worker) in self.exited_workers.drain() {
582 if block || worker.join_handle.is_finished() {
583 log::debug!("server: joining worker: {worker_id}");
584
585 let _ = worker
586 .join_handle
587 .join()
588 .inspect_err(|e| log::error!("server: failure joining worker thread: {e:?}"));
589 } else {
590 log::debug!("server: worker {worker_id} has not finished");
591 keep.insert(worker_id, worker);
592 }
593 }
594
595 self.exited_workers = keep;
596 }
597
598 fn handle_worker_event(&mut self, evt: &WorkerStateEvent) {
601 log::trace!("server received WorkerStateEvent: {:?}", evt);
602
603 let worker_id = evt.worker_id();
604
605 let worker: &mut WorkerThread = match self.workers.get_mut(&worker_id) {
606 Some(w) => w,
607 None => {
608 log::error!("No worker found with id {worker_id}");
609 return;
610 }
611 };
612
613 if evt.state() == WorkerState::Exiting {
614 self.remove_thread(&worker_id);
616 } else {
617 log::trace!("server: updating thread state: {:?}", worker_id);
618 worker.state = evt.state();
619 }
620
621 let idle = self.idle_thread_count();
622 let active = self.active_thread_count();
623
624 log::debug!("server: workers idle={idle} active={active}");
625
626 if self.sig_tracker.any_shutdown_requested() {
627 return;
628 }
629
630 if idle == 0 {
631 if active < self.max_workers {
632 self.spawn_one_thread();
633 } else {
634 log::warn!("server: reached max workers!");
635 }
636 }
637
638 if self.min_idle_workers > IDLE_THREAD_WARN_THRESHOLD && idle < IDLE_THREAD_WARN_THRESHOLD {
639 log::warn!(
640 "server: idle thread count={} is below warning threshold={}",
641 idle,
642 IDLE_THREAD_WARN_THRESHOLD
643 );
644 }
645 }
646
647 fn active_thread_count(&self) -> usize {
648 self.workers
649 .values()
650 .filter(|v| v.state == WorkerState::Active)
651 .count()
652 }
653
654 fn idle_thread_count(&self) -> usize {
655 self.workers
656 .values()
657 .filter(|v| v.state == WorkerState::Idle)
658 .count()
659 }
660}
661
662fn system_method_echo(
664 _worker: &mut Box<dyn app::ApplicationWorker>,
665 session: &mut session::ServerSession,
666 method: message::MethodCall,
667) -> EgResult<()> {
668 let count = method.params().len();
669 for (idx, val) in method.params().iter().enumerate() {
670 if idx == count - 1 {
671 session.respond_complete(val.clone())?;
675 } else {
676 session.respond(val.clone())?;
677 }
678 }
679 Ok(())
680}
681
682fn system_method_time(
683 _worker: &mut Box<dyn app::ApplicationWorker>,
684 session: &mut session::ServerSession,
685 _method: message::MethodCall,
686) -> EgResult<()> {
687 match SystemTime::now().duration_since(UNIX_EPOCH) {
688 Ok(t) => session.respond_complete(t.as_secs()),
689 Err(e) => Err(format!("System time error: {e}").into()),
690 }
691}
692
693fn system_method_introspect(
694 _worker: &mut Box<dyn app::ApplicationWorker>,
695 session: &mut session::ServerSession,
696 method: message::MethodCall,
697) -> EgResult<()> {
698 let prefix = match method.params().first() {
699 Some(p) => p.as_str(),
700 None => None,
701 };
702
703 let mut names: Vec<&str> = match prefix {
705 Some(pfx) => Server::methods()
708 .keys()
709 .filter(|n| n.starts_with(pfx))
710 .map(|n| n.as_str())
711 .collect(),
712 None => Server::methods().keys().map(|n| n.as_str()).collect(),
713 };
714
715 names.sort();
716
717 for name in names {
718 if let Some(meth) = Server::methods().get(name) {
719 if method.method().contains("summary") {
720 session.respond(meth.to_summary_string())?;
721 } else {
722 session.respond(meth.to_eg_value())?;
723 }
724 }
725 }
726
727 Ok(())
728}