1use crate::osrf::addr::BusAddress;
2use crate::osrf::client::{Client, ClientSingleton};
3use crate::osrf::conf;
4use crate::osrf::message;
5use crate::osrf::message::Message;
6use crate::osrf::message::MessageStatus;
7use crate::osrf::message::MessageType;
8use crate::osrf::message::MethodCall;
9use crate::osrf::message::Payload;
10use crate::osrf::message::Status;
11use crate::osrf::message::TransportMessage;
12use crate::osrf::params::ApiParams;
13use crate::util;
14use crate::{EgResult, EgValue};
15use std::cell::RefCell;
16use std::cell::RefMut;
17use std::collections::VecDeque;
18use std::fmt;
19use std::rc::Rc;
20
21const CONNECT_TIMEOUT: u64 = 10;
22pub const DEFAULT_REQUEST_TIMEOUT: u64 = 60;
23
24#[derive(Debug)]
26struct Response {
27 value: Option<EgValue>,
29 complete: bool,
31 partial: bool,
33}
34
35#[derive(Clone)]
37pub struct Request {
38 session: Rc<RefCell<ClientSessionInternal>>,
40
41 complete: bool,
43
44 thread_trace: usize,
46
47 thread: String,
50}
51
52impl Request {
53 fn new(
54 thread: String,
55 session: Rc<RefCell<ClientSessionInternal>>,
56 thread_trace: usize,
57 ) -> Request {
58 Request {
59 session,
60 thread,
61 complete: false,
62 thread_trace,
63 }
64 }
65
66 pub fn thread(&self) -> &str {
67 &self.thread
68 }
69
70 pub fn thread_trace(&self) -> usize {
71 self.thread_trace
72 }
73
74 pub fn complete(&self) -> bool {
78 self.complete
79 }
80
81 pub fn exhausted(&self) -> bool {
87 self.complete() && self.session.borrow().backlog.is_empty()
88 }
89
90 pub fn first(&mut self) -> EgResult<Option<EgValue>> {
96 self.first_with_timeout(DEFAULT_REQUEST_TIMEOUT)
97 }
98
99 pub fn first_with_timeout(&mut self, timeout: u64) -> EgResult<Option<EgValue>> {
105 let mut resp: Option<EgValue> = None;
106 while !self.complete {
107 if let Some(r) = self.recv_with_timeout(timeout)? {
108 if resp.is_none() {
109 resp = Some(r);
110 } }
112 }
113
114 Ok(resp)
115 }
116
117 pub fn recv_with_timeout(&mut self, mut timeout: u64) -> EgResult<Option<EgValue>> {
124 if self.complete {
125 timeout = 0;
129 }
130
131 loop {
132 let response = self.session.borrow_mut().recv(self.thread_trace, timeout)?;
133
134 if let Some(r) = response {
135 if r.partial {
136 continue;
143 }
144 if r.complete {
145 self.complete = true;
146 }
147 return Ok(r.value);
148 } else {
149 return Ok(None);
150 }
151 }
152 }
153
154 pub fn recv(&mut self) -> EgResult<Option<EgValue>> {
155 self.recv_with_timeout(DEFAULT_REQUEST_TIMEOUT)
156 }
157}
158
159struct ClientSessionInternal {
161 client: Client,
163
164 thread: String,
166
167 connected: bool,
170
171 service: String,
173
174 service_addr: BusAddress,
176
177 router_addr: BusAddress,
179
180 worker_addr: Option<BusAddress>,
184
185 last_thread_trace: usize,
190
191 backlog: VecDeque<Message>,
195
196 partial_buffer: Option<String>,
198}
199
200impl fmt::Display for ClientSessionInternal {
201 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202 write!(f, "Session({} {})", self.service(), self.thread())
203 }
204}
205
206impl ClientSessionInternal {
207 fn new(client: Client, service: &str) -> ClientSessionInternal {
208 let router_addr =
209 BusAddress::for_router(conf::config().client().router_name(), client.domain());
210
211 let service_addr = BusAddress::for_bare_service(service);
212
213 ClientSessionInternal {
214 client,
215 router_addr,
216 service_addr,
217 worker_addr: None,
218 service: String::from(service),
219 connected: false,
220 last_thread_trace: 0,
221 partial_buffer: None,
222 backlog: VecDeque::new(),
223 thread: util::random_number(16),
224 }
225 }
226
227 fn service(&self) -> &str {
228 &self.service
229 }
230
231 fn thread(&self) -> &str {
232 &self.thread
233 }
234
235 fn connected(&self) -> bool {
236 self.connected
237 }
238
239 fn reset(&mut self) {
240 self.worker_addr = None;
242 self.connected = false;
243 self.backlog.clear();
244 }
245
246 fn router_addr(&self) -> &BusAddress {
247 &self.router_addr
248 }
249
250 fn worker_addr(&self) -> Option<&BusAddress> {
251 self.worker_addr.as_ref()
252 }
253
254 fn service_addr(&self) -> &BusAddress {
255 &self.service_addr
256 }
257
258 fn client_internal_mut(&self) -> RefMut<ClientSingleton> {
260 self.client.singleton().borrow_mut()
261 }
262
263 fn destination_addr(&self) -> &BusAddress {
267 match self.worker_addr() {
268 Some(a) => a,
269 None => self.service_addr(),
270 }
271 }
272
273 fn recv_from_backlog(&mut self, thread_trace: usize) -> Option<Message> {
274 if let Some(index) = self
275 .backlog
276 .iter()
277 .position(|m| m.thread_trace() == thread_trace)
278 {
279 self.backlog.remove(index)
282 } else {
283 None
284 }
285 }
286
287 fn recv(&mut self, thread_trace: usize, timeout: u64) -> EgResult<Option<Response>> {
288 let mut timer = util::Timer::new(timeout);
289
290 let mut first_loop = true;
291 loop {
292 if let Some(msg) = self.recv_from_backlog(thread_trace) {
300 return self.unpack_reply(&mut timer, msg);
301 }
302
303 if first_loop {
304 first_loop = false;
305 } else if timer.done() {
306 return Ok(None);
309 }
310
311 let mut tmsg = match self
312 .client_internal_mut()
313 .recv_session(&mut timer, self.thread())?
314 {
315 Some(m) => m,
316 None => continue, };
318
319 self.worker_addr = Some(BusAddress::parse_str(tmsg.from())?);
321
322 for msg in tmsg.body_mut().drain(..) {
324 self.backlog.push_back(msg);
325 }
326
327 }
330 }
331
332 fn unpack_reply(
335 &mut self,
336 timer: &mut util::Timer,
337 mut msg: Message,
338 ) -> EgResult<Option<Response>> {
339 if let Payload::Result(resp) = msg.payload_mut() {
340 log::trace!("{self} Unpacking osrf message status={}", resp.status());
341
342 let mut value = resp.take_content();
344
345 if resp.status() == &MessageStatus::Partial {
346 let buf = match self.partial_buffer.as_mut() {
347 Some(b) => b,
348 None => {
349 self.partial_buffer = Some(String::new());
350 self.partial_buffer.as_mut().unwrap()
351 }
352 };
353
354 if let Some(chunk) = value.as_str() {
357 buf.push_str(chunk);
358 }
359
360 return Ok(Some(Response {
361 value: None,
362 complete: false,
363 partial: true,
364 }));
365 } else if resp.status() == &MessageStatus::PartialComplete {
366 let mut buf = self.partial_buffer.take().unwrap_or_default();
368
369 if let Some(chunk) = value.as_str() {
371 buf.push_str(chunk);
372 }
373
374 let jval = json::parse(&buf)
377 .map_err(|e| format!("Error reconstituting partial message: {e}"))?;
378
379 value = match EgValue::from_json_value(jval) {
382 Ok(v) => v,
383 Err(e) => {
384 log::error!("Error translating JSON value into EgValue: {e}");
385 EgValue::Null
386 }
387 };
388
389 log::trace!("Partial message is now complete");
390 }
391
392 return Ok(Some(Response {
393 value: Some(value),
394 complete: false,
395 partial: false,
396 }));
397 }
398
399 let trace = msg.thread_trace();
400
401 if let Payload::Status(stat) = msg.payload() {
402 self.unpack_status_message(trace, timer, stat)
403 .inspect_err(|_| self.reset())
404 } else {
405 self.reset();
406 Err(format!("{self} unexpected response for request {trace}: {msg:?}").into())
407 }
408 }
409
410 fn unpack_status_message(
411 &mut self,
412 trace: usize,
413 timer: &mut util::Timer,
414 statmsg: &Status,
415 ) -> EgResult<Option<Response>> {
416 let stat = statmsg.status();
417
418 match stat {
419 MessageStatus::Ok => {
420 self.connected = true;
421 Ok(None)
422 }
423 MessageStatus::Continue => {
424 timer.reset();
425 Ok(None)
426 }
427 MessageStatus::Complete => {
428 log::trace!("{self} request {trace} complete");
429 Ok(Some(Response {
430 value: None,
431 complete: true,
432 partial: false,
433 }))
434 }
435 _ => {
436 self.reset();
437 Err(format!("{self} request {trace} failed: {}", statmsg).into())
438 }
439 }
440 }
441
442 fn incr_thread_trace(&mut self) -> usize {
443 self.last_thread_trace += 1;
444 self.last_thread_trace
445 }
446
447 fn request(&mut self, method: &str, params: impl Into<ApiParams>) -> EgResult<usize> {
449 log::debug!("{self} sending request {method}");
450
451 let trace = self.incr_thread_trace();
452
453 let mut params: ApiParams = params.into();
454 let params: Vec<EgValue> = params.take_params();
455
456 if !self.connected() {
457 self.worker_addr = None;
460 }
461
462 let tmsg = TransportMessage::with_body(
463 self.destination_addr().as_str(),
464 self.client.address().as_str(),
465 self.thread(),
466 Message::new(
467 MessageType::Request,
468 trace,
469 Payload::Method(MethodCall::new(method, params)),
470 ),
471 );
472
473 if !self.connected() {
474 let router_addr = self.router_addr().as_str();
478 self.client_internal_mut()
479 .bus_mut()
480 .send_to(tmsg, router_addr)?;
481 } else if let Some(a) = self.worker_addr() {
482 self.client_internal_mut()
485 .get_domain_bus(a.domain())?
486 .send(tmsg)?;
487 } else {
488 self.reset();
489 return Err("We are connected, but have no worker_addr()".into());
490 }
491
492 Ok(trace)
493 }
494
495 fn connect(&mut self) -> EgResult<()> {
497 if self.connected() {
498 log::warn!("{self} is already connected");
499 return Ok(());
500 }
501
502 self.worker_addr = None;
505
506 log::debug!("{self} sending CONNECT");
507
508 let trace = self.incr_thread_trace();
509
510 let tm = TransportMessage::with_body(
511 self.destination_addr().as_str(),
512 self.client.address().as_str(),
513 self.thread(),
514 Message::new(MessageType::Connect, trace, Payload::NoPayload),
515 );
516
517 self.client
519 .singleton()
520 .borrow_mut()
521 .bus_mut()
522 .send_to(tm, self.router_addr().as_str())?;
523
524 self.recv(trace, CONNECT_TIMEOUT)?;
525
526 if self.connected() {
527 log::trace!("{self} connected OK");
528 Ok(())
529 } else {
530 self.reset();
531 Err("CONNECT timed out".into())
532 }
533 }
534
535 fn disconnect(&mut self) -> EgResult<()> {
539 if !self.connected() || self.worker_addr().is_none() {
540 self.reset();
541 return Ok(());
542 }
543
544 let trace = self.incr_thread_trace();
545
546 let dest_addr = self.worker_addr().unwrap(); log::debug!("{self} sending DISCONNECT");
549
550 let tmsg = TransportMessage::with_body(
551 dest_addr.as_str(),
552 self.client.address().as_str(),
553 self.thread(),
554 Message::new(MessageType::Disconnect, trace, Payload::NoPayload),
555 );
556
557 self.client_internal_mut()
558 .get_domain_bus(dest_addr.domain())?
559 .send(tmsg)?;
560
561 self.reset();
562
563 Ok(())
564 }
565}
566
567pub struct ClientSession {
569 session: Rc<RefCell<ClientSessionInternal>>,
570}
571
572impl ClientSession {
573 pub fn new(client: Client, service: &str) -> ClientSession {
574 let ses = ClientSessionInternal::new(client, service);
575
576 log::trace!("Created new session {ses}");
577
578 ClientSession {
579 session: Rc::new(RefCell::new(ses)),
580 }
581 }
582
583 pub fn request(&mut self, method: &str, params: impl Into<ApiParams>) -> EgResult<Request> {
587 let thread = self.session.borrow().thread().to_string();
588
589 Ok(Request::new(
590 thread,
591 self.session.clone(),
592 self.session.borrow_mut().request(method, params)?,
593 ))
594 }
595
596 pub fn send_recv(
601 &mut self,
602 method: &str,
603 params: impl Into<ApiParams>,
604 ) -> EgResult<ResponseIterator> {
605 Ok(ResponseIterator::new(self.request(method, params)?))
606 }
607
608 pub fn connect(&self) -> EgResult<()> {
609 self.session.borrow_mut().connect()
610 }
611
612 pub fn disconnect(&self) -> EgResult<()> {
613 self.session.borrow_mut().disconnect()
614 }
615
616 pub fn connected(&self) -> bool {
617 self.session.borrow().connected()
618 }
619}
620
621pub struct ResponseIterator {
623 request: Request,
624}
625
626impl Iterator for ResponseIterator {
627 type Item = EgResult<EgValue>;
628
629 fn next(&mut self) -> Option<Self::Item> {
630 self.request.recv().transpose()
631 }
632}
633
634impl ResponseIterator {
635 pub fn new(request: Request) -> Self {
636 ResponseIterator { request }
637 }
638}
639
640pub struct MultiSession {
652 client: Client,
653 service: String,
654 requests: Vec<Request>,
655}
656
657impl MultiSession {
658 pub fn new(client: Client, service: &str) -> MultiSession {
659 MultiSession {
660 client,
661 service: service.to_string(),
662 requests: Vec::new(),
663 }
664 }
665
666 pub fn request(&mut self, method: &str, params: impl Into<ApiParams>) -> EgResult<String> {
671 let mut ses = self.client.session(&self.service);
672 let req = ses.request(method, params)?;
673 let thread = req.thread().to_string();
674
675 self.requests.push(req);
676
677 Ok(thread)
678 }
679
680 pub fn complete(&mut self) -> bool {
685 self.remove_completed();
686 self.requests.is_empty()
687 }
688
689 pub fn recv(&mut self, timeout: u64) -> EgResult<Option<(String, EgValue)>> {
694 if self.client.wait(timeout)? {
699 for req in self.requests.iter_mut() {
700 if let Some(resp) = req.recv_with_timeout(0)? {
701 return Ok(Some((req.thread.to_string(), resp)));
702 }
703 }
704 }
705
706 self.remove_completed();
707
708 Ok(None)
709 }
710
711 fn remove_completed(&mut self) {
712 let test = |r: &Request| r.exhausted();
716
717 while let Some(pos) = self.requests.iter().position(test) {
718 self.requests.remove(pos);
719 }
720 }
721}
722
723pub struct ServerSession {
724 service: String,
726
727 client: Client,
729
730 thread: String,
732
733 sender: BusAddress,
735
736 responded_complete: bool,
739
740 last_thread_trace: usize,
745
746 atomic_resp_queue: Option<Vec<EgValue>>,
748}
749
750impl fmt::Display for ServerSession {
751 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
752 write!(f, "ServerSession({} {})", self.service(), self.thread())
753 }
754}
755
756impl ServerSession {
757 pub fn new(
758 client: Client,
759 service: &str,
760 thread: &str,
761 last_thread_trace: usize,
762 sender: BusAddress,
763 ) -> ServerSession {
764 ServerSession {
765 client,
766 sender,
767 last_thread_trace,
768 service: service.to_string(),
769 responded_complete: false,
770 thread: thread.to_string(),
771 atomic_resp_queue: None,
772 }
773 }
774
775 pub fn last_thread_trace(&self) -> usize {
776 self.last_thread_trace
777 }
778
779 pub fn set_last_thread_trace(&mut self, trace: usize) {
780 self.last_thread_trace = trace
781 }
782
783 pub fn clear_responded_complete(&mut self) {
784 self.responded_complete = false;
785 }
786
787 pub fn thread(&self) -> &str {
788 &self.thread
789 }
790
791 pub fn service(&self) -> &str {
792 &self.service
793 }
794
795 pub fn sender(&self) -> &BusAddress {
796 &self.sender
797 }
798
799 pub fn new_atomic_resp_queue(&mut self) {
800 log::debug!("{self} starting new atomic queue...");
801 self.atomic_resp_queue = Some(Vec::new());
802 }
803
804 fn client_internal_mut(&self) -> RefMut<ClientSingleton> {
806 self.client.singleton().borrow_mut()
807 }
808
809 pub fn responded_complete(&self) -> bool {
810 self.responded_complete
811 }
812
813 fn build_result_message(
817 &mut self,
818 mut result: Option<EgValue>,
819 complete: bool,
820 ) -> EgResult<Option<Message>> {
821 let result_value;
822
823 if self.atomic_resp_queue.is_some() {
824 let q = self.atomic_resp_queue.as_mut().unwrap();
826
827 if let Some(res) = result.take() {
828 q.push(res);
829 }
830
831 if complete {
832 result_value = self.atomic_resp_queue.take().unwrap().into();
838 } else {
839 return Ok(None);
842 }
843 } else {
844 if let Some(res) = result.take() {
846 result_value = res;
847 } else {
848 return Ok(None);
849 }
850 }
851
852 Ok(Some(Message::new(
853 MessageType::Result,
854 self.last_thread_trace(),
855 Payload::Result(message::Result::new(
856 MessageStatus::Ok,
857 "OK",
858 "osrfResult",
859 result_value,
860 )),
861 )))
862 }
863
864 fn respond_with_parts(&mut self, value: Option<EgValue>, complete: bool) -> EgResult<()> {
866 if self.responded_complete {
867 log::warn!(
868 r#"Dropping trailing replies after already sending a
869 Request Complete message for thread {}"#,
870 self.thread()
871 );
872 return Ok(());
873 }
874
875 let mut complete_msg = None;
876
877 let mut result_msg = self.build_result_message(value, complete)?;
878
879 if complete {
880 self.responded_complete = true;
882
883 complete_msg = Some(Message::new(
884 MessageType::Status,
885 self.last_thread_trace(),
886 Payload::Status(message::Status::new(
887 MessageStatus::Complete,
888 "Request Complete",
889 "osrfConnectStatus",
890 )),
891 ));
892 }
893
894 if result_msg.is_none() && complete_msg.is_none() {
895 return Ok(());
897 }
898
899 let mut tmsg = TransportMessage::new(
903 self.sender.as_str(),
904 self.client.address().as_str(),
905 self.thread(),
906 );
907
908 if let Some(msg) = result_msg.take() {
909 tmsg.body_mut().push(msg);
910 }
911
912 if let Some(msg) = complete_msg.take() {
913 tmsg.body_mut().push(msg);
914 }
915
916 self.client_internal_mut()
917 .get_domain_bus(self.sender.domain())?
918 .send(tmsg)
919 }
920
921 pub fn send_complete(&mut self) -> EgResult<()> {
922 self.respond_with_parts(None, true)
923 }
924
925 pub fn respond(&mut self, value: impl Into<EgValue>) -> EgResult<()> {
926 self.respond_with_parts(Some(value.into()), false)
927 }
928
929 pub fn respond_complete(&mut self, value: impl Into<EgValue>) -> EgResult<()> {
930 self.respond_with_parts(Some(value.into()), true)
931 }
932}