Browse Source

added process interface

master
Weird Constructor 3 years ago
parent
commit
5a4df316f6
  1. 2
      .gitignore
  2. 9
      init.wl
  3. 112
      src/event.rs
  4. 137
      src/main.rs
  5. 331
      src/msg.rs
  6. 218
      src/process.rs
  7. 345
      src/tcp_csv_msg_connection.rs
  8. 91
      src/timer.rs

2
.gitignore vendored

@ -1,2 +1,4 @@
/target
src/stuff.rs
*.history
**/*.rs.bk

9
init.wl

@ -1,10 +1,7 @@
!id = dn:timer:oneshot :ms => 1000;
!id2 = dn:timer:interval :ms => 2000;
dn:on id {
std:displayln "timeout";
dn:on (dn:timer:oneshot :ms => 1000) {
std:displayln "timeout" _;
};
dn:on id2 {
std:displayln "timeout2";
};
dn:on id2 { std:displayln "timeout2" _; };

112
src/event.rs

@ -0,0 +1,112 @@
use wlambda::VVal;
use crate::msg;
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum Msg {
JSON(Option<Vec<String>>, String),
MsgPack(Option<Vec<String>>, Vec<u8>),
Bytes(Option<Vec<String>>, Vec<u8>),
Str(Option<Vec<String>>, String),
Direct(Vec<String>),
}
impl Msg {
pub fn from_msg(m: msg::Msg) -> Option<Self> {
match m {
msg::Msg::Quit => None,
msg::Msg::Ok(_) => None,
msg::Msg::Ping => None,
msg::Msg::Hello(_) => None,
msg::Msg::Error(_) => None,
msg::Msg::Direct(args) => Some(Msg::Direct(args)),
msg::Msg::Payload(typ, args, payload) => {
Some(match typ {
"json" => {
match String::from_utf8(payload) {
Ok(s) => Msg::JSON(args, s),
Err(e) => Msg::JSON(args,
String::from_utf8_lossy(e.as_bytes())
.to_string()),
}
},
"msgpack" => Msg::MsgPack(args, payload),
_ => Msg::Bytes(args, payload),
})
},
}
}
pub fn into_vval(self) -> VVal {
match self {
Msg::JSON(args, s) => {
let v =
VVal::from_json(&s)
.unwrap_or_else(|e| VVal::err_msg(&e));
if let Some(args) = args {
let va =
VVal::vec_mv(
args.into_iter().map(
|a| VVal::new_str_mv(a)).collect());
va.push(v);
va
} else {
v
}
},
Msg::MsgPack(args, b) => {
let v =
VVal::from_msgpack(&b)
.unwrap_or_else(|e| VVal::err_msg(&e));
if let Some(args) = args {
let va =
VVal::vec_mv(
args.into_iter().map(
|a| VVal::new_str_mv(a)).collect());
va.push(v);
va
} else {
v
}
},
Msg::Bytes(args, b) => {
let v =
if let Some(args) = args {
VVal::vec_mv(
args.into_iter().map(
|a| VVal::new_str_mv(a)).collect())
} else {
VVal::vec()
};
v.push(VVal::new_byt(b));
v
},
Msg::Str(args, s) => {
let v =
if let Some(args) = args {
VVal::vec_mv(
args.into_iter().map(
|a| VVal::new_str_mv(a)).collect())
} else {
VVal::vec()
};
v.push(VVal::new_str_mv(s));
v
},
Msg::Direct(args) => {
VVal::vec_mv(
args.into_iter().map(
|a| VVal::new_str_mv(a)).collect())
},
}
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum Event {
DeleteCallback(u64),
Timeout(u64),
Message(u64, Msg),
LogErr(String),
}

137
src/main.rs

@ -1,118 +1,38 @@
//mod detached_command;
//use detached_command::*;
mod tcp_csv_msg_connection;
mod sync_event;
use std::thread::{JoinHandle, spawn};
use std::time::{Duration, Instant};
use std::sync::mpsc::*;
//mod tcp_csv_msg_connection;
//mod sync_event;
mod event;
mod timer;
mod msg;
mod process;
use std::rc::Rc;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::mpsc::*;
use wlambda::*;
#[derive(Debug, Clone)]
enum Event {
Timeout(u64),
LogErr(String),
}
#[derive(Debug, Clone, Copy)]
enum Timer {
Timeouted,
Oneshot(u64, u64),
Interval(u64, u64, u64),
}
fn start_timer_thread(event_tx: Sender<Event>, rx: Receiver<Timer>) -> JoinHandle<()> {
spawn(move || {
let mut list = vec![];
let mut free_list = vec![];
let mut max_timeout = 100000000;
let mut min_timeout = 100000000;
loop {
let before_recv = Instant::now();
match rx.recv_timeout(Duration::from_millis(min_timeout)) {
Ok(timer) => {
if free_list.is_empty() {
list.push(timer);
} else {
let idx = free_list.pop().unwrap();
list[idx] = timer;
}
},
Err(RecvTimeoutError::Timeout) => {
()
},
Err(RecvTimeoutError::Disconnected) => {
break;
},
}
let passed_time = before_recv.elapsed().as_millis() as u64;
min_timeout = max_timeout;
for (idx, timer) in list.iter_mut().enumerate() {
match timer {
Timer::Timeouted => (),
Timer::Oneshot(id, tout) => {
if *tout > passed_time {
let new_tout = *tout - passed_time;
if new_tout < min_timeout {
min_timeout = new_tout;
}
*timer = Timer::Oneshot(*id, new_tout);
} else {
event_tx.send(Event::Timeout(*id));
*timer = Timer::Timeouted;
free_list.push(idx);
}
},
Timer::Interval(id, tout, orig_tout) => {
if *tout > passed_time {
let new_tout = *tout - passed_time;
if new_tout < min_timeout {
min_timeout = new_tout;
}
*timer = Timer::Interval(*id, new_tout, *orig_tout);
} else {
event_tx.send(Event::Timeout(*id));
let new_tout = *orig_tout;
if new_tout < min_timeout {
min_timeout = new_tout;
}
*timer = Timer::Interval(*id, *orig_tout, *orig_tout);
}
},
}
}
println!("RR: {:?}", list);
}
})
}
use crate::timer::*;
use crate::event::*;
fn main() {
let (event_tx, event_rx) = channel();
let (timer_tx, timer_rx) = channel();
let timer_thread = start_timer_thread(event_tx.clone(), timer_rx);
let mut cur_id = Rc::new(RefCell::new(0_u64));
let _timer_thread = start_timer_thread(event_tx.clone(), timer_rx);
let cur_id = Rc::new(RefCell::new(0_u64));
let mut callbacks = Rc::new(RefCell::new(HashMap::new()));
let callbacks = Rc::new(RefCell::new(HashMap::new()));
let global = GlobalEnv::new_default();
global.borrow_mut().add_func(
"dn:wsmp:listen",
move |env: &mut Env, _argc: usize| {
move |_env: &mut Env, _argc: usize| {
// create mpsc
// draw new id
// store mpsc sender in global register
@ -175,7 +95,7 @@ fn main() {
global.borrow_mut().add_func(
"dn:send",
move |env: &mut Env, _argc: usize| {
move |_env: &mut Env, _argc: usize| {
// store callback for given id in global register
Ok(VVal::None)
}, Some(0), Some(0));
@ -202,13 +122,31 @@ fn main() {
Event::Timeout(id) => {
if let Some(cb) = callbacks.borrow().get(&id) {
let ret =
ctx.call(cb, &[]).expect("no error in cb");
ctx.call(cb, &[VVal::Int(id as i64)]).expect("no error in cb");
if ret.with_s_ref(|s| s == "delete") {
remove = Some(id);
}
}
},
Event::Message(id, msg) => {
if let Some(cb) = callbacks.borrow().get(&id) {
let ret =
ctx.call(cb, &[VVal::Int(id as i64), msg.into_vval()])
.expect("no error in cb");
if ret.with_s_ref(|s| s == "delete") {
// FIXME: We need to kill the send
// channel of the process! Which should
// also kill the process itself.
remove = Some(id);
}
}
},
Event::DeleteCallback(id) => {
println!("DEBUG: Removed cb {}", id);
remove = Some(id);
},
Event::LogErr(err) => {
eprintln!("Error: {}", err);
},
@ -216,6 +154,9 @@ fn main() {
if let Some(remove_id) = remove {
callbacks.borrow_mut().remove(&remove_id);
// FIXME: We need to kill the send
// channel of the process! Which should
// also kill the process itself.
}
} else {
break;

331
src/msg.rs

@ -0,0 +1,331 @@
use std::fmt::Write;
#[derive(Debug, Clone, PartialEq)]
pub enum Msg {
Direct(Vec<String>),
Payload(&'static str, Option<Vec<String>>, Vec<u8>),
Hello(Vec<String>),
Error(Vec<String>),
Ping,
Quit,
Ok(String),
}
fn write_msg_arg(w: &mut dyn std::fmt::Write, arg: &str) {
if let Some(_) = arg.find(|c: char| c.is_whitespace() || c == '"') {
write!(w, "\"").expect("write to work");
for c in arg.chars() {
match c {
'"' => write!(w, "\\\"").expect("write to work"),
'\\' => write!(w, "\\\\").expect("write to work"),
'\r' => write!(w, "\\r").expect("write to work"),
'\n' => write!(w, "\\n").expect("write to work"),
c => w.write_char(c).expect("write to work"),
}
}
write!(w, "\"").expect("write to work");
} else {
write!(w, "{}", arg).expect("write to work");
}
}
fn write_msg_args(w: &mut dyn std::fmt::Write, args: &[String]) {
for a in args {
write!(w, " ").expect("write to work");
write_msg_arg(w, a);
}
}
impl Msg {
pub fn ok_response(&self) -> Option<Self> {
match self {
Msg::Ping => Some(Msg::Ok("ping".to_string())),
Msg::Quit => Some(Msg::Ok("quit".to_string())),
Msg::Ok(_arg) => None,
Msg::Hello(_args) => Some(Msg::Ok("hello".to_string())),
Msg::Error(_args) => Some(Msg::Ok("error".to_string())),
Msg::Direct(_args) => Some(Msg::Ok("direct".to_string())),
Msg::Payload(name, _, _) => Some(Msg::Ok(name.to_string())),
}
}
pub fn to_frame(&self) -> Vec<u8> {
let mut s = String::new();
let mut payload : Option<&[u8]> = None;
match self {
Msg::Ping => write!(&mut s, "ping").expect("write to work"),
Msg::Quit => write!(&mut s, "quit").expect("write to work"),
Msg::Ok(arg) => write!(&mut s, "ok {}", arg).expect("write to work"),
Msg::Hello(args) => {
write!(&mut s, "hello").expect("write to work");
write_msg_args(&mut s, args);
},
Msg::Error(args) => {
write!(&mut s, "error").expect("write to work");
write_msg_args(&mut s, args);
},
Msg::Direct(args) => {
write!(&mut s, "direct").expect("write to work");
write_msg_args(&mut s, args);
},
Msg::Payload(name, args, msg_payload) => {
write!(&mut s, "{}", name).expect("write to work");
write!(&mut s, " {}", msg_payload.len()).expect("write to work");
if let Some(args) = args {
write_msg_args(&mut s, args);
}
write!(&mut s, "\r\n").expect("write to work");
payload = Some(&msg_payload[..]);
},
}
write!(&mut s, "\r\n").expect("write to work");
let mut b = s.into_bytes();
if let Some(payload) = payload {
b.extend_from_slice(payload);
b.push(b'\r');
b.push(b'\n');
}
b
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ReadMsgError {
TryAgain,
ProtocolError,
Timeout,
EOF,
ReadError(String, String),
}
fn next_arg(s: &str) -> (String, &str) {
let s = s.trim_start();
if let Some(s) = s.strip_prefix('"') {
let mut arg = String::new();
let mut escaped = false;
for (byte_pos, c) in s.char_indices() {
if escaped {
match c {
'r' => arg.push('\r'),
'n' => arg.push('\n'),
_ => arg.push(c),
}
escaped = false;
} else {
if c == '"' {
return (arg, &s[(byte_pos + 1)..].trim_start());
} else if c == '\\' {
escaped = true;
} else {
arg.push(c);
}
}
}
(arg, "")
} else {
let v : Vec<&str> = s.splitn(2, |c: char| c.is_whitespace()).collect();
if v.len() == 0 {
("".to_string(), "")
} else if v.len() == 1 {
(v[0].to_string(), "")
} else {
(v[0].to_string(), v[1].trim_start())
}
}
}
pub struct MessageReader {
payload_name: Option<&'static str>,
payload_buf: Option<Vec<u8>>,
payload_rem_len: Option<u64>,
payload_data: Option<Vec<String>>,
}
impl MessageReader {
pub fn new() -> Self {
Self {
payload_name: None,
payload_buf: None,
payload_rem_len: None,
payload_data: None,
}
}
fn read_payload(&mut self, ep: &str, buf: &mut dyn std::io::BufRead) -> Result<Msg, ReadMsgError> {
let rlen = self.payload_rem_len.unwrap();
if rlen > 0 {
let take_len = if rlen < 4096 { rlen as usize } else { 4096 };
let mut data_buf : [u8; 4096] = [0; 4096];
match buf.read(&mut data_buf[0..take_len]) {
Ok(n) => {
if n == 0 {
return Err(ReadMsgError::EOF);
}
if n > take_len {
return Err(ReadMsgError::ProtocolError);
}
self.payload_rem_len = Some(rlen - (n as u64));
self.payload_buf.as_mut().unwrap().extend_from_slice(&data_buf[0..n]);
},
Err(e) => {
match e.kind() {
std::io::ErrorKind::TimedOut
| std::io::ErrorKind::WouldBlock => {
return Err(ReadMsgError::Timeout);
},
_ => {
return Err(ReadMsgError::ReadError(
ep.to_string(), format!("{}", e)));
}
}
},
}
}
if self.payload_rem_len.unwrap() == 0 {
self.payload_rem_len = None;
return Ok(Msg::Payload(
self.payload_name.take().unwrap(),
self.payload_data.take(),
self.payload_buf.take().unwrap()));
} else {
return Err(ReadMsgError::TryAgain);
}
}
pub fn read_msg(&mut self, ep: &str, buf: &mut dyn std::io::BufRead) -> Result<Msg, ReadMsgError> {
if self.payload_rem_len.is_some() {
return self.read_payload(ep, buf);
}
// use std::io::BufRead;
let mut line = String::new();
match buf.read_line(&mut line) {
Ok(n) => {
if n == 0 {
return Err(ReadMsgError::EOF);
}
//d// println!("READL: [{}]", line);
let v: Vec<&str> =
line.splitn(2, |c: char| c.is_whitespace()).collect();
let command = v[0];
let mut rest = if v.len() > 1 { v[1] } else { "" };
match command {
"direct" => {
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
return Ok(Msg::Direct(args));
},
"payload" => {
let (len_arg, r) = next_arg(rest);
rest = r;
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
if let Ok(len) = u64::from_str_radix(&len_arg, 10) {
self.payload_rem_len = Some(len);
self.payload_buf = Some(vec![]);
self.payload_name = Some("payload");
self.payload_data = Some(args);
return self.read_payload(ep, buf);
} else {
return Err(ReadMsgError::ProtocolError);
}
},
"error" => {
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
return Ok(Msg::Error(args));
},
"msgpack" => {
let (len_arg, _) = next_arg(rest);
if let Ok(len) = u64::from_str_radix(&len_arg, 10) {
self.payload_rem_len = Some(len);
self.payload_buf = Some(vec![]);
self.payload_name = Some("msgpack");
return self.read_payload(ep, buf);
} else {
return Err(ReadMsgError::ProtocolError);
}
},
"json" => {
let (len_arg, _) = next_arg(rest);
if let Ok(len) = u64::from_str_radix(&len_arg, 10) {
self.payload_rem_len = Some(len);
self.payload_buf = Some(vec![]);
self.payload_name = Some("json");
return self.read_payload(ep, buf);
} else {
return Err(ReadMsgError::ProtocolError);
}
},
"hello" => {
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
return Ok(Msg::Hello(args));
},
"quit" => {
return Ok(Msg::Quit);
},
"ping" => {
return Ok(Msg::Ping);
},
"ok" => {
let (arg, _) = next_arg(rest);
return Ok(Msg::Ok(arg));
},
"" => {
return Err(ReadMsgError::TryAgain);
},
_ => {
return Err(ReadMsgError::ProtocolError);
},
}
},
Err(e) => {
match e.kind() {
std::io::ErrorKind::TimedOut
| std::io::ErrorKind::WouldBlock => {
return Err(ReadMsgError::Timeout);
},
_ => {
return Err(ReadMsgError::ReadError(
ep.to_string(), format!("{}", e)));
}
}
},
}
}
}

218
src/process.rs

@ -0,0 +1,218 @@
use std::thread::{JoinHandle, spawn};
use std::sync::mpsc::*;
use std::sync::{Arc, Mutex};
use std::process::Command;
use std::process::Stdio;
use std::time::Duration;
use std::io::Write;
use std::io::BufRead;
use crate::event::*;
use crate::msg;
pub enum Cmd {
Input(Vec<u8>),
Kill,
}
pub enum CmdProtocol {
LineBased,
WSMP,
}
pub fn kill_child(child: &Arc<Mutex<std::process::Child>>) {
if let Ok(mut child) = child.lock() {
child.kill().is_ok();
}
}
pub fn start_process(event_tx: Sender<Event>,
rx: Receiver<Cmd>,
id: u64,
cmd_exe: &str,
args: &[&str],
protocol: CmdProtocol) -> JoinHandle<()>
{
let mut cmd = Command::new(cmd_exe);
let cmd_exe = cmd_exe.to_string();
for a in args.iter() {
cmd.arg(a);
}
spawn(move || {
cmd.stdout(Stdio::piped())
.stderr(Stdio::null())
.stdin(Stdio::piped());
let mut child =
match cmd.spawn() {
Ok(child) => child,
Err(e) => {
event_tx.send(
Event::LogErr(
format!("Error starting '{}': {}",
cmd_exe, e))).is_ok();
return;
},
};
let stdin = child.stdin.take().unwrap();
let stdout = child.stdout.take().unwrap();
let child = Arc::new(Mutex::new(child));
let stop = Arc::new(Mutex::new(false));
let cmd_exe_w = cmd_exe.to_string();
let child_w = child.clone();
let event_tx_w = event_tx.clone();
let stop_w = stop.clone();
let writer_thread = spawn(move || {
let mut bw = std::io::BufWriter::new(stdin);
loop {
match stop_w.lock() {
Ok(stop) => { if *stop { return; } },
Err(_) => { return; },
}
match rx.recv_timeout(Duration::from_millis(1000)) {
Ok(Cmd::Kill) => { kill_child(&child_w); return; },
Ok(Cmd::Input(s)) => {
if let Err(e) = bw.write_all(&s) {
event_tx_w.send(
Event::LogErr(
format!("Writing to '{}' failed: {}",
cmd_exe_w, e))).is_ok();
return;
}
if let Err(e) = bw.flush() {
event_tx_w.send(
Event::LogErr(
format!("Flushing to '{}' failed: {}",
cmd_exe_w, e))).is_ok();
return;
}
},
Err(RecvTimeoutError::Timeout) => {
// nop
},
Err(RecvTimeoutError::Disconnected) => {
std::thread::sleep(
std::time::Duration::from_millis(250));
return;
},
}
}
});
match protocol {
CmdProtocol::LineBased => {
let mut br = std::io::BufReader::new(stdout);
loop {
let mut line = String::new();
match br.read_line(&mut line) {
Ok(s) => {
event_tx.send(
Event::Message(id, Msg::Str(None, line))).is_ok();
if s == 0 { break; }
},
Err(e) => {
event_tx.send(
Event::LogErr(
format!("Error executing '{}': {}",
cmd_exe, e))).is_ok();
kill_child(&child);
break;
}
}
}
},
CmdProtocol::WSMP => {
let mut stdout = std::io::BufReader::new(stdout);
loop {
let mut rd = msg::MessageReader::new();
let mut try_again = true;
while try_again {
try_again = false;
match rd.read_msg(&cmd_exe, &mut stdout) {
Ok(msg) => {
match msg {
msg::Msg::Quit => { kill_child(&child); },
msg::Msg::Ok(_) => { },
msg::Msg::Ping => { },
msg::Msg::Hello(_) => { },
msg::Msg::Error(e) => {
event_tx.send(
Event::LogErr(
format!("Error from '{}': {:?}",
cmd_exe, e))).is_ok();
},
msg => {
if let Some(msg) = Msg::from_msg(msg) {
event_tx.send(
Event::Message(id, msg)).is_ok();
}
},
}
},
Err(msg::ReadMsgError::Timeout) => {
// nop
},
Err(msg::ReadMsgError::TryAgain) => {
try_again = true;
},
Err(e) => {
event_tx.send(
Event::LogErr(
format!("Error executing '{}': {:?}",
cmd_exe, e))).is_ok();
kill_child(&child);
},
}
}
}
},
}
if let Ok(mut stop) = stop.lock() {
*stop = true;
}
match child.lock() {
Ok(mut child) =>
match child.wait() {
Ok(status) => {
event_tx.send(
Event::Message(id,
Msg::Direct(
vec![
"end".to_string(),
status.code().unwrap_or(-1).to_string()])))
.is_ok();
event_tx.send(Event::DeleteCallback(id)).is_ok();
},
Err(e) => {
event_tx.send(
Event::LogErr(
format!("Error waiting for end of '{}': {}",
cmd_exe, e))).is_ok();
},
},
Err(e) => {
event_tx.send(
Event::LogErr(
format!("Error lock child waiting for end of '{}': {}",
cmd_exe, e))).is_ok();
},
}
writer_thread.join();
})
}

345
src/tcp_csv_msg_connection.rs

@ -1,11 +1,10 @@
use wlambda::VVal;
use std::sync::*;
use std::sync::atomic::AtomicBool;
use std::net::TcpStream;
use std::net::TcpListener;
use std::fmt::Write;
use crate::sync_event::SyncEvent;
use crate::msg::*;
macro_rules! send_event {
($where: expr, $user_id: expr, $event_tx: expr, $event: expr) => {
@ -20,337 +19,6 @@ macro_rules! send_event {
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Msg {
Direct(Vec<String>),
Payload(&'static str, Option<Vec<String>>, Vec<u8>),
Hello(Vec<String>),
Error(Vec<String>),
Ping,
Quit,
Ok(String),
}
fn write_msg_arg(w: &mut std::fmt::Write, arg: &str) {
if let Some(_) = arg.find(|c: char| c.is_whitespace() || c == '"') {
write!(w, "\"").expect("write to work");
for c in arg.chars() {
match c {
'"' => write!(w, "\\\"").expect("write to work"),
'\\' => write!(w, "\\\\").expect("write to work"),
'\r' => write!(w, "\\r").expect("write to work"),
'\n' => write!(w, "\\n").expect("write to work"),
c => w.write_char(c).expect("write to work"),
}
}
write!(w, "\"").expect("write to work");
} else {
write!(w, "{}", arg).expect("write to work");
}
}
fn write_msg_args(w: &mut std::fmt::Write, args: &[String]) {
for a in args {
write!(w, " ").expect("write to work");
write_msg_arg(w, a);
}
}
impl Msg {
pub fn ok_response(&self) -> Option<Self> {
match self {
Msg::Ping => Some(Msg::Ok("ping".to_string())),
Msg::Quit => Some(Msg::Ok("quit".to_string())),
Msg::Ok(arg) => None,
Msg::Hello(args) => Some(Msg::Ok("hello".to_string())),
Msg::Error(args) => Some(Msg::Ok("error".to_string())),
Msg::Direct(args) => Some(Msg::Ok("direct".to_string())),
Msg::Payload(name, _, _) => Some(Msg::Ok(name.to_string())),
}
}
pub fn to_frame(&self) -> Vec<u8> {
let mut s = String::new();
let mut payload : Option<&[u8]> = None;
match self {
Msg::Ping => write!(&mut s, "ping").expect("write to work"),
Msg::Quit => write!(&mut s, "quit").expect("write to work"),
Msg::Ok(arg) => write!(&mut s, "ok {}", arg).expect("write to work"),
Msg::Hello(args) => {
write!(&mut s, "hello").expect("write to work");
write_msg_args(&mut s, args);
},
Msg::Error(args) => {
write!(&mut s, "error").expect("write to work");
write_msg_args(&mut s, args);
},
Msg::Direct(args) => {
write!(&mut s, "direct").expect("write to work");
write_msg_args(&mut s, args);
},
Msg::Payload(name, args, msg_payload) => {
write!(&mut s, "{}", name).expect("write to work");
write!(&mut s, " {}", msg_payload.len()).expect("write to work");
if let Some(args) = args {
write_msg_args(&mut s, args);
}
write!(&mut s, "\r\n").expect("write to work");
payload = Some(&msg_payload[..]);
},
}
write!(&mut s, "\r\n").expect("write to work");
let mut b = s.into_bytes();
if let Some(payload) = payload {
b.extend_from_slice(payload);
b.push(b'\r');
b.push(b'\n');
}
b
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ReadMsgError {
TryAgain,
ProtocolError,
Timeout,
EOF,
ReadError(String, String),
}
fn next_arg(s: &str) -> (String, &str) {
let s = s.trim_start();
if let Some(s) = s.strip_prefix('"') {
let mut arg = String::new();
let mut escaped = false;
for (byte_pos, c) in s.char_indices() {
if escaped {
match c {
'r' => arg.push('\r'),
'n' => arg.push('\n'),
_ => arg.push(c),
}
escaped = false;
} else {
if c == '"' {
return (arg, &s[(byte_pos + 1)..].trim_start());
} else if c == '\\' {
escaped = true;
} else {
arg.push(c);
}
}
}
(arg, "")
} else {
let v : Vec<&str> = s.splitn(2, |c: char| c.is_whitespace()).collect();
if v.len() == 0 {
("".to_string(), "")
} else if v.len() == 1 {
(v[0].to_string(), "")
} else {
(v[0].to_string(), v[1].trim_start())
}
}
}
pub struct MessageReader {
payload_name: Option<&'static str>,
payload_buf: Option<Vec<u8>>,
payload_rem_len: Option<u64>,
payload_data: Option<Vec<String>>,
}
impl MessageReader {
pub fn new() -> Self {
Self {
payload_name: None,
payload_buf: None,
payload_rem_len: None,
payload_data: None,
}
}
fn read_payload(&mut self, ep: &str, buf: &mut dyn std::io::BufRead) -> Result<Msg, ReadMsgError> {
let rlen = self.payload_rem_len.unwrap();
if rlen > 0 {
let take_len = if rlen < 4096 { rlen as usize } else { 4096 };
let mut data_buf : [u8; 4096] = [0; 4096];
match buf.read(&mut data_buf[0..take_len]) {
Ok(n) => {
if n == 0 {
return Err(ReadMsgError::EOF);
}
if n > take_len {
return Err(ReadMsgError::ProtocolError);
}
self.payload_rem_len = Some(rlen - (n as u64));
self.payload_buf.as_mut().unwrap().extend_from_slice(&data_buf[0..n]);
},
Err(e) => {
match e.kind() {
std::io::ErrorKind::TimedOut
| std::io::ErrorKind::WouldBlock => {
return Err(ReadMsgError::Timeout);
},
_ => {
return Err(ReadMsgError::ReadError(
ep.to_string(), format!("{}", e)));
}
}
},
}
}
if self.payload_rem_len.unwrap() == 0 {
self.payload_rem_len = None;
return Ok(Msg::Payload(
self.payload_name.take().unwrap(),
self.payload_data.take(),
self.payload_buf.take().unwrap()));
} else {
return Err(ReadMsgError::TryAgain);
}
}
pub fn read_msg(&mut self, ep: &str, buf: &mut dyn std::io::BufRead) -> Result<Msg, ReadMsgError> {
if self.payload_rem_len.is_some() {
return self.read_payload(ep, buf);
}
use std::io::BufRead;
let mut line = String::new();
match buf.read_line(&mut line) {
Ok(n) => {
if n == 0 {
return Err(ReadMsgError::EOF);
}
//d// println!("READL: [{}]", line);
let v: Vec<&str> =
line.splitn(2, |c: char| c.is_whitespace()).collect();
let command = v[0];
let mut rest = if v.len() > 1 { v[1] } else { "" };
match command {
"direct" => {
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
return Ok(Msg::Direct(args));
},
"payload" => {
let (len_arg, r) = next_arg(rest);
rest = r;
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
if let Ok(len) = u64::from_str_radix(&len_arg, 10) {
self.payload_rem_len = Some(len);
self.payload_buf = Some(vec![]);
self.payload_name = Some("payload");
self.payload_data = Some(args);
return self.read_payload(ep, buf);
} else {
return Err(ReadMsgError::ProtocolError);
}
},
"error" => {
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
return Ok(Msg::Error(args));
},
"msgpack" => {
let (len_arg, _) = next_arg(rest);
if let Ok(len) = u64::from_str_radix(&len_arg, 10) {
self.payload_rem_len = Some(len);
self.payload_buf = Some(vec![]);
self.payload_name = Some("msgpack");
return self.read_payload(ep, buf);
} else {
return Err(ReadMsgError::ProtocolError);
}
},
"json" => {
let (len_arg, _) = next_arg(rest);
if let Ok(len) = u64::from_str_radix(&len_arg, 10) {
self.payload_rem_len = Some(len);
self.payload_buf = Some(vec![]);
self.payload_name = Some("json");
return self.read_payload(ep, buf);
} else {
return Err(ReadMsgError::ProtocolError);
}
},
"hello" => {
let mut args = vec![];
while rest.len() > 0 {
let (arg, r) = next_arg(rest);
rest = r;
args.push(arg);
}
return Ok(Msg::Hello(args));
},
"quit" => {
return Ok(Msg::Quit);
},
"ping" => {
return Ok(Msg::Ping);
},
"ok" => {
let (arg, _) = next_arg(rest);
return Ok(Msg::Ok(arg));
},
"" => {
return Err(ReadMsgError::TryAgain);
},
_ => {
return Err(ReadMsgError::ProtocolError);
},
}
},
Err(e) => {
match e.kind() {
std::io::ErrorKind::TimedOut
| std::io::ErrorKind::WouldBlock => {
return Err(ReadMsgError::Timeout);
},
_ => {
return Err(ReadMsgError::ReadError(
ep.to_string(), format!("{}", e)));
}
}
},
}
}
}
#[derive(Debug, Clone)]
pub enum Event {
RecvMessage(Msg),
@ -575,7 +243,7 @@ impl TCPCSVConnection {
reader = None;
}
if let Some(s) = &stream {
if let Some(_s) = &stream {
println!("R: {}", ep);
} else {
// println!("R----");
@ -634,9 +302,6 @@ impl TCPCSVConnection {
}
}
}
stream = None;
reader = None;
})
}
@ -743,10 +408,6 @@ impl TCPCSVConnection {
}
}
}
stream = None;
cur_frame = None;
cur_write_ptr = None;
})
}
@ -791,7 +452,7 @@ impl TCPCSVConnection {
}
use std::str::FromStr;
let mut stream =
let stream =
TcpStream::connect_timeout(
&std::net::SocketAddr::from_str(&ep)
.expect("Valid endpoint address"),

91
src/timer.rs