1use crate::init;
2use crate::osrf::addr::BusAddress;
3use crate::osrf::app;
4use crate::osrf::client::{Client, ClientSingleton};
5use crate::osrf::conf;
6use crate::osrf::logging::Logger;
7use crate::osrf::message;
8use crate::osrf::message::Message;
9use crate::osrf::message::MessageStatus;
10use crate::osrf::message::MessageType;
11use crate::osrf::message::Payload;
12use crate::osrf::message::TransportMessage;
13use crate::osrf::method;
14use crate::osrf::method::ParamCount;
15use crate::osrf::sclient::HostSettings;
16use crate::osrf::session::ServerSession;
17use crate::util;
18use crate::EgResult;
19use mptc::signals::SignalTracker;
20use std::cell::RefMut;
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::OnceLock;
24use std::thread;
25use std::time::{Duration, SystemTime, UNIX_EPOCH};
26
27static REGISTERED_METHODS: OnceLock<HashMap<String, method::MethodDef>> = OnceLock::new();
28
29const IDLE_WAKE_TIME: u64 = 5;
31
32pub struct Microservice {
33 application: Box<dyn app::Application>,
34
35 sig_tracker: SignalTracker,
37
38 client: Client,
40
41 connected: bool,
43
44 session: Option<ServerSession>,
50}
51
52impl fmt::Display for Microservice {
53 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54 write!(f, "Micro")
55 }
56}
57
58impl Microservice {
59 pub fn start(application: Box<dyn app::Application>) -> EgResult<()> {
60 let mut options = init::InitOptions::new();
61 options.appname = Some(application.name().to_string());
62
63 let client = init::osrf_init(&options)?;
64
65 let mut tracker = SignalTracker::new();
66
67 tracker.track_graceful_shutdown();
68 tracker.track_fast_shutdown();
69 tracker.track_reload();
70
71 let mut service = Microservice {
72 application,
73 client,
74 sig_tracker: tracker,
75 connected: false,
76 session: None,
77 };
78
79 let client = service.client.clone();
80
81 service.application.init(client)?;
82
83 service.register_methods()?;
84
85 service.register_routers()?;
86
87 service.listen();
88
89 service.unregister_routers()?;
90
91 Ok(())
92 }
93
94 fn set_idle(&mut self) -> EgResult<()> {
98 Ok(())
99 }
100
101 fn set_active(&mut self) -> EgResult<()> {
105 Ok(())
106 }
107
108 fn methods() -> &'static HashMap<String, method::MethodDef> {
109 if let Some(h) = REGISTERED_METHODS.get() {
110 h
111 } else {
112 log::error!("Cannot call methods() prior to registration");
113 panic!("Cannot call methods() prior to registration");
114 }
115 }
116
117 fn client_internal_mut(&self) -> RefMut<ClientSingleton> {
119 self.client.singleton().borrow_mut()
120 }
121
122 fn session(&self) -> &ServerSession {
126 self.session.as_ref().unwrap()
127 }
128
129 fn session_mut(&mut self) -> &mut ServerSession {
130 self.session.as_mut().unwrap()
131 }
132
133 fn listen(&mut self) {
135 let factory = self.application.worker_factory();
136 let mut app_worker = (factory)();
137
138 if let Err(e) = app_worker.worker_start(self.client.clone()) {
139 log::error!("worker_start failed {e}. Exiting");
140 return;
141 }
142
143 let max_requests: usize = HostSettings::get(&format!(
144 "apps/{}/unix_config/max_requests",
145 self.application.name()
146 ))
147 .expect("Host Settings Not Retrieved")
148 .as_usize()
149 .unwrap_or(5000);
150
151 let keepalive = HostSettings::get(&format!(
152 "apps/{}/unix_config/keepalive",
153 self.application.name()
154 ))
155 .expect("Host Settings Not Retrieved")
156 .as_u64()
157 .unwrap_or(5);
158
159 let mut requests: usize = 0;
160
161 let username = self.client.address().username();
164 let domain = self.client.address().domain();
165
166 let service_addr = BusAddress::for_service(username, domain, self.application.name());
167 let service_addr = service_addr.as_str().to_string();
168
169 let my_addr = self.client.address().as_str().to_string();
170
171 while requests < max_requests {
172 let timeout: u64;
173 let sent_to: &str;
174
175 if self.connected {
176 sent_to = &my_addr;
181 timeout = keepalive;
182 } else {
183 if let Err(e) = self.reset() {
187 log::error!("could not reset {e}. Exiting");
188 break;
189 }
190
191 sent_to = &service_addr;
192 timeout = IDLE_WAKE_TIME;
193 }
194
195 let (work_occurred, msg_handled) =
198 match self.handle_recv(&mut app_worker, timeout, sent_to) {
199 Ok(w) => w,
200 Err(e) => {
201 log::error!("Error in main loop error: {e}");
202 break;
203 }
204 };
205
206 if self.connected {
210 continue;
211 }
212
213 if work_occurred {
214 if let Err(e) = app_worker.end_session() {
220 log::error!("end_session() returned an error: {e}");
221 break;
222 }
223
224 if self.set_idle().is_err() {
225 break;
226 }
227
228 if msg_handled {
229 requests += 1;
232
233 message::reset_thread_locale();
238 }
239 } else {
240 if let Err(e) = app_worker.worker_idle_wake(self.connected) {
243 log::error!("worker_idle_wake() returned an error: {e}");
244 break;
245 }
246 }
247
248 if self.sig_tracker.any_shutdown_requested() {
252 log::info!("received a stop signal");
253 break;
254 }
255 }
256
257 log::debug!("{self} exiting listen loop and cleaning up");
258
259 if let Err(e) = app_worker.worker_end() {
260 log::error!("worker_end failed {e}");
261 }
262
263 self.reset().ok();
265 }
266
267 fn handle_recv(
271 &mut self,
272 app_worker: &mut Box<dyn app::ApplicationWorker>,
273 timeout: u64,
274 sent_to: &str,
275 ) -> EgResult<(bool, bool)> {
276 let selfstr = format!("{self}");
277
278 let recv_result = self
279 .client_internal_mut()
280 .bus_mut()
281 .recv(timeout, Some(sent_to));
282
283 let msg_op = match recv_result {
284 Ok(o) => o,
285 Err(e) => {
286 thread::sleep(Duration::from_secs(1));
291 Err(e)?
292 }
293 };
294
295 let tmsg = match msg_op {
296 Some(v) => v,
297 None => {
298 if !self.connected {
299 return Ok((false, false));
301 }
302
303 self.set_active()?;
304
305 log::warn!("{selfstr} timeout waiting on request while connected");
307
308 if let Err(e) = self.reply_with_status(MessageStatus::Timeout, "Timeout") {
309 Err(format!("server: could not reply with Timeout message: {e}"))?;
310 }
311
312 return Ok((true, false)); }
314 };
315
316 self.set_active()?;
317
318 if !self.connected {
319 app_worker.start_session()?;
324 }
325
326 if let Err(e) = self.handle_transport_message(tmsg, app_worker) {
327 log::error!("{selfstr} error handling message: {e}");
331 self.connected = false;
332 }
333
334 Ok((true, true)) }
336
337 fn handle_transport_message(
338 &mut self,
339 mut tmsg: message::TransportMessage,
340 app_worker: &mut Box<dyn app::ApplicationWorker>,
341 ) -> EgResult<()> {
342 Logger::set_log_trace(tmsg.osrf_xid());
344
345 if self.session.is_none() || self.session().thread().ne(tmsg.thread()) {
346 log::trace!("server: creating new server session for {}", tmsg.thread());
347
348 self.session = Some(ServerSession::new(
349 self.client.clone(),
350 self.application.name(),
351 tmsg.thread(),
352 0, BusAddress::parse_str(tmsg.from())?,
354 ));
355 }
356
357 for msg in tmsg.body_mut().drain(..) {
358 self.handle_message(msg, app_worker)?;
359 }
360
361 Ok(())
362 }
363
364 fn handle_message(
365 &mut self,
366 msg: message::Message,
367 app_worker: &mut Box<dyn app::ApplicationWorker>,
368 ) -> EgResult<()> {
369 self.session_mut().set_last_thread_trace(msg.thread_trace());
370 self.session_mut().clear_responded_complete();
371
372 log::trace!("{self} received message of type {:?}", msg.mtype());
373
374 match msg.mtype() {
375 message::MessageType::Disconnect => {
376 log::trace!("{self} received a DISCONNECT");
377 self.reset()?;
378 Ok(())
379 }
380
381 message::MessageType::Connect => {
382 log::trace!("{self} received a CONNECT");
383
384 if self.connected {
385 return self.reply_bad_request("Worker is already connected");
386 }
387
388 self.connected = true;
389 self.reply_with_status(MessageStatus::Ok, "OK")
390 }
391
392 message::MessageType::Request => {
393 log::trace!("{self} received a REQUEST");
394 self.handle_request(msg, app_worker)
395 }
396
397 _ => self.reply_bad_request("Unexpected message type"),
398 }
399 }
400
401 fn reply_with_status(&mut self, stat: MessageStatus, stat_text: &str) -> EgResult<()> {
402 let tmsg = TransportMessage::with_body(
403 self.session().sender().as_str(),
404 self.client.address().as_str(),
405 self.session().thread(),
406 Message::new(
407 MessageType::Status,
408 self.session().last_thread_trace(),
409 Payload::Status(message::Status::new(stat, stat_text, "osrfStatus")),
410 ),
411 );
412
413 self.client_internal_mut()
414 .get_domain_bus(self.session().sender().domain())?
415 .send(tmsg)
416 }
417
418 fn handle_request(
419 &mut self,
420 mut msg: message::Message,
421 app_worker: &mut Box<dyn app::ApplicationWorker>,
422 ) -> EgResult<()> {
423 let method_call = match msg.take_payload() {
424 message::Payload::Method(m) => m,
425 _ => return self.reply_bad_request("Request sent without a MethoCall payload"),
426 };
427
428 let param_count = method_call.params().len();
429 let api_name = method_call.method().to_string();
430
431 let log_params = util::stringify_params(
432 &api_name,
433 method_call.params(),
434 conf::config().log_protect(),
435 );
436
437 log::info!("CALL: {} {}", api_name, log_params);
439
440 if !self.connected {
444 self.client.clear()?;
445 }
446
447 let mut method_def = Microservice::methods().get(&api_name).cloned();
450
451 if method_def.is_none() {
452 if api_name.ends_with(".atomic") {
456 let meth = api_name.replace(".atomic", "");
457 if let Some(m) = Microservice::methods().get(&meth) {
458 method_def = Some(m.clone());
459
460 self.session_mut().new_atomic_resp_queue();
463 }
464 }
465 }
466
467 if method_def.is_none() {
468 log::warn!("Method not found: {}", api_name);
469
470 return self.reply_with_status(
471 MessageStatus::MethodNotFound,
472 &format!("Method not found: {}", api_name),
473 );
474 }
475
476 let method_def = method_def.unwrap();
477 let pcount = method_def.param_count();
478
479 if !ParamCount::matches(pcount, param_count as u8) {
482 return self.reply_bad_request(&format!(
483 "Invalid param count sent: method={} sent={} needed={}",
484 api_name, param_count, &pcount,
485 ));
486 }
487
488 if let Some(param_defs) = method_def.params() {
491 for (idx, param_def) in param_defs.iter().enumerate() {
492 if let Some(param_val) = method_call.params().get(idx) {
495 if idx >= pcount.minimum() as usize && param_val.is_null() {
496 continue;
499 }
500 if !param_def.datatype.matches(param_val) {
501 return self.reply_bad_request(&format!(
502 "Invalid paramter type: wanted={} got={}",
503 param_def.datatype,
504 param_val.clone().dump()
505 ));
506 }
507 } else {
508 break;
510 }
511 }
512 }
513
514 if let Err(err) = (method_def.handler())(app_worker, self.session_mut(), method_call) {
516 let msg = format!("{self} method {api_name} exited: \"{err}\"");
517 log::error!("{msg}");
518 app_worker.api_call_error(&api_name, err);
519 self.reply_server_error(&msg)?;
520 Err(msg)?;
521 }
522
523 if !self.session().responded_complete() {
524 self.session_mut().send_complete()
525 } else {
526 Ok(())
527 }
528 }
529
530 fn reply_server_error(&mut self, text: &str) -> EgResult<()> {
531 self.connected = false;
532
533 let msg = Message::new(
534 MessageType::Status,
535 self.session().last_thread_trace(),
536 Payload::Status(message::Status::new(
537 MessageStatus::InternalServerError,
538 &format!("Internal Server Error: {text}"),
539 "osrfStatus",
540 )),
541 );
542
543 let tmsg = TransportMessage::with_body(
544 self.session().sender().as_str(),
545 self.client.address().as_str(),
546 self.session().thread(),
547 msg,
548 );
549
550 self.client_internal_mut()
551 .get_domain_bus(self.session().sender().domain())?
552 .send(tmsg)
553 }
554
555 fn reply_bad_request(&mut self, text: &str) -> EgResult<()> {
556 self.connected = false;
557
558 let msg = Message::new(
559 MessageType::Status,
560 self.session().last_thread_trace(),
561 Payload::Status(message::Status::new(
562 MessageStatus::BadRequest,
563 &format!("Bad Request: {text}"),
564 "osrfStatus",
565 )),
566 );
567
568 let tmsg = TransportMessage::with_body(
569 self.session().sender().as_str(),
570 self.client.address().as_str(),
571 self.session().thread(),
572 msg,
573 );
574
575 self.client_internal_mut()
576 .get_domain_bus(self.session().sender().domain())?
577 .send(tmsg)
578 }
579
580 fn register_methods(&mut self) -> EgResult<()> {
581 let client = self.client.clone();
582 let list = self.application.register_methods(client)?;
583 let mut hash = HashMap::new();
584 for m in list {
585 hash.insert(m.name().to_string(), m);
586 }
587 self.add_system_methods(&mut hash);
588
589 if REGISTERED_METHODS.set(hash).is_err() {
590 return Err("Cannot call register_methods() more than once".into());
591 }
592
593 Ok(())
594 }
595
596 fn add_system_methods(&mut self, hash: &mut HashMap<String, method::MethodDef>) {
597 let name = "opensrf.system.echo";
598 let mut method = method::MethodDef::new(name, method::ParamCount::Any, system_method_echo);
599 method.set_desc("Echo back any values sent");
600 hash.insert(name.to_string(), method);
601
602 let name = "opensrf.system.time";
603 let mut method = method::MethodDef::new(name, method::ParamCount::Zero, system_method_time);
604 method.set_desc("Respond with system time in epoch seconds");
605 hash.insert(name.to_string(), method);
606
607 let name = "opensrf.system.method.all";
608 let mut method = method::MethodDef::new(
609 name,
610 method::ParamCount::Range(0, 1),
611 system_method_introspect,
612 );
613 method.set_desc("List published API definitions");
614
615 method.add_param(method::Param {
616 name: String::from("prefix"),
617 datatype: method::ParamDataType::String,
618 desc: Some(String::from("API name prefix filter")),
619 });
620
621 hash.insert(name.to_string(), method);
622
623 let name = "opensrf.system.method.all.summary";
624 let mut method = method::MethodDef::new(
625 name,
626 method::ParamCount::Range(0, 1),
627 system_method_introspect,
628 );
629 method.set_desc("Summary list published API definitions");
630
631 method.add_param(method::Param {
632 name: String::from("prefix"),
633 datatype: method::ParamDataType::String,
634 desc: Some(String::from("API name prefix filter")),
635 });
636
637 hash.insert(name.to_string(), method);
638 }
639
640 fn hosting_domains(&self) -> Vec<(String, String)> {
643 let mut domains: Vec<(String, String)> = Vec::new();
644 for router in conf::config().client().routers() {
645 match router.services() {
646 Some(services) => {
647 if services.iter().any(|s| s.eq(self.application.name())) {
648 domains.push((router.username().to_string(), router.domain().to_string()));
649 }
650 }
651 None => {
652 domains.push((router.username().to_string(), router.domain().to_string()));
655 }
656 }
657 }
658
659 domains
660 }
661
662 fn register_routers(&mut self) -> EgResult<()> {
663 for (username, domain) in self.hosting_domains().iter() {
664 log::info!("server: registering with router at {domain}");
665
666 self.client.send_router_command(
667 username,
668 domain,
669 "register",
670 Some(self.application.name()),
671 )?;
672 }
673
674 Ok(())
675 }
676
677 fn unregister_routers(&mut self) -> EgResult<()> {
678 for (username, domain) in self.hosting_domains().iter() {
679 log::info!("server: un-registering with router at {domain}");
680
681 self.client.send_router_command(
682 username,
683 domain,
684 "unregister",
685 Some(self.application.name()),
686 )?;
687 }
688 Ok(())
689 }
690
691 fn reset(&mut self) -> EgResult<()> {
693 self.connected = false;
694 self.session = None;
695 self.client.clear()
696 }
697}
698
699fn system_method_echo(
701 _worker: &mut Box<dyn app::ApplicationWorker>,
702 session: &mut ServerSession,
703 method: message::MethodCall,
704) -> EgResult<()> {
705 let count = method.params().len();
706 for (idx, val) in method.params().iter().enumerate() {
707 if idx == count - 1 {
708 session.respond_complete(val.clone())?;
712 } else {
713 session.respond(val.clone())?;
714 }
715 }
716 Ok(())
717}
718
719fn system_method_time(
720 _worker: &mut Box<dyn app::ApplicationWorker>,
721 session: &mut ServerSession,
722 _method: message::MethodCall,
723) -> EgResult<()> {
724 match SystemTime::now().duration_since(UNIX_EPOCH) {
725 Ok(t) => session.respond_complete(t.as_secs()),
726 Err(e) => Err(format!("System time error: {e}").into()),
727 }
728}
729
730fn system_method_introspect(
731 _worker: &mut Box<dyn app::ApplicationWorker>,
732 session: &mut ServerSession,
733 method: message::MethodCall,
734) -> EgResult<()> {
735 let prefix = match method.params().first() {
736 Some(p) => p.as_str(),
737 None => None,
738 };
739
740 let mut names: Vec<&str> = match prefix {
742 Some(pfx) => Microservice::methods()
745 .keys()
746 .filter(|n| n.starts_with(pfx))
747 .map(|n| n.as_str())
748 .collect(),
749 None => Microservice::methods().keys().map(|n| n.as_str()).collect(),
750 };
751
752 names.sort();
753
754 for name in names {
755 if let Some(meth) = Microservice::methods().get(name) {
756 if method.method().contains("summary") {
757 session.respond(meth.to_summary_string())?;
758 } else {
759 session.respond(meth.to_eg_value())?;
760 }
761 }
762 }
763
764 Ok(())
765}